Apache Spark Streaming with Kafka ,Cassandra and Zookeeper

This article integrates different components of  Real time Streaming Platform involving Apache Spark. Core components of a Streaming platform are:

1. Messaging System  - Apache Kafka, Kinesis (AWS)
2. Stream processing system – Apache Storm, Apache Spark (Spark Streaming), Samza, Apache Flink and many more.
3. Storage – HDFS/S3 (raw Storage), NoSQL – Cassandra/MongoDB/HBase(includes MaprDB) and other NoSQLs. Or even In-memory Storage Systems

Each Component in the system or the Architecture should satisfy the following:

1. Distributed and Scalability
2. Fault Tolerant
3. Lower Latency / Higher Throughput
4. Integration with other components.

1. Pre-requisites for Quickstart

Apache Zookeeper (Required for Kafka)
Apache Spark
Apache Cassandra
Apache Kafka
JDK1.7
Eclipse
Maven

1.1 Environment to set

export JAVA_HOME=/home/datadotz/jdk1.7.0_45
(Note : above can be set in .bash_profile and .bashrc  too)

2. Installation of Apache Spark in a Standalone 

Kindly refer our chennaihug.org for Installation of Spark

http://chennaihug.org/knowledgebase/spark-master-and-slaves-single-node-installation/

3. Installing Apache Zookeeper (optional)

3.1 Download Apache Zookeeper from Apache site. Please check for Apache Zookeeper Website if the link is broken. Kindly check for current version in the Apache site

http://www.us.apache.org/dist/zookeeper/zookeeper-3.3.6/

a. Untar zookeeper-3.3.6.tar.gz

b. Change the directory to conf

c. Create a new file named zoo.cfg and configure as below.

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
dataDir=/home/dd/zookeeper/data/
# the port at which the clients will connect
clientPort=2181
dataLogDir=/home/dd/zookeeper/logs/

4. Installing Apache Cassandra

Kindly refer our wiki in chennaihug.org for Apache Cassandra Installation.

http://chennaihug.org/knowledgebase/cassandra-single-node-installation/

Note: Please use latest version as much as possible.

5. Installing Apache Kafka

Kindly refer our wiki in chennaihug.org for Kafka installation

http://chennaihug.org/knowledgebase/kafka-single-node-cluster/

6. Integration steps to follow

1.Create a Maven Java Project in Eclipse IDE and make use of below code. Dependencies to be represented in POM.xml and build the project. This will generate a JAR.

Scala code…

package datadotz.spark.training

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.streaming._
import com.datastax.spark.connector.SomeColumns

object KafkaSparkStreaming {
  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaStreaming     ")
      System.exit(1)
    }
    val Array(zkQuorum, group, topics, numThreads) = args

    // setup streaming context
    val sparkConf = new SparkConf().setAppName("KafkaSparkStreaming").set("spark.cassandra.connection.host", "127.0.0.1")
    val ssc = new StreamingContext(sparkConf, Seconds(20))

    val topicpMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap)
      .map(_._2)
    lines.print();
  /*   lines.foreachRDD( rdd => {
       import com.datastax.spark.connector.streaming._
    if (! rdd.isEmpty()) {
      rdd.map(line => { val arr = line.split(","); (arr(0).toInt, arr(1)) }).saveToCassandra("sparkdata", "sparktable", SomeColumns("sno", "pname"))

    }
     }
    )*/
    lines.map(line => { val arr = line.split(","); (arr(0).toInt, arr(1)) }).saveToCassandra("sparkdata", "sparktable", SomeColumns("sno", "pname"))
    ssc.start
    ssc.awaitTermination
  }

}

2.Start all the required Daemon processes

  • Spark
  • Cassandra
  • Zookeeper
  • Kafka

all_daemons

3.Create a new TOPIC in Kafka as “kafka_spark_cassandra”

kafkatopic

4.Create a required Schema in Cassandra

cassandra1

5.Submit a JAR file

spark_submit

6.Now, Start the Kafka Producer and try to give some data manually

producedata7.Now , check for the consumed data in Cassandra

finalcasssandra

As shown above, we were able to ingest data in Apache Kafka. And data in Kafka is consumed by Spark streaming. Spark processes each record and stores the same in Cassandra.

———————————-

Article written by DataDotz Team

DataDotz is a Chennai based BigData Team primarily focussed on consulting and training on technologies such as Apache Hadoop, Apache Spark , NoSQL(HBase, Cassandra, MongoDB), Search and Cloud Computing.

Note: DataDotz also provides classroom based Apache Kafka training in Chennai. The Course includes Cassandra , MongoDB, Scala and Apache Spark Training. For more details related to Apache Spark training in Chennai, please visit http://datadotz.com/training/