This post continues my review of GoldenGate Big Data adapters started by review of HDFS and FLUME adapters. Here is list of all posts in the series:
- GoldenGate 12.2 Big Data Adapters: part 1 – HDFS
- GoldenGate 12.2 Big Data Adapters: part 2 – Flume
- GoldenGate 12.2 Big Data Adapters: part 3 – Kafka
In this article I will try the Kafka adapter and see how it works. Firstly, I think it may be worth reminding readers what Kafka is. Kafka is a streaming subscriber-publisher system. One can ask how it is different from Flume, and that question I’ve asked myself when I’ve heard about the Kafka. I think one of the best comparisons between Flume and Kafka has been made by Gwen Shapira & Jeff Holoman in the blog post Apache Kafka for Beginners . In essence, Kafka is general purpose system where most of the control and consumer functionality relays on your own built consumer programs. When in Flume you have pre-created sources, sinks, and can use interceptors for changing data. So, in Kafka you are getting on the destination exactly what you put on the source. Kafka and Flume can work together pretty well, and in this article I am going to use them both.
Let’s recall what we have in our configuration. We have an Oracle database running as a source, and Oracle GoldenGate for Oracle capturing changes for one schema in this database. We have OGG 12.2 and integrated extract on the source. The replication is going directly to trail files on the destination side where we have OGG for BigData installed on a Linux box. You can get more details about the installation on source and target from the first post in the series. I’ve made configuration as simple as possible dedicating most attention to the Big Data adapters functionality, which is after all the main point of the article.
Having installed OGG for Big Data, we need to setup the Kafka adapter. As for other adapters, we are copying the configuration files from $OGG_HOME/AdapterExamples/big-data directory.
bash$ cp $OGG_HOME/AdapterExamples/big-data/kafka/* $OGG_HOME/dirdat/
We need to adjust our kafka.props file to define Kafka/Zookeper topics for data and schema changes (TopicName and SchemaTopicName parameters), and the gg.classpath for Kafka and Avro java classes. I left rest of the parameters default including format for the changes which was defined as “avro_op” in the example.
[[email protected] oggbd]$ cat dirprm/kafka.props gg.handlerlist = kafkahandler gg.handler.kafkahandler.type = kafka gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties gg.handler.kafkahandler.TopicName =oggtopic gg.handler.kafkahandler.format =avro_op gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic gg.handler.kafkahandler.BlockingSend =false gg.handler.kafkahandler.includeTokens=false gg.handler.kafkahandler.mode =tx #gg.handler.kafkahandler.maxGroupSize =100, 1Mb #gg.handler.kafkahandler.minGroupSize =50, 500Kb goldengate.userexit.timestamp=utc goldengate.userexit.writers=javawriter javawriter.stats.display=TRUE javawriter.stats.full=TRUE gg.log=log4j gg.log.level=INFO gg.report.time=30sec gg.classpath=dirprm/:/u01/kafka/libs/*:/usr/lib/avro/*: javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar [[email protected] oggbd]$
The next file we have to correct is custom_kafka_producer.properties which contains information about our running Kafka server and define some addition parameters like compression. I left all the parameters unchanged except “bootstrap.servers” where I put information about my Kafka service.
[[email protected] oggbd]$ cat dirprm/custom_kafka_producer.properties bootstrap.servers=sandbox:9092 acks=1 compression.type=gzip reconnect.backoff.ms=1000 value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer # 100KB per partition batch.size=102400 linger.ms=10000 [[email protected] oggbd]$
If we plan an initial load through Kafka we can use something like that parameter file I prepared for a passive replicat :
[[email protected] oggbd]$ cat dirprm/irkafka.prm -- Trail file for this example is located in "dirdat" directory -- Command to run passive REPLICAT -- ./replicat paramfile dirprm/irkafka.prm reportfile dirrpt/irkafka.rpt SPECIALRUN END RUNTIME EXTFILE /u01/oggbd/dirdat/initld -- TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props REPORTCOUNT EVERY 1 MINUTES, RATE GROUPTRANSOPS 10000 MAP ggtest.*, TARGET bdtest.*; [[email protected] oggbd]$
Before starting any replicat we need to prepare our system to receive the data. Since the Kafka itself is pure streaming system it cannot pass files to HDFS without other program or connector. In the first case we will be using Kafka passing data to Flume and from Flume will use its sink to HDFS. Please be aware that you need a Zookeeper to manage topics for Kafka. I am not going to discuss setting up Zookeeper in this article, just assume that we have it already and it is up and running on port 2181.
I used Kafka version 0.9.0.1 downloading it from https://kafka.apache.org/downloads.html. After downloading the archive I unpacked it, slightly corrected configuration and started it in standalone mode.
[[email protected] u01]# wget https://apache.parentingamerica.com/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz --2016-03-15 15:22:09-- https://apache.parentingamerica.com/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz Resolving apache.parentingamerica.com... 70.38.15.129 Connecting to apache.parentingamerica.com|70.38.15.129|:80... connected. HTTP request sent, awaiting response... 200 OK Length: 35650542 (34M) [application/x-gzip] Saving to: `kafka_2.11-0.9.0.1.tgz' 100%[=========================================================================================================================================>] 35,650,542 2.95M/s in 16s 2016-03-15 15:22:26 (2.10 MB/s) - `kafka_2.11-0.9.0.1.tgz' saved [35650542/35650542] [[email protected] u01]# tar xfz kafka_2.11-0.9.0.1.tgz [[email protected] u01]# ln -s kafka_2.11-0.9.0.1 kafka [[email protected] u01]# cd kafka [[email protected] kafka]# vi config/server.properties [[email protected] kafka]# grep -v '^$\|^\s*\#' config/server.properties broker.id=0 listeners=PLAINTEXT://:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 log.cleaner.enable=false zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=6000 delete.topic.enable=true [[email protected] kafka]# [[email protected] kafka]# nohup bin/kafka-server-start.sh config/server.properties > /var/log/kafka/server.log & [1] 30669 [[email protected] kafka]# nohup: ignoring input and redirecting stderr to stdout
Now we need to prepare our two topics for the data received from the GoldenGate. As you remember we have defined topic “oggdata” for our data flow using parameter gg.handler.kafkahandler.TopicName in our kafka.props file and topic “mySchemaTopic” for schema changes. So, let’s create the topic using Kafka’s supplemented scripts:
[[email protected] kafka]# bin/kafka-topics.sh --zookeeper sandbox:2181 --create --topic oggtopic --partitions 1 --replication-factor 1 SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/u01/kafka_2.11-0.9.0.1/libs/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See https://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] Created topic "oggtopic". [[email protected] kafka]# bin/kafka-topics.sh --zookeeper sandbox:2181 --list SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/u01/kafka_2.11-0.9.0.1/libs/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See https://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] oggtopic [[email protected] kafka]#
As matter of fact, all the necessary topics will also be created automatically when you start your GoldenGate replicat. You need to create the topic explicitly if you want to use some custom parameters for it. You also have the option to alter the topic later on when setting up configuration parameters.
Here is list of the topics we have when one of them is created manually and the second one is created automatically by the replicat process.
[[email protected] kafka]# bin/kafka-topics.sh --zookeeper sandbox:2181 --describe --topic oggtopic SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/u01/kafka_2.11-0.9.0.1/libs/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See https://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] Topic:oggtopic PartitionCount:1 ReplicationFactor:1 Configs: Topic: oggtopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0 [[email protected] kafka]# bin/kafka-topics.sh --zookeeper sandbox:2181 --describe --topic mySchemaTopic SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/u01/kafka_2.11-0.9.0.1/libs/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See https://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] Topic:mySchemaTopic PartitionCount:1 ReplicationFactor:1 Configs: Topic: mySchemaTopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0 [[email protected] kafka]#
In our configuration we have only one server and the simplest configuration for Kafka. In a real business case it can be way more complex. Our replicat is going to post data changes to oggtopic, and all changes and definitions for schema to the mySchemaTopic. We’ve already mentioned that we are going to use Flume functionality to write to HDFS. I’ve prepared Flume with two sources and sinks to write data changes to the /user/oracle/ggflume HDFS directory. We had an option to split data and schema changes to different directories if we wish it. Here is my configuration for Flume:
[[email protected] ~]# cat /etc/flume-ng/conf/flume.conf # Name/aliases for the components on this agent agent.sources = ogg1 ogg2 agent.sinks = hdfs1 hdfs2 agent.channels = ch1 ch2 #Kafka source agent.sources.ogg1.type = org.apache.flume.source.kafka.KafkaSource agent.sources.ogg1.zookeeperConnect = localhost:2181 agent.sources.ogg1.topic = oggtopic agent.sources.ogg1.groupId = flume agent.sources.ogg1.kafka.consumer.timeout.ms = 100 agent.sources.ogg2.type = org.apache.flume.source.kafka.KafkaSource agent.sources.ogg2.zookeeperConnect = localhost:2181 agent.sources.ogg2.topic = mySchemaTopic agent.sources.ogg2.groupId = flume agent.sources.ogg2.kafka.consumer.timeout.ms = 100 # Describe the sink agent.sinks.hdfs1.type = hdfs agent.sinks.hdfs1.hdfs.path = hdfs://sandbox/user/oracle/ggflume agent.sinks.hdfs2.type = hdfs agent.sinks.hdfs2.hdfs.path = hdfs://sandbox/user/oracle/ggflume #agent.sinks.hdfs1.type = logger # Use a channel which buffers events in memory agent.channels.ch1.type = memory agent.channels.ch1.capacity = 1001 agent.channels.ch1.transactionCapacity = 1000 agent.channels.ch2.type = memory agent.channels.ch2.capacity = 1001 agent.channels.ch2.transactionCapacity = 1000 # Bind the source and sink to the channel agent.sources.ogg1.channels = ch1 agent.sources.ogg2.channels = ch2 agent.sinks.hdfs1.channel = ch1 agent.sinks.hdfs2.channel = ch2
As you can see, we have separate sources for each of our Kafka topics, and we have two sinks pointing to the same HDFS location. The data is going to be written down in Avro format.
All preparations are completed, and we are running Kafka server, two topics, and Flume is ready to write data to HDFS. Our HDFS directory is still empty.
[[email protected] oggbd]$ hadoop fs -ls /user/oracle/ggflume/ [[email protected] oggbd]$
Let’s run the passive replicat with our initial data load trail file :
[[email protected] oggbd]$ cd /u01/oggbd [[email protected] oggbd]$ ./replicat paramfile dirprm/irkafka.prm reportfile dirrpt/irkafka.rpt [[email protected] oggbd]$
Now we can have a look to results. We got 3 files on HDFS where first two files describe structure for the TEST_TAB_1 and TEST_TAB_2 accordingly, and the third file contains the data changes, or maybe better to say initial data for those tables. You may see that the schema definition was put on separate files when the data changes were posted altogether to the one file.
[[email protected] ~]$ hadoop fs -ls /user/oracle/ggflume/ Found 3 items -rw-r--r-- 1 flume oracle 1833 2016-03-23 12:14 /user/oracle/ggflume/FlumeData.1458749691685 -rw-r--r-- 1 flume oracle 1473 2016-03-23 12:15 /user/oracle/ggflume/FlumeData.1458749691686 -rw-r--r-- 1 flume oracle 981 2016-03-23 12:15 /user/oracle/ggflume/FlumeData.1458749691718 [[email protected] ~]$ [[email protected] ~]$ hadoop fs -cat /user/oracle/ggflume/FlumeData.1458749691685 SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable?????k?\??????S?A?%?{ "type" : "record", "name" : "TEST_TAB_1", "namespace" : "BDTEST", "fields" : [ { "name" : "table", "type" : "string" ......................... [[email protected] ~]$ hadoop fs -cat /user/oracle/ggflume/FlumeData.1458749691686 SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable?* ?e????xS?A?%N{ "type" : "record", "name" : "TEST_TAB_2", "namespace" : "BDTEST", "fields" : [ { "name" : "table", "type" : "string" }, { ............................... [[email protected] ~]$hadoop fs -cat /user/oracle/ggflume/FlumeData.1458749691718 SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable??????c?C n??S?A?b"BDTEST.TEST_TAB_1I42016-02-16 19:17:40.74669942016-03-23T12:14:35.373000(00000000-10000002012 PK_ID1371O62FX&2014-01-24:19:09:20RJ68QYM5&2014-01-22:12:14:30"BDTEST.TEST_TAB_1I42016-02-16 19:17:40.74669942016-03-23T12:14:35.405000(00000000-10000002155 PK_ID2371O62FX&2014-01-24:19:09:20HW82LI73&2014-05-11:05:23:23"BDTEST.TEST_TAB_1I42016-02-16 19:17:40.74669942016-03-23T12:14:35.405001(00000000-10000002298 PK_ID3RXZT5VUN&2013-09-04:23:32:56RJ68QYM5&2014-01-22:12:14:30"BDTEST.TEST_TAB_1I42016-02-16 19:17:40.74669942016-03-23T12:14:35.405002(00000000-10000002441 PK_ID4RXZT5VUN&2013-09-04:23:32:56HW82LI73&2014-05-11:05:23:23"BDTEST.TEST_TAB_2I42016-02-16 19:17:40.76289942016-03-23T12:14:35.408000(00000000-10000002926 PK_IDRND_STR_1ACC_DATE7IJWQRO7T&2013-07-07:08:13:52[[email protected] ~]$
Now we need to create our ongoing replication. Our extract was set up the same way as it was described in the first post of the series. It is up and running, passing changes to the replicat side to the directory ./dirdat
GGSCI (sandbox.localdomain) 1> info all Program Status Group Lag at Chkpt Time Since Chkpt MANAGER RUNNING EXTRACT RUNNING GGEXT 00:00:09 00:00:03 [[email protected] oggbd]$ ls -l dirdat/ total 240 -rw-r-----. 1 oracle oinstall 3028 Feb 16 14:17 initld -rw-r-----. 1 oracle oinstall 190395 Mar 14 13:00 or000041 -rw-r-----. 1 oracle oinstall 1794 Mar 15 12:02 or000042 -rw-r-----. 1 oracle oinstall 43222 Mar 17 11:53 or000043 [[email protected] oggbd]$
I’ve prepared parameter file for the Kafka replicat :
[[email protected] oggbd]$ cat dirprm/rkafka.prm REPLICAT rkafka -- Trail file for this example is located in "AdapterExamples/trail" directory -- Command to add REPLICAT -- add replicat rkafka, exttrail dirdat/or, begin now TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props REPORTCOUNT EVERY 1 MINUTES, RATE GROUPTRANSOPS 10000 MAP GGTEST.*, TARGET BDTEST.*; [[email protected] oggbd]$
We need only add and start our rkafka replica for the Big Data GoldenGate.
GGSCI (sandbox.localdomain) 1> add replicat rkafka, exttrail dirdat/or, begin now REPLICAT added. GGSCI (sandbox.localdomain) 2> start replicat rkafka Sending START request to MANAGER ... REPLICAT RKAFKA starting GGSCI (sandbox.localdomain) 3> info rkafka REPLICAT RKAFKA Last Started 2016-03-24 11:53 Status RUNNING Checkpoint Lag 00:00:00 (updated 00:00:06 ago) Process ID 21041 Log Read Checkpoint File dirdat/or000000000 2016-03-24 11:53:17.388078 RBA 0
You may remember that we don’t have dirdat/or000000000 file in our dirdat directory. So, our replicat has to be slightly corrected to work with proper trail files. I am altering sequence for my replicat to reflect actual sequence number for my last trail file.
GGSCI (sandbox.localdomain) 10> stop replicat rkafka Sending STOP request to REPLICAT RKAFKA ... Request processed. GGSCI (sandbox.localdomain) 11> alter replicat rkafka EXTSEQNO 43 2016-03-24 12:03:27 INFO OGG-06594 Replicat RKAFKA has been altered through GGSCI. Even the start up position might be updated, duplicate suppression remains active in next startup. To override duplicate suppression, start RKAFKA with NOFILTERDUPTRANSACTIONS option. REPLICAT altered. GGSCI (sandbox.localdomain) 12> start replicat rkafka Sending START request to MANAGER ... REPLICAT RKAFKA starting GGSCI (sandbox.localdomain) 13> info rkafka REPLICAT RKAFKA Last Started 2016-03-24 12:03 Status RUNNING Checkpoint Lag 00:00:00 (updated 00:00:12 ago) Process ID 21412 Log Read Checkpoint File dirdat/or000000043 First Record RBA 0 GGSCI (sandbox.localdomain) 14>
Let’s change some data:
orclbd> select * from test_tab_2; PK_ID RND_STR_1 ACC_DATE ---------------- ---------- --------------------------- 7 IJWQRO7T 07/07/13 08:13:52 orclbd> insert into test_tab_2 values (8,'TEST_INS1',sysdate); 1 row inserted. orclbd> commit; Commit complete. orclbd>
[[email protected] oggbd]$ hadoop fs -ls /user/oracle/ggflume/ Found 5 items -rw-r--r-- 1 flume oracle 1833 2016-03-23 12:14 /user/oracle/ggflume/FlumeData.1458749691685 -rw-r--r-- 1 flume oracle 1473 2016-03-23 12:15 /user/oracle/ggflume/FlumeData.1458749691686 -rw-r--r-- 1 flume oracle 981 2016-03-23 12:15 /user/oracle/ggflume/FlumeData.1458749691718 -rw-r--r-- 1 flume oracle 278 2016-03-24 12:18 /user/oracle/ggflume/FlumeData.1458836268086 -rw-r--r-- 1 flume oracle 1473 2016-03-24 12:18 /user/oracle/ggflume/FlumeData.1458836268130 [[email protected] oggbd]$ [[email protected] oggbd]$ hadoop fs -cat /user/oracle/ggflume/FlumeData.1458836268086 SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable?Q???n?y?1?R#S?j???"BDTEST.TEST_TAB_2I42016-03-24 16:17:29.00033642016-03-24T12:17:31.733000(00000000430000043889 PK_IDRND_STR_1ACC_DATE8TEST_INS1&2016-03-24:12:17:26[[email protected] oggbd]$ [[email protected] oggbd]$ hadoop fs -cat /user/oracle/ggflume/FlumeData.1458836268130 SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable?6F!?Z?-?ZA8r^S?j?oN{ "type" : "record", "name" : "TEST_TAB_2", "namespace" : "BDTEST",
We got our schema definition file and a file with data changes.
orclbd> update test_tab_2 set RND_STR_1='TEST_UPD1' where pk_id=8; 1 row updated. orclbd> commit; Commit complete. orclbd>
[[email protected] oggbd]$ hadoop fs -ls /user/oracle/ggflume/ Found 6 items -rw-r--r-- 1 flume oracle 1833 2016-03-23 12:14 /user/oracle/ggflume/FlumeData.1458749691685 -rw-r--r-- 1 flume oracle 1473 2016-03-23 12:15 /user/oracle/ggflume/FlumeData.1458749691686 -rw-r--r-- 1 flume oracle 981 2016-03-23 12:15 /user/oracle/ggflume/FlumeData.1458749691718 -rw-r--r-- 1 flume oracle 278 2016-03-24 12:18 /user/oracle/ggflume/FlumeData.1458836268086 -rw-r--r-- 1 flume oracle 1473 2016-03-24 12:18 /user/oracle/ggflume/FlumeData.1458836268130 -rw-r--r-- 1 flume oracle 316 2016-03-24 12:28 /user/oracle/ggflume/FlumeData.1458836877420 [[email protected] oggbd]$ hadoop fs -cat /user/oracle/ggflume/FlumeData.1458836877420 SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable]??u????????qS?t,??"BDTEST.TEST_TAB_2U42016-03-24 16:27:39.00035642016-03-24T12:27:42.177000(00000000430000044052 PK_IDRND_STR_1ACC_DATE8TEST_INS1&2016-03-24:12:17:268TEST_UPD1&2016-03-24:12:17:26[[email protected] oggbd]$
You can see that we only got a file with data changes since no DDL changes were made. The transactions will be grouped to the files according to our Flume parameters as we discussed in the previous blog post.
You can also see old value for the updated record and the new one. Using that information we can reconstruct the changes, but we need to apply certain logic to decrypt the changes.
For deletion operation we are getting operation flag “F” and values for the deleted record. Again, no schema definition file since no changes were made.
Let’s try some DDL.
orclbd> truncate table test_tab_2; Table TEST_TAB_2 truncated. orclbd>
GGSCI (sandbox.localdomain) 4> info rkafka REPLICAT RKAFKA Last Started 2016-03-24 12:10 Status RUNNING Checkpoint Lag 00:00:00 (updated 00:00:02 ago) Process ID 21803 Log Read Checkpoint File dirdat/or000043 2016-03-24 12:40:05.000303 RBA 45760 GGSCI (sandbox.localdomain) 5>
No new files on HDFS.
orclbd> insert into test_tab_2 select * from test_tab_3; 1 row inserted. orclbd> commit; Commit complete. orclbd>
[[email protected] oggbd]$ hadoop fs -ls /user/oracle/ggflume/ Found 8 items -rw-r--r-- 1 flume oracle 1833 2016-03-23 12:14 /user/oracle/ggflume/FlumeData.1458749691685 -rw-r--r-- 1 flume oracle 1473 2016-03-23 12:15 /user/oracle/ggflume/FlumeData.1458749691686 -rw-r--r-- 1 flume oracle 981 2016-03-23 12:15 /user/oracle/ggflume/FlumeData.1458749691718 -rw-r--r-- 1 flume oracle 278 2016-03-24 12:18 /user/oracle/ggflume/FlumeData.1458836268086 -rw-r--r-- 1 flume oracle 1473 2016-03-24 12:18 /user/oracle/ggflume/FlumeData.1458836268130 -rw-r--r-- 1 flume oracle 316 2016-03-24 12:28 /user/oracle/ggflume/FlumeData.1458836877420 -rw-r--r-- 1 flume oracle 278 2016-03-24 12:35 /user/oracle/ggflume/FlumeData.1458837310570 -rw-r--r-- 1 flume oracle 277 2016-03-24 12:42 /user/oracle/ggflume/FlumeData.1458837743709 [[email protected] oggbd]$ hadoop fs -cat /user/oracle/ggflume/FlumeData.1458837743709 SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable*?2??????>iS??\??"BDTEST.TEST_TAB_2I42016-03-24 16:42:04.00020042016-03-24T12:42:06.774000(00000000430000045760 PK_IDRND_STR_1ACC_DATE7IJWQRO7T&2013-07-07:08:13:52[[email protected] oggbd]$
Again, we got only file with data changes. I tried to compare the file we were getting for the previous insert and insert after truncate, but couldn’t find difference except for the binary part of the avro file. It will require additional investigation and maybe clarification from Oracle. In the current state it looks like it is easy to miss a truncate command for a table on the destination side.
Let us change the table and add a column there.
orclbd> alter table test_tab_2 add test_col varchar2(10); Table TEST_TAB_2 altered. orclbd>
We are not getting any new files with new table definitions until we do any DML on the table. Both files (with the new schema definition and data changes) will appear after we insert, delete or update any rows there.
orclbd> insert into test_tab_2 values (8,'TEST_INS1',sysdate,'TEST_ALTER'); 1 row inserted. orclbd> commit; Commit complete. orclbd>
[[email protected] oggbd]$ hadoop fs -ls /user/oracle/ggflume/ Found 10 items ................................................... -rw-r--r-- 1 flume oracle 1654 2016-03-24 12:56 /user/oracle/ggflume/FlumeData.1458838582020 -rw-r--r-- 1 flume oracle 300 2016-03-24 12:56 /user/oracle/ggflume/FlumeData.1458838584891 [[email protected] oggbd]$ hadoop fs -cat /user/oracle/ggflume/FlumeData.1458838582020 SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable-??ip??/?w?S??/{ "type" : "record", "name" : "TEST_TAB_2", "namespace" : "BDTEST", ................ "name" : "TEST_COL", "type" : [ "null", "string" ], "default" : null ................. [[email protected] oggbd]$ hadoop fs -cat /user/oracle/ggflume/FlumeData.1458838584891 SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritabletr?V?_$???:2??S??/w?"BDTEST.TEST_TAB_2I42016-03-24 16:56:04.00026042016-03-24T12:56:08.370000(00000000430000047682 PK_IDRND_STR_1ACC_DATETEST_COL8TEST_INS1&2016-03-24:12:56:01TEST_ALTER
I used JMeter to generate some load, and it could easily with almost no delays, replicate 225 transactions per second (30% inserts 80% updates). It was not a test for Kafka or Flume, which could sustain way more load, but rather combination of GoldenGate with the Big Data infrastructure. It was stable without any errors. I do understand that the current test is very far from any potential production workflow which may include Oracle Database (or any other RDBMS) + GoldenGate + Kafka + Storm + …. . And maybe the final data format will be completely different. So far the adapters are looking good and doing the job. In the next post I will observe the HBASE adapter. Stay tuned.
22 Comments. Leave new
Hi Gleb Otochkin,
This is very useful information and appreciated the way which you have explained the difficult things in very easy way. so thanks to you for your all effort and proving such a nice information. I need your help as I am trying to implement real time data pipeline from Oracle Golden gate to Kafka to Spark to Hbase and i am struggling while designing the Low level data flow design for the same. it would be great if you can help in this regards. Thanks waiting for your Reply …
Hi Ravi,
I would love to help you with some specific questions you have and if you want Pythian to help with implementation or design we can setup a call and discuss about it.
Thanks,
Gleb
Hi,
I tried to configure using GG 12.3.0.1 and I got the following errors
1) the command ‘add extract ext1, tranlog, begin now’ fails with invalid parameter specified for ADD EXTRACT.
Any idea ?
2) The GG for BD installation does not contains
[[email protected] oggbd]$ cat dirprm/kafka.props
I created, the same with the other properties files.
3) Is it mandatory that kafka and gg stay deployed in the same machine ? or using bootstrap.servers=sandbox:9092 I can point to an external kafka installation.
thanks
HI Ignacio,
1) Where do you run the ‘add extract’ command? You are supposed to run it on a source side using Oracle GoldenGate for Oracle database, MS SQL or MySQL .
2) The example kafka.props file in the AdapterExamples/big-data/kafka/ for your OGG for Big Data installation.
3) Kafka server can be deployed separately. On the GoldenGate machine you need only Kafka client java libraries.
Here is list of required libs:
https://docs.oracle.com/goldengate/bd123010/gg-bd/GADBD/GUID-1C21BC19-B3E9-462C-809C-9440CAB3A427.htm#GADBD373
Thanks,
Gleb
Hi Gleb, great write up. Do you know if it is possible through the use of the big data adapter to develop your own custom Avro schema and have the adapter use this when pushing onto Kafka? Our use case is that with all our events delivered over Kafka we have a pre-defined header that should be present in each message which we want populated.
Also if I’m only interested in propagating INSERT transactions on a particular table is this possible?
Appreciate your help
Thanks!
Hi Nick,
I am not sure if you can change avro format for an existing data adapter. It maybe possible if the header is, let’s say, in form of a column. In this case you can try to add it using standard GoldenGate methods.
About the inserts. It should be possible to filter out all but inserts operation for a table.
You can ignore certain types of operations using parameters like “IGNOREUPDATES” and other.
Thanks,
Gleb
Hello Gleb,
Its a great article for newbies. Thanks for that.
I am stuck at one place Gleb. I have installed Kafka, zookeeper and its running fine. Have installed GG also which is extracting the data from oracle db and writing it to a trail file. But when I start replicat it fails with the following error: “OGG-15050 Error loading Java VM runtime library: (126 The specified module could not be found.).”
I am not getting any clue on how to resolve this.
P.S.: I am doing all this on windows.
Please suggest on how this can be resolved. I would be really thankful to you.
Thanks,
Bipul
Hi Bipul, it can be done by:
Do you have your java runtime libs and jdk setup on the machine?
If you have installed/unzipped your java to a directory
export JAVA_HOME=
export PATH=$JAVA_HOME/bin:$PATH
Thanks,
Gleb
HI Bipul . I am facing similar on issue.Did you find resolution yet?
Gleb can you help on the same .
Hi Jasjyot,
As I’ve replied to Bipu the problem is most likely coming from incorrect settings for JAVA_HOME.
I am not really working on Windows but if you have you jdk installed to C:\Program Files\Java\jdk1.8.0_181 then you need to set the environment variable JAVA_HOME using :
set JAVA_HOME=C:\Program Files\Java\jdk1.8.0_181
also you have to put the necessary classes to the gg.classpath in the replicat property file (kafka.props)
After that start a windows prompt and restart the manager and replicat
Hope it will help.
Gleb
Hi Gleb,
I have oracle 12.2 extract / pump running to ogg 12.3 replicat, using kafka_2.11-0.9.0.0 as a target. I have enable new topic creation for each table, and also validated ‘schema evolution’ feature doing DDL on the source, and seeing the new meta data schema avro record show up on the schema topic. That’s very nice. Now i am trying to understand how one would go about associating avro schema records with the data records? i do not see any schema version numbers in the avro schema records, nor in the data records.. how do you match them up?
thanks in advance!!
Good question Victor. If I understood correctly you have a separated kafka topic for schema changes. What if we use a timestamp for the message as a marker about schema changes. And of course you are going to get the schema changes only with following DML.
Thanks,
Gleb
Hi Gleb,
yes, we could associate the avro schema record with its payload messages using a qualified range of dates and times.. it could get complex in a hurry, but this is a way to solve it.
I had hoped for something ‘off-the-shelf’ available from OGG, and just learned that they will have support for Kafka Connect handler which can fully integrate with the Confluent IO schema registry.. it’s supposed to be out in 12.3.1.1 OGG patch later this month..so we shall wait and see
thanks again!
Hi Victor,
You are right, the integration may help with it, but it maybe bit early to talk about any features until they are generally available. I may write an update about it as soon as it is out.
Cheers,
Gleb
Gleb,
Thanks for covering this topic.
Extract and replicat seems to be working fine. I’m able the to see change in RBA’s after every transaction.
I’m not able to pass the trail messages from GG to Kafka.
I manually tested kafka producer and consumer,it seems to be working fine..
But am not able to pass messages frm GG to kafka. I didn’t see any errors in GG logs.
Please help
Hi siva,
When you look to the logs do you check the java log for the kafka replicat?
As example for a “rkafka” replicat it wouild be there:
$OGG_HOME/dirrpt/RKAFKA_info_log4j.log
Even if you don’t have any errors in other logs the RKAFKA_info_log4j.log can provide more details. Also you can enable debug for the process changing prameter gg.log.level from INFO to DEBUG in the $OGG_HOME/dirprm/kafka.props
Reagrds,
Gleb
Hi Gleb.
Congratulations for your post. This has been very useful.
I have installed OGG 12.3 to replicate data from Oracle 12c database with OGG 12.1 (source).
I am working with cloudera ecosystem, with kafka and flume from it.
I configured replicate kafka and he is running, but the data is not in the hdfs. I configured flume.conf as the same you mentioned above, but i have a question: if the topic is created automatic is correct to configure topic name in the flume.conf? Each table will create a new topic. How flume will get data from the new topic if flume is configured to get data from topic oggtopic?
Another question: do you configured ogg to talk with cloudera ecosystem? I saw your kafka and flume installations from apache and standalone.
Thank you
Nilton
Hi Nilton,
I used Cloudera distribution and “vanilla” Kafka and Flume and it worked correctly in both cases.
If you need your data in HDFS I would recommend to use just a HDFS GoldenGate adapter instead of Kafka and Flume since it makes more sense.
The combination used in the article was just to compare output from HDFS adapter and going through Kafka adapter.
But if you want to use Kafka to maybe pass the messages further between different systems you of course can do that. And you are right, if you use templates for topics to put to different tables you may need either to prepare all the topic in advance and setup flume for each topic or do not use template and use one static topic for the data.
Thanks,
Gleb
Hi Great article,
My question is about Kafka checkpointing with replicat, example, in Kafka producer asynchronous mode, replicat moved to 1 to 100 operations, let say kakfka processed 80 operations, at 81 Kafka is went down for an hour. After Kafka is up, replicat has to reposition from 81st operation. How it is happening with Kafka handler because I did not see any checkpoint and comparison logic mentioned anywhere in GG bigdata adapter oracle websites. What is the guarantee that restart of replicat and adapter starts from 81st operation.
Hi Raju,
I guess you mean non-blocking mode for the handler. In that case the producer can keep in its buffer several messages. On each transaction commit it will flush data to the Kafka topic and save the checkpoint. If the next portion will not be able to reach the topic because the Kafka server is down it will not be reflected in the checkpoint file and those messages will be send again when the topic is available.
I tried to test it but didn’t reach much success to completely it. I can say only when I was producing tons of messages in non-blocking mode and killed Kafka server I didn’t loose any data. I may try it again with the new version of Kafka and OGG.
Cheers,
Gleb
q: Can we replicate multiple source tables to one kafka topic?
Hi Gleb,
I am unable to send log to kafka. I am getting below error. With linux server it worked but after migration to aix we are unable to send.
=ERROR 2021-06-24 09:02:23.000554 [kafka-producer-network-thread | producer-1] – A failure occurred sending a message to Kafka to topic [xxxx-DX_ALL_xxxx].
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for xxxx-DX_ALL_xxxx: 50013 ms has passed since batch creation plus linger time
Servername:oracle:client:/goldengate/12.3.BD/dirprm: kinit -kt /goldengate/12.3.BD/dirprm/kafka.keytab kafka/[email protected]
java.lang.IllegalArgumentException: Bad principal name: -kt
Please help me
Br,
Ram