Mutterings and Musings

Samza and Jruby, or Streaming Dynamic Typing to the Masses (Pt 1)

PREFACE (AND A SHORTCUT)

If you're impatient, Elias Levy was (to my knowledge) the first to do this and make it publically available. His port used version 0.9.1 of Samza and JRuby 1.7.23 (equivalent to Ruby 1.9.3). His work provided an invaluable starting point. However, if you'd like to work with a different version of Samza or JRuby, or just generally want to have a stronger understanding of how JRuby (and other JVM-based languages) can integrate into Samza's build process, read on!

(I'm only going to as back-to-basics as the official hello-samza repository from the Samza project. The actual core of the project is elsewhere, but we're going to stick with adapting the officially-sanctioned project baseline.)

INTRO

Samza

Samza is a general-purpose distributed stream processing framework that uses queue-based message passing for communication (by default via Kafka and guarantees at-least-once delivery of messages. It stubs out pluggable functionality for message serialization/deserialization, metrics aggregation, node-local key-value storage (by default, RocksDB), and more. It's pretty open-ended in how it can be used -- basically, so long as your message can be serialized, you can do whatever you want with it. The only limits on the content or structure of the message are those imposed the JVM or the subsystems.

On the flip side, this freedom can make implementation an arduous process; since Samza is general by design, it doesn't prescribe how a system should be designed, and thus there are no real tools for topology declaration. That onus lies with the developer; I've personally found it helpful to settle on a basic message-passing structure up front (ie action/payload pairs).

JRuby

JRuby is a project to port Ruby to the JVM. Version 0.9.0 was released in 2006, and it has continued since then to be an actively maintained project with a host of contributors. There are 2 currently maintained release branches: the releases track Ruby 1.9.x, and the 9.x.x.x releases track Ruby 2.2.x. We're going with the latter (9.1.6.0) in this project.

FIRST STEPS

To start, let's pull down the latest version of hello-samza from Github into the local directory jruby-hello-samza:

$ git clone https://github.com/apache/samza-hello-samza jruby-hello-samza

Open pom.xml, the file used to coordinate Maven builds. Let's add the JRuby maven plugin to the <dependencies></dependencies> section:

org.jrubyjruby-complete9.1.6.0

Let's also add some plugins for JRuby->Java source transpilation and downloading of JRuby gems; add this w/in the <plugins></plugins> section:

de.saumya.mojojruby-maven-plugin1.1.5generate-sourcescompiletrue${jruby.generated.sources}truede.saumya.mojogem-maven-plugin1.1.5trueinitialize

While you're there, remove the org.apache.rat configuration from the <plugins> section; it's there to ensure that all source code files have a license attached, which is just going to be a bloody nuisance for our current project.

Let's also make a directory to store the Ruby source code:

$ mkdir -p src/main/ruby

and remove the Java source code and configuration files:

$ rm -r src/main/java $ rm -r src/main/config/*

At this point we can follow the instructions from the Samza project's Hello Samza documentation:

$ ./bin/grid bootstrap $ mvn clean package $ mkdir -p deploy/samza $ tar -zxf ./target/hello-samza-0.11.0-dist.tar.gz -C deploy/samza

You now have a Samza build system running the latest version of Samza with a recent version of JRuby. Those last three lines build your Samza source code and "deploy" it. You'll need to run them every time you make changes to your source or configuration files.

THERE'S NO RUBY LIKE J-RUBY

Now that we have Samza running with JRuby in tow, let's write some JRuby. We're going to start by creating a very simple task, one which will mirror the very basic elements of a Samza stream task. The purpose of this task is ludicrously simple(-minded): get a message from the Kafka input stream and write it to a file. While not in the slightest useful, it will demonstrate the minimum we need to get the two systems cooperating.

Add the following source code to a new file src/main/ruby/SourceStreamTask.rb:

require 'java'

java_package 'hello.jruby.test'

java_import 'org.apache.samza.system.IncomingMessageEnvelope' java_import 'org.apache.samza.task.MessageCollector' java_import 'org.apache.samza.task.TaskCoordinator' java_import 'org.apache.samza.task.StreamTask'

class SourceStreamTask java_implements StreamTask

java_signature 'void process(IncomingMessageEnvelope, MessageCollector, TaskCoordinator)' def process(envelope, collector, coordinator) msg = envelope.getMessage File.open("/tmp/message-stream-output.txt", "a") {|f| f.write("#{msg}
")} end end

The important elements here are:

java_package 'hello.jruby.test'

This is the full package path (important for the properties file).

java_implements StreamTask

java_signature 'void process(IncomingMessageEnvelope, MessageCollector, TaskCoordinator)'

The basic Java interface stream tasks must implement to process data. There are others; we'll get to those later.

CONFIGURATION

Each stream task needs to have a *.properties config file (Java properties file format) where to find the class, what systems it works with, etc. We see from the docs that the only truly required attributes are job.factory.class, job.name, task.class, and task.inputs, but let's fill out a few more items to demonstrate some of the basic configurability. Save the following to the file src/main/config/source-task.properties:

# Job job.factory.class=org.apache.samza.job.yarn.YarnJobFactory job.name=source-task

# YARN yarn.package.path=file://basedir/target/{project.artifactId}-${pom.version}-dist.tar.gz

# Task task.class=hello.jruby.test.SourceStreamTask task.inputs=kafka.source-input

# Serializers serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory

# Kafka System systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory systems.kafka.samza.msg.serde=string systems.kafka.consumer.zookeeper.connect=localhost:2181/ systems.kafka.producer.bootstrap.servers=localhost:9092

# Job Coordinator job.coordinator.system=kafka job.coordinator.replication.factor=1

A quick overview:

The rest of the options are system-level configuration options and can be left as-is. As you can probably see, cranking out more than a few of these properties files can be somewhat tiring; even if you reuse many of the options, this is still a lot of redundancy. Samza currently lacks a standard topology definition mechanism (a la Storm); this is by intent, as Samza aims to be a general stream processing framework (pass in anything, do anything, I don't care).

We now need to update our assembly instructions to include this properties file in the build. Open up the file at src/main/assmembly/src.xml, and find <files> within the <assembly> section. You'll see several entries for the deleted Wikipedia files; remove all of these <file> entries. Add the following to the <filesets> option group:

${basedir}/src/main/config**/*.propertiesconfigtrue

This tells the Pom assembler to read and interpret every *.properties file in that directory. The corrollary here is that every properties file in that directory will need to be valid -- ie, all of the wikpedia *.properties files (if you haven't removed them) will fail, being as we've removed all of their corresponding Java classes.

Quick overview of the other properties for this option:

COMPILING/RUNNING

We now have a valid, albeit silly, stream task that simply waits for something to come in on its input and writes that message to a file. Let's go ahead and compile it:

$ ./bin/grid stop all ## just in case $ mvn clean package $ tar -zxf ./target/hello-samza-0.11.0-dist.tar.gz -C deploy/samza $ ./bin/grid start all

Now we'll run it using Samza's run-job.sh script:

$ ./deploy/samza/bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file:///Users/user-account/hello-samza/deploy/samza/config/source-task.properties

Notice how we're using a complete path to the assembled version of the properties file, not the one we're editing (ie not in src/main/config); the variables in this one have been interpolated by Maven.

Give the task a few (maybe 10) seconds to get running. You can see the input queue for this task listed as one of the available Kafka queues:

$ ./deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --list

Let's go ahead and throw some data at it (using one of the scripts available for interaction with Kafka):

$ echo "This is a great line" | ./deploy/kafka/bin/kafka-console-producer.sh --topic source-input --broker-list localhost:9092

We can see that this was written to our output file:

$ cat /tmp/message-stream-output.txt

Hooray! A totally pointless, bare-basics demonstration of writing a Samza task using JRuby. Next, we'll actually do something useful.