Tuesday, 16 September 2014

HBase: MapReduce On Multiple Input Table

Starting with version 0.94.5 HBase supports reading multiple tables as input to MapReduce jobs using MultiTableInputFormat class.
In this post I am giving an example of MapReduce job which reads from two HBase tables performs some aggregation on one table and merges (SQL UNION ALL operation) it with the content of second table and stores the result in an output table.

The first table is 'storeSales' table and it has store-wise sales for each date. The create statements are -

create 'storeSales', 'cf1'
put 'storeSales', '20130101#1', 'cf1:sSales', '100'
put 'storeSales', '20130101#2', 'cf1:sSales', '110'
put 'storeSales', '20130102#1', 'cf1:sSales', '200'
put 'storeSales', '20130102#2', 'cf1:sSales', '210'


The second table is 'onlineSales' table and it has online sale for each date. The create statements are -
create 'onlineSales', 'cf2'
put 'onlineSales', '20130101', 'cf2:oSales', '400'
put 'onlineSales', '20130102', 'cf2:oSales', '130'

Using a MapReduce job I am going to merge aggregated (at date level) store sales with online sales.
Lets create a output table for the same -
create 'totalSales', 'cf1'

The mapper class for this job is -

Note that in mapper I am getting table name of current split and using different context.write based on table name. If your source tables have rowkeys with different prefixes you can use that also for different context.write logic.

The reducer class for this job is -

Based on intermediate key value I am using aggregation in reducer.

Finally the driver class for this job is

In the driver there are 2 HBase Scan for 2 input tables and I am passing these scans in a list to TableMapReduceUtil.initTableMapperJob method.

Package jar file (to hbase-union.jar) and execute below commands to invoke MapReduce job -
export HADOOP_CLASSPATH=`hbase classpath`
hadoop jar hbase-union.jar com.rishav.hbase.union.UnionJob

Once the job is complete use HBase shell to verify output results -
hbase(main):034:0> scan 'totalSales'
ROW                                        COLUMN+CELL                                                                                                               
 o#20130101                                column=cf1:tSales, timestamp=1410848221034, value=\x00\x00\x01\x90                                                        
 o#20130102                                column=cf1:tSales, timestamp=1410848221034, value=\x00\x00\x00\x82                                                        
 s#20130101                                column=cf1:tSales, timestamp=1410848221034, value=\x00\x00\x00\xD2                                                        
 s#20130102                                column=cf1:tSales, timestamp=1410848221034, value=\x00\x00\x01\x9A                                                        
4 row(s) in 0.0410 seconds
hbase(main):035:0> org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x01\x90".to_java_bytes)
=> 400
hbase(main):036:0> org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x00\x82".to_java_bytes)
=> 130
hbase(main):037:0> org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x00\xD2".to_java_bytes)
=> 210
hbase(main):038:0> org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x01\x9A".to_java_bytes)
=> 410

MultiTableInputFormat can be used for doing HBase table joins too, I shall try that some time.

Friday, 27 June 2014

Update Fixed number of MongoDB Documents

Recently I worked on a project which uses MongoDB as source data system and uses R for analysis and MongoDB again for output storage.

In this project we faced a different problem. We were using R to process source data present in MongoDB and if we gave large number of documents for analysis to R it was becoming slower and a bottleneck. To avoid this bottleneck we had to implement processing of a fixed number of documents in R for a batch.

To achieve this we needed some kind of record number in MongoDB, but being a distributed database getting some sequential number in MongoDB was not supported. Also our MongoDB source was getting populated by a distributed real-time stream so implementing some logic on application side was also deterrent.

To have some batchId field for a fixed number of documents in MongoDB we implemented below algorithm :
1. Find for documents which didn't had batchId field.
2. Sort by some timestamp field.
3. Limit the number of documents (say 10000).
4. Append batchId field to documents and save them (get value of batchId from audit table).

MongoDB shell command for this is :
db['collection1'].find({batchId:null}).sort({systemTime:1}).limit(10000).forEach(
    function (e) {
// get value of batchId from audit table
        e.batchId = 1;
        db['collection1'].save(e);
    }
);

Using the above code we appeneded batchId to MongoDB documents and picked only current batchId for analysis in R.

Java code for above MongoDB shell command is :

Wednesday, 5 March 2014

MapReduce on Avro data files

In this post we are going to write a MapReduce program to consume Avro input data and also produce data in Avro format.
We will write a program to calculate average of student marks.

 

Data Preparation

The schema for the records is:
student.avsc
{
  "type" : "record",
  "name" : "student_marks",
  "namespace" : "com.rishav.avro",
  "fields" : [ {
    "name" : "student_id",
    "type" : "int"
  }, {
    "name" : "subject_id",
    "type" : "int"
  }, {
    "name" : "marks",
    "type" : "int"
  } ]
}

And some sample records are:
student.json
{"student_id":1,"subject_id":63,"marks":19}
{"student_id":2,"subject_id":64,"marks":74}
{"student_id":3,"subject_id":10,"marks":94}
{"student_id":4,"subject_id":79,"marks":27}
{"student_id":1,"subject_id":52,"marks":95}
{"student_id":2,"subject_id":34,"marks":16}
{"student_id":3,"subject_id":81,"marks":17}
{"student_id":4,"subject_id":60,"marks":52}
{"student_id":1,"subject_id":11,"marks":66}
{"student_id":2,"subject_id":84,"marks":39}
{"student_id":3,"subject_id":24,"marks":39}
{"student_id":4,"subject_id":16,"marks":0}
{"student_id":1,"subject_id":65,"marks":75}
{"student_id":2,"subject_id":5,"marks":52}
{"student_id":3,"subject_id":86,"marks":50}
{"student_id":4,"subject_id":55,"marks":42}
{"student_id":1,"subject_id":30,"marks":21}

Now we will convert the above sample records to avro format and upload the avro data file to HDFS:
java -jar avro-tools-1.7.5.jar fromjson student.json --schema-file student.avsc > student.avro
hadoop fs -put student.avro student.avro

Avro MapReduce Program

In my program I have used Avro Java class for student_marks schema. To generate Java class from the schema file use below command:
java -jar avro-tools-1.7.5.jar compile schema student.avsc .
Then add the generated Java class to IDE.

I have written a MapReduce program which reads Avro data file student.avro (passed as argument) and calculates average marks for each student and store the output also in Avro format. The program is given below:


  • In the program the input key to mapper is AvroKey<student_marks> and the input value is null. The output key of map method is student_id and output value is an IntPair having marks and 1.
  • We have a combiner also which aggregates partial sums for each student_id.
  • Finally reducer takes student_id and partial sums and counts and uses them to calculate average for each student_id. The reducer writes the output in Avro format.

For Avro job setup we have added these properties:
// set InputFormatClass to AvroKeyInputFormat and define input schema
    job.setInputFormatClass(AvroKeyInputFormat.class);
    AvroJob.setInputKeySchema(job, student_marks.getClassSchema());

// set OutputFormatClass to AvroKeyValueOutputFormat and key as INT type and value as FLOAT type
    job.setOutputFormatClass(AvroKeyValueOutputFormat.class);
    AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.INT));
    AvroJob.setOutputValueSchema(job, Schema.create(Schema.Type.FLOAT));

Job Execution

We package our Java program to avro_mr.jar and add Avro jars to libjars and hadoop classpath using below commands:

export LIBJARS=avro-1.7.5.jar,avro-mapred-1.7.5-hadoop1.jar,paranamer-2.6.jar
export HADOOP_CLASSPATH=avro-1.7.5.jar:avro-mapred-1.7.5-hadoop1.jar:paranamer-2.6.jar
hadoop jar avro_mr.jar com.rishav.avro.mapreduce.AvroAverageDriver -libjars ${LIBJARS} student.avro output
You can verify the output using avro-tool command.

To enable snappy compression for output add below lines to run method and add snappy-java jar to libjars and hadoop classpath:
        FileOutputFormat.setCompressOutput(job, true);
        FileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class);

Thursday, 27 February 2014

Convert csv data to Avro data

In one of my previous post I explained how we can convert json data to avro data and vice versa using avro tools command line option. Today I was trying to see what options we have for converting csv data to avro format, as of now we don't have any avro tool option to accomplish this . Now, we can either write our own java program (MapReduce program or a simple java program) or we can use various SerDe's available with Hive to do this quickly and without writing any code :)

To convert csv data to Avro data using Hive we need to follow below steps:
  1. Create a Hive table stored as textfile and specify your csv delimiter also.
  2. Load csv file to above table using "load data" command.
  3. Create another Hive table using AvroSerDe.
  4. Insert data from former table to new Avro Hive table using "insert overwrite" command.

To demonstrate this I will use use below data (student.csv):
0,38,91
0,65,28
0,78,16
1,34,96
1,78,14
1,11,43
Now execute below queries in Hive:

Now you can get data in Avro format from Hive warehouse folder. To dump this file to local file system use below command:
hadoop fs -cat /path/to/warehouse/test.db/avro_table/* > student.avro

If you want to get json data from this avro file you can use avro tools command:
java -jar avro-tools-1.7.5.jar tojson student.avro > student.json

So we can easily convert csv to avro and csv to json also by just writing 4 HQLs.

Tuesday, 25 February 2014

Implementing Custom WritableComparable

In one of my previous post I wrote about Implementing Custom Writable which can be used as values in MapReduce program. For using customized key in MapReduce we need to implement WritableComparable interface.

WritableComparable interface is just a subinterface of the Writable and java.lang.Comparable interfaces. For implementing a WritableComparable we must have compareTo method apart from readFields and write methods, as shown below:
public interface WritableComparable extends Writable, Comparable
{
    void readFields(DataInput in);
    void write(DataOutput out);
    int compareTo(WritableComparable o)
}
Comparison of types is crucial for MapReduce, where there is a sorting phase during which keys are compared with one another.

The code for IntPair class which is used in In-mapper Combiner Program to Calculate Average post is given below:


As you can see in compareTo(IntPair tp) of above class that IntPair needs to be deserialized for comparison to take place, we can implement a RawComparator which can compare two keys by just checking their serialized representation. More on RawComparator is available in Hadoop: The Definitive Guide.

Wednesday, 19 February 2014

Avro Schema Evolution

Avro can use different schemas for serialization and deserialization, and it can handle removed, added and modified fields. Thus it helps in building decoupled and robust systems.

In this post we will serialize data using this schema:

and deserialize it using a different schema
which has following modifications:
  1. university_id field is removed.
  2. age field is added.
  3. result_score field is renamed to score.
Before we actually see how Avro handles these modification I would like to mention below points:
  • If a new field is added then it must have a default value. Also specify type as an array of types starting with null e.g. "type": ["null", "string"] otherwise you will get this error:
    Exception in thread "main" java.lang.NoSuchMethodError: org.codehaus.jackson.node.ValueNode.asText()Ljava/lang/String;
  • If a field is renamed then the old name must be present as aliases.

In the this java program we serialize data using StudentActivity.avsc schema and deserialize data using StudentActivityNew.avsc schema

On executing this code we see that Avro handles the modifications without any issues and our data is deserialized properly.

Tuesday, 18 February 2014

Getting started with Avro Part2

In the previous post we used avro-tools commands to serialize and deserialize data. In this post we post we will use Avro Java API for achieving the same. We will use same sample data and schema from our previous post.

The java code for serializing and deserializing data without generating the code for schema is given below:


For generating the schema java code from Avro json schema we can use avro-tools jar. The command for same is given below:
java -jar avro-tools-1.7.5.jar compile schema StudentActivity.avsc <output_path>
Output path can be source folder for the project or we can add the generated java class files to Eclipse IDE manually.

The java code for serializing and deserializing data with generating the code for schema is similar to above code except that in previous code we were assiging values to a GenericRecord and in this one we are assigning values to the generated Avro object:


In next post we will see how Avro deals with schema evolution.