GoldenGate 12.2 big data adapters: part 3 – Kafka

Posted in: Big Data, Hadoop, Oracle, Technical Track

This post continues my review of GoldenGate Big Data adapters started by review of HDFS and FLUME adapters. Here is list of all posts in the series:

  1. GoldenGate 12.2 Big Data Adapters: part 1 – HDFS
  2. GoldenGate 12.2 Big Data Adapters: part 2 – Flume
  3. GoldenGate 12.2 Big Data Adapters: part 3 – Kafka

In this article I will try the Kafka adapter and see how it works. Firstly, I think it may be worth reminding readers what Kafka is. Kafka is a streaming subscriber-publisher system. One can ask how it is different from Flume, and that question I’ve asked myself when I’ve heard about the Kafka. I think one of the best comparisons between Flume and Kafka has been made by Gwen Shapira & Jeff Holoman in the blog post Apache Kafka for Beginners . In essence, Kafka is general purpose system where most of the control and consumer functionality relays on your own built consumer programs. When in Flume you have pre-created sources, sinks, and can use interceptors for changing data. So, in Kafka you are getting on the destination exactly what you put on the source. Kafka and Flume can work together pretty well, and in this article I am going to use them both.
Let’s recall what we have in our configuration. We have an Oracle database running as a source, and Oracle GoldenGate for Oracle capturing changes for one schema in this database. We have OGG 12.2 and integrated extract on the source. The replication is going directly to trail files on the destination side where we have OGG for BigData installed on a Linux box. You can get more details about the installation on source and target from the first post in the series. I’ve made configuration as simple as possible dedicating most attention to the Big Data adapters functionality, which is after all the main point of the article.

Having installed OGG for Big Data, we need to setup the Kafka adapter. As for other adapters, we are copying the configuration files from $OGG_HOME/AdapterExamples/big-data directory.

bash$ cp $OGG_HOME/AdapterExamples/big-data/kafka/* $OGG_HOME/dirdat/

We need to adjust our kafka.props file to define Kafka/Zookeper topics for data and schema changes (TopicName and SchemaTopicName parameters), and the gg.classpath for Kafka and Avro java classes. I left rest of the parameters default including format for the changes which was defined as “avro_op” in the example.

[[email protected] oggbd]$ cat dirprm/kafka.props
gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type = kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.TopicName =oggtopic
gg.handler.kafkahandler.format =avro_op
gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.mode =tx
#gg.handler.kafkahandler.maxGroupSize =100, 1Mb
#gg.handler.kafkahandler.minGroupSize =50, 500Kb
goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
gg.log=log4j
gg.log.level=INFO
gg.report.time=30sec
gg.classpath=dirprm/:/u01/kafka/libs/*:/usr/lib/avro/*:
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar
[[email protected] oggbd]$

The next file we have to correct is custom_kafka_producer.properties which contains information about our running Kafka server and define some addition parameters like compression. I left all the parameters unchanged except “bootstrap.servers” where I put information about my Kafka service.

[[email protected] oggbd]$ cat dirprm/custom_kafka_producer.properties
bootstrap.servers=sandbox:9092
acks=1
compression.type=gzip
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
# 100KB per partition
batch.size=102400
linger.ms=10000
[[email protected] oggbd]$

If we plan an initial load through Kafka we can use something like that parameter file I prepared for a passive replicat :

[[email protected] oggbd]$ cat dirprm/irkafka.prm
-- Trail file for this example is located in "dirdat" directory
-- Command to run passive REPLICAT
-- ./replicat paramfile dirprm/irkafka.prm reportfile dirrpt/irkafka.rpt
SPECIALRUN
END RUNTIME
EXTFILE /u01/oggbd/dirdat/initld
--
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP ggtest.*, TARGET bdtest.*;
[[email protected] oggbd]$

Before starting any replicat we need to prepare our system to receive the data. Since the Kafka itself is pure streaming system it cannot pass files to HDFS without other program or connector. In the first case we will be using Kafka passing data to Flume and from Flume will use its sink to HDFS. Please be aware that you need a Zookeeper to manage topics for Kafka. I am not going to discuss setting up Zookeeper in this article, just assume that we have it already and it is up and running on port 2181.
I used Kafka version 0.9.0.1 downloading it from https://kafka.apache.org/downloads.html. After downloading the archive I unpacked it, slightly corrected configuration and started it in standalone mode.

[[email protected] u01]# wget https://apache.parentingamerica.com/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz
--2016-03-15 15:22:09--  https://apache.parentingamerica.com/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz
Resolving apache.parentingamerica.com... 70.38.15.129
Connecting to apache.parentingamerica.com|70.38.15.129|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 35650542 (34M) [application/x-gzip]
Saving to: `kafka_2.11-0.9.0.1.tgz'
100%[=========================================================================================================================================>] 35,650,542  2.95M/s   in 16s
2016-03-15 15:22:26 (2.10 MB/s) - `kafka_2.11-0.9.0.1.tgz' saved [35650542/35650542]
[[email protected] u01]# tar xfz kafka_2.11-0.9.0.1.tgz
[[email protected] u01]# ln -s kafka_2.11-0.9.0.1 kafka
[[email protected] u01]# cd kafka
[[email protected] kafka]# vi config/server.properties
[[email protected] kafka]# grep -v '^$\|^\s*\#' config/server.properties
broker.id=0
listeners=PLAINTEXT://:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
delete.topic.enable=true
[[email protected] kafka]#
[[email protected] kafka]# nohup bin/kafka-server-start.sh config/server.properties > /var/log/kafka/server.log &
[1] 30669
[[email protected] kafka]# nohup: ignoring input and redirecting stderr to stdout

Now we need to prepare our two topics for the data received from the GoldenGate. As you remember we have defined topic “oggdata” for our data flow using parameter gg.handler.kafkahandler.TopicName in our kafka.props file and topic “mySchemaTopic” for schema changes. So, let’s create the topic using Kafka’s supplemented scripts:

[[email protected] kafka]# bin/kafka-topics.sh --zookeeper sandbox:2181 --create --topic oggtopic --partitions 1 --replication-factor 1
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/u01/kafka_2.11-0.9.0.1/libs/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See https://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Created topic "oggtopic".
[[email protected] kafka]# bin/kafka-topics.sh --zookeeper sandbox:2181 --list
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/u01/kafka_2.11-0.9.0.1/libs/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See https://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
oggtopic
[[email protected] kafka]#

As matter of fact, all the necessary topics will also be created automatically when you start your GoldenGate replicat. You need to create the topic explicitly if you want to use some custom parameters for it. You also have the option to alter the topic later on when setting up configuration parameters.
Here is list of the topics we have when one of them is created manually and the second one is created automatically by the replicat process.

[[email protected] kafka]# bin/kafka-topics.sh --zookeeper sandbox:2181 --describe --topic oggtopic
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/u01/kafka_2.11-0.9.0.1/libs/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See https://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Topic:oggtopic	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: oggtopic	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
[[email protected] kafka]# bin/kafka-topics.sh --zookeeper sandbox:2181 --describe --topic mySchemaTopic
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/u01/kafka_2.11-0.9.0.1/libs/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See https://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Topic:mySchemaTopic	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: mySchemaTopic	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
[[email protected] kafka]#

In our configuration we have only one server and the simplest configuration for Kafka. In a real business case it can be way more complex. Our replicat is going to post data changes to oggtopic, and all changes and definitions for schema to the mySchemaTopic. We’ve already mentioned that we are going to use Flume functionality to write to HDFS. I’ve prepared Flume with two sources and sinks to write data changes to the /user/oracle/ggflume HDFS directory. We had an option to split data and schema changes to different directories if we wish it. Here is my configuration for Flume:

[[email protected] ~]# cat /etc/flume-ng/conf/flume.conf
# Name/aliases for the components on this agent
agent.sources = ogg1 ogg2
agent.sinks = hdfs1 hdfs2
agent.channels = ch1 ch2
#Kafka source
agent.sources.ogg1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.ogg1.zookeeperConnect = localhost:2181
agent.sources.ogg1.topic = oggtopic
agent.sources.ogg1.groupId = flume
agent.sources.ogg1.kafka.consumer.timeout.ms = 100
agent.sources.ogg2.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.ogg2.zookeeperConnect = localhost:2181
agent.sources.ogg2.topic = mySchemaTopic
agent.sources.ogg2.groupId = flume
agent.sources.ogg2.kafka.consumer.timeout.ms = 100
# Describe the sink
agent.sinks.hdfs1.type = hdfs
agent.sinks.hdfs1.hdfs.path = hdfs://sandbox/user/oracle/ggflume
agent.sinks.hdfs2.type = hdfs
agent.sinks.hdfs2.hdfs.path = hdfs://sandbox/user/oracle/ggflume
#agent.sinks.hdfs1.type = logger
# Use a channel which buffers events in memory
agent.channels.ch1.type = memory
agent.channels.ch1.capacity = 1001
agent.channels.ch1.transactionCapacity = 1000
agent.channels.ch2.type = memory
agent.channels.ch2.capacity = 1001
agent.channels.ch2.transactionCapacity = 1000
# Bind the source and sink to the channel
agent.sources.ogg1.channels = ch1
agent.sources.ogg2.channels = ch2
agent.sinks.hdfs1.channel = ch1
agent.sinks.hdfs2.channel = ch2

As you can see, we have separate sources for each of our Kafka topics, and we have two sinks pointing to the same HDFS location. The data is going to be written down in Avro format.
All preparations are completed, and we are running Kafka server, two topics, and Flume is ready to write data to HDFS. Our HDFS directory is still empty.

[[email protected] oggbd]$ hadoop fs -ls /user/oracle/ggflume/
[[email protected] oggbd]$

Let’s run the passive replicat with our initial data load trail file :

[[email protected] oggbd]$ cd /u01/oggbd
[[email protected] oggbd]$ ./replicat paramfile dirprm/irkafka.prm reportfile dirrpt/irkafka.rpt
[[email protected] oggbd]$

Now we can have a look to results. We got 3 files on HDFS where first two files describe structure for the TEST_TAB_1 and TEST_TAB_2 accordingly, and the third file contains the data changes, or maybe better to say initial data for those tables. You may see that the schema definition was put on separate files when the data changes were posted altogether to the one file.

[[email protected] ~]$ hadoop fs -ls /user/oracle/ggflume/
Found 3 items
-rw-r--r--   1 flume oracle       1833 2016-03-23 12:14 /user/oracle/ggflume/FlumeData.1458749691685
-rw-r--r--   1 flume oracle       1473 2016-03-23 12:15 /user/oracle/ggflume/FlumeData.1458749691686
-rw-r--r--   1 flume oracle        981 2016-03-23 12:15 /user/oracle/ggflume/FlumeData.1458749691718
[[email protected] ~]$
[[email protected] ~]$ hadoop fs -cat  /user/oracle/ggflume/FlumeData.1458749691685
SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable?????k?\??????S?A?%?{
  "type" : "record",
  "name" : "TEST_TAB_1",
  "namespace" : "BDTEST",
  "fields" : [ {
    "name" : "table",
    "type" : "string"
.........................
[[email protected] ~]$ hadoop fs -cat  /user/oracle/ggflume/FlumeData.1458749691686
SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable?*
?e????xS?A?%N{
  "type" : "record",
  "name" : "TEST_TAB_2",
  "namespace" : "BDTEST",
  "fields" : [ {
    "name" : "table",
    "type" : "string"
  }, {
...............................
[[email protected] ~]$hadoop fs -cat  /user/oracle/ggflume/FlumeData.1458749691718
SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable??????c?C n??S?A?b"BDTEST.TEST_TAB_1I42016-02-16 19:17:40.74669942016-03-23T12:14:35.373000(00000000-10000002012
PK_ID1371O62FX&2014-01-24:19:09:20RJ68QYM5&2014-01-22:12:14:30"BDTEST.TEST_TAB_1I42016-02-16 19:17:40.74669942016-03-23T12:14:35.405000(00000000-10000002155
PK_ID2371O62FX&2014-01-24:19:09:20HW82LI73&2014-05-11:05:23:23"BDTEST.TEST_TAB_1I42016-02-16 19:17:40.74669942016-03-23T12:14:35.405001(00000000-10000002298
PK_ID3RXZT5VUN&2013-09-04:23:32:56RJ68QYM5&2014-01-22:12:14:30"BDTEST.TEST_TAB_1I42016-02-16 19:17:40.74669942016-03-23T12:14:35.405002(00000000-10000002441
PK_ID4RXZT5VUN&2013-09-04:23:32:56HW82LI73&2014-05-11:05:23:23"BDTEST.TEST_TAB_2I42016-02-16 19:17:40.76289942016-03-23T12:14:35.408000(00000000-10000002926
PK_IDRND_STR_1ACC_DATE7IJWQRO7T&2013-07-07:08:13:52[[email protected] ~]$

Now we need to create our ongoing replication. Our extract was set up the same way as it was described in the first post of the series. It is up and running, passing changes to the replicat side to the directory ./dirdat

GGSCI (sandbox.localdomain) 1> info all
Program     Status      Group       Lag at Chkpt  Time Since Chkpt
MANAGER     RUNNING
EXTRACT     RUNNING     GGEXT       00:00:09      00:00:03
[[email protected] oggbd]$ ls -l dirdat/
total 240
-rw-r-----. 1 oracle oinstall   3028 Feb 16 14:17 initld
-rw-r-----. 1 oracle oinstall 190395 Mar 14 13:00 or000041
-rw-r-----. 1 oracle oinstall   1794 Mar 15 12:02 or000042
-rw-r-----. 1 oracle oinstall  43222 Mar 17 11:53 or000043
[[email protected] oggbd]$

I’ve prepared parameter file for the Kafka replicat :

[[email protected] oggbd]$ cat dirprm/rkafka.prm
REPLICAT rkafka
-- Trail file for this example is located in "AdapterExamples/trail" directory
-- Command to add REPLICAT
-- add replicat rkafka, exttrail dirdat/or, begin now
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP GGTEST.*, TARGET BDTEST.*;
[[email protected] oggbd]$

We need only add and start our rkafka replica for the Big Data GoldenGate.

GGSCI (sandbox.localdomain) 1> add replicat rkafka, exttrail dirdat/or, begin now
REPLICAT added.
GGSCI (sandbox.localdomain) 2> start replicat rkafka
Sending START request to MANAGER ...
REPLICAT RKAFKA starting
GGSCI (sandbox.localdomain) 3> info rkafka
REPLICAT   RKAFKA    Last Started 2016-03-24 11:53   Status RUNNING
Checkpoint Lag       00:00:00 (updated 00:00:06 ago)
Process ID           21041
Log Read Checkpoint  File dirdat/or000000000
                     2016-03-24 11:53:17.388078  RBA 0

You may remember that we don’t have dirdat/or000000000 file in our dirdat directory. So, our replicat has to be slightly corrected to work with proper trail files. I am altering sequence for my replicat to reflect actual sequence number for my last trail file.

GGSCI (sandbox.localdomain) 10> stop replicat rkafka
Sending STOP request to REPLICAT RKAFKA ...
Request processed.
GGSCI (sandbox.localdomain) 11> alter replicat rkafka EXTSEQNO 43
2016-03-24 12:03:27  INFO    OGG-06594  Replicat RKAFKA has been altered through GGSCI. Even the start up position might be updated, duplicate suppression remains active in next startup. To override duplicate suppression, start RKAFKA with NOFILTERDUPTRANSACTIONS option.
REPLICAT altered.
GGSCI (sandbox.localdomain) 12> start replicat rkafka
Sending START request to MANAGER ...
REPLICAT RKAFKA starting
GGSCI (sandbox.localdomain) 13> info rkafka
REPLICAT   RKAFKA    Last Started 2016-03-24 12:03   Status RUNNING
Checkpoint Lag       00:00:00 (updated 00:00:12 ago)
Process ID           21412
Log Read Checkpoint  File dirdat/or000000043
                     First Record  RBA 0
GGSCI (sandbox.localdomain) 14>

Let’s change some data:

orclbd> select * from test_tab_2;
           PK_ID RND_STR_1  ACC_DATE
---------------- ---------- ---------------------------
               7 IJWQRO7T   07/07/13 08:13:52
orclbd> insert into test_tab_2 values (8,'TEST_INS1',sysdate);
1 row inserted.
orclbd> commit;
Commit complete.
orclbd>
[[email protected] oggbd]$ hadoop fs -ls /user/oracle/ggflume/
Found 5 items
-rw-r--r--   1 flume oracle       1833 2016-03-23 12:14 /user/oracle/ggflume/FlumeData.1458749691685
-rw-r--r--   1 flume oracle       1473 2016-03-23 12:15 /user/oracle/ggflume/FlumeData.1458749691686
-rw-r--r--   1 flume oracle        981 2016-03-23 12:15 /user/oracle/ggflume/FlumeData.1458749691718
-rw-r--r--   1 flume oracle        278 2016-03-24 12:18 /user/oracle/ggflume/FlumeData.1458836268086
-rw-r--r--   1 flume oracle       1473 2016-03-24 12:18 /user/oracle/ggflume/FlumeData.1458836268130
[[email protected] oggbd]$
[[email protected] oggbd]$ hadoop fs -cat /user/oracle/ggflume/FlumeData.1458836268086
SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable?Q???n?y?1?R#S?j???"BDTEST.TEST_TAB_2I42016-03-24 16:17:29.00033642016-03-24T12:17:31.733000(00000000430000043889
PK_IDRND_STR_1ACC_DATE8TEST_INS1&2016-03-24:12:17:26[[email protected] oggbd]$
[[email protected] oggbd]$ hadoop fs -cat /user/oracle/ggflume/FlumeData.1458836268130
SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable?6F!?Z?-?ZA8r^S?j?oN{
  "type" : "record",
  "name" : "TEST_TAB_2",
  "namespace" : "BDTEST",

We got our schema definition file and a file with data changes.

orclbd> update test_tab_2 set RND_STR_1='TEST_UPD1' where pk_id=8;
1 row updated.
orclbd> commit;
Commit complete.
orclbd>
[[email protected] oggbd]$ hadoop fs -ls /user/oracle/ggflume/
Found 6 items
-rw-r--r--   1 flume oracle       1833 2016-03-23 12:14 /user/oracle/ggflume/FlumeData.1458749691685
-rw-r--r--   1 flume oracle       1473 2016-03-23 12:15 /user/oracle/ggflume/FlumeData.1458749691686
-rw-r--r--   1 flume oracle        981 2016-03-23 12:15 /user/oracle/ggflume/FlumeData.1458749691718
-rw-r--r--   1 flume oracle        278 2016-03-24 12:18 /user/oracle/ggflume/FlumeData.1458836268086
-rw-r--r--   1 flume oracle       1473 2016-03-24 12:18 /user/oracle/ggflume/FlumeData.1458836268130
-rw-r--r--   1 flume oracle        316 2016-03-24 12:28 /user/oracle/ggflume/FlumeData.1458836877420
[[email protected] oggbd]$ hadoop fs -cat /user/oracle/ggflume/FlumeData.1458836877420
SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable]??u????????qS?t,??"BDTEST.TEST_TAB_2U42016-03-24 16:27:39.00035642016-03-24T12:27:42.177000(00000000430000044052
PK_IDRND_STR_1ACC_DATE8TEST_INS1&2016-03-24:12:17:268TEST_UPD1&2016-03-24:12:17:26[[email protected] oggbd]$

You can see that we only got a file with data changes since no DDL changes were made. The transactions will be grouped to the files according to our Flume parameters as we discussed in the previous blog post.

You can also see old value for the updated record and the new one. Using that information we can reconstruct the changes, but we need to apply certain logic to decrypt the changes.

For deletion operation we are getting operation flag “F” and values for the deleted record. Again, no schema definition file since no changes were made.

Let’s try some DDL.

orclbd> truncate table test_tab_2;
Table TEST_TAB_2 truncated.
orclbd>
GGSCI (sandbox.localdomain) 4> info rkafka
REPLICAT   RKAFKA    Last Started 2016-03-24 12:10   Status RUNNING
Checkpoint Lag       00:00:00 (updated 00:00:02 ago)
Process ID           21803
Log Read Checkpoint  File dirdat/or000043
                     2016-03-24 12:40:05.000303  RBA 45760
GGSCI (sandbox.localdomain) 5>

No new files on HDFS.

orclbd> insert into test_tab_2 select * from test_tab_3;
1 row inserted.
orclbd> commit;
Commit complete.
orclbd>
[[email protected] oggbd]$ hadoop fs -ls /user/oracle/ggflume/
Found 8 items
-rw-r--r--   1 flume oracle       1833 2016-03-23 12:14 /user/oracle/ggflume/FlumeData.1458749691685
-rw-r--r--   1 flume oracle       1473 2016-03-23 12:15 /user/oracle/ggflume/FlumeData.1458749691686
-rw-r--r--   1 flume oracle        981 2016-03-23 12:15 /user/oracle/ggflume/FlumeData.1458749691718
-rw-r--r--   1 flume oracle        278 2016-03-24 12:18 /user/oracle/ggflume/FlumeData.1458836268086
-rw-r--r--   1 flume oracle       1473 2016-03-24 12:18 /user/oracle/ggflume/FlumeData.1458836268130
-rw-r--r--   1 flume oracle        316 2016-03-24 12:28 /user/oracle/ggflume/FlumeData.1458836877420
-rw-r--r--   1 flume oracle        278 2016-03-24 12:35 /user/oracle/ggflume/FlumeData.1458837310570
-rw-r--r--   1 flume oracle        277 2016-03-24 12:42 /user/oracle/ggflume/FlumeData.1458837743709
[[email protected] oggbd]$ hadoop fs -cat /user/oracle/ggflume/FlumeData.1458837743709
SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable*?2??????>iS??\??"BDTEST.TEST_TAB_2I42016-03-24 16:42:04.00020042016-03-24T12:42:06.774000(00000000430000045760
PK_IDRND_STR_1ACC_DATE7IJWQRO7T&2013-07-07:08:13:52[[email protected] oggbd]$

Again, we got only file with data changes. I tried to compare the file we were getting for the previous insert and insert after truncate, but couldn’t find difference except for the binary part of the avro file. It will require additional investigation and maybe clarification from Oracle. In the current state it looks like it is easy to miss a truncate command for a table on the destination side.

Let us change the table and add a column there.

orclbd> alter table test_tab_2 add test_col varchar2(10);
Table TEST_TAB_2 altered.
orclbd>

We are not getting any new files with new table definitions until we do any DML on the table. Both files (with the new schema definition and data changes) will appear after we insert, delete or update any rows there.

orclbd> insert into test_tab_2 values (8,'TEST_INS1',sysdate,'TEST_ALTER');
1 row inserted.
orclbd> commit;
Commit complete.
orclbd>
[[email protected]andbox oggbd]$ hadoop fs -ls /user/oracle/ggflume/
Found 10 items
...................................................
-rw-r--r--   1 flume oracle       1654 2016-03-24 12:56 /user/oracle/ggflume/FlumeData.1458838582020
-rw-r--r--   1 flume oracle        300 2016-03-24 12:56 /user/oracle/ggflume/FlumeData.1458838584891
[[email protected] oggbd]$ hadoop fs -cat /user/oracle/ggflume/FlumeData.1458838582020
SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable-??ip??/?w?S??/{
  "type" : "record",
  "name" : "TEST_TAB_2",
  "namespace" : "BDTEST",
................
        "name" : "TEST_COL",
        "type" : [ "null", "string" ],
        "default" : null
.................
[[email protected] oggbd]$ hadoop fs -cat /user/oracle/ggflume/FlumeData.1458838584891
SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritabletr?V?_$???:2??S??/w?"BDTEST.TEST_TAB_2I42016-03-24 16:56:04.00026042016-03-24T12:56:08.370000(00000000430000047682
PK_IDRND_STR_1ACC_DATETEST_COL8TEST_INS1&2016-03-24:12:56:01TEST_ALTER

I used JMeter to generate some load, and it could easily with almost no delays, replicate 225 transactions per second (30% inserts 80% updates). It was not a test for Kafka or Flume, which could sustain way more load, but rather combination of GoldenGate with the Big Data infrastructure. It was stable without any errors. I do understand that the current test is very far from any potential production workflow which may include Oracle Database (or any other RDBMS) + GoldenGate + Kafka + Storm + …. . And maybe the final data format will be completely different. So far the adapters are looking good and doing the job. In the next post I will observe the HBASE adapter. Stay tuned.

email

Author

Want to talk with an expert? Schedule a call with our team to get the conversation started.

About the Author

Regarded by his peers as an Oracle guru, Gleb is known for being able to resolve any problem related to Oracle. He loves the satisfaction of troubleshooting, and his colleagues even say that seeking Gleb’s advice regarding an issue is more efficient than looking it up. Gleb enjoys the variety of challenges he faces while working at Pythian, rather than working on the same thing every day. His areas of speciality include Oracle RAC, Exadata, RMAN, SQL tuning, high availability, storage, performance tuning, and many more. When he’s not working, running, or cycling, Gleb can be found reading.

22 Comments. Leave new

Hi Gleb Otochkin,
This is very useful information and appreciated the way which you have explained the difficult things in very easy way. so thanks to you for your all effort and proving such a nice information. I need your help as I am trying to implement real time data pipeline from Oracle Golden gate to Kafka to Spark to Hbase and i am struggling while designing the Low level data flow design for the same. it would be great if you can help in this regards. Thanks waiting for your Reply …

Reply

Hi Ravi,
I would love to help you with some specific questions you have and if you want Pythian to help with implementation or design we can setup a call and discuss about it.

Thanks,
Gleb

Reply

Hi,

I tried to configure using GG 12.3.0.1 and I got the following errors
1) the command ‘add extract ext1, tranlog, begin now’ fails with invalid parameter specified for ADD EXTRACT.
Any idea ?

2) The GG for BD installation does not contains
[[email protected] oggbd]$ cat dirprm/kafka.props

I created, the same with the other properties files.

3) Is it mandatory that kafka and gg stay deployed in the same machine ? or using bootstrap.servers=sandbox:9092 I can point to an external kafka installation.

thanks

Reply

HI Ignacio,
1) Where do you run the ‘add extract’ command? You are supposed to run it on a source side using Oracle GoldenGate for Oracle database, MS SQL or MySQL .
2) The example kafka.props file in the AdapterExamples/big-data/kafka/ for your OGG for Big Data installation.
3) Kafka server can be deployed separately. On the GoldenGate machine you need only Kafka client java libraries.
Here is list of required libs:
https://docs.oracle.com/goldengate/bd123010/gg-bd/GADBD/GUID-1C21BC19-B3E9-462C-809C-9440CAB3A427.htm#GADBD373

Thanks,
Gleb

Reply

Hi Gleb, great write up. Do you know if it is possible through the use of the big data adapter to develop your own custom Avro schema and have the adapter use this when pushing onto Kafka? Our use case is that with all our events delivered over Kafka we have a pre-defined header that should be present in each message which we want populated.

Also if I’m only interested in propagating INSERT transactions on a particular table is this possible?

Appreciate your help
Thanks!

Reply
Gleb Otochkin
March 2, 2017 10:45 am

Hi Nick,
I am not sure if you can change avro format for an existing data adapter. It maybe possible if the header is, let’s say, in form of a column. In this case you can try to add it using standard GoldenGate methods.
About the inserts. It should be possible to filter out all but inserts operation for a table.
You can ignore certain types of operations using parameters like “IGNOREUPDATES” and other.

Thanks,
Gleb

Reply

Hello Gleb,
Its a great article for newbies. Thanks for that.
I am stuck at one place Gleb. I have installed Kafka, zookeeper and its running fine. Have installed GG also which is extracting the data from oracle db and writing it to a trail file. But when I start replicat it fails with the following error: “OGG-15050 Error loading Java VM runtime library: (126 The specified module could not be found.).”
I am not getting any clue on how to resolve this.
P.S.: I am doing all this on windows.

Please suggest on how this can be resolved. I would be really thankful to you.

Thanks,
Bipul

Reply
Gleb Otochkin
June 23, 2017 8:10 am

Hi Bipul,
Do you have your java runtime libs and jdk setup on the machine?
If you have installed/unzipped your java to a directory it can be done by:
export JAVA_HOME=

export PATH=$JAVA_HOME/bin:$PATH

Thanks,
Gleb

Reply

HI Bipul . I am facing similar on issue.Did you find resolution yet?
Gleb can you help on the same .

Reply

Hi Jasjyot,
As I’ve replied to Bipu the problem is most likely coming from incorrect settings for JAVA_HOME.
I am not really working on Windows but if you have you jdk installed to C:\Program Files\Java\jdk1.8.0_181 then you need to set the environment variable JAVA_HOME using :
set JAVA_HOME=C:\Program Files\Java\jdk1.8.0_181
also you have to put the necessary classes to the gg.classpath in the replicat property file (kafka.props)

After that start a windows prompt and restart the manager and replicat

Hope it will help.
Gleb

Reply
Victor Feinstein
July 5, 2017 7:46 pm

Hi Gleb,

I have oracle 12.2 extract / pump running to ogg 12.3 replicat, using kafka_2.11-0.9.0.0 as a target. I have enable new topic creation for each table, and also validated ‘schema evolution’ feature doing DDL on the source, and seeing the new meta data schema avro record show up on the schema topic. That’s very nice. Now i am trying to understand how one would go about associating avro schema records with the data records? i do not see any schema version numbers in the avro schema records, nor in the data records.. how do you match them up?

thanks in advance!!

Reply
Gleb Otochkin
July 6, 2017 1:51 pm

Good question Victor. If I understood correctly you have a separated kafka topic for schema changes. What if we use a timestamp for the message as a marker about schema changes. And of course you are going to get the schema changes only with following DML.

Thanks,
Gleb

Reply
Victor Feinstein
July 6, 2017 5:29 pm

Hi Gleb,
yes, we could associate the avro schema record with its payload messages using a qualified range of dates and times.. it could get complex in a hurry, but this is a way to solve it.
I had hoped for something ‘off-the-shelf’ available from OGG, and just learned that they will have support for Kafka Connect handler which can fully integrate with the Confluent IO schema registry.. it’s supposed to be out in 12.3.1.1 OGG patch later this month..so we shall wait and see

thanks again!

Reply
Gleb Otochkin
July 7, 2017 7:18 am

Hi Victor,
You are right, the integration may help with it, but it maybe bit early to talk about any features until they are generally available. I may write an update about it as soon as it is out.
Cheers,
Gleb

Reply

Gleb,
Thanks for covering this topic.
Extract and replicat seems to be working fine. I’m able the to see change in RBA’s after every transaction.
I’m not able to pass the trail messages from GG to Kafka.
I manually tested kafka producer and consumer,it seems to be working fine..
But am not able to pass messages frm GG to kafka. I didn’t see any errors in GG logs.
Please help

Reply
Gleb Otochkin
August 14, 2017 2:01 pm

Hi siva,
When you look to the logs do you check the java log for the kafka replicat?
As example for a “rkafka” replicat it wouild be there:
$OGG_HOME/dirrpt/RKAFKA_info_log4j.log

Even if you don’t have any errors in other logs the RKAFKA_info_log4j.log can provide more details. Also you can enable debug for the process changing prameter gg.log.level from INFO to DEBUG in the $OGG_HOME/dirprm/kafka.props

Reagrds,
Gleb

Reply

Hi Gleb.

Congratulations for your post. This has been very useful.

I have installed OGG 12.3 to replicate data from Oracle 12c database with OGG 12.1 (source).

I am working with cloudera ecosystem, with kafka and flume from it.

I configured replicate kafka and he is running, but the data is not in the hdfs. I configured flume.conf as the same you mentioned above, but i have a question: if the topic is created automatic is correct to configure topic name in the flume.conf? Each table will create a new topic. How flume will get data from the new topic if flume is configured to get data from topic oggtopic?

Another question: do you configured ogg to talk with cloudera ecosystem? I saw your kafka and flume installations from apache and standalone.

Thank you

Nilton

Reply
Gleb Otochkin
July 27, 2018 9:00 am

Hi Nilton,
I used Cloudera distribution and “vanilla” Kafka and Flume and it worked correctly in both cases.
If you need your data in HDFS I would recommend to use just a HDFS GoldenGate adapter instead of Kafka and Flume since it makes more sense.
The combination used in the article was just to compare output from HDFS adapter and going through Kafka adapter.
But if you want to use Kafka to maybe pass the messages further between different systems you of course can do that. And you are right, if you use templates for topics to put to different tables you may need either to prepare all the topic in advance and setup flume for each topic or do not use template and use one static topic for the data.

Thanks,
Gleb

Reply

Hi Great article,

My question is about Kafka checkpointing with replicat, example, in Kafka producer asynchronous mode, replicat moved to 1 to 100 operations, let say kakfka processed 80 operations, at 81 Kafka is went down for an hour. After Kafka is up, replicat has to reposition from 81st operation. How it is happening with Kafka handler because I did not see any checkpoint and comparison logic mentioned anywhere in GG bigdata adapter oracle websites. What is the guarantee that restart of replicat and adapter starts from 81st operation.

Reply

Hi Raju,
I guess you mean non-blocking mode for the handler. In that case the producer can keep in its buffer several messages. On each transaction commit it will flush data to the Kafka topic and save the checkpoint. If the next portion will not be able to reach the topic because the Kafka server is down it will not be reflected in the checkpoint file and those messages will be send again when the topic is available.
I tried to test it but didn’t reach much success to completely it. I can say only when I was producing tons of messages in non-blocking mode and killed Kafka server I didn’t loose any data. I may try it again with the new version of Kafka and OGG.

Cheers,
Gleb

Reply

q: Can we replicate multiple source tables to one kafka topic?

Reply

Hi Gleb,

I am unable to send log to kafka. I am getting below error. With linux server it worked but after migration to aix we are unable to send.

=ERROR 2021-06-24 09:02:23.000554 [kafka-producer-network-thread | producer-1] – A failure occurred sending a message to Kafka to topic [xxxx-DX_ALL_xxxx].
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for xxxx-DX_ALL_xxxx: 50013 ms has passed since batch creation plus linger time

Servername:oracle:client:/goldengate/12.3.BD/dirprm: kinit -kt /goldengate/12.3.BD/dirprm/kafka.keytab kafka/[email protected]
java.lang.IllegalArgumentException: Bad principal name: -kt

Please help me

Br,
Ram

Reply

Leave a Reply

Your email address will not be published. Required fields are marked *