Sunday, May 6, 2018

Kafka Installation and basics

installation done from:

https://hevodata.com/blog/how-to-set-up-kafka-on-ubuntu-16-04/

testing installation:
sudo /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties


Create Topic in kafka:

/opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

List topics in kafka:

/opt/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181

publishing messages in kafka:

/opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

helloword!

subscriber:(recieving it)

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

helloword!

sudo systemctl enable kafka.service

Saturday, April 28, 2018

Creating Data streaming in Spark from cassandra tables

Cassandra side Changes:

CREATE KEYSPACE vision_app WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'datacenter1' : 2 };

USE vision_app;

CREATE TABLE TEST_CASSANDRA
               (
                id                          int ,
                 SPONSOR_FILE_NO             int,
                 STREET                      varchar,
                 POBOX                       varchar,
                 WORK_PHONE                  varchar,
                 IS_ACCOMPANIED              int,
                PRIMARY KEY ((id))
               );

CREATE TABLE TEST_CAS (id int,address varchar,primary key ((id)))

ALTER KEYSPACE vision_app WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1 };  (altering keyspace aa we have single node)


Spark Side changes:

step:1.Create Sqlcontext (create sqlcontext  which helps to query)

import org.apache.spark.SparkContext
val sc = SparkContext.getOrCreate()      (not needed if we are creating table or temp view in spark)

import org.apache.spark.sql.SQLContext    (not needed if we are creating table or temp view in spark)
val sqlContext = new SQLContext(sc)

step:2 above step is for  reading the cassandra table
val vision_app = spark.read.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "test_cas", "keyspace" -> "vision_app"))
.load()

--above step is for  reading the cassandra table

Step:3 load record in spark(table or temp view) from cassandra

:paste
val createDDL = """CREATE  table test_cas
     USING org.apache.spark.sql.cassandra
     OPTIONS (
     table "test_cas",
     keyspace "vision_app",
         pushdown "true")"""
spark.sql(createDDL) // Creates Catalog Entry registering an existing Cassandra Table


:paste
val createDDL = """CREATE  table test_cassandra
     USING org.apache.spark.sql.cassandra
     OPTIONS (
     table "test_cassandra",
     keyspace "vision_app",
         pushdown "true")"""
spark.sql(createDDL) // Creates Catalog Entry registering an existing Cassandra Table

Step 4:Read the data in spark

spark.sql("SELECT * FROM test_cas").show

spark.sql("SELECT * FROM test_cassandra").show


Step5: Joining the two tables

spark.sql("SELECT * FROM test_cassandra ip,test_cas iv where ip.id=iv.id").show

Tuesday, February 20, 2018

Initial Load to Load Data into Kafka by Golden Gate(Big Data)

1.On source server we create the initial data extract process

Log on source server as oracle and go to Oracle Golde Gate directory and start golden gate:
$ ./ggsci
Check out the manager process. If it’s down then start it up:
ggsci > info mgr
ggsci > start mgr

Then we create the process:
ggsci> add extract initext, sourceistable
This is an initial load and the data is extracted once. At this point there are no transactions performed on this table. The data is extracted directly from the table, not from the redo logs.

2.On source server we create a parameter file for the extract process initext.

ggsci > edit params load1
then we add the following parameters:
EXTRACT initext
--SOURCEISTABLE
USERID ggs_admin, PASSWORD ggs_admin
RMTHOST 172.17.0.2, MGRPORT 7809
--RMTTASK REPLICAT, GROUP initrep
RMTFILE /u01/t2, MEGABYTES 100
TABLE scott.test;

Where:
  • extract is the extract process created previously: initext
  • userid/password of the Oracle Golden Gate user on source database: ggs_admin/ggs_admin
  • rmthost/mgrport is the name of the target server/port number 
  • table is the table we created to be processed
Save the parameter file created above.

3. On target server we create the initial load process initrep.

Log on target server tgtdb as oracle and create the process for loading the data:
$ ./ggsci
ggsci > info mgr
if manager is down we start it up.
ggsci > add replicat initrep, exttrail /u01/t2

ggsci > edit params initrep
Then we add the following parameters:
REPLICAT initrep
--SPECIALRUN
--END RUNTIME
TARGETDB LIBFILE /u01/ggadapter/libggjava.so SET property=/u01/ggadapter/dirprm/kafka_init.props
--EXTFILE /u01/t2
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
SOURCEDEFS ./dirsql/SOURCE_DEFGEN.def
MAP scott.test, TARGET myschematopic.test1;

Save the parameter file created above.
Note that on both servers: source and target the parameter files created above reside on folder:
$ <ORACLE_GOLDEN_GATE_HOME>/dirprm
In this case:
$ /db0/oracle/product/12.1.2.1/ogg_1/dirprm

4.Start the extract and loading processes

Log on source server as oracle and run the Oracle Golden Gate interface:
$ ./ggsci
ggsci > start extract initext
Sending START request to MANAGER ...
EXTRACT initext starting
***********************************************************************
* ** Run Time Statistics ** *
***********************************************************************

Report at 2018-02-20 07:34:17 (activity since 2018-02-20 07:34:10)
Output to /u01/t2:
From Table SCOTT.TEST:
# inserts: 1000002
# updates: 0
# deletes: 0
# discards: 0

REDO Log Statistics
Bytes parsed 0
Bytes output 83889477

5.Start the replicat and loading processes.


Log on target server as oracle and run the Oracle Golden Gate interface:
$ ./ggsci
ggsci > start replicat  initrep
GGSCI (5a077cd21ca2) 33> start replicat initrep
Sending START request to MANAGER ...
REPLICAT INITREP starting

GGSCI (5a077cd21ca2) 34> info replicat
ERROR: You must specify a group name.
GGSCI (5a077cd21ca2) 35> info initrep
REPLICAT INITREP Last Started 2018-02-20 08:16 Status RUNNING
Checkpoint Lag 00:42:42 (updated 00:00:00 ago)
Process ID 29424
Log Read Checkpoint File /u01/t2000001
2018-02-20 07:34:16.274206 RBA 32650606

GGSCI (5a077cd21ca2) 36> stats initrep
Sending STATS request to REPLICAT INITREP ...
Start of Statistics at 2018-02-20 08:17:04.
Replicating from SCOTT.TEST to myschematopic.test1:
*** Total statistics since 2018-02-20 08:16:49 ***
Total inserts 643002.00
Total updates 0.00
Total deletes 0.00
Total discards 0.00
Total operations 643002.00
*** Daily statistics since 2018-02-20 08:16:49 ***
Total inserts 643002.00
Total updates 0.00
Total deletes 0.00
Total discards 0.00
Total operations 643002.00
*** Hourly statistics since 2018-02-20 08:16:49 ***
Total inserts 643002.00
Total updates 0.00
Total deletes 0.00
Total discards 0.00
Total operations 643002.00
*** Latest statistics since 2018-02-20 08:16:49 ***
Total inserts 643002.00
Total updates 0.00
Total deletes 0.00
Total discards 0.00
Total operations 643002.00
End of Statistics.





Thursday, November 2, 2017

Oracle GoldenGate for Big Data and Adapter for Kafka

Before installing and running Oracle GoldenGate for Java, you must install Java (JDK or JRE) version 1.8 or later.

Installation of Oracle GoldenGate for Big Data:

  1. Create an installation directory that has no spaces in its name. Then extract the ZIP file into this new installation directory. For example:
    Shell> mkdir installation_directory
    Shell> cp path/to/installation_zip installation_directory
    Shell> cd installation_directory
    Shell> unzip installation_zip
    If you are on Linux or UNIX, run:
    Shell> tar -xf installation_tar
  2. Stay on the installation directory and bring up GGSCI to create the remaining subdirectories in the installation location.
    Shell> ggsci
    GGSCI> CREATE SUBDIRS
  3. Create a Manager parameter file:
    GGSCI> EDIT PARAM MGR
  4. Specify a port for the Manager to listen on by using the editor to add a line to the Manager parameter file. For example:
    PORT 7801
  5. if you are on Windows and running Manager as a service, set the system variable PATH to include jvm.dll, then delete the Manager service and re-add it.
  6. Go to GGSCI, start the Manager, and check to see that it is running:
    GGSCI>START MGR
    GGSCI>INFO MGR
Note:
To check for environmental variable problems locating the JVM at runtime:
Add the parameter GETENV(PATH) for Windows or GETENV(LD_LIBRARY_PATH) for UNIX to the Replicat parameter file.
Start the Replicat process
Check the output for the report using the GGSCI command: SEND REPLICAT group_name REPORT
Having installed OGG for Big Datawe need to setup the Kafka adapter. As for other adapters,
we are copying the configuration files from $OGG_HOME/AdapterExamples/big-data directory
We need to adjust our kafka.props file to define Kafka/Zookeper topics for data and
schema changes (TopicName and SchemaTopicName parameters), and the gg.classpath for Kafka and Avro java classes.
Configuring Oracle GoldenGate to send transactions to the Connect API in Kafka:
Adapter Khafka:
There will be three files important in khafka adapter
1.custom_kafka_producer.properties
2.kafka.props
3.rkafka.prm
Above there files needs to be added in the /u05/ggadapter/dirprm

Kafka Connect settingsEdit the existing /u05/ggadapter/dirprm/custom_kafka_producer.properties
custom_kafka_producer.properties:

custom_kafka_producer.properties
bootstrap.servers=10.152.22.182:9092
acks=1
compression.type=gzip
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
# 100KB per partition
batch.size=1
linger.ms=10000
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter

Handler configurationEdit the existing /u05/ggadapter/dirprm/conf.props and amend gg.classpath as shown below.
kafka.props:

Kafka.props
gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
#gg.handler.kafkahandler.sourceRecordGeneratorClass=oracle.goldengate.kafkaconnect.DefaultSourceRecordGenerator
gg.handler.kafkahandler.TopicName=elastic_test
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic
gg.handler.kafkahandler.BlockingSend=false
#gg.handler.kafkahandler.includeTokens=false
#gg.handler.kafkahandler.topicPartitioning=table
gg.handler.kafkahandler.mode=tx
#gg.handler.kafkahandler.maxGroupSize =100, 1Mb
#gg.handler.kafkahandler.minGroupSize =50, 500Kb
 
#gg.handler.kafkahandler.format=oracle.goldengate.kafkaconnect.formatter.KafkaConnectFormatter
#gg.handler.kafkahandler.format.insertOpKey=I
#gg.handler.kafkahandler.format.updateOpKey=U
#gg.handler.kafkahandler.format.deleteOpKey=D
#gg.handler.kafkahandler.format.treatAllColumnsAsStrings=false
#gg.handler.kafkahandler.format.iso8601Format=false
#gg.handler.kafkahandler.format.pkUpdateHandling=abend
goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
auto.create.topics.enable=true
gg.log=log4j
#gg.log.level=DEBUG
gg.log.level=INFO
gg.report.time=30sec
#Sample gg.classpath for Apache Kafka
gg.classpath=dirprm:/u05/ggadapter/kafka/libs/*
#Sample gg.classpath for HDP
#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/*
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=/u05/ggadapter/ggjava/ggjava.jar


3.Replicat parametersCreate /u05/ggadapter/dirprm/rconf.prm with the following contents:
rkafka.prm:
rkafka.prm
REPLICAT rkafka
-- Trail file for this example is located in "AdapterExamples/trail" directory
-- Command to add REPLICAT
-- add replicat rkafka, exttrail AdapterExamples/u05/extract1/e1
TARGETDB LIBFILE /u05/ggadapter/libggjava.so SET property=/u05/ggadapter/dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 1000
SOURCEDEFS ./dirsql/SOURCE_DEFGEN.def
--ASSUMETARGETDEFS
MAP vision_app.*, TARGET mySchemaTopic.*;

We have the source as  table  in oracle and our  destination will be different  which we  say as topics in khafka so we need to configure SOURCEDEF file.
Below are steps to create sourcedef file

create SOURCEDEF file using Goldengate DEFGEN utility

Data definitions are needed when the source and target tables have different definitions or the databases are of different types.
Perform below steps on the SOURCE database from which you want to obtain metadata definitions.
From the Oracle GoldenGate directory, run GGSCI.
Defgen
 1.In GGSCI, issue the following command to create a DEFGEN parameter file.
                     ggsci > Edit params defgen
                     defsfile ./dirsql/SOURCE_DEFGEN.def
                     userid gguser password ******
                     TABLE schema.*
 2.Enter the parameters listed above in the order shown and invoke DEFGEN from GG_HOME location 
                   $ ./defgen paramfile ./dirprm/defgen.prm
 3.Specify this copied SOURCE_DEFGEN.def file location in REPLICAT param file(rkafka.prm)
                   ex: SOURCEDEFS ./dirsql/SOURCE_DEFGEN.def
For every DDL change happen to the source tables (only tables which are captured for replication)  generate a NEW sourcedef file and copied to the target server

Testing the Replication:

Connect to Oracle and insert a row, not forgetting to commit the transaction
oracle DML in table(source)
SQL> insert into elastic_test values(2, 'Francis''Francis');
 
1 row created.
 
SQL> insert into elastic_test values (3, 'Ashfak''Ashfak');
 
1 row created.
 
SQL> commit;
 
Commit complete.
 
SQL> update elastic_test set test_name='updatefrancis' where test_id=2;
 
1 row updated.
 
SQL> commit;
 
Commit complete.
 
SQL> delete from elastic_test where test_id=2;
 
1 row deleted.
 
SQL> commit;
 
Commit complete.
 
SQL> update elastic_test set test_id=4 where test_id=3;
 
1 row updated.
 
SQL> commit;
 
Commit complete.
Now if you list the topics defined within Kafka, you should see a new one has been created, for the  elastic_test table:
kafka output (destination)
{"table":"MYSCHEMATOPIC.ELASTIC_TEST","op_type":"I","op_ts":"2017-10-22 11:28:51.000000","current_ts":"2017-10-24T12:50:00.660000","pos":"00000000030000001091","after":{"TEST_ID":1,"TEST_NAME":"Suhail","TEST_NAME_AR":"Suhail"}}
{"table":"MYSCHEMATOPIC.ELASTIC_TEST","op_type":"I","op_ts":"2017-10-24 08:52:01.000000","current_ts":"2017-10-24T12:52:04.960000","pos":"00000000030000001263","after":{"TEST_ID":2,"TEST_NAME":"Francis","TEST_NAME_AR":"Francis"}}{"table":"MYSCHEMATOPIC.ELASTIC_TEST","op_type":"I","op_ts":"2017-10-24 08:52:01.000000","current_ts":"2017-10-24T12:52:04.961000","pos":"00000000030000001437","after":{"TEST_ID":3,"TEST_NAME":"Ashfak","TEST_NAME_AR":"Ashfak"}}
{"table":"MYSCHEMATOPIC.ELASTIC_TEST","op_type":"U","op_ts":"2017-10-24 08:55:04.000000","current_ts":"2017-10-24T12:55:07.252000","pos":"00000000030000001734","before":{"TEST_ID":null,"TEST_NAME":"Francis"},"after":{"TEST_ID":null,"TEST_NAME":"updatefrancis"}}
{"table":"MYSCHEMATOPIC.ELASTIC_TEST","op_type":"D","op_ts":"2017-10-24 08:56:11.000000","current_ts":"2017-10-24T12:56:14.365000","pos":"00000000030000001865","before":{"TEST_ID":2}}
{"table":"MYSCHEMATOPIC.ELASTIC_TEST","op_type":"U","op_ts":"2017-10-24 08:57:43.000000","current_ts":"2017-10-24T12:57:45.817000","pos":"00000000030000002152","before":{"TEST_ID":3},"after":{"TEST_ID":4}}
In case we dont to replicate all colums in the table to destination we can filter that in  replicat param file in source by specifying the column  name
example:
In below example we are transfering only column test_id,test_name not sending test_name_ar to  destination
param_file_source
extract extract1
SETENV (ORACLE_HOME = "/u01/app/oracle/product/11.2.0/dbhome")
SETENV (ORACLE_SID = "vapp3dev")
USERID ggs_admin, PASSWORD ggs_admin
GETUPDATEBEFORES
exttrail /u05/extract1/e1
table vision_app.elastic_test cols (test_id,test_name);