Friday, 27 June 2014

Update Fixed number of MongoDB Documents

Recently I worked on a project which uses MongoDB as source data system and uses R for analysis and MongoDB again for output storage.

In this project we faced a different problem. We were using R to process source data present in MongoDB and if we gave large number of documents for analysis to R it was becoming slower and a bottleneck. To avoid this bottleneck we had to implement processing of a fixed number of documents in R for a batch.

To achieve this we needed some kind of record number in MongoDB, but being a distributed database getting some sequential number in MongoDB was not supported. Also our MongoDB source was getting populated by a distributed real-time stream so implementing some logic on application side was also deterrent.

To have some batchId field for a fixed number of documents in MongoDB we implemented below algorithm :
1. Find for documents which didn't had batchId field.
2. Sort by some timestamp field.
3. Limit the number of documents (say 10000).
4. Append batchId field to documents and save them (get value of batchId from audit table).

MongoDB shell command for this is :
    function (e) {
// get value of batchId from audit table
        e.batchId = 1;

Using the above code we appeneded batchId to MongoDB documents and picked only current batchId for analysis in R.

Java code for above MongoDB shell command is :

Wednesday, 5 March 2014

MapReduce on Avro data files

In this post we are going to write a MapReduce program to consume Avro input data and also produce data in Avro format.
We will write a program to calculate average of student marks.


Data Preparation

The schema for the records is:
  "type" : "record",
  "name" : "student_marks",
  "namespace" : "com.rishav.avro",
  "fields" : [ {
    "name" : "student_id",
    "type" : "int"
  }, {
    "name" : "subject_id",
    "type" : "int"
  }, {
    "name" : "marks",
    "type" : "int"
  } ]

And some sample records are:

Now we will convert the above sample records to avro format and upload the avro data file to HDFS:
java -jar avro-tools-1.7.5.jar fromjson student.json --schema-file student.avsc > student.avro
hadoop fs -put student.avro student.avro

Avro MapReduce Program

In my program I have used Avro Java class for student_marks schema. To generate Java class from the schema file use below command:
java -jar avro-tools-1.7.5.jar compile schema student.avsc .
Then add the generated Java class to IDE.

I have written a MapReduce program which reads Avro data file student.avro (passed as argument) and calculates average marks for each student and store the output also in Avro format. The program is given below:

  • In the program the input key to mapper is AvroKey<student_marks> and the input value is null. The output key of map method is student_id and output value is an IntPair having marks and 1.
  • We have a combiner also which aggregates partial sums for each student_id.
  • Finally reducer takes student_id and partial sums and counts and uses them to calculate average for each student_id. The reducer writes the output in Avro format.

For Avro job setup we have added these properties:
// set InputFormatClass to AvroKeyInputFormat and define input schema
    AvroJob.setInputKeySchema(job, student_marks.getClassSchema());

// set OutputFormatClass to AvroKeyValueOutputFormat and key as INT type and value as FLOAT type
    AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.INT));
    AvroJob.setOutputValueSchema(job, Schema.create(Schema.Type.FLOAT));

Job Execution

We package our Java program to avro_mr.jar and add Avro jars to libjars and hadoop classpath using below commands:

export LIBJARS=avro-1.7.5.jar,avro-mapred-1.7.5-hadoop1.jar,paranamer-2.6.jar
export HADOOP_CLASSPATH=avro-1.7.5.jar:avro-mapred-1.7.5-hadoop1.jar:paranamer-2.6.jar
hadoop jar avro_mr.jar com.rishav.avro.mapreduce.AvroAverageDriver -libjars ${LIBJARS} student.avro output
You can verify the output using avro-tool command.

To enable snappy compression for output add below lines to run method and add snappy-java jar to libjars and hadoop classpath:
        FileOutputFormat.setCompressOutput(job, true);
        FileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class);

Thursday, 27 February 2014

Convert csv data to Avro data

In one of my previous post I explained how we can convert json data to avro data and vice versa using avro tools command line option. Today I was trying to see what options we have for converting csv data to avro format, as of now we don't have any avro tool option to accomplish this . Now, we can either write our own java program (MapReduce program or a simple java program) or we can use various SerDe's available with Hive to do this quickly and without writing any code :)

To convert csv data to Avro data using Hive we need to follow below steps:
  1. Create a Hive table stored as textfile and specify your csv delimiter also.
  2. Load csv file to above table using "load data" command.
  3. Create another Hive table using AvroSerDe.
  4. Insert data from former table to new Avro Hive table using "insert overwrite" command.

To demonstrate this I will use use below data (student.csv):
Now execute below queries in Hive:

Now you can get data in Avro format from Hive warehouse folder. To dump this file to local file system use below command:
hadoop fs -cat /path/to/warehouse/test.db/avro_table/* > student.avro

If you want to get json data from this avro file you can use avro tools command:
java -jar avro-tools-1.7.5.jar tojson student.avro > student.json

So we can easily convert csv to avro and csv to json also by just writing 4 HQLs.

Tuesday, 25 February 2014

Implementing Custom WritableComparable

In one of my previous post I wrote about Implementing Custom Writable which can be used as values in MapReduce program. For using customized key in MapReduce we need to implement WritableComparable interface.

WritableComparable interface is just a subinterface of the Writable and java.lang.Comparable interfaces. For implementing a WritableComparable we must have compareTo method apart from readFields and write methods, as shown below:
public interface WritableComparable extends Writable, Comparable
    void readFields(DataInput in);
    void write(DataOutput out);
    int compareTo(WritableComparable o)
Comparison of types is crucial for MapReduce, where there is a sorting phase during which keys are compared with one another.

The code for IntPair class which is used in In-mapper Combiner Program to Calculate Average post is given below:

As you can see in compareTo(IntPair tp) of above class that IntPair needs to be deserialized for comparison to take place, we can implement a RawComparator which can compare two keys by just checking their serialized representation. More on RawComparator is available in Hadoop: The Definitive Guide.

Wednesday, 19 February 2014

Avro Schema Evolution

Avro can use different schemas for serialization and deserialization, and it can handle removed, added and modified fields. Thus it helps in building decoupled and robust systems.

In this post we will serialize data using this schema:

and deserialize it using a different schema
which has following modifications:
  1. university_id field is removed.
  2. age field is added.
  3. result_score field is renamed to score.
Before we actually see how Avro handles these modification I would like to mention below points:
  • If a new field is added then it must have a default value. Also specify type as an array of types starting with null e.g. "type": ["null", "string"] otherwise you will get this error:
    Exception in thread "main" java.lang.NoSuchMethodError: org.codehaus.jackson.node.ValueNode.asText()Ljava/lang/String;
  • If a field is renamed then the old name must be present as aliases.

In the this java program we serialize data using StudentActivity.avsc schema and deserialize data using StudentActivityNew.avsc schema

On executing this code we see that Avro handles the modifications without any issues and our data is deserialized properly.

Tuesday, 18 February 2014

Getting started with Avro Part2

In the previous post we used avro-tools commands to serialize and deserialize data. In this post we post we will use Avro Java API for achieving the same. We will use same sample data and schema from our previous post.

The java code for serializing and deserializing data without generating the code for schema is given below:

For generating the schema java code from Avro json schema we can use avro-tools jar. The command for same is given below:
java -jar avro-tools-1.7.5.jar compile schema StudentActivity.avsc <output_path>
Output path can be source folder for the project or we can add the generated java class files to Eclipse IDE manually.

The java code for serializing and deserializing data with generating the code for schema is similar to above code except that in previous code we were assiging values to a GenericRecord and in this one we are assigning values to the generated Avro object:

In next post we will see how Avro deals with schema evolution.

Getting started with Avro Part1

In our previous post we got some basic idea about Avro, in this post we will use Avro for serializing and deserializing data.

We will use these 3 methods in which we can use Avro for serialization/deserialization:
  1. Using Avro command line tools.
  2. Using Avro Java API without code generation.
  3. Using Avro Java API with code generation.

Sample Data

We will use below sample data (StudentActivity.json):
Note that the JSON records are nested ones.

Defining a schema

Avro schemas are defined using JSON. The avro schema for our sample data is defined as below (StudentActivity.avsc):

1. Serialization/Deserialization using Avro command line tools

Avro provides a jar file by name avro-tools-<version>.jar which provides many command line tools as listed below:

For converting json sample data to Avro binary format use "fromjson" option and for getting json data back from Avro files use "tojson" option.

Command for serializing json
Without any compression
java -jar avro-tools-1.7.5.jar fromjson --schema-file StudentActivity.avsc StudentActivity.json > StudentActivity.avro

With snappy compression
java -jar avro-tools-1.7.5.jar fromjson --schema-file StudentActivity.avsc StudentActivity.json > StudentActivity.snappy.avro

Command for deserializing json
The same command is used for deserializing both compressed and uncompressed data
java -jar avro-tools-1.7.5.jar tojson StudentActivity.avro
java -jar avro-tools-1.7.5.jar tojson StudentActivity.snappy.avro

As Avro data file contains the schema also, we can retrieve it using this commmand:
java -jar avro-tools-1.7.5.jar getschema StudentActivity.avro
java -jar avro-tools-1.7.5.jar getschema StudentActivity.snappy.avro
In our next post we will use Avro Java API for serialization/deserialization.