Partitioning a large table is general practice for a few reasons:
- Improving query efficiency by avoiding to transfer and process unnecessary data.
- Improving data lineage by isolating batches of ingestion, so if a ingestion batch failed for some reason and introduces some corrupted data, it’s safe to re-ingest the data
With that being said this practice often results in a table with a lot of partitions, which makes querying a full table or a large part of it a very slow operation. It also makes the Hive client executing the query “memory hungry”. This is mainly caused by how Hive processes a query. Before generating a query plan, the Hive client needs to read the metadata of all partitions. That means a lot of RPC round trips between the Hive client and Hadoop namenode, as well as RDBMS transactions between the Hive client and metastore. It’s a slow process and also consumes a lot of memory. A simple experiment using Hive-0.12 shows that it takes around 50KB heap space to store all data structures for each partition. Below are two examples from a heap dump of a Hive client executing a query which touches 13k+ partitions.
We can set HADOOP_HEAPSIZE in hive-env.sh to a larger number to keep ourself out of trouble. The HADOOP_HEAPSIZE will be passed as -Xmx argument to JVM. But if we want to run multiple Hive queries at the same time on the same machine, we will run out of memory very quickly. Another thing to watch out when increasing the heap size is: if the parallel GC is used for the JVM, which is the default option for Java server VM, and if the maximum GC pause time isn’t set properly, a Hive client dealing with a lot of partitions will quickly increase its heap size to the maximum and never shrink the heap size down.
Another potential problem of querying a large amount of partitions is that Hive uses CombineHiveInputFormat by default, which instructs Hadoop to combine all input files which are smaller than “split size” into splits. The algorithm used to do the combining is “greedy”. It bins larger files into splits first, then smaller ones. So the “last” couple of splits combined usually have a huge amount (depends on how unevenly the size of input files is distributed) of small files in them. As a result, those “unlucky” map tasks which get these splits will be very slow compared to other map tasks and consume a lot of memory to collect and process metadata of input files. Usually you can tell how bad the situation is by comparing SPLIT_RAW_BYTES counters of map tasks.
A possible solution to this problem is creating two versions of that table: one partitioned, and one non-partitioned. The partitioned one is still populated as the way it is. The non-partitioned one can be populated in parallel with the partitioned one by using “INSERT INTO”. One disadvantage of the non-partitioned version is it’s harder to be revised if corrupted data is found in it because in that case the whole table has to be rewritten. Though, starting with hive 0.14, updating and deleting SQL statements are allowed for tables stored in ORC format. Another possible problem of the non-partitioned version is that the table may contain a large number of small files on HDFS, because every “INSERT INTO” will create at least one file. As the number of files in the table increases, querying to the table slows down. So a periodical compaction is recommended to decrease the number of files in a table. It can be done by simply executing “INSERT OVERWRITE SELECT * FROM” periodically. You need to make sure no other inserts are being executed at the same time or data loss will occur.
Learn more about Pythian’s expertise in Big Data.
Interested in working with Kai? Schedule a tech call.