Thursday, November 2, 2017

Oracle GoldenGate for Big Data and Adapter for Kafka

Before installing and running Oracle GoldenGate for Java, you must install Java (JDK or JRE) version 1.8 or later.

Installation of Oracle GoldenGate for Big Data:

  1. Create an installation directory that has no spaces in its name. Then extract the ZIP file into this new installation directory. For example:
    Shell> mkdir installation_directory
    Shell> cp path/to/installation_zip installation_directory
    Shell> cd installation_directory
    Shell> unzip installation_zip
    If you are on Linux or UNIX, run:
    Shell> tar -xf installation_tar
  2. Stay on the installation directory and bring up GGSCI to create the remaining subdirectories in the installation location.
    Shell> ggsci
    GGSCI> CREATE SUBDIRS
  3. Create a Manager parameter file:
    GGSCI> EDIT PARAM MGR
  4. Specify a port for the Manager to listen on by using the editor to add a line to the Manager parameter file. For example:
    PORT 7801
  5. if you are on Windows and running Manager as a service, set the system variable PATH to include jvm.dll, then delete the Manager service and re-add it.
  6. Go to GGSCI, start the Manager, and check to see that it is running:
    GGSCI>START MGR
    GGSCI>INFO MGR
Note:
To check for environmental variable problems locating the JVM at runtime:
Add the parameter GETENV(PATH) for Windows or GETENV(LD_LIBRARY_PATH) for UNIX to the Replicat parameter file.
Start the Replicat process
Check the output for the report using the GGSCI command: SEND REPLICAT group_name REPORT
Having installed OGG for Big Datawe need to setup the Kafka adapter. As for other adapters,
we are copying the configuration files from $OGG_HOME/AdapterExamples/big-data directory
We need to adjust our kafka.props file to define Kafka/Zookeper topics for data and
schema changes (TopicName and SchemaTopicName parameters), and the gg.classpath for Kafka and Avro java classes.
Configuring Oracle GoldenGate to send transactions to the Connect API in Kafka:
Adapter Khafka:
There will be three files important in khafka adapter
1.custom_kafka_producer.properties
2.kafka.props
3.rkafka.prm
Above there files needs to be added in the /u05/ggadapter/dirprm

Kafka Connect settingsEdit the existing /u05/ggadapter/dirprm/custom_kafka_producer.properties
custom_kafka_producer.properties:

custom_kafka_producer.properties
bootstrap.servers=10.152.22.182:9092
acks=1
compression.type=gzip
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
# 100KB per partition
batch.size=1
linger.ms=10000
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter

Handler configurationEdit the existing /u05/ggadapter/dirprm/conf.props and amend gg.classpath as shown below.
kafka.props:

Kafka.props
gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
#gg.handler.kafkahandler.sourceRecordGeneratorClass=oracle.goldengate.kafkaconnect.DefaultSourceRecordGenerator
gg.handler.kafkahandler.TopicName=elastic_test
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic
gg.handler.kafkahandler.BlockingSend=false
#gg.handler.kafkahandler.includeTokens=false
#gg.handler.kafkahandler.topicPartitioning=table
gg.handler.kafkahandler.mode=tx
#gg.handler.kafkahandler.maxGroupSize =100, 1Mb
#gg.handler.kafkahandler.minGroupSize =50, 500Kb
 
#gg.handler.kafkahandler.format=oracle.goldengate.kafkaconnect.formatter.KafkaConnectFormatter
#gg.handler.kafkahandler.format.insertOpKey=I
#gg.handler.kafkahandler.format.updateOpKey=U
#gg.handler.kafkahandler.format.deleteOpKey=D
#gg.handler.kafkahandler.format.treatAllColumnsAsStrings=false
#gg.handler.kafkahandler.format.iso8601Format=false
#gg.handler.kafkahandler.format.pkUpdateHandling=abend
goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
auto.create.topics.enable=true
gg.log=log4j
#gg.log.level=DEBUG
gg.log.level=INFO
gg.report.time=30sec
#Sample gg.classpath for Apache Kafka
gg.classpath=dirprm:/u05/ggadapter/kafka/libs/*
#Sample gg.classpath for HDP
#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/*
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=/u05/ggadapter/ggjava/ggjava.jar


3.Replicat parametersCreate /u05/ggadapter/dirprm/rconf.prm with the following contents:
rkafka.prm:
rkafka.prm
REPLICAT rkafka
-- Trail file for this example is located in "AdapterExamples/trail" directory
-- Command to add REPLICAT
-- add replicat rkafka, exttrail AdapterExamples/u05/extract1/e1
TARGETDB LIBFILE /u05/ggadapter/libggjava.so SET property=/u05/ggadapter/dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 1000
SOURCEDEFS ./dirsql/SOURCE_DEFGEN.def
--ASSUMETARGETDEFS
MAP vision_app.*, TARGET mySchemaTopic.*;

We have the source as  table  in oracle and our  destination will be different  which we  say as topics in khafka so we need to configure SOURCEDEF file.
Below are steps to create sourcedef file

create SOURCEDEF file using Goldengate DEFGEN utility

Data definitions are needed when the source and target tables have different definitions or the databases are of different types.
Perform below steps on the SOURCE database from which you want to obtain metadata definitions.
From the Oracle GoldenGate directory, run GGSCI.
Defgen
 1.In GGSCI, issue the following command to create a DEFGEN parameter file.
                     ggsci > Edit params defgen
                     defsfile ./dirsql/SOURCE_DEFGEN.def
                     userid gguser password ******
                     TABLE schema.*
 2.Enter the parameters listed above in the order shown and invoke DEFGEN from GG_HOME location 
                   $ ./defgen paramfile ./dirprm/defgen.prm
 3.Specify this copied SOURCE_DEFGEN.def file location in REPLICAT param file(rkafka.prm)
                   ex: SOURCEDEFS ./dirsql/SOURCE_DEFGEN.def
For every DDL change happen to the source tables (only tables which are captured for replication)  generate a NEW sourcedef file and copied to the target server

Testing the Replication:

Connect to Oracle and insert a row, not forgetting to commit the transaction
oracle DML in table(source)
SQL> insert into elastic_test values(2, 'Francis''Francis');
 
1 row created.
 
SQL> insert into elastic_test values (3, 'Ashfak''Ashfak');
 
1 row created.
 
SQL> commit;
 
Commit complete.
 
SQL> update elastic_test set test_name='updatefrancis' where test_id=2;
 
1 row updated.
 
SQL> commit;
 
Commit complete.
 
SQL> delete from elastic_test where test_id=2;
 
1 row deleted.
 
SQL> commit;
 
Commit complete.
 
SQL> update elastic_test set test_id=4 where test_id=3;
 
1 row updated.
 
SQL> commit;
 
Commit complete.
Now if you list the topics defined within Kafka, you should see a new one has been created, for the  elastic_test table:
kafka output (destination)
{"table":"MYSCHEMATOPIC.ELASTIC_TEST","op_type":"I","op_ts":"2017-10-22 11:28:51.000000","current_ts":"2017-10-24T12:50:00.660000","pos":"00000000030000001091","after":{"TEST_ID":1,"TEST_NAME":"Suhail","TEST_NAME_AR":"Suhail"}}
{"table":"MYSCHEMATOPIC.ELASTIC_TEST","op_type":"I","op_ts":"2017-10-24 08:52:01.000000","current_ts":"2017-10-24T12:52:04.960000","pos":"00000000030000001263","after":{"TEST_ID":2,"TEST_NAME":"Francis","TEST_NAME_AR":"Francis"}}{"table":"MYSCHEMATOPIC.ELASTIC_TEST","op_type":"I","op_ts":"2017-10-24 08:52:01.000000","current_ts":"2017-10-24T12:52:04.961000","pos":"00000000030000001437","after":{"TEST_ID":3,"TEST_NAME":"Ashfak","TEST_NAME_AR":"Ashfak"}}
{"table":"MYSCHEMATOPIC.ELASTIC_TEST","op_type":"U","op_ts":"2017-10-24 08:55:04.000000","current_ts":"2017-10-24T12:55:07.252000","pos":"00000000030000001734","before":{"TEST_ID":null,"TEST_NAME":"Francis"},"after":{"TEST_ID":null,"TEST_NAME":"updatefrancis"}}
{"table":"MYSCHEMATOPIC.ELASTIC_TEST","op_type":"D","op_ts":"2017-10-24 08:56:11.000000","current_ts":"2017-10-24T12:56:14.365000","pos":"00000000030000001865","before":{"TEST_ID":2}}
{"table":"MYSCHEMATOPIC.ELASTIC_TEST","op_type":"U","op_ts":"2017-10-24 08:57:43.000000","current_ts":"2017-10-24T12:57:45.817000","pos":"00000000030000002152","before":{"TEST_ID":3},"after":{"TEST_ID":4}}
In case we dont to replicate all colums in the table to destination we can filter that in  replicat param file in source by specifying the column  name
example:
In below example we are transfering only column test_id,test_name not sending test_name_ar to  destination
param_file_source
extract extract1
SETENV (ORACLE_HOME = "/u01/app/oracle/product/11.2.0/dbhome")
SETENV (ORACLE_SID = "vapp3dev")
USERID ggs_admin, PASSWORD ggs_admin
GETUPDATEBEFORES
exttrail /u05/extract1/e1
table vision_app.elastic_test cols (test_id,test_name);