Multi phased processing in scala

Last time in our project we had to add progress bar for visualization of long time running process. Process was made of a few phases and we had to print in which phase we currently are. In first step we conclude that we need to create a class of Progre…

Last time in our project we had to add progress bar for visualization of long time running process. Process was made of a few phases and we had to print in which phase we currently are. In first step we conclude that we need to create a class of Progress which will be passed as an implicit parameter to our service. Then we will wrap method calls be inProgress method which will notify some e.g. akka actor about phase begin and phase end.

But this approach has some disadvantages. Firstly before we start service’s operation we need to init progress with count of all phases to get know ratio of progress finish. With this approach we had to add some extra counting before operation start.

If we want to keep real progress notifications the numbers of phases had to fit count of inPhase blocks. Some of phases were dynamically computed and some where omitted in case of failure validations results. This code become to be unmaintained.

We found that we need to join computation of phases with real phase processing. In this case we need to change approach from building process to building chain of phases that will run the process. Each phase will take the result of previous phase and transform it to new output. So example process will look like this:

Code giving this chain functionality looks like this:

We’ve used right associative operator :: for building chain of phases. “Body” of phases is piped by andThen: processPrevWrapped andThen processNext. For nil-tail we need to have a factory creating empty chain with identity “body” function.

Also if we have this kind of tool, we can modify piping code according to nature of our flow. For example if we are using scalaz.Validation we can do validating chain which will extract a success from n-step output and pass it to input of next step (like flatMap). In the other hand if n-step will return Failure, we will skip all remaining phases of validating chain.

To make building of chain more production-ready we add some extra features:

  • Chaining of chains (sth like ::: in scala Lists)
  • Transforming of input/output – for adding some “glue” code for simpler phases chaining
  • Wrapping of chains – also some “glue” code doing both input and output transformations
  • Sequencing of chains – sequenced processing of multiple phases with the same input

If you are interested in using similar approach, take a look at my github project: scala-phases-chain. If you want to integrate this tool with akka actors, simply change MultiPhasedProgress.notifyAboutStatus method to look like this:

You May Also Like

Recently at storm-users

I've been reading through storm-users Google Group recently. This resolution was heavily inspired by Adam Kawa's post "Football zero, Apache Pig hero". Since I've encountered a lot of insightful and very interesting information I've decided to describe some of those in this post.

  • nimbus will work in HA mode - There's a pull request open for it already... but some recent work (distributing topology files via Bittorrent) will greatly simplify the implementation. Once the Bittorrent work is done we'll look at reworking the HA pull request. (storm’s pull request)

  • pig on storm - Pig on Trident would be a cool and welcome project. Join and groupBy have very clear semantics there, as those concepts exist directly in Trident. The extensions needed to Pig are the concept of incremental, persistent state across batches (mirroring those concepts in Trident). You can read a complete proposal.

  • implementing topologies in pure python with petrel looks like this:

class Bolt(storm.BasicBolt):
    def initialize(self, conf, context):
       ''' This method executed only once '''
        storm.log('initializing bolt')

    def process(self, tup):
       ''' This method executed every time a new tuple arrived '''       
       msg = tup.values[0]
       storm.log('Got tuple %s' %msg)

if __name__ == "__main__":
    Bolt().run()
  • Fliptop is happy with storm - see their presentation here

  • topology metrics in 0.9.0: The new metrics feature allows you to collect arbitrarily custom metrics over fixed windows. Those metrics are exported to a metrics stream that you can consume by implementing IMetricsConsumer and configure with Config.java#L473. Use TopologyContext#registerMetric to register new metrics.

  • storm vs flume - some users' point of view: I use Storm and Flume and find that they are better at different things - it really depends on your use case as to which one is better suited. First and foremost, they were originally designed to do different things: Flume is a reliable service for collecting, aggregating, and moving large amounts of data from source to destination (e.g. log data from many web servers to HDFS). Storm is more for real-time computation (e.g. streaming analytics) where you analyse data in flight and don't necessarily land it anywhere. Having said that, Storm is also fault-tolerant and can write to external data stores (e.g. HBase) and you can do real-time computation in Flume (using interceptors)

That's all for this day - however, I'll keep on reading through storm-users, so watch this space for more info on storm development.

I've been reading through storm-users Google Group recently. This resolution was heavily inspired by Adam Kawa's post "Football zero, Apache Pig hero". Since I've encountered a lot of insightful and very interesting information I've decided to describe some of those in this post.

  • nimbus will work in HA mode - There's a pull request open for it already... but some recent work (distributing topology files via Bittorrent) will greatly simplify the implementation. Once the Bittorrent work is done we'll look at reworking the HA pull request. (storm’s pull request)

  • pig on storm - Pig on Trident would be a cool and welcome project. Join and groupBy have very clear semantics there, as those concepts exist directly in Trident. The extensions needed to Pig are the concept of incremental, persistent state across batches (mirroring those concepts in Trident). You can read a complete proposal.

  • implementing topologies in pure python with petrel looks like this:

class Bolt(storm.BasicBolt):
    def initialize(self, conf, context):
       ''' This method executed only once '''
        storm.log('initializing bolt')

    def process(self, tup):
       ''' This method executed every time a new tuple arrived '''       
       msg = tup.values[0]
       storm.log('Got tuple %s' %msg)

if __name__ == "__main__":
    Bolt().run()
  • Fliptop is happy with storm - see their presentation here

  • topology metrics in 0.9.0: The new metrics feature allows you to collect arbitrarily custom metrics over fixed windows. Those metrics are exported to a metrics stream that you can consume by implementing IMetricsConsumer and configure with Config.java#L473. Use TopologyContext#registerMetric to register new metrics.

  • storm vs flume - some users' point of view: I use Storm and Flume and find that they are better at different things - it really depends on your use case as to which one is better suited. First and foremost, they were originally designed to do different things: Flume is a reliable service for collecting, aggregating, and moving large amounts of data from source to destination (e.g. log data from many web servers to HDFS). Storm is more for real-time computation (e.g. streaming analytics) where you analyse data in flight and don't necessarily land it anywhere. Having said that, Storm is also fault-tolerant and can write to external data stores (e.g. HBase) and you can do real-time computation in Flume (using interceptors)

That's all for this day - however, I'll keep on reading through storm-users, so watch this space for more info on storm development.