Saturday, April 28, 2018

Creating Data streaming in Spark from cassandra tables

Cassandra side Changes:

CREATE KEYSPACE vision_app WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'datacenter1' : 2 };

USE vision_app;

CREATE TABLE TEST_CASSANDRA
               (
                id                          int ,
                 SPONSOR_FILE_NO             int,
                 STREET                      varchar,
                 POBOX                       varchar,
                 WORK_PHONE                  varchar,
                 IS_ACCOMPANIED              int,
                PRIMARY KEY ((id))
               );

CREATE TABLE TEST_CAS (id int,address varchar,primary key ((id)))

ALTER KEYSPACE vision_app WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1 };  (altering keyspace aa we have single node)


Spark Side changes:

step:1.Create Sqlcontext (create sqlcontext  which helps to query)

import org.apache.spark.SparkContext
val sc = SparkContext.getOrCreate()      (not needed if we are creating table or temp view in spark)

import org.apache.spark.sql.SQLContext    (not needed if we are creating table or temp view in spark)
val sqlContext = new SQLContext(sc)

step:2 above step is for  reading the cassandra table
val vision_app = spark.read.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "test_cas", "keyspace" -> "vision_app"))
.load()

--above step is for  reading the cassandra table

Step:3 load record in spark(table or temp view) from cassandra

:paste
val createDDL = """CREATE  table test_cas
     USING org.apache.spark.sql.cassandra
     OPTIONS (
     table "test_cas",
     keyspace "vision_app",
         pushdown "true")"""
spark.sql(createDDL) // Creates Catalog Entry registering an existing Cassandra Table


:paste
val createDDL = """CREATE  table test_cassandra
     USING org.apache.spark.sql.cassandra
     OPTIONS (
     table "test_cassandra",
     keyspace "vision_app",
         pushdown "true")"""
spark.sql(createDDL) // Creates Catalog Entry registering an existing Cassandra Table

Step 4:Read the data in spark

spark.sql("SELECT * FROM test_cas").show

spark.sql("SELECT * FROM test_cassandra").show


Step5: Joining the two tables

spark.sql("SELECT * FROM test_cassandra ip,test_cas iv where ip.id=iv.id").show

No comments:

Post a Comment