Sunday, May 6, 2018

Kafka Installation and basics

installation done from:

https://hevodata.com/blog/how-to-set-up-kafka-on-ubuntu-16-04/

testing installation:
sudo /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties


Create Topic in kafka:

/opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

List topics in kafka:

/opt/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181

publishing messages in kafka:

/opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

helloword!

subscriber:(recieving it)

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

helloword!

sudo systemctl enable kafka.service

Saturday, April 28, 2018

Creating Data streaming in Spark from cassandra tables

Cassandra side Changes:

CREATE KEYSPACE vision_app WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'datacenter1' : 2 };

USE vision_app;

CREATE TABLE TEST_CASSANDRA
               (
                id                          int ,
                 SPONSOR_FILE_NO             int,
                 STREET                      varchar,
                 POBOX                       varchar,
                 WORK_PHONE                  varchar,
                 IS_ACCOMPANIED              int,
                PRIMARY KEY ((id))
               );

CREATE TABLE TEST_CAS (id int,address varchar,primary key ((id)))

ALTER KEYSPACE vision_app WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1 };  (altering keyspace aa we have single node)


Spark Side changes:

step:1.Create Sqlcontext (create sqlcontext  which helps to query)

import org.apache.spark.SparkContext
val sc = SparkContext.getOrCreate()      (not needed if we are creating table or temp view in spark)

import org.apache.spark.sql.SQLContext    (not needed if we are creating table or temp view in spark)
val sqlContext = new SQLContext(sc)

step:2 above step is for  reading the cassandra table
val vision_app = spark.read.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "test_cas", "keyspace" -> "vision_app"))
.load()

--above step is for  reading the cassandra table

Step:3 load record in spark(table or temp view) from cassandra

:paste
val createDDL = """CREATE  table test_cas
     USING org.apache.spark.sql.cassandra
     OPTIONS (
     table "test_cas",
     keyspace "vision_app",
         pushdown "true")"""
spark.sql(createDDL) // Creates Catalog Entry registering an existing Cassandra Table


:paste
val createDDL = """CREATE  table test_cassandra
     USING org.apache.spark.sql.cassandra
     OPTIONS (
     table "test_cassandra",
     keyspace "vision_app",
         pushdown "true")"""
spark.sql(createDDL) // Creates Catalog Entry registering an existing Cassandra Table

Step 4:Read the data in spark

spark.sql("SELECT * FROM test_cas").show

spark.sql("SELECT * FROM test_cassandra").show


Step5: Joining the two tables

spark.sql("SELECT * FROM test_cassandra ip,test_cas iv where ip.id=iv.id").show

Tuesday, February 20, 2018

Initial Load to Load Data into Kafka by Golden Gate(Big Data)

1.On source server we create the initial data extract process

Log on source server as oracle and go to Oracle Golde Gate directory and start golden gate:
$ ./ggsci
Check out the manager process. If it’s down then start it up:
ggsci > info mgr
ggsci > start mgr

Then we create the process:
ggsci> add extract initext, sourceistable
This is an initial load and the data is extracted once. At this point there are no transactions performed on this table. The data is extracted directly from the table, not from the redo logs.

2.On source server we create a parameter file for the extract process initext.

ggsci > edit params load1
then we add the following parameters:
EXTRACT initext
--SOURCEISTABLE
USERID ggs_admin, PASSWORD ggs_admin
RMTHOST 172.17.0.2, MGRPORT 7809
--RMTTASK REPLICAT, GROUP initrep
RMTFILE /u01/t2, MEGABYTES 100
TABLE scott.test;

Where:
  • extract is the extract process created previously: initext
  • userid/password of the Oracle Golden Gate user on source database: ggs_admin/ggs_admin
  • rmthost/mgrport is the name of the target server/port number 
  • table is the table we created to be processed
Save the parameter file created above.

3. On target server we create the initial load process initrep.

Log on target server tgtdb as oracle and create the process for loading the data:
$ ./ggsci
ggsci > info mgr
if manager is down we start it up.
ggsci > add replicat initrep, exttrail /u01/t2

ggsci > edit params initrep
Then we add the following parameters:
REPLICAT initrep
--SPECIALRUN
--END RUNTIME
TARGETDB LIBFILE /u01/ggadapter/libggjava.so SET property=/u01/ggadapter/dirprm/kafka_init.props
--EXTFILE /u01/t2
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
SOURCEDEFS ./dirsql/SOURCE_DEFGEN.def
MAP scott.test, TARGET myschematopic.test1;

Save the parameter file created above.
Note that on both servers: source and target the parameter files created above reside on folder:
$ <ORACLE_GOLDEN_GATE_HOME>/dirprm
In this case:
$ /db0/oracle/product/12.1.2.1/ogg_1/dirprm

4.Start the extract and loading processes

Log on source server as oracle and run the Oracle Golden Gate interface:
$ ./ggsci
ggsci > start extract initext
Sending START request to MANAGER ...
EXTRACT initext starting
***********************************************************************
* ** Run Time Statistics ** *
***********************************************************************

Report at 2018-02-20 07:34:17 (activity since 2018-02-20 07:34:10)
Output to /u01/t2:
From Table SCOTT.TEST:
# inserts: 1000002
# updates: 0
# deletes: 0
# discards: 0

REDO Log Statistics
Bytes parsed 0
Bytes output 83889477

5.Start the replicat and loading processes.


Log on target server as oracle and run the Oracle Golden Gate interface:
$ ./ggsci
ggsci > start replicat  initrep
GGSCI (5a077cd21ca2) 33> start replicat initrep
Sending START request to MANAGER ...
REPLICAT INITREP starting

GGSCI (5a077cd21ca2) 34> info replicat
ERROR: You must specify a group name.
GGSCI (5a077cd21ca2) 35> info initrep
REPLICAT INITREP Last Started 2018-02-20 08:16 Status RUNNING
Checkpoint Lag 00:42:42 (updated 00:00:00 ago)
Process ID 29424
Log Read Checkpoint File /u01/t2000001
2018-02-20 07:34:16.274206 RBA 32650606

GGSCI (5a077cd21ca2) 36> stats initrep
Sending STATS request to REPLICAT INITREP ...
Start of Statistics at 2018-02-20 08:17:04.
Replicating from SCOTT.TEST to myschematopic.test1:
*** Total statistics since 2018-02-20 08:16:49 ***
Total inserts 643002.00
Total updates 0.00
Total deletes 0.00
Total discards 0.00
Total operations 643002.00
*** Daily statistics since 2018-02-20 08:16:49 ***
Total inserts 643002.00
Total updates 0.00
Total deletes 0.00
Total discards 0.00
Total operations 643002.00
*** Hourly statistics since 2018-02-20 08:16:49 ***
Total inserts 643002.00
Total updates 0.00
Total deletes 0.00
Total discards 0.00
Total operations 643002.00
*** Latest statistics since 2018-02-20 08:16:49 ***
Total inserts 643002.00
Total updates 0.00
Total deletes 0.00
Total discards 0.00
Total operations 643002.00
End of Statistics.