An effective approach to migrate dynamic thrift data to CQL, part 3

Posted in: Cassandra, Technical Track

Note that this post is Part 3 of a 3-part blog series. If you haven’t read Part 1 and Part 2 of this series, please do so before continuing. Part 1 and 2 give some background knowledge on the problem we’re trying to solve, as well as some fundamental concepts used in the following discussion. The chapter number sequencing also follows that from Part 1 and Part 2.

5. Static and Dynamic Cassandra Table Definition

From the analysis in Chapter 4, we can see that the problem we were originally facing comes from the fact that the table in question is defined and operated in mixed Thrift mode: some of the columns are pre-defined statically and other columns are added on the fly by a Thrift based application. In such a case, when we examine the table directly using CQL based tools like cqlsh utility or COPY command, the dynamic column part is missing. So the question becomes how can we migrate data in a dynamic (Thrift) column into a static schema based CQL table?

5.1. CQL Table Schema that Can Host Dynamic Column Data

As the first step of the solution, we need to design a CQL table schema, although static, that can manage dynamically generated data.
As briefly mentioned in Chapter 4, the dynamism in CQL is achieved through mechanisms like clustering column(s) and/or more complex data types like collections or UDTs. For the problem we addressed, using a map collection is a more appropriate approach because the dynamic columns generated by the Thrift application are quite different and correspond to different logical entities. Clustering column is not suitable for our problem.
So for general discussion purpose, let’ assume that in our problem, the static part of the table definition (in Thrift way) is something like below (“xxxn” represents column validator class type).

create column family some_tbl_name
    with key_validation_class = BytesType
     and comparator = UTF8Type
     and column_metadata = [
       {column_name: StaticColName1, validation_class: xxx1},
       {column_name: StaticColName2, validation_class: xxx2}
       {column_name: StaticColName3, validation_class: xxx3}
       … …
     ]

Meanwhile, for each row, the Thrift application is generating some dynamic columns. Note that different rows may have different dynamic columns. To simplify our discussion, we’re going to use the following terms to refer to the static and dynamic sets of columns for each row, respectively.

StaticColNameSet = {StaticColName1, StaticColName2, StaticColName3, …}
DynamicColNameSet = {DynamicColName1, DynmaicColName2, DyanmicColName3, …}

In our CQL table design, we use a CQL column of map collection type to manage all dynamic Thrift columns and leave static Thrift columns as is in the CQL table schema, something like below (yyyn represents the proper CQL column types that correspond to the original Thrift column validator types)

CREATE TABLE some_tbl_name (
    key blob,
    StaticColName1 yyy1,
    StaticColName2 yyy2,
    StaticColName3 yyy3,
    … …,
    DynamicColMapName map<text,text>
    PRIMARY KEY (key)
)

The map key is of TEXT type and each key corresponds to a dynamic Thrift column name. The map value corresponds to the actual value of a dynamic Thrift column and it should have BLOB type as the general case. But for our particular problem, all Thrift dynamic columns have TEXT values, so in our design the map is of type <TEXT, TEXT>.

5.2. A Natural Approach to Migrate Dynamic Data

Once the CQL table schema is in place, the next step is to develop an ETL program to transfer data from the Thrift table into the CQL table.
A natural approach is to let the program set up connect sessions to both tables (the source Thrift table and the target CQL table). For each Thrift row, do data transformation (put dynamic data in a map) and insert the transformed data as a new row in the CQL table. This approach, although working correctly, is less efficient, especially considering that the source Thrift table contains millions of rows.
We’re not going to cover the details of this approach in the post. It is listed here simply for the purpose of comparing it with the more effective approach as we’ll describe in the next section.

5.3. A More Effective Approach to Migrate Dynamic Data

Cassandra offers a highly efficient bulk-loading technique using “sstableloader”. This technique utilizes the core Cassandra engine code to generate SSTable file formats directly from any source of data to be bulk loaded. Since Cassandra engine is written in Java. This technique is only directly available to Java programs.

In order to use this technique, one needs to write a custom Java program that calls the functionalities provided by Cassandra core Java class SSTableSimpleUnsortedWriter which can generate raw SSTable files directly. In order to run the Java program, the JAR file for Apache Cassandra itself needs to be included in the Java class path.

With Cassandra’s trending change from Thrift to CQL, a new SSTable writer class, called CQLSSTableWriter, has been introduced in later version of Cassandra to generate raw SSTable file formats using CQL.

Our improved approach adopts this technique to do a very fast data migration from dynamic Thrift tables into a CQL table. At very high level, this approach has two steps:

  • Fetch Thrift table rows and for each row transform dynamic column data into a map item. Call CQLSSTableWriter functionality to generate SSTable files directly from the transformed data.
  • Use sstableloader utility to load the generated SSTable files in the CQL table.

The second step is fast and fairly straightforward by simply calling the Cassandra utility.

Conceptually, the first step is similar to the simple approach as described in the previous section, except that instead of maintaining a live connection to a CQL table and inserting each transformed row on the fly, this approach directs the transformed data of each row into a CQLSSTableWriter instance. The instance is based on Java non-blocking IO (NIO) buffers and is automatically flushed to disk (in SSTable raw format) when the buffer is full.

The highly simplified pseudo code is as below, for the purpose of clarifying the core idea.

writer = CQLSSTableWriter.build()
thrift_rows = thirft_conn.fetchRows()
for each row in thrift_rows() {
    keyValue = row.getKey()
    dynColMap = new Map()
    thirft_columns = row.getColumns()
    for each column in thirft_columns {
        colName = column.getName()
        if colName is in DynamicColNameSet {
            dynColMap.addKeyValuePair(column.name, column.value)
        }
        else {
            staticColVal1 = column.getValue(StaticColName1)
            staticColVal2 = column.getValue(StaticColName2)
            … … …
            }
        }
    }
    writer.addRow(keyValue, staticColVal1, staticColVal2, …, dynColMap)
}
writer.close()

5.3.1. Testing Result

Adopting this approach, our actual Java program implementation shows an amazing ETL performance boost from several hours (using simple approach) to several minutes to process totally about 1.2 million records from a Thrift table with dynamically generated columns into a CQL table with a map collection column that holds the dynamic Thrift data.

The Java program itself is only responsible for processing and transforming the source Thrift table into raw SSTable files that match the target CQL table schema. The actual data loading work is done through Cassandra sstableloader utility.

The Java program processes the source data in batches of 20000 rows each batch. When one batch is done, the program prints out statistics about the batch execution time. When all data is finished processing, it prints out the information about 1) total number of rows processed and 2) the total execution time. The test result is as below:

Fetching result from source table and generate SSTable ...
-------------------------
> batch_run: 1;   batch_size: 20001;   batch_runtime: PT5.594S
> batch_run: 2;   batch_size: 20000;   batch_runtime: PT4.368S
> batch_run: 3;   batch_size: 20000;   batch_runtime: PT3.937S
> batch_run: 4;   batch_size: 20000;   batch_runtime: PT3.838S
> batch_run: 5;   batch_size: 20000;   batch_runtime: PT2.015S
… … …
> batch_run: 59;   batch_size: 20000;   batch_runtime: PT1.941S
> batch_run: 60;   batch_size: 12581;   batch_runtime: PT2.82S
#### Total Records Processed: 1192582;  Total Run Time: PT2M35.178S

Once the raw SSTable files conforming are generated by the Java program, using Cassandra “sstableloader” utility to load the data into the target CQL table takes about less than 1 minute to complete. Since this is a standard Cassandra utility, we’re not going to paste the detail execution log here except the summary:

$ sstableloader -d <node_ip> <sstable_root>/<keyspace_name>/<target_cql_table_name>/
… … …
Summary statistics:
       Connections per host:         : 1
       Total files transferred:      : 15
       Total bytes transferred:      : 511566477
       Total duration (ms):          : 47451
       Average transfer rate (MB/s): : 10
       Peak transfer rate (MB/s):    : 20

6. Summary and Conclusion

In this blog post series, we explored an approach to effectively migrate Cassandra data from Thrift to CQL. The storage engine discussion in this post is mainly for Cassandra version 2.2 and before because since Cassandra version 3.0 the storage engine has been rewritten quite a bit. However, the data migration approach we explored in this post should apply to the new engine as well.  This is because in pre-3.0 Cassandra version, CQL provides a high level abstraction on top of the old storage engine. But in version 3.0 and beyond, the CQL representation of the Cassandra storage model (e.g. clustering) is directly implemented at the storage engine layer. From the usage and application data modeling perspective, CQL remains the same.

email

Author

Interested in working with Yabin? Schedule a tech call.

No comments

Leave a Reply

Your email address will not be published. Required fields are marked *