Recursion in Hive – part 1

Posted in: Big Data, Hadoop, Technical Track

I am going to start this new series of blog posts talking about code migration use cases. We will talk about migration from RDBMS to Hive keeping the simplicity and flexibility of a SQL approach.

The first case is about recursive SQL. In most of the situations for RDBMS it covered by recursive queries by using a “with” clause. Though, unfortunately it’s not yet supported in Hive;.

Let’s consider the following scenario. We have PRODUCTs and STATEs. STATEs make the forest of trees structure. The facts are combinations of PRODUCTs and STATEs which may have some data. Here are the simplified DDLs:

```create table t_product (
product string);
create table t_state (
state string,
next_state string);
create table t_big_data (
product string,
state string,
data string);
```

The task is: for an input set of pairs (PRODUCT, STATE) try to find the next available STATE with data in fact table or return NULL.

The input data is stored in t_input table:

```create table t_input (
product string,
state string);
```

We need to populate t_output table:

```create table t_output (
product string,
state string,
found_state string,
data string);
```

Here are various methods to solve this: procedural approach with recursive functions, recursive SQL, multi-joins (in case we know the max depth).

The most reasonable for the modern RDBMS supporting recursive queries would be something like this:

```insert into t_output(product, state, found_state, data)
with rec (product, state1, state2, data) as (
select i.product, i.state, i.state, d.data
from t_input i
left join t_big_data d
on d.product = i.product and d.state = i.state
union all
select r.product, r.state1, s.next_state, b.data
from rec r
join t_state s
on s.state = r.state2
left join t_big_data d
on d.product = r.product and d.state = s.next_state
where r.data is null
)
select product, state1 as state, state2 as found_state, data
from rec
where data is not null
or state2 is null;
```

RDBMS can make a good execution plan for such queries especially if there are correct indexes on t_big_data table. We could do a multi-join approach in Hive but the cost for each big table scan it too high.

The trick we will use here is based on an observation that a tree structure is usually relatively small in comparison with a data table. So that we can easily “expand” a tree into flat denormalized structure: for each STATE from the initial table, we keep all STATES and path length on the way to root. For example, for the following simple tree:

```STATE NEXT_STATE
—– ———-
S1    NULL
S2    S1
S3    S1
S4    S2
```

We would have:

```STATE1 STATE2 LEVEL
—— —— —–
S1     S1     0
S2     S2     0
S2     S1     1
S3     S3     0
S3     S1     1
S4     S4     0
S4     S2     1
S4     S1     2
```

Using the RDBMS recursive queries we would create this table as:

```create table t_expand_state (
state1 string,
state2 string,
lvl integer);
insert into t_expand_state (state1, state2, lvl)
with rec (state1, state2, lvl) as (
select state, state, 0
from t_state
union all
select r.state1, s.next_state, r.lvl + 1
from rec r
join t_state s
on r.state2 = s.state
where s.next_state is not null
)
select * from rec;
```

For Oracle DB we could do this with “connect by”:

```select connect_by_root state state1, state as state2, level-1 lvl
from t_state
connect by prior next_state = state;
```

Having this t_expand_state table we can rewrite out query as:

```insert into t_output(product, state, found_state, data)
select t.product, t.state,
case when t.min_lvl_with_data is not null then t.state2 end,
t.data
from (
select i.product, i.state, s.state2, s.lvl, d.data,
min(case when d.data is not null then lvl end)
over(partition by i.product_id, i.state) min_lvl_with_data
from t_input i
join t_expand_state s
on s.state1 = i.state
left join t_big_data d
on d.product = i.product and d.state = s.state2) t
where t.lvl = t.min_lvl_with_data
or (t.lvl = 0 and t.min_lvl_with_data is null);
```

This solution has its specific edge cases of inefficiency:
— big t_state that produce abnormal t_expand_state table;
— dense t_big_data table: so that during the query execution it has to keep a lot of extra-rows with “data” for states we don’t need;
— big t_input: joining it by all “next states” would inflate dataset.
But for practical use t_input is usually relatively small and there isn’t much overhead for getting extra-data for the next states. Another advantage is that we scan t_big_data only once.

To reach our goal the only task left is: how to build t_expand_state in Hive without recursion? Well we surely may consider multi-joins once again, but my choice is to use: UDTF.

In order to make recursion more natural I implemented this function using Scala. In the ExpandTreeUDTF we store tree structure in a mutable map during the “process” method call. After that it expands this map using memoization.

```class ExpandTree2UDTF extends GenericUDTF {
var inputOIs: Array[PrimitiveObjectInspector] = null
val tree: collection.mutable.Map[String,Option[String]] = collection.mutable.Map()
override def initialize(args: Array[ObjectInspector]): StructObjectInspector = {
inputOIs = args.map{_.asInstanceOf[PrimitiveObjectInspector]}
val fieldNames = java.util.Arrays.asList("id", "ancestor", "level")
val fieldOI = primitive.PrimitiveObjectInspectorFactory.javaStringObjectInspector.asInstanceOf[ObjectInspector]
val fieldOIs = java.util.Arrays.asList(fieldOI, fieldOI, fieldOI)
ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}
def process(record: Array[Object]) {
val id = inputOIs(0).getPrimitiveJavaObject(record(0)).asInstanceOf[String]
val parent = Option(inputOIs(1).getPrimitiveJavaObject(record(1)).asInstanceOf[String])
tree += ( id -> parent )
}
def close {
val expandTree = collection.mutable.Map[String,List[String]]()
def calculateAncestors(id: String): List[String] =
tree(id) match { case Some(parent) => id :: getAncestors(parent) ; case None => List(id) }
def getAncestors(id: String) = expandTree.getOrElseUpdate(id, calculateAncestors(id))
tree.keys.foreach{ id => getAncestors(id).zipWithIndex.foreach{ case(ancestor,level) => forward(Array(id, ancestor, level)) } }
}
}
```

Having this we may compile it to jar, add it to Hive, create function and use it to build t_expand_state table.

```create function expand_tree as ‘com.pythian.nikotin.scala.ExpandTreeUDTF’;
insert ovewrite table t_expand_state (state1, state2, lvl)
select expand_tree(state, next_state) from t_state;
```

Author

Want to talk with an expert? Schedule a call with our team to get the conversation started.

Valentin Nikotin

Valentin is a specialist in Big Data and Cloud solutions. He has extensive expertise in Cloudera Hadoop Distribution, Google Cloud Platform and skilled in building scalable performance critical distributed systems and data visualization systems.

Hi,

I am getting the below error while trying to run the above scala UDTF:

OK
at scala.collection.mutable.HashTable\$HashUtils\$class.elemHashCode(HashTable.scala:398)
at scala.collection.mutable.HashMap.elemHashCode(HashMap.scala:39)
at scala.collection.mutable.HashTable\$class.findEntry(HashTable.scala:130)
at scala.collection.mutable.HashMap.findEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.get(HashMap.scala:69)
at scala.collection.mutable.MapLike\$class.getOrElseUpdate(MapLike.scala:187)
at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91)

Valentin Nikotin
February 21, 2017 4:19 am

It seems you have a cycle loop in your data.

Many Thanks Valentin for your prompt reply. I had an issue of cyclic loop in my data. Sorted now

Hi Ishan, could you please let me know how did you resolve cyclic loop?

Ishan Sharma
March 1, 2017 3:23 am

Hi Valentin,

How can we amend the above code to get paths as well in addition to levels? Can we implement something similar to SYS_CONNECT_BY_PATH in ORACLE?

Regards,
Ishan

Tom Christopher
May 15, 2017 9:47 pm

Hi, Thanks for this code. It seems to work on small data. But on big data, I am getting the below run time error. Any ideas?
Error: java.lang.RuntimeException: Hive Runtime Error while closing operators
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at scala.collection.MapLike\$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
at ExpandTreeUDTF.ExpandTreeUDTF\$\$calculateAncestors\$1(ExpandTreeUDTF.scala:45)
at ExpandTreeUDTF\$\$anonfun\$ExpandTreeUDTF\$\$getAncestors\$1\$1.apply(ExpandTreeUDTF.scala:46)
at ExpandTreeUDTF\$\$anonfun\$ExpandTreeUDTF\$\$getAncestors\$1\$1.apply(ExpandTreeUDTF.scala:46)
at scala.collection.mutable.MapLike\$class.getOrElseUpdate(MapLike.scala:194)
at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80)
at ExpandTreeUDTF.ExpandTreeUDTF\$\$getAncestors\$1(ExpandTreeUDTF.scala:46)
at ExpandTreeUDTF.ExpandTreeUDTF\$\$calculateAncestors\$1(ExpandTreeUDTF.scala:45)
at ExpandTreeUDTF\$\$anonfun\$ExpandTreeUDTF\$\$getAncestors\$1\$1.apply(ExpandTreeUDTF.scala:46)
at ExpandTreeUDTF\$\$anonfun\$ExpandTreeUDTF\$\$getAncestors\$1\$1.apply(ExpandTreeUDTF.scala:46)
at scala.collection.mutable.MapLike\$class.getOrElseUpdate(MapLike.scala:194)

Valentin Nikotin
May 18, 2017 7:57 am

Check id=0318188127, it seems there is no information about it’s “parent” in the source table.

Is there any we can this type of datasets, I have a dataset where in some cases parentid is not availalbe?

Hi, Valentin

I’d like to know what is the source code license.
If it’s MIT license, I’m very happy.

Valentin Nikotin
May 31, 2017 4:15 am

Hi, there is no specific licence for this code, feel free to use it with MIT licence.

Thank you Valentin !

Hi Ishan,

Please can you post the code here.

Hello,

val fieldOI = primitive.PrimitiveObjectInspectorFactory.javaStringObjectInspector.asInstanceOf[ObjectInspector]

In this line, what variable is primitive? You help is much appreciated.

Hello Valentin,

Thanks for your help. When I try to create the function I get this error

The hive version we are using is Hive 2.1.1-amzn-0. Would this be an issue for the code? Please let me know if I need to download any sbt to resolve this.

I built my SBT using

libraryDependencies += “org.apache.hive” % “hive-exec” % “2.1.1”

I am getting this error FAILED: Execution Error, return code -101 from org.apache.hadoop.hive.ql.exec.FunctionTask. scala/collection/Seq

Thanks

Hi Valentin, Can you provide JAVA code equivalent to Scala code you have written. I am looking for some use case that show the way how recursion (connect by prior) function implemented in Oracle ,equivalent is implemented in HIVE with examples.

Valentin Nikotin
November 10, 2017 4:21 pm

Hi Anuj,

I don’t have java code for this, but feel free to share you attempts and issues you have been faced. It should be almost identical code.

Kaveh Hariri
April 27, 2018 6:29 pm

Here is a java version of Valentin’s code, it ran very quickly, but my states table was 260k rows which exploded to around 800k rows.

import java.util.*;

public class mainrun extends GenericUDTF{
PrimitiveObjectInspector[] inputOIs = null;
Map tree = new HashMap();

@Override
public StructObjectInspector initialize(ObjectInspector[] args){
inputOIs = new PrimitiveObjectInspector[args.length];
for(int i =0; i<inputOIs.length; i++){
inputOIs[i] = (PrimitiveObjectInspector)args[i];
}
List fieldNames = Arrays.asList(“id”,”ancestor”, “level”);
ObjectInspector fieldOI = (ObjectInspector) PrimitiveObjectInspectorFactory.javaStringObjectInspector;
List fieldIOs = Arrays.asList(fieldOI,fieldOI,fieldOI);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldIOs);
}
public void process(Object[] record){
String tmpId = (String) inputOIs[0].getPrimitiveJavaObject(record[0]);
String id = (tmpId == null || tmpId.trim().equalsIgnoreCase(“”) ? null : tmpId.trim());
String tmpParent = (String) inputOIs[1].getPrimitiveJavaObject(record[1]);
String parent = (tmpParent == null || tmpParent.trim().equalsIgnoreCase(“”) ? null : tmpParent.trim());
tree.put(id,parent);

}

public List calculateAncestors(String id, Map<String,List> expandTree){
if(tree.get(id) == null) {
List rt = new ArrayList();
return rt;
}
else{
List rt = getAncestors(tree.get(id),expandTree);
return rt;
}
}

public List getAncestors(String id, Map<String,List> expandTree){
List parent = expandTree.get(id);
if(parent == null || parent.isEmpty()){
return calculateAncestors(id,expandTree);
}
else {
return parent;
}
}

public void close() throws HiveException {
Map<String,List> expandTree = new HashMap();

for(String id : tree.keySet()){
List al = new ArrayList();
List tmp = getAncestors(id,expandTree);
for(int i =0; i < tmp.size(); i++){
}
for(int i =0; i<al.size(); i++){
forward(new String[]{id,al.get(i).ancestor,al.get(i).level});
}
}
}
}

Hi,

This UDTF works for the data set provided, but does not work on my data set. In a distributed environment, can we be sure that each container (mapper) has access to all the data stored in the tree hashmap?

I tried this, but got below error when creating Temporary Hive function. Can somebody please send me the completed code including import statements, and build.sbt file?

hive> create temporary function hierlevel AS ‘com.skurra.hive.udf.hierarchy’ ;
java.lang.NoClassDefFoundError: scala/collection/Seq
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)