Streaming data processing is yet another interesting topic in data science. In this article, we will walk through the integration of Spark streaming, Kafka streaming, and Schema registry for the purpose of communicating Avro-format messages. Spark, Kafka and Zookeeper are running on a single machine (standalone cluster). The actual configurations of Spark, Kafka, and Zookeeper are to some extend irrelevant to this integration.
All implementation can be found from my code repository bigdata ETL - kafka streaming and bigdata ETL - spark streaming.
NOTE: implementation in this post is no loger maintained. Please check the second and third part for more detailed implementation.
We need Spark, Kafka, Zookeeper, and schema registry server installed before running through the actual integration. The following installation guide targets OsX and will produce a standalone cluster.
First, brew needs to be installed on OsX
/usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"
Install Kafka via brew. Zookeeper comes as a extra as it is part of this Kafka distribution for OsX
brew install kafka
Spark can be ‘installed’ by downloading the distribution from Spark download page.
Schema registry from confluent.io will be ‘installed’ by downloading the distribution from confluent.io installation.
Install gradle in order to compile the project codes written in Java
brew install gradle
Kafka can operate in Command-Line-Interface (CLI) mode where messages can be produced and consumed by Kafka via Kafka shell command.
Kafka won’t work without Zookeeper, so first Zookeeper needs to be started
zkserver start
Start Kafka server
kafka-server-start /usr/local/etc/kafka/server.properties
Create a topic, zookeeper URL, partition, and replication factor need to be given as input parameters. partition defines the number of Kafka brokers (Kafka servers) and replication factor defines how many times each message will be replicated. We use default value partition=1
and replication-factor=1
running as standalone cluster.
kafka-topics --zookeeper localhost:2181 --create --topic test --partition 1 --replication-factor 1
Start a Kafka producer to generate messages to the queue, broker server url need to be specified
kafka-console-producer --topic test --broker-list localhost:9092
Type in messages one line by another in console or send a file to Kafka stream
kafka-console-producer --topic test --broker-list localhost:9092 < test.csv
Start Kafka consumer after which messages being sent to Kafka stream will be consumed and printed to the screen
kafka-console-consumer --zookeeper localhost:2181 --topic test
The following Kafka and Spark versions are compatible and should work together. In short, Kafka version 0.10.1.0 should be paired with spark version 1.6.2. In practice, the following gradle dependencies need to be added to gradle build script.
compile( 'org.apache.kafka:kafka-clients:0.10.1.0' )
compile( 'org.apache.kafka:kafka_2.10:0.10.1.0' )
compile( 'org.apache.spark:spark-core_2.10:1.6.2')
compile( 'org.apache.spark:spark-streaming_2.10:1.6.2')
compile( 'org.apache.spark:spark-streaming-kafka_2.10:1.6.2' )
Write up the code in Java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("test", "value-" + i);
producer.send(record);
}
producer.close();
The will generate to a Kafka test topic 100 lines of records.
Code should be compiled with gradle. First, generate a gradle wrapper
gradle wrapper
Add the following line to specify which class will be run by gradle
mainClassName = 'streaming.KafkaCustomerProducer'
Compile the code with gradle wrapper
./gradlew build
The most straight forward way is to run by gradle wrapper
./gradlew run
As an alternative, build a fatJar by adding the following stuffs to build.gradle
task fatJar(type: Jar){
zip64 true
description = "Assembles a Hadoop ready fat jar file"
baseName = project.name + '-all'
doFirst {
from {
configurations.compile.collect { it.isDirectory() ? it : zipTree(it) }
}
}
manifest {
attributes( "Main-Class": "${archivesBaseName}/${mainClassName}")
}
exclude 'META-INF/*.RSA','META-INF/*.SF','META-INF/*.DSA'
with jar
}
With all dependencies compiled to a fatJar, the package can be submited to Spark engine
./gradlew fatJar
spark-submit --class streaming.KafkaCustomerProducer streaming.jar
While the code is running, execute a Kafka consumer eating messages from topic test.
kafka-console-consumer --zookeeper localhost:2181 --topic test
Records streamed from Kafka Java producer will be received and printed out directly to the terminal.
Write up code in Java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "mygroup");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test"));
boolean running = true;
while (running) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
consumer.close();
Add the following line to specify which class will be run by gradle
mainClassName = 'streaming.KafkaCustomerConsumer'
While the Java Kafka consumer is running, execute a Kafka producer from command line
kafka-console-producer --topic test --broker-list localhost:9092
or
kafka-console-producer --topic test --broker-list localhost:9092 < test.csv
messages populated to Kafka topic test will be consumed and printed out to the terminal.
Complete code can be found from KafkaAvroProducer.java
Add twitter bijection dependencies to work with Avro format
compile( 'org.apache.avro:avro:1.8.0' )
compile( 'com.twitter:bijection-avro_2.10:0.9.2' )
and import packages
import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
Schema is defined as a JSON string
public class SchemaDefinition{
public static final String AVRO_SCHEMA_testout =
"{"
+ "\"type\":\"record\","
+ "\"name\":\"testout\","
+ "\"fields\":["
+ " {\"name\":\"testout_date\",\"type\":\"string\" },"
+ " {\"name\":\"testout_time\",\"type\":\"string\" },"
+ " {\"name\":\"testout_name\",\"type\":\"string\" },"
+ " {\"name\":\"testout_address\",\"type\":\"string\" },"
+ " {\"name\":\"testout_country\",\"type\":\"string\" },"
+ " {\"name\":\"testout_info_6\",\"type\":\"string\" },"
+ " {\"name\":\"testout_info_7\",\"type\":\"string\" },"
+ " {\"name\":\"testout_info_8\",\"type\":\"string\" },"
+ " {\"name\":\"testout_info_9\",\"type\":\"string\" },"
+ " {\"name\":\"testout_info_0\",\"type\":\"string\" }"
+ "]}";
public static final String AVRO_SCHEMA_test =
"{"
+ "\"type\":\"record\","
+ "\"name\":\"test\","
+ "\"fields\":["
+ " {\"name\":\"date\",\"type\":\"string\" },"
+ " {\"name\":\"time\",\"type\":\"string\" },"
+ " {\"name\":\"name\",\"type\":\"string\" },"
+ " {\"name\":\"address\",\"type\":\"string\" },"
+ " {\"name\":\"country\",\"type\":\"string\" },"
+ " {\"name\":\"info_6\",\"type\":\"string\" },"
+ " {\"name\":\"info_7\",\"type\":\"string\" },"
+ " {\"name\":\"info_8\",\"type\":\"string\" },"
+ " {\"name\":\"info_9\",\"type\":\"string\" },"
+ " {\"name\":\"info_0\",\"type\":\"string\" }"
+ "]}";
}
Define Avro schema parser by twitter bijection API
Schema.Parser parser = new Schema.Parser();
Schema schema = null;
if (operation == PRODtest){
schema = parser.parse(SchemaDefinition.AVRO_SCHEMA_test);
topic = "test";
}
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
Read in a CSV file and for each line in the file parse all fields according to the schema defineition. After that, build a Avro message, encode the message as a byte string, and put the byte string to kafka topic.
reader = new CSVReader( new FileReader(inputFilename) );
String[] line;
int messageCount = 0;
while ( (line = reader.readNext()) != null ){
messageCount ++;
long startTime = System.currentTimeMillis();
GenericData.Record avroRecord = new GenericData.Record(schema);
for (int i = 0; i < line.length; i++){
avroRecord.put(i,line[i]);
}
byte[] bytes = recordInjection.apply(avroRecord);
ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, bytes);
if ( syncFlag ){
try{
producer.send(record).get();
System.out.println("MESSAGE(" + messageCount + ")");
} catch (InterruptedException | ExecutionException ex){
ex.printStackTrace();
}
} else {
producer.send(record, new KafkaAvroProducerCallback(Arrays.toString(line), messageCount, startTime));
}
Thread.sleep(250);
}
Add the main class information into gradle build script
mainClassName = 'streaming.KafkaAvroProducer'
Run the Kafka Avro producer. While running the producer, execute a Kafka CLI consumer to eat messages from a Kafka topic test
kafka-console-consumer --zookeeper localhost:2181 --topic test
The following message are sent as Kafka Avro messages
date_1,time_1,name_1,address_1,time_1,info_6_1,info_7_1,info_8_1,info_9_1,info_0_1
date_2,time_2,name_2,address_2,time_2,info_6_2,info_7_2,info_8_2,info_9_2,info_0_2
...
The Kafka consumer will print out the following messages. They are not the same as input messages as they are Avro messages in byte stream
date_1
time_1
name_1address_1
time_1info_6_1info_7_1info_8_1info_9_1info_0_1
date_2
time_2
name_2address_2
time_2info_6_2info_7_2info_8_2info_9_2info_0_2
...
The implementation described in the previous section makes sure messages are in Avro format before sending into Kafka topic. In this section, we introduce a consumer as implemented in Spark Streaming framework. The consumer will eat Avro message produced by Kafka Avro producer and process the Avro message via Spark Streaming map reduce operations.
It’s good to point out here the difference between Spark streaming processing/transformation and Kafka processing/transformation is that Spark engine essentially works on a stream of RDDs which are created in a certain time window/interval.
Complete code can be found from SparkKafkaConsumer.java
Initialize Spark streaming context in which time window to generate RDDs is specified in Duration.seconds()
SparkConf sparkConf = new SparkConf()
.setAppName("GFM-Spark-Consumer")
.setMaster("local[*]");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(batchSize));
Initialize Spark Stream connector connecting to Kafka topic
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("zookeeper.connect", zookeeperURL);
kafkaParams.put("group.id", groupName);
JavaPairReceiverInputDStream<String, byte[]> messages = KafkaUtils.createStream(
jssc,
String.class,
byte[].class,
StringDecoder.class,
DefaultDecoder.class,
kafkaParams,
topicMap,
StorageLevel.MEMORY_AND_DISK_SER());
Process the Spark Stream via Spark map reduce operations
if (operation == CONStest){
JavaDStream<String> lines = messages.map( new MapperTestToTestout() );
lines.print();
}
Notice that the actual mapper function is implemented via a customized function.
The implementation of the mapper function is as follows
public class MapperTestToTestout implements Function<Tuple2<String, byte[]>, String> {
private static Injection<GenericRecord, byte[]> testInjection;
static{
Schema.Parser parserTest = new Schema.Parser();
Schema schemaTest = parserTest.parse(SchemaDefinition.AVRO_SCHEMA_test);
testInjection = GenericAvroCodecs.toBinary(schemaTest);
}
private static final long serialVersionUID = 1L;
@Override
public String call(Tuple2<String, byte[]> tuple2) {
// output: definition of Testout in Avro
Injection<GenericRecord, byte[]> testoutInjection;
Schema.Parser parserTestout = new Schema.Parser();
Schema schemaTestout = parserTestout.parse(SchemaDefinition.AVRO_SCHEMA_testout);
testoutInjection = GenericAvroCodecs.toBinary(schemaTestout);
GenericData.Record avroRecordTestout = new GenericData.Record(schemaTestout);
// input: Avro message
GenericRecord avroRecordInput = testInjection.invert(tuple2._2()).get();
avroRecordTestout.put("date",avroRecordInput.get("out_date"));
avroRecordTestout.put("time",avroRecordInput.get("out_time"));
return avroRecordTestout.toString();
}
}
The decoding from byte array to Avro message is through invert()
from bijection package.
As the function call (mapper) will be executed on every worker, variables used in this mapper call need to be defined as static variables and initialized via Java static initialization
private static Injection<GenericRecord, byte[]> testInjection;
static{
Schema.Parser parserTest = new Schema.Parser();
Schema schemaTest = parserTest.parse(SchemaDefinition.AVRO_SCHEMA_test);
testInjection = GenericAvroCodecs.toBinary(schemaTest);
}
Compile and run the Spark Kafka consumer. While the Kafka Avro Producer is being executed via
spark-submit --class streaming.KafkaAvroProducer build/libs/streaming-all.jar
start the Spark stream Avro consumer
spark-submit --class streaming.SparkKafkaConsumer build/libs/streaming-all.jar
Complete code for schema registry CLI operations can be found from Code repository.
Schema registry can be obtained from confluent.io by downloading the distribution. Then, start schema registry server
cd confluent-3.0.0/bin/
# start registry server
schema-registry-start ../etc/schema-registry/schema-registry.properties
For test integration, we disable schema registry backwards compatibility checking
# disable compatibility check
curl -X PUT http://localhost:8081/config \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "NONE"}'
Define new schemas as a JSON string in a shell script. Schema test and testout are defined as follows
schemaTest='{"schema":'\
' "{\"type\":\"record\",'\
' \"name\":\"test\",'\
' \"fields\":['\
' {\"name\":\"date\",\"type\":\"string\" },'\
' {\"name\":\"time\",\"type\":\"string\" },'\
' {\"name\":\"name\",\"type\":\"string\" },'\
' {\"name\":\"address\",\"type\":\"string\" },'\
' {\"name\":\"country\",\"type\":\"string\" },'\
' {\"name\":\"info_6\",\"type\":\"string\" },'\
' {\"name\":\"info_7\",\"type\":\"string\" },'\
' {\"name\":\"info_8\",\"type\":\"string\" },'\
' {\"name\":\"info_9\",\"type\":\"string\" },'\
' {\"name\":\"info_0\",\"type\":\"string\" }]}"}'
schemaTestout='{"schema":'\
' "{\"type\":\"record\",'\
' \"name\":\"testout\",'\
' \"fields\":['\
' {\"name\":\"testout_date\",\"type\":\"string\" },'\
' {\"name\":\"testout_time\",\"type\":\"string\" },'\
' {\"name\":\"testout_name\",\"type\":\"string\" },'\
' {\"name\":\"testout_address\",\"type\":\"string\" },'\
' {\"name\":\"testout_country\",\"type\":\"string\" },'\
' {\"name\":\"testout_info_6\",\"type\":\"string\" },'\
' {\"name\":\"testout_info_7\",\"type\":\"string\" },'\
' {\"name\":\"testout_info_8\",\"type\":\"string\" },'\
' {\"name\":\"testout_info_9\",\"type\":\"string\" },'\
' {\"name\":\"testout_info_0\",\"type\":\"string\" }]}"}'
Via a shell command, register new schemas to the schema registry server
# register 'testout' schema
curl -X POST http://localhost:8081/subjects/schemaTestout/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data "${schemaTestout}"
# register 'test' schema
curl -X POST http://localhost:8081/subjects/schemaTest/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data "${schemaTest}"
Retrieve registered schemas from registry
# retrive schema: schemaTest with the latest version
curl -X GET -i http://localhost:8081/subjects/schemaTest/versions/latest
# retrive schema: schemaTestout with the latest version
curl -X GET -i http://localhost:8081/subjects/schemaTestout/versions/latest
The purpose of schema registry is to automatically check/encode/decode schema while producing or consuming messages from Kafka stream. In particular, we will NOT send a byte string (which encodes an actual Avro message) when we send Avro message to a Kafka stream. Instread we send directly a Avro message contain both schema and actual data. Similarly, when consuming a Avro message from a Kafka stream, we will consume directly the Avro message which is decoded by Schema registry and there is no need to do the decoding in the actual code. The encoding in producer and decoding in consumer are both automatical and hidden from user.
Producer code
The complete producer code that reads messages from a CSV file and sends Avro messages to a stream while acknowledging schema registry can be checked from Code Repository.
The key here is to add key and value serializer configuration in Kafka producer configuration as the following Java code
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
which will make sure Avro messages when being sent will be properly encoded and serialized.
The actual producer code is rather straight forward as the following Java code
for (int i = 0; i < line.length; i++){
avroRecord.put(i,line[i]);
}
ProducerRecord record = new ProducerRecord<Object, Object>(topic, "key", avroRecord);
try{
producer.send(record, new KafkaRegistrySerializerCallback("", messageCount, startTime));
}catch(SerializationException ex){
}
which will prepare a Avro message and send via a simple Kafka producer.
If you are checking the schema registry server while running this code, you will notice there are rest calls to the server which will register the schema to the registry server.
Consumer code
The consumer code will initialize a Spark-Streaming conumser which reads from the Avro stream as produced above and processes the message inside the Spark Stream. In particular, this spark consumer will read directly Avro message the decoding is automatic through schema registry server. This approach is quite different from the previous case where we read byte message and decode into Avro. The full code can be found from Code Repository.
The key is to define a proper Kafka stream inside the Spark streaming as in the following Java code
kafkaMSG = KafkaUtils.createStream(
jssc,
String.class,
GenericRecord.class,
StringDecoder.class,
KafkaAvroDecoder.class,
kafkaParams,
topicMap,
StorageLevel.MEMORY_AND_DISK_SER());
in which the value type field is GenericRecord.clas and decode class is KafkaAvroDecoder.class.
Then where is a Spark Stream mapping function generating the actual Avro message out from the key-value pair as follows
JavaDStream<GenericRecord> avroInMSG = kafkaMSG.map(
new Function<Tuple2<String, GenericRecord>,GenericRecord >(){
@Override
public GenericRecord call(Tuple2<String, GenericRecord> tuple2) throws Exception{
return tuple2._2();
}
});
We have been discussing briefly the integration of Kafka Streaming, Spark Streaming, and Schema Registry Server. The goal is to be able to produce and consumer Avro message from/to Kafka streaming and Spark streaming in a native way. Code examples in Java are provided as in Code Repository.
Hongyu Su 06 January 2017 Helsinki