Cassandra side Changes:
CREATE KEYSPACE vision_app WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'datacenter1' : 2 };
USE vision_app;
CREATE TABLE TEST_CASSANDRA
(
id int ,
SPONSOR_FILE_NO int,
STREET varchar,
POBOX varchar,
WORK_PHONE varchar,
IS_ACCOMPANIED int,
PRIMARY KEY ((id))
);
CREATE TABLE TEST_CAS (id int,address varchar,primary key ((id)))
ALTER KEYSPACE vision_app WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1 }; (altering keyspace aa we have single node)
Spark Side changes:
step:1.Create Sqlcontext (create sqlcontext which helps to query)
import org.apache.spark.SparkContext
val sc = SparkContext.getOrCreate() (not needed if we are creating table or temp view in spark)
import org.apache.spark.sql.SQLContext (not needed if we are creating table or temp view in spark)
val sqlContext = new SQLContext(sc)
step:2 above step is for reading the cassandra table
val vision_app = spark.read.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "test_cas", "keyspace" -> "vision_app"))
.load()
--above step is for reading the cassandra table
Step:3 load record in spark(table or temp view) from cassandra
:paste
val createDDL = """CREATE table test_cas
USING org.apache.spark.sql.cassandra
OPTIONS (
table "test_cas",
keyspace "vision_app",
pushdown "true")"""
spark.sql(createDDL) // Creates Catalog Entry registering an existing Cassandra Table
:paste
val createDDL = """CREATE table test_cassandra
USING org.apache.spark.sql.cassandra
OPTIONS (
table "test_cassandra",
keyspace "vision_app",
pushdown "true")"""
spark.sql(createDDL) // Creates Catalog Entry registering an existing Cassandra Table
Step 4:Read the data in spark
spark.sql("SELECT * FROM test_cas").show
spark.sql("SELECT * FROM test_cassandra").show
Step5: Joining the two tables
spark.sql("SELECT * FROM test_cassandra ip,test_cas iv where ip.id=iv.id").show
CREATE KEYSPACE vision_app WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'datacenter1' : 2 };
USE vision_app;
CREATE TABLE TEST_CASSANDRA
(
id int ,
SPONSOR_FILE_NO int,
STREET varchar,
POBOX varchar,
WORK_PHONE varchar,
IS_ACCOMPANIED int,
PRIMARY KEY ((id))
);
CREATE TABLE TEST_CAS (id int,address varchar,primary key ((id)))
ALTER KEYSPACE vision_app WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1 }; (altering keyspace aa we have single node)
Spark Side changes:
step:1.Create Sqlcontext (create sqlcontext which helps to query)
import org.apache.spark.SparkContext
val sc = SparkContext.getOrCreate() (not needed if we are creating table or temp view in spark)
import org.apache.spark.sql.SQLContext (not needed if we are creating table or temp view in spark)
val sqlContext = new SQLContext(sc)
step:2 above step is for reading the cassandra table
val vision_app = spark.read.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "test_cas", "keyspace" -> "vision_app"))
.load()
--above step is for reading the cassandra table
Step:3 load record in spark(table or temp view) from cassandra
:paste
val createDDL = """CREATE table test_cas
USING org.apache.spark.sql.cassandra
OPTIONS (
table "test_cas",
keyspace "vision_app",
pushdown "true")"""
spark.sql(createDDL) // Creates Catalog Entry registering an existing Cassandra Table
:paste
val createDDL = """CREATE table test_cassandra
USING org.apache.spark.sql.cassandra
OPTIONS (
table "test_cassandra",
keyspace "vision_app",
pushdown "true")"""
spark.sql(createDDL) // Creates Catalog Entry registering an existing Cassandra Table
Step 4:Read the data in spark
spark.sql("SELECT * FROM test_cas").show
spark.sql("SELECT * FROM test_cassandra").show
Step5: Joining the two tables
spark.sql("SELECT * FROM test_cassandra ip,test_cas iv where ip.id=iv.id").show