Recursion in Hive – part 1

Posted in: Big Data, Hadoop, Technical Track

 

I am going to start this new series of blog posts talking about code migration use cases. We will talk about migration from RDBMS to Hive keeping the simplicity and flexibility of a SQL approach.

The first case is about recursive SQL. In most of the situations for RDBMS it covered by recursive queries by using a “with” clause. Though, unfortunately it’s not yet supported in Hive;.

Let’s consider the following scenario. We have PRODUCTs and STATEs. STATEs make the forest of trees structure. The facts are combinations of PRODUCTs and STATEs which may have some data. Here are the simplified DDLs:

create table t_product (
  product string);
create table t_state (
  state string,
  next_state string);
create table t_big_data (
  product string,
  state string,
  data string);

The task is: for an input set of pairs (PRODUCT, STATE) try to find the next available STATE with data in fact table or return NULL.

The input data is stored in t_input table:

create table t_input (
  product string,
  state string);

We need to populate t_output table:

create table t_output (
  product string,
  state string,
  found_state string,
  data string);

Here are various methods to solve this: procedural approach with recursive functions, recursive SQL, multi-joins (in case we know the max depth).

The most reasonable for the modern RDBMS supporting recursive queries would be something like this:

insert into t_output(product, state, found_state, data)
with rec (product, state1, state2, data) as (
  select i.product, i.state, i.state, d.data
  from t_input i
  left join t_big_data d
    on d.product = i.product and d.state = i.state
  union all
  select r.product, r.state1, s.next_state, b.data
  from rec r
  join t_state s
    on s.state = r.state2
  left join t_big_data d
    on d.product = r.product and d.state = s.next_state
  where r.data is null
)
select product, state1 as state, state2 as found_state, data
from rec
where data is not null
   or state2 is null;

RDBMS can make a good execution plan for such queries especially if there are correct indexes on t_big_data table. We could do a multi-join approach in Hive but the cost for each big table scan it too high.

The trick we will use here is based on an observation that a tree structure is usually relatively small in comparison with a data table. So that we can easily “expand” a tree into flat denormalized structure: for each STATE from the initial table, we keep all STATES and path length on the way to root. For example, for the following simple tree:

STATE NEXT_STATE
—– ———-
S1    NULL
S2    S1
S3    S1
S4    S2

We would have:

STATE1 STATE2 LEVEL
—— —— —–
S1     S1     0
S2     S2     0
S2     S1     1
S3     S3     0
S3     S1     1
S4     S4     0
S4     S2     1
S4     S1     2

Using the RDBMS recursive queries we would create this table as:

create table t_expand_state (
  state1 string,
  state2 string,
  lvl integer);
insert into t_expand_state (state1, state2, lvl)
with rec (state1, state2, lvl) as (
  select state, state, 0
  from t_state
  union all
  select r.state1, s.next_state, r.lvl + 1
  from rec r
  join t_state s
    on r.state2 = s.state
  where s.next_state is not null
)
select * from rec;

For Oracle DB we could do this with “connect by”:

select connect_by_root state state1, state as state2, level-1 lvl
from t_state
connect by prior next_state = state;

Having this t_expand_state table we can rewrite out query as:

insert into t_output(product, state, found_state, data)
select t.product, t.state,
       case when t.min_lvl_with_data is not null then t.state2 end,
       t.data
from (
  select i.product, i.state, s.state2, s.lvl, d.data,
         min(case when d.data is not null then lvl end)
           over(partition by i.product_id, i.state) min_lvl_with_data
  from t_input i
  join t_expand_state s
    on s.state1 = i.state
  left join t_big_data d
    on d.product = i.product and d.state = s.state2) t
  where t.lvl = t.min_lvl_with_data
     or (t.lvl = 0 and t.min_lvl_with_data is null);

This solution has its specific edge cases of inefficiency:
— big t_state that produce abnormal t_expand_state table;
— dense t_big_data table: so that during the query execution it has to keep a lot of extra-rows with “data” for states we don’t need;
— big t_input: joining it by all “next states” would inflate dataset.
But for practical use t_input is usually relatively small and there isn’t much overhead for getting extra-data for the next states. Another advantage is that we scan t_big_data only once.

To reach our goal the only task left is: how to build t_expand_state in Hive without recursion? Well we surely may consider multi-joins once again, but my choice is to use: UDTF.

In order to make recursion more natural I implemented this function using Scala. In the ExpandTreeUDTF we store tree structure in a mutable map during the “process” method call. After that it expands this map using memoization.

class ExpandTree2UDTF extends GenericUDTF {
  var inputOIs: Array[PrimitiveObjectInspector] = null
  val tree: collection.mutable.Map[String,Option[String]] = collection.mutable.Map()
  override def initialize(args: Array[ObjectInspector]): StructObjectInspector = {
    inputOIs = args.map{_.asInstanceOf[PrimitiveObjectInspector]}
    val fieldNames = java.util.Arrays.asList("id", "ancestor", "level")
    val fieldOI = primitive.PrimitiveObjectInspectorFactory.javaStringObjectInspector.asInstanceOf[ObjectInspector]
    val fieldOIs = java.util.Arrays.asList(fieldOI, fieldOI, fieldOI)
    ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
  }
  def process(record: Array[Object]) {
    val id = inputOIs(0).getPrimitiveJavaObject(record(0)).asInstanceOf[String]
    val parent = Option(inputOIs(1).getPrimitiveJavaObject(record(1)).asInstanceOf[String])
    tree += ( id -> parent )
  }
  def close {
    val expandTree = collection.mutable.Map[String,List[String]]()
    def calculateAncestors(id: String): List[String] =
      tree(id) match { case Some(parent) => id :: getAncestors(parent) ; case None => List(id) }
    def getAncestors(id: String) = expandTree.getOrElseUpdate(id, calculateAncestors(id))
    tree.keys.foreach{ id => getAncestors(id).zipWithIndex.foreach{ case(ancestor,level) => forward(Array(id, ancestor, level)) } }
  }
}

Having this we may compile it to jar, add it to Hive, create function and use it to build t_expand_state table.

create function expand_tree as ‘com.pythian.nikotin.scala.ExpandTreeUDTF’;
insert ovewrite table t_expand_state (state1, state2, lvl)
select expand_tree(state, next_state) from t_state;

 

 

email

Interested in working with Valentin? Schedule a tech call.

About the Author

Valentin is a specialist in Big Data and Cloud solutions. He has extensive expertise in Cloudera Hadoop Distribution, Google Cloud Platform and skilled in building scalable performance critical distributed systems and data visualization systems.

21 Comments. Leave new

Hi,

I am getting the below error while trying to run the above scala UDTF:

OK
Exception in thread “main” java.lang.StackOverflowError
at scala.collection.mutable.HashTable$HashUtils$class.elemHashCode(HashTable.scala:398)
at scala.collection.mutable.HashMap.elemHashCode(HashMap.scala:39)
at scala.collection.mutable.HashTable$class.findEntry(HashTable.scala:130)
at scala.collection.mutable.HashMap.findEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.get(HashMap.scala:69)
at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:187)
at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91)

Could you please advise how to overcome the error?

Reply
Valentin Nikotin
February 21, 2017 4:19 am

It seems you have a cycle loop in your data.

Reply

Many Thanks Valentin for your prompt reply. I had an issue of cyclic loop in my data. Sorted now

Reply

Hi Ishan, could you please let me know how did you resolve cyclic loop?

Reply

Hi,Could you please paste java version of your code?

Reply
Ishan Sharma
March 1, 2017 3:23 am

Hi Valentin,

How can we amend the above code to get paths as well in addition to levels? Can we implement something similar to SYS_CONNECT_BY_PATH in ORACLE?

Regards,
Ishan

Reply
Tom Christopher
May 15, 2017 9:47 pm

Hi, Thanks for this code. It seems to work on small data. But on big data, I am getting the below run time error. Any ideas?
Error: java.lang.RuntimeException: Hive Runtime Error while closing operators
at org.apache.hadoop.hive.ql.exec.mr.ExecMapper.close(ExecMapper.java:217)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1714)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.util.NoSuchElementException: key not found: 0318188127
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
at ExpandTreeUDTF.ExpandTreeUDTF$$calculateAncestors$1(ExpandTreeUDTF.scala:45)
at ExpandTreeUDTF$$anonfun$ExpandTreeUDTF$$getAncestors$1$1.apply(ExpandTreeUDTF.scala:46)
at ExpandTreeUDTF$$anonfun$ExpandTreeUDTF$$getAncestors$1$1.apply(ExpandTreeUDTF.scala:46)
at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194)
at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80)
at ExpandTreeUDTF.ExpandTreeUDTF$$getAncestors$1(ExpandTreeUDTF.scala:46)
at ExpandTreeUDTF.ExpandTreeUDTF$$calculateAncestors$1(ExpandTreeUDTF.scala:45)
at ExpandTreeUDTF$$anonfun$ExpandTreeUDTF$$getAncestors$1$1.apply(ExpandTreeUDTF.scala:46)
at ExpandTreeUDTF$$anonfun$ExpandTreeUDTF$$getAncestors$1$1.apply(ExpandTreeUDTF.scala:46)
at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194)

Reply
Valentin Nikotin
May 18, 2017 7:57 am

Check id=0318188127, it seems there is no information about it’s “parent” in the source table.

Reply

Hi, Valentin

I’d like to know what is the source code license.
If it’s MIT license, I’m very happy.

Reply
Valentin Nikotin
May 31, 2017 4:15 am

Hi, there is no specific licence for this code, feel free to use it with MIT licence.

Reply

Thank you Valentin !

Reply

Hi Ishan,

Please can you post the code here.

Reply

Hello,

val fieldOI = primitive.PrimitiveObjectInspectorFactory.javaStringObjectInspector.asInstanceOf[ObjectInspector]

In this line, what variable is primitive? You help is much appreciated.

Reply

Hello Valentin,

Thanks for your help. When I try to create the function I get this error
FAILED: Execution Error, return code -101 from org.apache.hadoop.hive.ql.exec.FunctionTask. scala/collection/Seq

Reply

The hive version we are using is Hive 2.1.1-amzn-0. Would this be an issue for the code? Please let me know if I need to download any sbt to resolve this.

I built my SBT using

libraryDependencies += “org.apache.hive” % “hive-exec” % “2.1.1”

I am getting this error FAILED: Execution Error, return code -101 from org.apache.hadoop.hive.ql.exec.FunctionTask. scala/collection/Seq

Thanks

Reply

Hi Valentin, Can you provide JAVA code equivalent to Scala code you have written. I am looking for some use case that show the way how recursion (connect by prior) function implemented in Oracle ,equivalent is implemented in HIVE with examples.

Reply
Valentin Nikotin
November 10, 2017 4:21 pm

Hi Anuj,

I don’t have java code for this, but feel free to share you attempts and issues you have been faced. It should be almost identical code.

Reply
Kaveh Hariri
April 27, 2018 6:29 pm

Here is a java version of Valentin’s code, it ran very quickly, but my states table was 260k rows which exploded to around 800k rows.

import java.util.*;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;

public class mainrun extends GenericUDTF{
PrimitiveObjectInspector[] inputOIs = null;
Map tree = new HashMap();

@Override
public StructObjectInspector initialize(ObjectInspector[] args){
inputOIs = new PrimitiveObjectInspector[args.length];
for(int i =0; i<inputOIs.length; i++){
inputOIs[i] = (PrimitiveObjectInspector)args[i];
}
List fieldNames = Arrays.asList(“id”,”ancestor”, “level”);
ObjectInspector fieldOI = (ObjectInspector) PrimitiveObjectInspectorFactory.javaStringObjectInspector;
List fieldIOs = Arrays.asList(fieldOI,fieldOI,fieldOI);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldIOs);
}
public void process(Object[] record){
String tmpId = (String) inputOIs[0].getPrimitiveJavaObject(record[0]);
String id = (tmpId == null || tmpId.trim().equalsIgnoreCase(“”) ? null : tmpId.trim());
String tmpParent = (String) inputOIs[1].getPrimitiveJavaObject(record[1]);
String parent = (tmpParent == null || tmpParent.trim().equalsIgnoreCase(“”) ? null : tmpParent.trim());
tree.put(id,parent);

}

public List calculateAncestors(String id, Map<String,List> expandTree){
if(tree.get(id) == null) {
List rt = new ArrayList();
rt.add(id);
return rt;
}
else{
List rt = getAncestors(tree.get(id),expandTree);
rt.add(0,id);
return rt;
}
}

public List getAncestors(String id, Map<String,List> expandTree){
List parent = expandTree.get(id);
if(parent == null || parent.isEmpty()){
return calculateAncestors(id,expandTree);
}
else {
return parent;
}
}

public void close() throws HiveException {
Map<String,List> expandTree = new HashMap();

for(String id : tree.keySet()){
List al = new ArrayList();
List tmp = getAncestors(id,expandTree);
for(int i =0; i < tmp.size(); i++){
al.add(new anc_lev(tmp.get(i),Integer.toString(i)));
}
for(int i =0; i<al.size(); i++){
forward(new String[]{id,al.get(i).ancestor,al.get(i).level});
}
}
}
}

Reply

Hi,

This UDTF works for the data set provided, but does not work on my data set. In a distributed environment, can we be sure that each container (mapper) has access to all the data stored in the tree hashmap?

Reply

I tried this, but got below error when creating Temporary Hive function. Can somebody please send me the completed code including import statements, and build.sbt file?

hive> create temporary function hierlevel AS ‘com.skurra.hive.udf.hierarchy’ ;
java.lang.NoClassDefFoundError: scala/collection/Seq
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at org.apache.hadoop.hive.ql.exec.FunctionTask.getUdfClass(FunctionTask.java:309)
at org.apache.hadoop.hive.ql.exec.FunctionTask.createTemporaryFunction(FunctionTask.java:165)
at org.apache.hadoop.hive.ql.exec.FunctionTask.execute(FunctionTask.java:72)
at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:160)
at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:88)
at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1676)
at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1435)
at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1218)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1082)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1072)
at org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:213)
at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:165)
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)
at org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:736)
at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:681)
at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:621)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
Caused by: java.lang.ClassNotFoundException: scala.collection.Seq
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
… 23 more
FAILED: Execution Error, return code -101 from org.apache.hadoop.hive.ql.exec.FunctionTask. scala/collection/Seq
hive>

Reply

Leave a Reply

Your email address will not be published. Required fields are marked *