RTMP – Real Time Messaging Protocol

Protokół RTMP jest to zamknięty standard przemysłowy stworzony przez Adobe System. Jest używany do przesyłania obiektów AMF  (Action Message Format) oraz danych w formie strumieniowej takich jaki audio i wideo pomiedzy klientem i serwerem Flash. RTMP do transportu wykorzustuje bezpośrednio protokół TCP/IP na porcie 1935 tunel HTTP na porcie 80 (RTMPT). Możliwe jest przesyłanie do 64 strumieni jednocześnie po tym samym połaczeniu. W kazdym pakiecie AMF znajduje sie numer identyfikujący strumien. Dana pakiet RTMP może zawierać wiele wiadmości AMF.

Pakiet RTMP

Pakiet rtmp składa się z nagłówka o stałej długości oraz o zmiennej długości zawartości. Długość nagłówka może przyjmować cztery wartości:

  • 00 – 12 bajtów
  • 01 – 8 bajtów
  • 10 – 4 bajty
  • 11 – 1 bajt

Krótsza długość nagłówka oznacza ze brakujace dane są takie same jak te wysłane w ostatnim pakiecie je zawierającym z tym samym object id.

Pierwszy bajt zawiera informacje o długości nagłówka oraz id obiektu. Długość nagłówka zawarta jest w pierwszych dwóch bitach a id obiektu w kolejnych 6. Id obiektu wskazuje na id wiadomości AMF powiązaną z danym strumieniem danych. Oznacza to ze możliwe jest przesłanie 64 typów obiektów czyli obsłużenie 64 strumieni w tym samym połaczeniu.

Kolejne trzy bajty zawierają timestamp, przesyłany zawsze gdy długość nagłówka jest wieksza/równa 4 bajty. Jego zastosowanie i znaczenie nie jest znane:)

Kolejne trzy bajty zawierają wielkość zawartości pakietu RTMP (bez nagłówka), domyślnie dla danych video oraz audio jest to 128 i 64 bajty. Natępujący bajt zawiera typ przkazywanego obiektu AMF.:

  • 0x03 (Bytes Read) – wysyłany co x odebranych bajtów przez obie strony
  • 0x04 (Ping) – używany do kontroli stanu strumienia, dzieli się na podtypy
  • 0x05 (Server) – odpowiedzi serwera
  • 0x06 (Client) – zapytania klienta
  • 0x08 (Audio Data) – dane audio
  • 0x09 (Video Data) – dane video
  • 0x12 (Notify) – wywołanie nie oczekujące na odpowiedź
  • 0x13 (Shared Object) – obiekt współdzielony, dzieli sie na podtypy
  • 0x14 (Invoke) – wywołanie metody RPC na obiekcie zdalnym

Ostatnie cztery bajty zawierają id strumienia. Jeśli nadawca pakietu jest klient to zawierają one obiekt źródłowy ‘NetStream’, natomiast jeśli nadawcą jest serwer to zawierają obiekt ‘NetStream’ przypisany do tego strumienia po stronie serwera.

Połaczenie (Handshake)

Każde połaczenie inicjowane jest przez klienta. Inicjalizacja ma miejsce w oparciu o tzw ‘handshake’. Klient wysyła do serwera pojedyńczy bajt o wartości 0x03 i tablicę bajtów o długości 1536 oraz zapamiętuje zawartość tablicy. Serwer w odpowiedzi wysyła pojedyńczy bajt o wartośći 0x03 oraz dwie tablice bajtów o długości 1536.  Zawartość drugiej tablicy jest kopią pierwszej tablicy wysłanej przez klienta. Klient porównuje zawartości tablicy i jeśli są zgodne wysyła do serwera ostateczne potwierdzenie w postaci pojedyńczego bajtu i tablicy bajtów o długości 1536 bedącej kopią pierwszej tablicy otrzymanej w odpowiedzi od serwera. Od tej pory połaczenie jest nawiązane.

Obiekty AMF

Po nawiązaniu połaczenia dane audio jak i video są przesyłane w obiektach używających struktury AMF. Format AMF jest wykorzystywany do przenoszenia klas LocalConnection, SharedObject, NetConnection, and NetStream. Wszystkie obiekty AMF są poprzedzone jedno bajtowym nagłówkiem lub nagłówkiem zgodnym z opakowującym nagłówkiem RTMP. Pierwsze bajt wskazuje na typ obiektu, zanczenie kolejnych bajtów zależy bezpośrednio od typu obiektu:

  • 0x00 – 8 bajtów, liczba
  • 0x01 – 1 bajt, wartość boolean
  • 0x02 – string
  • 0x03 – obiekt, jest to lista par typu klucz/wartość. Klucz jest reprezentowany jako String a wartość to obiekt AMF. Koniec obiektu jest wskazany za pomocą 0x000009 (parę o zerowej długości kluczu i wartości końca obiektu)
  • 0x04 – film flash
  • 0x05 – 0 bajtów, wartość NULL
  • 0x06 – 0 bajtów, wartość niezdefioniowana
  • 0x07 – referencja
  • 0x08 – tablica ECMA
  • 0x09 – 0 bajtów, koniec obiektu
  • 0x0a – stala tablica
  • 0x0b – data
  • 0x0c – string wielobajtowy
  • 0x0d – typ niewspierany
  • 0x0e – zbiór rekordów
  • 0x0f – obiekt XML
  • 0x10 – obiekt typowany
You May Also Like

Simple trick to DRY your Grails controller

Grails controllers are not very DRY. It's easy to find duplicated code fragments in default generated controller. Take a look at code sample below. It is duplicated four times in show, edit, update and delete actions:

class BookController {
def show() {
def bookInstance = Book.get(params.id)
if (!bookInstance) {
flash.message = message(code: 'default.not.found.message', args: [message(code: 'book.label', default: 'Book'), params.id])
redirect(action: "list")
return
}
[bookInstance: bookInstance]
}
}

Why is it duplicated?

There is a reason for that duplication, though. If you move this snippet to a method, it can redirect to "list" action, but it can't prevent controller from further execution. After you call redirect, response status changes to 302, but after method exits, controller still runs subsequent code.

Solution

At TouK we've implemented a simple trick to resolve that situation:

  1. wrap everything with a simple withStoppingOnRender method,
  2. whenever you want to render or redirect AND stop controller execution - throw EndRenderingException.

We call it Big Return - return from a method and return from a controller at once. Here is how it works:

class BookController {
def show(Long id) {
withStoppingOnRender {
Book bookInstance = Book.get(id)
validateInstanceExists(bookInstance)
[bookInstance: bookInstance]
}
}

protected Object withStoppingOnRender(Closure closure) {
try {
return closure.call()
} catch (EndRenderingException e) {}
}

private void validateInstanceExists(Book instance) {
if (!instance) {
flash.message = message(code: 'default.not.found.message', args: [message(code: 'book.label', default: 'Book'), params.id])
redirect(action: "list")
throw new EndRenderingException()
}
}
}

class EndRenderingException extends RuntimeException {}

Example usage

For simple CRUD controllers, you can use this solution and create some BaseController class for your controllers. We use withStoppingOnRender in every controller so code doesn't look like a spaghetti, we follow DRY principle and code is self-documented. Win-win-win! Here is a more complex example:

class DealerController {
@Transactional
def update() {
withStoppingOnRender {
Dealer dealerInstance = Dealer.get(params.id)
validateInstanceExists(dealerInstance)
validateAccountInExternalService(dealerInstance)
checkIfInstanceWasConcurrentlyModified(dealerInstance, params.version)
dealerInstance.properties = params
saveUpdatedInstance(dealerInstance)
redirectToAfterUpdate(dealerInstance)
}
}
}

JBoss Envers and Spring transaction managers

I've stumbled upon a bug with my configuration for JBoss Envers today, despite having integration tests all over the application. I have to admit, it casted a dark shadow of doubt about the value of all the tests for a moment. I've been practicing TDD since 2005, and frankly speaking, I should have been smarter than that.

My fault was simple. I've started using Envers the right way, with exploratory tests and a prototype. Then I've deleted the prototype and created some integration tests using in-memory H2 that looked more or less like this example:

@Test
public void savingAndUpdatingPersonShouldCreateTwoHistoricalVersions() {
    //given
    Person person = createAndSavePerson();
    String oldFirstName = person.getFirstName();
    String newFirstName = oldFirstName + "NEW";

    //when
    updatePersonWithNewName(person, newFirstName);

    //then
    verifyTwoHistoricalVersionsWereSaved(oldFirstName, newFirstName);
}

private Person createAndSavePerson() {
    Transaction transaction = session.beginTransaction();
    Person person = PersonFactory.createPerson();
    session.save(person);
    transaction.commit();
    return person;
}    

private void updatePersonWithNewName(Person person, String newName) {
    Transaction transaction = session.beginTransaction();
    person.setFirstName(newName);
    session.update(person);
    transaction.commit();
}

private void verifyTwoHistoricalVersionsWereSaved(String oldFirstName, String newFirstName) {
    List<Object[]> personRevisions = getPersonRevisions();
    assertEquals(2, personRevisions.size());
    assertEquals(oldFirstName, ((Person)personRevisions.get(0)[0]).getFirstName());
    assertEquals(newFirstName, ((Person)personRevisions.get(1)[0]).getFirstName());
}

private List<Object[]> getPersonRevisions() {
    Transaction transaction = session.beginTransaction();
    AuditReader auditReader = AuditReaderFactory.get(session);
    List<Object[]> personRevisions = auditReader.createQuery()
            .forRevisionsOfEntity(Person.class, false, true)
            .getResultList();
    transaction.commit();
    return personRevisions;
}

Because Envers inserts audit data when the transaction is commited (in a new temporary session), I thought I have to create and commit the transaction manually. And that is true to some point.

My fault was that I didn't have an end-to-end integration/acceptance test, that would call to entry point of the application (in this case a service which is called by GWT via RPC), because then I'd notice, that the Spring @Transactional annotation, and calling transaction.commit() are two, very different things.

Spring @Transactional annotation will use a transaction manager configured for the application. Envers on the other hand is used by subscribing a listener to hibernate's SessionFactory like this:

<bean id="sessionFactory" class="org.springframework.orm.hibernate3.annotation.AnnotationSessionFactoryBean" >        
...
 <property name="eventListeners">
     <map key-type="java.lang.String" value-type="org.hibernate.event.EventListeners">
         <entry key="post-insert" value-ref="auditEventListener"/>
         <entry key="post-update" value-ref="auditEventListener"/>
         <entry key="post-delete" value-ref="auditEventListener"/>
         <entry key="pre-collection-update" value-ref="auditEventListener"/>
         <entry key="pre-collection-remove" value-ref="auditEventListener"/>
         <entry key="post-collection-recreate" value-ref="auditEventListener"/>
     </map>
 </property>
</bean>

<bean id="auditEventListener" class="org.hibernate.envers.event.AuditEventListener" />

Envers creates and collects something called AuditWorkUnits whenever you update/delete/insert audited entities, but audit tables are not populated until something calls AuditProcess.beforeCompletion, which makes sense. If you are using org.hibernate.transaction.JDBCTransaction manually, this is called on commit() when notifying all subscribed javax.transaction.Synchronization objects (and enver's AuditProcess is one of them).

The problem was, that I used a wrong transaction manager.

<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager" >
    <property name="dataSource" ref="dataSource"/>
</bean>

This transaction manager doesn't know anything about hibernate and doesn't use org.hibernate.transaction.JDBCTransaction. While Synchronization is an interface from javax.transaction package, DataSourceTransactionManager doesn't use it (maybe because of simplicity, I didn't dig deep enough in org.springframework.jdbc.datasource), and thus Envers works fine except not pushing the data to the database.

Which is the whole point of using Envers.

Use right tools for the task, they say. The whole problem is solved by using a transaction manager that is well aware of hibernate underneath.

<bean id="transactionManager" class="org.springframework.orm.hibernate3.HibernateTransactionManager" >
    <property name="sessionFactory" ref="sessionFactory"/>
</bean>

Lesson learned: always make sure your acceptance tests are testing the right thing. If there is a doubt about the value of your tests, you just don't have enough of them,