GoldenGate 12.2 big data adapters: part 2 – FLUME

Posted in: Big Data, Hadoop, Oracle, Technical Track

In this blog post we continue our review of the new Oracle GoldenGate Big Data adapters. In the first part of the series I tested the basic HDFS adapter and checked how it worked with some DML and DDL. In this article I will try the Flume adapter and see how it works.

A quick reminder on what Flume is. It is not a topic about the popular Australian musician. Today we are talking about Apache Flume. In short, it is a pipeline or a kind of streaming system that allows you to move large amount of data. It has simple architecture and, in general, there are three main components:

a) Source: where data enters into Flume from an outside system.
b) Sink: responsible for passing data to the destination system whether it is the final destination, or another flow.
c) Channel: connects the Source and Sink.

I know that it is a rather simplistic description, but the main subject of this article is not what Flume can do, but how we can pass our data from Oracle to Flume using GoldenGate.
My first post discussed how you set up an Oracle source system, and how to start GoldenGate initial load and extract. I am not repeating it here. Let’s assume we have the source system. It is Oracle database, replicating DML and DDL for one particular schema, and GGTEST using Oracle GoldenGate 12.2 to trail files on our box where we already have GoldenGate for Big Data. Have a look at the first part to see how to set up the GoldenGate for Big Data (OGG BD).

So, we have our OGG BD setup and the manager up and running.

GGSCI (sandbox.localdomain) 1> info manager
Manager is running (IP port sandbox.localdomain.7839, Process ID 18521).
GGSCI (sandbox.localdomain) 2>

What we need now is to prepare our Flume agent to accept messages from OGG. I’ve already set up my Flume’s agent-ng service on my Linux box, and now we need to prepare the configuration file for the agent to handle the income stream, and pass it to the destination system. We will set up our source to “avro” and sink will be writing to HDFS. The source can be either avro or thrift. According to oracle documentation the Flume handler can stream data from a trail file to Avro or Thrift RPC Flume sources.

I have to admit that the destination as HDFS looks quite artificial since we have a special adapter for HDFS and don’t need a Flume to write there. But such a configuration can help us compare different adapters and what they can do.
I used Flume version 1.6.0:

[[email protected] flume-ng]$ bin/flume-ng version
Flume 1.6.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 2561a23240a71ba20bf288c7c2cda88f443c2080
Compiled by hshreedharan on Mon May 11 11:15:44 PDT 2015
From source with checksum b29e416802ce9ece3269d34233baf43f
[[email protected] flume-ng]$

Here is my configuration file for the Flume agent:

# Name/aliases for the components on this agent
agent.sources = ogg1
agent.sinks = hdfs1
agent.channels = ch1
#Avro source
agent.sources.ogg1.type = avro
agent.sources.ogg1.bind = 0.0.0.0
agent.sources.ogg1.port = 4141
# Describe the sink
agent.sinks.hdfs1.type = hdfs
agent.sinks.hdfs1.hdfs.path = hdfs://sandbox/user/oracle/ggflume
#agent.sinks.hdfs1.type = logger
# Use a channel which buffers events in memory
agent.channels.ch1.type = memory
agent.channels.ch1.capacity = 100000
agent.channels.ch1.transactionCapacity = 10000
# Bind the source and sink to the channel
agent.sources.ogg1.channels = ch1
agent.sinks.hdfs1.channel = ch1

I’ve made the configuration simple and clear. You may change agent.sources.ogg1.port and agent.sinks.hdfs1.hdfs.path  depending on your system.

On the target HDFS we have to create directory as it was defined in our sink configuration.

[[email protected] ~]$ hadoop fs -mkdir /user/oracle/ggflume
[[email protected] ~]$ hadoop fs -ls /user/oracle/ggflume
[[email protected] ~]$

We can start our Flume agent now.

[[email protected] conf]# service flume-ng-agent start
Starting Flume NG agent daemon (flume-ng-agent):           [  OK  ]
[[email protected] conf]# service flume-ng-agent status
Flume NG agent is running                                  [  OK  ]
[[email protected] conf]#
[[email protected] conf]# tail /var/log/flume-ng/flume.log
25 Feb 2016 11:56:37,113 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.register:120)  - Monitored counter group for type: CHANNEL, name: ch1: Successfully registered new MBean.
25 Feb 2016 11:56:37,121 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.start:96)  - Component type: CHANNEL, name: ch1 started
25 Feb 2016 11:56:37,122 INFO  [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:173)  - Starting Sink hdfs1
25 Feb 2016 11:56:37,123 INFO  [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:184)  - Starting Source ogg1
25 Feb 2016 11:56:37,139 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.source.AvroSource.start:228)  - Starting Avro source ogg1: { bindAddress: 0.0.0.0, port: 4141 }...
25 Feb 2016 11:56:37,146 INFO  [lifecycleSupervisor-1-2] (org.apache.flume.instrumentation.MonitoredCounterGroup.register:120)  - Monitored counter group for type: SINK, name: hdfs1: Successfully registered new MBean.
25 Feb 2016 11:56:37,147 INFO  [lifecycleSupervisor-1-2] (org.apache.flume.instrumentation.MonitoredCounterGroup.start:96)  - Component type: SINK, name: hdfs1 started
25 Feb 2016 11:56:38,114 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.register:120)  - Monitored counter group for type: SOURCE, name: ogg1: Successfully registered new MBean.
25 Feb 2016 11:56:38,115 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.start:96)  - Component type: SOURCE, name: ogg1 started
25 Feb 2016 11:56:38,116 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.source.AvroSource.start:253)  - Avro source ogg1 started.
[[email protected] conf]#

Flume is ready, and we can now prepare our OGG configuration. We have examples for the Flume adapter configuration files in $OGG_HOME/AdapterExamples/big-data/flume/ :

[[email protected] oggbd]$ ll AdapterExamples/big-data/flume/
total 12
-rw-r--r--. 1 oracle oinstall 107 Dec  9 12:56 custom-flume-rpc.properties
-r-xr-xr-x. 1 oracle oinstall 812 Dec  9 12:56 flume.props
-rw-r--r--. 1 oracle oinstall 332 Dec  9 12:56 rflume.prm
[[email protected] oggbd]$

We can copy the examples to our configuration directory and adjust them to our needs:

[[email protected] oggbd]$ cp AdapterExamples/big-data/flume/* dirprm/
Here is configuration file for our adapter:
[[email protected] oggbd]$ cat dirprm/flume.props
gg.handlerlist = flumehandler
gg.handler.flumehandler.type=flume
gg.handler.flumehandler.RpcClientPropertiesFile=custom-flume-rpc.properties
gg.handler.flumehandler.format=avro_op
gg.handler.flumehandler.mode=tx
#gg.handler.flumehandler.maxGroupSize=100, 1Mb
#gg.handler.flumehandler.minGroupSize=50, 500 Kb
gg.handler.flumehandler.EventMapsTo=tx
gg.handler.flumehandler.PropagateSchema=true
gg.handler.flumehandler.includeTokens=false
gg.handler.flumehandler.format.WrapMessageInGenericAvroMessage=true
goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
gg.log=log4j
gg.log.level=INFO
gg.report.time=30sec
gg.classpath=dirprm/:/usr/lib/flume-ng/lib/*:
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

You will need to adjust your gg.classpath depending on your system, as it has to include Flume java classes and a file with Flume source properties (custom-flume-rpc.properties file).
Here is my example for the custom-flume-rpc.properties file which will be used by OGG adapter to connect to the flume-ng agent. I’ve placed it to the dirprm directory along with other parameters.

[[email protected] oggbd]$ cat dirprm/custom-flume-rpc.properties
client.type=default
hosts=h1
hosts.h1=localhost:4141
batch-size=100
connect-timeout=20000
request-timeout=20000</pre>
<pre>

As you can see, my flume-ng agent is on the same host as the OGG which may not be the same for you. In your case you may need to provide hostname and port for your running glume-ng agent. We need to prepare the configuration file for our initial load. The OGG trail file is located in the dirdat/ directory and has name initld.

</pre>
<pre></pre> <pre>Here is our parameter file for initial load using passive replicat: [[email protected] oggbd]$ cat dirprm/irflume.prm --initial REPLICAT irflume -- Command to run REPLICAT in passive mode -- ./replicat paramfile dirprm/irflume.prm reportfile dirrpt/irflume.rpt SPECIALRUN END RUNTIME EXTFILE /u01/oggbd/dirdat/initld --DDLERROR default discard DDL include all TARGETDB LIBFILE libggjava.so SET property=dirprm/flume.props REPORTCOUNT EVERY 1 MINUTES, RATE GROUPTRANSOPS 10000 MAP ggtest.*, TARGET bdtest.*; 

Let’s run the load and see what we get in the end:

[[email protected] oggbd]$ ./replicat paramfile dirprm/irflume.prm reportfile dirrpt/irflume.rpt

The command completed successfully and we got three new files on HDFS. The first 2 files had the schema description and the 3-d one had the data for the replicated tables.

[[email protected] ~]# hadoop fs -ls /user/oracle/ggflume
Found 12 items
-rw-r--r--   1 flume oracle       1833 2016-03-10 11:17 /user/oracle/ggflume/FlumeData.1457626634620
-rw-r--r--   1 flume oracle       1762 2016-03-10 11:17 /user/oracle/ggflume/FlumeData.1457626634621
-rw-r--r--   1 flume oracle       1106 2016-03-10 11:17 /user/oracle/ggflume/FlumeData.1457626634622
[[email protected] ~]# hadoop fs -tail /user/oracle/ggflume/FlumeData.1457626634620
 {
        "name" : "PK_ID",
        "type" : [ "null", "string" ],
        "default" : null
      }, {
        "name" : "PK_ID_isMissing",
        "type" : "boolean"
      }, {
        "name" : "RND_STR",
        "type" : [ "null", "string" ],
        "default" : null
      }, {
        "name" : "RND_STR_isMissing",
        "type" : "boolean"
     ..................
[[email protected] ~]# hadoop fs -tail /user/oracle/ggflume/FlumeData.1457626634621
 "string"
  }, {
    "name" : "primary_keys",
    "type" : {
      "type" : "array",
      "items" : "string"
    }
  }, {
    "name" : "tokens",
    "type" : {
      "type" : "map",
      "values" : "string"
    },
...........................
[[email protected] ~]# hadoop fs -tail /user/oracle/ggflume/FlumeData.1457626634622
:?v??8?????	SaQm?"BDTEST.TEST_TAB_1Ñ??
                                          ?"BDTEST.TEST_TAB_1I42016-02-16 19:17:40.74669942016-03-10T11:17:14.448000(00000000-10000002012
PK_ID1371O62FX&2014-01-24:19:09:20RJ68QYM5&2014-01-22:12:14:30"BDTEST.TEST_TAB_1Ñ??
                                                                                   ?"BDTEST.TEST_TAB_1I42016-02-16 19:17:40.74669942016-03-10T11:17:14.459000(00000000-10000002155
PK_ID2371O62FX&2014-01-24:19:09:20HW82LI73&2014-05-11:05:23:23"BDTEST.TEST_TAB_1Ñ??
                                                                                   ?"BDTEST.TEST_TAB_1I42016-02-16 19:17:40.74669942016-03-10T11:17:14.459001(00000000-10000002298
PK_ID3RXZT5VUN&2013-09-04:23:32:56RJ68QYM5&2014-01-22:12:14:30"BDTEST.TEST_TAB_1Ñ??
                                                                                   ?"BDTEST.TEST_TAB_1I42016-02-16 19:17:40.74669942016-03-10T11:17:14.460000(00000000-10000002441
PK_ID4RXZT5VUN&2013-09-04:23:32:56HW82LI73&2014-05-11:05:23:23"BDTEST.TEST_TAB_2?????"BDTEST.TEST_TAB_2I42016-02-16 19:17:40.76289942016-03-10T11:17:14.466000(00000000-10000002926
PK_IDRND_STR_1ACC_DATE7IJWQRO7T&2013-07-07:08:13:52

The initial load has succeeded, and now we can create and start the proper ongoing replication to HDFS through Flume.
Let’s prepare a new parameter file for our permanent Flume replicat and starting it up.

GGSCI (sandbox.localdomain) 2> edit param rflume
REPLICAT rflume
-- Trail file for this example is located in "dirdat/" directory
-- Command to add REPLICAT
-- add replicat rflume, exttrail dirdat/or
TARGETDB LIBFILE libggjava.so SET property=dirprm/flume.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
HANDLECOLLISIONS
MAP ggtest.*, TARGET bdtest.*;
GGSCI (sandbox.localdomain) 1> add replicat rflume, exttrail dirdat/or, begin now
REPLICAT added.
GGSCI (sandbox.localdomain) 2> start replicat rflume
Sending START request to MANAGER ...
REPLICAT RFLUME starting

Let’s insert a row and see what we get on the target system.

orclbd> insert into ggtest.test_tab_1
  2  values (7,dbms_random.string('x', 8), sysdate-(7+dbms_random.value(0,1000)),
  3  dbms_random.string('x', 8), sysdate-(6+dbms_random.value(0,1000))) ;
1 row inserted.
orclbd> commit;
Commit complete.
orclbd>

As soon as commit had been executed we received a couple of new files on HDFS where the first had the schema for the changed table, and the second had the data for the transaction or  “payload”.

[[email protected] ~]# hadoop fs -ls /user/oracle/ggflume
.................
-rw-r--r--   1 flume oracle       1833 2016-03-10 11:17 /user/oracle/ggflume/FlumeData.1457626634620
-rw-r--r--   1 flume oracle       1762 2016-03-10 11:17 /user/oracle/ggflume/FlumeData.1457626634621
-rw-r--r--   1 flume oracle       1106 2016-03-10 11:17 /user/oracle/ggflume/FlumeData.1457626634622
-rw-r--r--   1 flume oracle       1833 2016-03-10 12:43 /user/oracle/ggflume/FlumeData.1457631817021
-rw-r--r--   1 flume oracle        605 2016-03-10 12:43 /user/oracle/ggflume/FlumeData.1457631817022
[[email protected] ~]#
[[email protected] ~]# hadoop fs -cat /user/oracle/ggflume/FlumeData.1457631817021
SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable???:]B?9?k??	]kTSa?m??{
  "type" : "record",
  "name" : "TEST_TAB_1",
  "namespace" : "BDTEST",
  "fields" : [ {
    "name" : "table",
    "type" : "string"
  }, {
    "name" : "op_type",
    "type" : "string"
  }, {
..............................
[[email protected] ~]# hadoop fs -cat /user/oracle/ggflume/FlumeData.1457631817022
{EQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable~,?`?aHTZRB?Sa?ny
  "type" : "record",
  "name" : "generic_wrapper",
  "namespace" : "oracle.goldengate",
  "fields" : [ {
    "name" : "table_name",
    "type" : "string"
  }, {
    "name" : "schema_hash",
    "type" : "int"
  }, {
    "name" : "payload",
    "type" : "bytes"
  } ]
}Sa?nz?"BDTEST.TEST_TAB_1Ñ??
                            ?"BDTEST.TEST_TAB_1I42016-03-10 17:43:31.00169042016-03-10T12:43:33.464000(00000000080001408270
PK_ID7XYJN3Z31&2014-04-21:09:01:21FL6Z8RPN&2013-08-06:21:40:02

I prepared and executed a small regression testing of inserts and updates to the table using jmeter, and started to push inserts and updates with a rate about 29 transaction per second.Even with one flume channel and my small Hadoop environment, it had a pretty good response time without trashing any errors. Flume put about 900 transactions per a HDFS file.

-rw-r--r--   1 flume oracle     123919 2016-03-10 14:52 /user/oracle/ggflume/FlumeData.1457639485465
-rw-r--r--   1 flume oracle      35068 2016-03-10 14:52 /user/oracle/ggflume/FlumeData.1457639485466
-rw-r--r--   1 flume oracle     145639 2016-03-10 14:52 /user/oracle/ggflume/FlumeData.1457639485467
-rw-r--r--   1 flume oracle     178943 2016-03-10 14:52 /user/oracle/ggflume/FlumeData.1457639485468
-rw-r--r--   1 flume oracle     103285 2016-03-10 14:52 /user/oracle/ggflume/FlumeData.1457639485469
[[email protected] Downloads]$ hadoop fs -cat /user/oracle/ggflume/FlumeData.1457639485467 | wc -l
804
[[email protected] Downloads]$ hadoop fs -cat /user/oracle/ggflume/FlumeData.1457639485468 | wc -l
988
[[email protected] Downloads]$ hadoop fs -cat /user/oracle/ggflume/FlumeData.1457639485469 | wc -l
570
[[email protected] Downloads]$

I’ve also tried the “thrift” datasource for Flume and it worked well too. To switch from “avro” to “thrift” I changed the value in the parameter agent.sources.ogg1.type in the flume.conf and restarted the flume agent. You also have to change client.type from default to thrift in your custom-flume-rpc.properties file. It worked fine, and I was able to get the information from the trail and write to the hdfs.

[[email protected] oggbd]$ ./replicat paramfile dirprm/irflume.prm reportfile dirrpt/irflume.rpt
[[email protected] oggbd]$ hadoop fs -ls /user/oracle/ggflume
Found 3 items
-rw-r--r--   1 flume oracle       1833 2016-02-25 16:05 /user/oracle/ggflume/FlumeData.1456434311892
-rw-r--r--   1 flume oracle       1762 2016-02-25 16:05 /user/oracle/ggflume/FlumeData.1456434311893
-rw-r--r--   1 flume oracle       1106 2016-02-25 16:05 /user/oracle/ggflume/FlumeData.1456434311894
[[email protected] oggbd]$
[[email protected] oggbd]$ hadoop fs -cat  /user/oracle/ggflume/FlumeData.1456434311892
SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritableOG?????$?{[email protected]]?{
  "type" : "record",
  "name" : "TEST_TAB_1",
  "namespace" : "BDTEST",
  "fields" : [ {
    "name" : "table",
    "type" : "string"
  }, {
.....
[[email protected] oggbd]$ hadoop fs -cat  /user/oracle/ggflume/FlumeData.1456434311894
SEQ!org.apache.hadoop.io.LongWritable"[email protected]??"BDTEST.TEST_TAB_1Ñ??
                                                                                                              ?"BDTEST.TEST_TAB_1I42016-02-16 19:17:40.74669942016-02-25T16:05:11.480000(00000000-10000002012
PK_ID1371O62FX&2014-01-24:19:09:20RJ68QYM5&2014-01-22:12:14:30"BDTEST.TEST_TAB_1Ñ??
                                                                                   ?"BDTEST.TEST_TAB_1I42016-02-16 19:17:40.74669942016-02-25T16:05:11.498000(00000000-10000002155
PK_ID2371O62FX&2014-01-24:19:09:20HW82LI73&2014-05-11:05:23:23"BDTEST.TEST_TAB_1Ñ??
                                                                                   ?"BDTEST.TEST_TAB_1I42016-02-16 19:17:40.74669942016-02-25T16:05:11.498001(00000000-10000002298
PK_ID3RXZT5VUN&2013-09-04:23:32:56RJ68QYM5&2014-01-22:12:14:30"BDTEST.TEST_TAB_1Ñ??
                                                                                   ?"BDTEST.TEST_TAB_1I42016-02-16 19:17:40.74669942016-02-25T16:05:11.499000(00000000-10000002441
PK_ID4RXZT5VUN&2013-09-04:23:32:56HW82LI73&2014-05-11:05:23:23"BDTEST.TEST_TAB_2?????"BDTEST.TEST_TAB_2I42016-02-16 19:17:40.76289942016-02-25T16:05:11.505000(00000000-10000002926
PK_IDRND_STR_1ACC_DATE7IJWQRO7T&2013-07-07:08:13:52[[email protected] oggbd]$

You can see from the output that in the FlumeData.1456434311894 file we are getting the schema description and in the FlumeData.1456434311894 we have the data from the tables TEST_TAB_1 and TEST_TAB_2.

Let’s try some simple DDL commands.
If we truncate a table:

orclbd> truncate table ggtest.test_tab_1;
Table GGTEST.TEST_TAB_1 truncated.
orclbd>

It is not going to be replicated. If we are altering the table, we are not seeing it as a separate command, but it is going to be reflected in the new schema definition for any new transaction replicated to HDFS. You will get a file with new schema definition and the transaction itself in a next file.

orclbd> alter table ggtest.test_tab_1 add (new1 varchar2(10));
Table GGTEST.TEST_TAB_1 altered.
orcl> insert into ggtest.test_tab_1
  2  values (7,dbms_random.string('x', 8), sysdate-(7+dbms_random.value(0,1000)),
  3  dbms_random.string('x', 8), sysdate-(6+dbms_random.value(0,1000)),'new_col' );
1 row created.
orcl> commit;
Commit complete.
orcl>
[[email protected] oggbd]$ hadoop fs -cat /user/oracle/ggflume/FlumeData.1457117136700
SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable????)0???4(SB?Yc{
  "type" : "record",
  "name" : "TEST_TAB_1",
  "namespace" : "BDTEST",
  "fields" : [ {
    "name" : "table",
    "type" : "string"
........
........
        "name" : "NEW1",
        "type" : [ "null", "string" ],
        "default" : null
      }, {
        "name" : "NEW1_isMissing",
        "type" : "boolean"
........
[[email protected] oggbd]$ hadoop fs -cat /user/oracle/ggflume/FlumeData.1457117136701
........
}SB???"BDTEST.TEST_TAB_1?????"BDTEST.TEST_TAB_1I42016-03-04 18:45:30.00131442016-03-04T13:45:34.156000(00000000000000014363
PK_ID7U09D0CTU&2013-08-15:12:53:50W0BSUWLL&2013-08-16:09:28:12new_col

As I’ve mentioned in my previous post, the deeper investigation of supported DDL changes is going to be the subject of a dedicated blog post. Here we can conclude that the adapter worked as expected, and supported the flow of transactions from our Oracle database down to Flume using Avro and Thrift sources. Of course it is not production implementation, and serves only as a basic functional and elementary regression testing. For a serious production workflow we need to develop appropriate architecture.

In my next few posts I plan to check Kafka and HBASE adapters and see how they work. Stay tuned!

email

Author

Want to talk with an expert? Schedule a call with our team to get the conversation started.

About the Author

Regarded by his peers as an Oracle guru, Gleb is known for being able to resolve any problem related to Oracle. He loves the satisfaction of troubleshooting, and his colleagues even say that seeking Gleb’s advice regarding an issue is more efficient than looking it up. Gleb enjoys the variety of challenges he faces while working at Pythian, rather than working on the same thing every day. His areas of speciality include Oracle RAC, Exadata, RMAN, SQL tuning, high availability, storage, performance tuning, and many more. When he’s not working, running, or cycling, Gleb can be found reading.

6 Comments. Leave new

Vasanth Kumar
May 20, 2016 12:48 am

Hi Gleb,
Firstly I would like to say these are very helpful blogs. All 3 in this series are very helpful.

We have a working GG replication for Oracle to Flume (Hadoop) using 12.1.0 version of GG with an unsupported version of Java Adapter (which was given by Oracle for AVRO support)

Wth GG 12.2.0 we know that GG supports AVRO format. We are working at replacing 12.1.0 with 12,2.0 version of GG, We are looking at AVRO support and DDL replication using 12.2.0.

I followed all the steps as explained in your blog.

Below are my configurations –

FLUME(version 1.6) conf file –
agent1.channels = ch1
agent1.sources = avro-source1
agent1.sinks = log-sink1
agent1.channels.ch1.type = memory

# Define an Avro source called avro-source1 on agent1 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind = n01dol411.tent.trt.csaa.pri
agent1.sources.avro-source1.port = 41414
# Define a logger sink that simply logs all events it receives
# and connect it to the other end of the same channel.
agent1.sinks.log-sink1.channel = ch1
agent1.sinks.log-sink1.type = file_roll
agent1.sinks.log-sink1.sink.directory = /u02/oracle

I am trying to write to a local file system using file_roll instead of hdfs
I can start flume without any issues.

Goldengate file from dirprm –
[email protected]:/u01/media/oracle/Java_Adapter_12.2/dirprm> cat custom-flume-rpc.properties
client.type=default
hosts=h1
hosts.h1=n01dol411.tent.trt.csaa.pri:61415
batch-size=100
connect-timeout=20000
request-timeout=20000

[email protected]:/u01/media/oracle/Java_Adapter_12.2/dirprm> cat flume.props

gg.handlerlist = flumehandler
gg.handler.flumehandler.type=flume
gg.handler.flumehandler.RpcClientPropertiesFile=custom-flume-rpc.properties
gg.handler.flumehandler.format=avro_op
gg.handler.flumehandler.mode=tx
#gg.handler.flumehandler.maxGroupSize=100, 1Mb
#gg.handler.flumehandler.minGroupSize=50, 500 Kb
gg.handler.flumehandler.EventMapsTo=tx
gg.handler.flumehandler.PropagateSchema=true
gg.handler.flumehandler.includeTokens=false
gg.handler.flumehandler.format.WrapMessageInGenericAvroMessage=true

goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE

gg.log=log4j
gg.log.level=INFO

gg.report.time=30sec

gg.classpath=dirprm/:/u01/media/oracle/Flume/apache-flume-1.6.0-bin/lib/*

javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

[email protected]:/u01/media/oracle/Java_Adapter_12.2/dirprm> cat rflume.prm
REPLICAT rflume
TARGETDB LIBFILE libggjava.so SET property=dirprm/flume.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP DATA.*, TARGET DATA.*;
TABLE DATA.*;

I added the replicat as

add replicate RFLUME, exttrail /u03/ogg_acfs/dirdat/mrm/mrmssprf/SP

When I start the replicat, report file shows no data replicated and replicat abends with errors.

Case 1 –
REPLICAT rflume
TARGETDB LIBFILE libggjava.so SET property=dirprm/flume.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP DATA.*, TARGET DATA.*;
–TABLE DATA.*;

I get below errors in report file
2016-05-19 22:14:59 INFO OGG-02243 Opened trail file /u03/ogg_acfs/dirdat/mrm/mrmssprf/SP000000 at 2016-05-19 22:14:59.789352.

2016-05-19 22:14:59 INFO OGG-03506 The source database character set, as determined from the trail file, is we8mswin1252.

2016-05-19 22:14:59 INFO OGG-06506 Wildcard MAP resolved (entry DATA.*): MAP “DATA”.”PC_DATA_UNIQUEID”, TARGET DATA.”PC_DATA_UNIQUEID”.
2016-05-19 22:14:59 ERROR OGG-00423 Could not find definition for DATA.PC_DATA_UNIQUEID.

Case 2 –
REPLICAT rflume
TARGETDB LIBFILE libggjava.so SET property=dirprm/flume.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
–MAP DATA.*, TARGET DATA.*;
TABLE DATA.*;

2016-05-19 22:41:01 INFO OGG-02243 Opened trail file /u03/ogg_acfs/dirdat/mrm/mrmssprf/SP000000 at 2016-05-19 22:41:01.967184.

2016-05-19 22:41:01 INFO OGG-03506 The source database character set, as determined from the trail file, is we8mswin1252.

Errors in report file

2016-05-19 22:41:01 INFO OGG-06506 Wildcard MAP resolved (entry DATA.*): TABLE “DATA”.”PC_DATA_UNIQUEID”.
2016-05-19 22:41:01 ERROR OGG-00204 Missing TARGET specification.

In GG 12.1.0 we were generating source definition file on source (oracle) and using it in the Java User Exit extract. We were using EXTRACT (not REPLICAT) to push data to flume. We were also generating AVSC file for the tables and place them in the HDFS before starting the extract.

My questions are,
Do I have to generate source definition file in 12.2.0 GG? If yes then how are DDLs replicated dynamically?
Do I have to generate AVSC file and place them in a path before I start the JAVA UE EXTRACT or REPLICAT?
Do I have to use a REPLICAT or EXTRACT in 12.2.0 GG?

Please let me know how I can move forward and implement this.

Thanks,
Vasanth

Reply
Gleb Otochkin
May 20, 2016 8:23 am

Hi Vasanth,
I see a couple of potential problems in your configuration.
1. In the custom-flume-rpc.properties file you have hosts.h1=n01dol411.tent.trt.csaa.pri:61415 when in the flume you have port agent1.sources.avro-source1.port = 41414.

2. The second is about your replicat and trail files.
Is your trail /u03/ogg_acfs/dirdat/mrm/mrmssprf/SP generated by 12.2 goldengate extract? In 12.2 we have definitions for the table in the trail file. Looking to the log it seems the replicat cannot find any table definitions in the trail. Have you tried ASSUMETARGETDEFS parameter?
The second configuration for the replicat doesn’t have target specification.

And here I will try to answer to your questions:
We don’t need source definition file if your extract on source side is 12.2, the definitions will be included to trail file. And in case of DDL you going to get a new definitions in your Avro file. It is dynamic.

I haven’t tried mix configuration like 12.1 on source and 12.2 on target and cannot say without testing how it will work. I can guess but prefer test it first.

You need to use replicat for target side (where you Flume is).


Gleb

Reply
Gleb Otochkin
May 20, 2016 3:17 pm

Hi Vasanth,
So, if you have source extract 11 or 12.1 you can use defgen utility and create a file on the source with definitions for your tables.
That file will work for Big Data goldengate and you will be able to replicate DML. But it means also no dynamic DDL replication in that case.
I am going to write a short blog showing how to do that.

Thanks,
Gleb

Reply
Vasanth Kumar
May 23, 2016 6:55 pm

Hi Gleb,
Like you mentioned I was trying to use the trail files create by 12.1 goldengate. After I started using 12.2 goldengate I was able to repliciate data.
However Flume is not creating multiple files as mentioned in your blog. It is creating creating 1 file for all the tables and for both DDL and DML. Is this because I am using file_roll as sink type and not hdfs?
How can I create a separate file for each table and how can I create separate files for schema description and table data?

Reply
Vasanth Kumar
May 23, 2016 7:03 pm

My configuration files,

Extract on Oracle db –
extract ggext
userid [email protected], PASSWORD ggsusr123
–RMTHOSTOPTIONS
RMTHOST n01dol411.tent.trt.csaa.pri, MGRPORT 7809
RMTFILE /u01/media/oracle/Java_Adapter_12.2/dirdat/EP, MEGABYTES 2, PURGE
DDL include objname gg_test.*
TABLE gg_test.*;

Flume config file –
# Name/aliases for the components on this agent
#
agent1.sources = avro-source1
agent1.sinks = log-sink1
agent1.channels = ch1
#
# #Avro source
#
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind = sandbox
agent1.sources.avro-source1.port = 41414
#
#
# # Describe the sink
#
agent1.sinks.log-sink1.type = file_roll
agent1.sinks.log-sink1.sink.directory = /u02/oracle/
agent1.sinks.log-sink1.sink.rollInterval=0
agent1.sinks.log-sink1.serializer=avro_event
#
# # Use a channel which buffers events in memory
#
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 100000
agent1.channels.ch1.transactionCapacity = 10000
#
# # Bind the source and sink to the channel
#
agent1.sources.avro-source1.channels = ch1
agent1.sinks.log-sink1.channel = ch1

flume.props –
gg.handlerlist = flumehandler
gg.handler.flumehandler.type=flume
gg.handler.flumehandler.RpcClientPropertiesFile=custom-flume-rpc.properties
gg.handler.flumehandler.format=avro_op
gg.handler.flumehandler.mode=tx
#gg.handler.flumehandler.maxGroupSize=100, 1Mb
#gg.handler.flumehandler.minGroupSize=50, 500 Kb
gg.handler.flumehandler.EventMapsTo=tx
gg.handler.flumehandler.PropagateSchema=true
gg.handler.flumehandler.includeTokens=false
gg.handler.flumehandler.format.WrapMessageInGenericAvroMessage=true

goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE

gg.log=log4j
gg.log.level=INFO

gg.report.time=30sec

gg.classpath=dirprm/:/u01/media/oracle/Flume/apache-flume-1.6.0-bin/lib/*

javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

custom-flume-rpc.properties –
client.type=default
hosts=h1
hosts.h1=sandbox:41414
batch-size=100
connect-timeout=20000
request-timeout=20000

Replicat param file –
REPLICAT rflume
— Trail file for this example is located in “AdapterExamples/trail” directory
— Command to add REPLICAT
— add replicat rflume, exttrail AdapterExamples/trail/tr
TARGETDB LIBFILE libggjava.so SET property=dirprm/flume.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP GG_TEST.*, TARGET GG_TEST.*;

Reply
Gleb Otochkin
May 31, 2016 11:08 am

Hi Vasanth,
I think it is limitation of the file_roll sink. As far as I can see you can either setup interval for file rolling or prevent it from rolling at all. I don’t see an option to split the events and make them go to different files.
What you can try is to use Kafka as intermediate step. Kafka adapter can split flow for schema changes and data flow to different topics. Having that you can define your flume file roll for different sources where one will be kafka for schema changes and another will be for data flow.


Gleb

Reply

Leave a Reply

Your email address will not be published. Required fields are marked *