Recently at storm-users

I’ve been reading through storm-users Google Group recently. This resolution was heavily inspired by Adam Kawa’s post “Football zero, Apache Pig hero”. Since I’ve encountered a lot of insightful and very interesting information I’ve decided to describe some of those in this post. nimbus will work in HA mode – There’s a pull request open for it already… but some recent work (distributing topology files via Bittorrent) will greatly simplify the implementation. Once the Bittorrent work is done we’ll look at reworking the HA pull request. (storm’s pull request) pig on storm – Pig on Trident would be a cool and welcome project. Join and groupBy have very clear semantics there, as those concepts exist directly in Trident. The extensions needed to Pig are the concept of incremental, persistent state across batches (mirroring those concepts in Trident). You can read a complete proposal. implementing topologies in pure python with petrel looks like this: class Bolt(storm.BasicBolt): def initialize(self, conf, context): ''' This method executed only once ''' storm.log('initializing bolt') def process(self, tup): ''' This method executed every time a new tuple arrived ''' msg = tup.values[0] storm.log('Got tuple %s' %msg) if __name__ == "__main__": Bolt().run() Fliptop is happy with storm – see their presentation here topology metrics in 0.9.0: The new metrics feature allows you to collect arbitrarily custom metrics over fixed windows. Those metrics are exported to a metrics stream that you can consume by implementing IMetricsConsumer and configure with Config.java#L473. Use TopologyContext#registerMetric to register new metrics. storm vs flume – some users’ point of view: I use Storm and Flume and find that they are better at different things – it really depends on your use case as to which one is better suited. First and foremost, they were originally designed to do different things: Flume is a reliable service for collecting, aggregating, and moving large amounts of data from source to destination (e.g. log data from many web servers to HDFS). Storm is more for real-time computation (e.g. streaming analytics) where you analyse data in flight and don’t necessarily land it anywhere. Having said that, Storm is also fault-tolerant and can write to external data stores (e.g. HBase) and you can do real-time computation in Flume (using interceptors) That’s all for this day – however, I’ll keep on reading through storm-users, so watch this space for more info on storm development. I’ve been reading through storm-users Google Group recently. This resolution was heavily inspired by Adam Kawa’s post “Football zero, Apache Pig hero”. Since I’ve encountered a lot of insightful and very interesting information I’ve decided to describe some of those in this post. nimbus will work in HA mode – There’s a pull request open for it already… but some recent work (distributing topology files via Bittorrent) will greatly simplify the implementation. Once the Bittorrent work is done we’ll look at reworking the HA pull request. (storm’s pull request) pig on storm – Pig on Trident would be a cool and welcome project. Join and groupBy have very clear semantics there, as those concepts exist directly in Trident. The extensions needed to Pig are the concept of incremental, persistent state across batches (mirroring those concepts in Trident). You can read a complete proposal. implementing topologies in pure python with petrel looks like this: class Bolt(storm.BasicBolt): def initialize(self, conf, context): ''' This method executed only once ''' storm.log('initializing bolt') def process(self, tup): ''' This method executed every time a new tuple arrived ''' msg = tup.values[0] storm.log('Got tuple %s' %msg) if __name__ == "__main__": Bolt().run() Fliptop is happy with storm – see their presentation here topology metrics in 0.9.0: The new metrics feature allows you to collect arbitrarily custom metrics over fixed windows. Those metrics are exported to a metrics stream that you can consume by implementing IMetricsConsumer and configure with Config.java#L473. Use TopologyContext#registerMetric to register new metrics. storm vs flume – some users’ point of view: I use Storm and Flume and find that they are better at different things – it really depends on your use case as to which one is better suited. First and foremost, they were originally designed to do different things: Flume is a reliable service for collecting, aggregating, and moving large amounts of data from source to destination (e.g. log data from many web servers to HDFS). Storm is more for real-time computation (e.g. streaming analytics) where you analyse data in flight and don’t necessarily land it anywhere. Having said that, Storm is also fault-tolerant and can write to external data stores (e.g. HBase) and you can do real-time computation in Flume (using interceptors) That’s all for this day – however, I’ll keep on reading through storm-users, so watch this space for more info on storm development.

I’ve been reading through storm-users Google Group recently. This
resolution was heavily inspired by Adam Kawa’s post “Football zero, Apache Pig hero”. Since I’ve encountered a lot of insightful and very interesting information I’ve decided to describe some of those in this post.

  • nimbus will work in HA mode – There’s a pull request open for it already… but some
    recent work (distributing topology files via Bittorrent) will greatly
    simplify the implementation. Once the Bittorrent work is done we’ll look
    at reworking the HA pull request. (storm’s pull request)

  • pig on storm – Pig on Trident would be a cool and welcome project. Join
    and groupBy have very clear semantics there, as those concepts exist
    directly in Trident. The extensions needed to Pig are the concept of
    incremental, persistent state across batches (mirroring those concepts
    in Trident). You can read a complete proposal.

  • implementing topologies in pure python with petrel looks like this:

class Bolt(storm.BasicBolt):
    def initialize(self, conf, context):
       ''' This method executed only once '''
        storm.log('initializing bolt')

    def process(self, tup):
       ''' This method executed every time a new tuple arrived '''       
       msg = tup.values[0]
       storm.log('Got tuple %s' %msg)

if __name__ == "__main__":
    Bolt().run()
  • Fliptop is happy with storm – see their presentation here

  • topology metrics in 0.9.0: The new metrics feature allows you to collect
    arbitrarily custom metrics over fixed windows. Those metrics are
    exported to a metrics stream that you can consume by implementing
    IMetricsConsumer and configure with Config.java#L473.
    Use TopologyContext#registerMetric to register new metrics.

  • storm vs flume – some users’ point of view: I use Storm and Flume and find that they are better at
    different things – it really depends on your use case as to which one is
    better suited. First and foremost, they were originally designed to do
    different things: Flume is a reliable service for collecting,
    aggregating, and moving large amounts of data from source to destination
    (e.g. log data from many web servers to HDFS). Storm is more for
    real-time computation (e.g. streaming analytics) where you analyse data
    in flight and don’t necessarily land it anywhere. Having said that,
    Storm is also fault-tolerant and can write to external data stores (e.g.
    HBase) and you can do real-time computation in Flume (using
    interceptors)

That’s all for this day – however, I’ll keep on reading through storm-users, so watch this space for more info on storm development.

You May Also Like

Mentoring in Software Craftsmanship

Maria Diaconu and  Alexandru Bolboaca are both strong supporters of Software Craftsmanship and Agile movements in Romania, with years of experience as developers, leaders, architects, managers and instructors. On their lecture at Agile Central Eur...Maria Diaconu and  Alexandru Bolboaca are both strong supporters of Software Craftsmanship and Agile movements in Romania, with years of experience as developers, leaders, architects, managers and instructors. On their lecture at Agile Central Eur...

Grails session timeout without XML

This article shows clean, non hacky way of configuring featureful event listeners for Grails application servlet context. Feat. HttpSessionListener as a Spring bean example with session timeout depending on whether user account is premium or not.

Common approaches

Speaking of session timeout config in Grails, a default approach is to install templates with a command. This way we got direct access to web.xml file. Also more unnecessary files are created. Despite that unnecessary files are unnecessary, we should also remember some other common knowledge: XML is not for humans.

Another, a bit more hacky, way is to create mysterious scripts/_Events.groovy file. Inside of which, by using not less enigmatic closure: eventWebXmlEnd = { filename -> ... }we can parse and hack into web.xml with a help of XmlSlurper.
Even though lot of Grails plugins do it similar way, still it’s not really straightforward, is it? Besides, where’s the IDE support? Hello!?

Examples of both above ways can be seen on StackOverflow.

Simpler and cleaner way

By adding just a single line to the already generated init closure we have it done:
class BootStrap {

def init = { servletContext ->
servletContext.addListener(OurListenerClass)
}
}

Allrighty, this is enough to avoid XML. Sweets are served after the main course though :)

Listener as a Spring bean

Let us assume we have a requirement. Set a longer session timeout for premium user account.
Users are authenticated upon session creation through SSO.

To easy meet the requirements just instantiate the CustomTimeoutSessionListener as Spring bean at resources.groovy. We also going to need some source of the user custom session timeout. Let say a ConfigService.
beans = {    
customTimeoutSessionListener(CustomTimeoutSessionListener) {
configService = ref('configService')
}
}

With such approach BootStrap.groovy has to by slightly modified. To keep control on listener instantation, instead of passing listener class type, Spring bean is injected by Grails and the instance passed:
class BootStrap {

def customTimeoutSessionListener

def init = { servletContext ->
servletContext.addListener(customTimeoutSessionListener)
}
}

An example CustomTimeoutSessionListener implementation can look like:
import javax.servlet.http.HttpSessionEvent    
import javax.servlet.http.HttpSessionListener
import your.app.ConfigService

class CustomTimeoutSessionListener implements HttpSessionListener {

ConfigService configService

@Override
void sessionCreated(HttpSessionEvent httpSessionEvent) {
httpSessionEvent.session.maxInactiveInterval = configService.sessionTimeoutSeconds
}

@Override
void sessionDestroyed(HttpSessionEvent httpSessionEvent) { /* nothing to implement */ }
}
Having at hand all power of the Spring IoC this is surely a good place to load some persisted user’s account stuff into the session or to notify any other adequate bean about user presence.

Wait, what about the user context?

Honest answer is: that depends on your case. Yet here’s an example of getSessionTimeoutMinutes() implementation using Spring Security:
import org.springframework.security.core.context.SecurityContextHolder    

class ConfigService {

static final int 3H = 3 * 60 * 60
static final int QUARTER = 15 * 60

int getSessionTimeoutSeconds() {

String username = SecurityContextHolder.context?.authentication?.principal
def account = Account.findByUsername(username)

return account?.premium ? 3H : QUARTER
}
}
This example is simplified. Does not contain much of defensive programming. Just an assumption that principal is already set and is a String - unique username. Thanks to Grails convention our ConfigService is transactional so the Account domain class can use GORM dynamic finder.
OK, config fetching implementation details are out of scope here anyway. You can get, load, fetch, obtain from wherever you like to. Domain persistence, principal object, role config, external file and so on...

Any gotchas?

There is one. When running grails test command, servletContext comes as some mocked class instance without addListener method. Thus we going to have a MissingMethodException when running tests :(

Solution is typical:
def init = { servletContext ->
if (Environment.current != Environment.TEST) {
servletContext.addListener(customTimeoutSessionListener)
}
}
An unnecessary obstacle if you ask me. Should I submit a Jira issue about that?

TL;DR

Just implement a HttpSessionListener. Create a Spring bean of the listener. Inject it into BootStrap.groovy and call servletContext.addListener(injectedListener).