RTMP – Real Time Messaging Protocol

Protokół RTMP jest to zamknięty standard przemysłowy stworzony przez Adobe System. Jest używany do przesyłania obiektów AMF  (Action Message Format) oraz danych w formie strumieniowej takich jaki audio i wideo pomiedzy klientem i serwerem Flash. RTMP do transportu wykorzustuje bezpośrednio protokół TCP/IP na porcie 1935 tunel HTTP na porcie 80 (RTMPT). Możliwe jest przesyłanie do 64 strumieni jednocześnie po tym samym połaczeniu. W kazdym pakiecie AMF znajduje sie numer identyfikujący strumien. Dana pakiet RTMP może zawierać wiele wiadmości AMF.

Pakiet RTMP

Pakiet rtmp składa się z nagłówka o stałej długości oraz o zmiennej długości zawartości. Długość nagłówka może przyjmować cztery wartości:

  • 00 – 12 bajtów
  • 01 – 8 bajtów
  • 10 – 4 bajty
  • 11 – 1 bajt

Krótsza długość nagłówka oznacza ze brakujace dane są takie same jak te wysłane w ostatnim pakiecie je zawierającym z tym samym object id.

Pierwszy bajt zawiera informacje o długości nagłówka oraz id obiektu. Długość nagłówka zawarta jest w pierwszych dwóch bitach a id obiektu w kolejnych 6. Id obiektu wskazuje na id wiadomości AMF powiązaną z danym strumieniem danych. Oznacza to ze możliwe jest przesłanie 64 typów obiektów czyli obsłużenie 64 strumieni w tym samym połaczeniu.

Kolejne trzy bajty zawierają timestamp, przesyłany zawsze gdy długość nagłówka jest wieksza/równa 4 bajty. Jego zastosowanie i znaczenie nie jest znane:)

Kolejne trzy bajty zawierają wielkość zawartości pakietu RTMP (bez nagłówka), domyślnie dla danych video oraz audio jest to 128 i 64 bajty. Natępujący bajt zawiera typ przkazywanego obiektu AMF.:

  • 0x03 (Bytes Read) – wysyłany co x odebranych bajtów przez obie strony
  • 0x04 (Ping) – używany do kontroli stanu strumienia, dzieli się na podtypy
  • 0x05 (Server) – odpowiedzi serwera
  • 0x06 (Client) – zapytania klienta
  • 0x08 (Audio Data) – dane audio
  • 0x09 (Video Data) – dane video
  • 0x12 (Notify) – wywołanie nie oczekujące na odpowiedź
  • 0x13 (Shared Object) – obiekt współdzielony, dzieli sie na podtypy
  • 0x14 (Invoke) – wywołanie metody RPC na obiekcie zdalnym

Ostatnie cztery bajty zawierają id strumienia. Jeśli nadawca pakietu jest klient to zawierają one obiekt źródłowy ‘NetStream’, natomiast jeśli nadawcą jest serwer to zawierają obiekt ‘NetStream’ przypisany do tego strumienia po stronie serwera.

Połaczenie (Handshake)

Każde połaczenie inicjowane jest przez klienta. Inicjalizacja ma miejsce w oparciu o tzw ‘handshake’. Klient wysyła do serwera pojedyńczy bajt o wartości 0x03 i tablicę bajtów o długości 1536 oraz zapamiętuje zawartość tablicy. Serwer w odpowiedzi wysyła pojedyńczy bajt o wartośći 0x03 oraz dwie tablice bajtów o długości 1536.  Zawartość drugiej tablicy jest kopią pierwszej tablicy wysłanej przez klienta. Klient porównuje zawartości tablicy i jeśli są zgodne wysyła do serwera ostateczne potwierdzenie w postaci pojedyńczego bajtu i tablicy bajtów o długości 1536 bedącej kopią pierwszej tablicy otrzymanej w odpowiedzi od serwera. Od tej pory połaczenie jest nawiązane.

Obiekty AMF

Po nawiązaniu połaczenia dane audio jak i video są przesyłane w obiektach używających struktury AMF. Format AMF jest wykorzystywany do przenoszenia klas LocalConnection, SharedObject, NetConnection, and NetStream. Wszystkie obiekty AMF są poprzedzone jedno bajtowym nagłówkiem lub nagłówkiem zgodnym z opakowującym nagłówkiem RTMP. Pierwsze bajt wskazuje na typ obiektu, zanczenie kolejnych bajtów zależy bezpośrednio od typu obiektu:

  • 0x00 – 8 bajtów, liczba
  • 0x01 – 1 bajt, wartość boolean
  • 0x02 – string
  • 0x03 – obiekt, jest to lista par typu klucz/wartość. Klucz jest reprezentowany jako String a wartość to obiekt AMF. Koniec obiektu jest wskazany za pomocą 0x000009 (parę o zerowej długości kluczu i wartości końca obiektu)
  • 0x04 – film flash
  • 0x05 – 0 bajtów, wartość NULL
  • 0x06 – 0 bajtów, wartość niezdefioniowana
  • 0x07 – referencja
  • 0x08 – tablica ECMA
  • 0x09 – 0 bajtów, koniec obiektu
  • 0x0a – stala tablica
  • 0x0b – data
  • 0x0c – string wielobajtowy
  • 0x0d – typ niewspierany
  • 0x0e – zbiór rekordów
  • 0x0f – obiekt XML
  • 0x10 – obiekt typowany
You May Also Like

How to use mocks in controller tests

Even since I started to write tests for my Grails application I couldn't find many articles on using mocks. Everyone is talking about tests and TDD but if you search for it there isn't many articles.

Today I want to share with you a test with mocks for a simple and complete scenario. I have a simple application that can fetch Twitter tweets and present it to user. I use REST service and I use GET to fetch tweets by id like this: http://api.twitter.com/1/statuses/show/236024636775735296.json. You can copy and paste it into your browser to see a result.

My application uses Grails 2.1 with spock-0.6 for tests. I have TwitterReaderService that fetches tweets by id, then I parse a response into my Tweet class.


class TwitterReaderService {
Tweet readTweet(String id) throws TwitterError {
try {
String jsonBody = callTwitter(id)
Tweet parsedTweet = parseBody(jsonBody)
return parsedTweet
} catch (Throwable t) {
throw new TwitterError(t)
}
}

private String callTwitter(String id) {
// TODO: implementation
}

private Tweet parseBody(String jsonBody) {
// TODO: implementation
}
}

class Tweet {
String id
String userId
String username
String text
Date createdAt
}

class TwitterError extends RuntimeException {}

TwitterController plays main part here. Users call show action along with id of a tweet. This action is my subject under test. I've implemented some basic functionality. It's easier to focus on it while writing tests.


class TwitterController {
def twitterReaderService

def index() {
}

def show() {
Tweet tweet = twitterReaderService.readTweet(params.id)
if (tweet == null) {
flash.message = 'Tweet not found'
redirect(action: 'index')
return
}

[tweet: tweet]
}
}

Let's start writing a test from scratch. Most important thing here is that I use mock for my TwitterReaderService. I do not construct new TwitterReaderService(), because in this test I test only TwitterController. I am not interested in injected service. I know how this service is supposed to work and I am not interested in internals. So before every test I inject a twitterReaderServiceMock into controller:


import grails.test.mixin.TestFor
import spock.lang.Specification

@TestFor(TwitterController)
class TwitterControllerSpec extends Specification {
TwitterReaderService twitterReaderServiceMock = Mock(TwitterReaderService)

def setup() {
controller.twitterReaderService = twitterReaderServiceMock
}
}

Now it's time to think what scenarios I need to test. This line from TwitterReaderService is the most important:


Tweet readTweet(String id) throws TwitterError

You must think of this method like a black box right now. You know nothing of internals from controller's point of view. You're only interested what can be returned for you:

  • a TwitterError can be thrown
  • null can be returned
  • Tweet instance can be returned

This list is your test blueprint. Now answer a simple question for each element: "What do I want my controller to do in this situation?" and you have plan test:

  • show action should redirect to index if TwitterError is thrown and inform about error
  • show action should redirect to index and inform if tweet is not found
  • show action should show found tweet

That was easy and straightforward! And now is the best part: we use twitterReaderServiceMock to mock each of these three scenarios!

In Spock there is a good documentation about interaction with mocks. You declare what methods are called, how many times, what parameters are given and what should be returned. Remember a black box? Mock is your black box with detailed instruction, e.g.: I expect you that if receive exactly one call to readTweet with parameter '1' then you should throw me a TwitterError. Rephrase this sentence out loud and look at this:


1 * twitterReaderServiceMock.readTweet('1') >> { throw new TwitterError() }

This is a valid interaction definition on mock! It's that easy! Here is a complete test that fails for now:


import grails.test.mixin.TestFor
import spock.lang.Specification

@TestFor(TwitterController)
class TwitterControllerSpec extends Specification {
TwitterReaderService twitterReaderServiceMock = Mock(TwitterReaderService)

def setup() {
controller.twitterReaderService = twitterReaderServiceMock
}

def "show should redirect to index if TwitterError is thrown"() {
given:
controller.params.id = '1'
when:
controller.show()
then:
1 * twitterReaderServiceMock.readTweet('1') >> { throw new TwitterError() }
0 * _._
flash.message == 'There was an error on fetching your tweet'
response.redirectUrl == '/twitter/index'
}
}

| Failure: show should redirect to index if TwitterError is thrown(pl.refaktor.twitter.TwitterControllerSpec)
| pl.refaktor.twitter.TwitterError
at pl.refaktor.twitter.TwitterControllerSpec.show should redirect to index if TwitterError is thrown_closure1(TwitterControllerSpec.groovy:29)

You may notice 0 * _._ notation. It says: I don't want any other mocks or any other methods called. Fail this test if something is called! It's a good practice to ensure that there are no more interactions than you want.

Ok, now I need to implement controller logic to handle TwitterError.


class TwitterController {

def twitterReaderService

def index() {
}

def show() {
Tweet tweet

try {
tweet = twitterReaderService.readTweet(params.id)
} catch (TwitterError e) {
log.error(e)
flash.message = 'There was an error on fetching your tweet'
redirect(action: 'index')
return
}

[tweet: tweet]
}
}

My tests passes! We have two scenarios left. Rule stays the same: TwitterReaderService returns something and we test against it. So this line is the heart of each test, change only returned values after >>:


1 * twitterReaderServiceMock.readTweet('1') >> { throw new TwitterError() }

Here is a complete test for three scenarios and controller that passes it.


import grails.test.mixin.TestFor
import spock.lang.Specification

@TestFor(TwitterController)
class TwitterControllerSpec extends Specification {

TwitterReaderService twitterReaderServiceMock = Mock(TwitterReaderService)

def setup() {
controller.twitterReaderService = twitterReaderServiceMock
}

def "show should redirect to index if TwitterError is thrown"() {
given:
controller.params.id = '1'
when:
controller.show()
then:
1 * twitterReaderServiceMock.readTweet('1') >> { throw new TwitterError() }
0 * _._
flash.message == 'There was an error on fetching your tweet'
response.redirectUrl == '/twitter/index'
}

def "show should inform about not found tweet"() {
given:
controller.params.id = '1'
when:
controller.show()
then:
1 * twitterReaderServiceMock.readTweet('1') >> null
0 * _._
flash.message == 'Tweet not found'
response.redirectUrl == '/twitter/index'
}


def "show should show found tweet"() {
given:
controller.params.id = '1'
when:
controller.show()
then:
1 * twitterReaderServiceMock.readTweet('1') >> new Tweet()
0 * _._
flash.message == null
response.status == 200
}
}

class TwitterController {

def twitterReaderService

def index() {
}

def show() {
Tweet tweet

try {
tweet = twitterReaderService.readTweet(params.id)
} catch (TwitterError e) {
log.error(e)
flash.message = 'There was an error on fetching your tweet'
redirect(action: 'index')
return
}

if (tweet == null) {
flash.message = 'Tweet not found'
redirect(action: 'index')
return
}

[tweet: tweet]
}
}

The most important thing here is that we've tested controller-service interaction without logic implementation in service! That's why mock technique is so useful. It decouples your dependencies and let you focus on exactly one subject under test. Happy testing!