Tuesday, 28 July 2015

Hive - Initialization Script

In many of Hive scripts there is use of some Hive UDFs and Hive configuration settings. Declaring these UDFs and Hive configuration variable across many scripts and/or at start of each Hive session is very cumbersome task and maintaining multiple scripts becomes more and more difficult with time.

To overcome these problems we can follow either below methods -
  1. Create an environment file which executed on start of each Hive cli session.
  2. Create an initialization script file which is initialized before start of Hive cli session or before execution of other Hive script.

Let us see each of the above methods in more details -
1. Create an environment file -
Hive executes an environment file by name .hiverc, present in home directory of user. All commands present in this file are executed before start of each Hive cli seesion, hive -e or hive -f options.
By default .hiverc file will not be present in user's home directory and user needs to create this.

2. Create an initialization script file
This option is more useful when we want to execute different set of initialization commands for different set of scripts.
To use this option create a text file with all initialization commands and use it in hive using hive -i <filename> option. This option can be used with Hive/beeline cli or -e or -f non-interactive modes.

An example initialization script looks like this (lets assume filename is hive_init.hql) -

SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;

SET hive.vectorized.execution.enabled=false;
SET hive.vectorized.execution.reduce.enabled=false;

SET hive.enforce.bucketing=true;

SET hive.exec.parallel=true;

SET hive.auto.convert.join=false;
SET hive.enforce.bucketmapjoin=true;
SET hive.optimize.bucketmapjoin.sortedmerge=true;
SET hive.optimize.bucketmapjoin=true;

ADD JAR /hadoop/rishav/udfs-0.0.1-SNAPSHOT.jar;
create temporary function getEvent as 'com.rishav.bigdata.hive.udfs.GetEventUDF';
create temporary function get_last_day as 'com.rishav.bigdata.hive.udfs.LastDayUDF';

To use this in different modes use
Hive CLI - hive -i hive_init.hql
Hive non-interactive command mode - hive -i hive_init.hql -e "sql command;"
Hive non-interactive script mode - hive -i hive_init.hql -f <script_file>

Tuesday, 9 December 2014

Implementing Apriori Algorithm In Hadoop-HBase - Part 2 : Conversion to MapReduce Codes

Let us assume that the transaction data which we are getting is in csv format like this - tId,pId
where tId is transaction Id
and pId is product Id
a single transaction can have one or more product Ids spread across one or multiple csv records e.g.

I have implemented Apriori algorithm for 2-itemset using 3 MapReduce jobs. The jobs and their functions are described below -

1. PopulateTranBasketAndProdPairJob - The mapper class of this job reads transaction records from specified csv file and emits (tId, pId). This job's reducer class gets (tId, <pId1, pId2,..., pIdn>) as input, then it makes product pairs available for this tId and writes individual pId(s) and product-pair(s) to HBase table 'tranBasket' with tId as rowkey.

2. SupportCountJob - This job reads the 'tranBasket' table and calculates the support counts for all pId and product pair(s). The support counts of individual products are stored in 'pCount' table with pId as rowkey and the support counts for product pairs are stored in 'ppCount' table with product pair as rowkey. At the end of this job transaction count is also printed to screen which acts as input to next job.

3.CalcSupportConfidenceJob - This is the last job in this series and gives us support, confidence and lift values for different product pairs. This job takes transaction count from the previous job as input to calculate support values. In this job only mapper is there, which reads complete 'pCount' table in memory and then reads 'ppCount' table row by row and performs calculation of different Apriori measures like support, confidence and lift and writes the result to HBase table 'apprOut'.
For verifying the results we can check mapper sysout files which have the values in readable format.

The source code is available here.

Note - This is just a simple demo application and there is scope for improvements. Some modifications which I can think of now are -
  1. Generally transaction ids are sequential numbers and if they are stored in HBase as such we will experience region hot spotting. Hence rowkey design has to be reworked.
  2. HBase scanner caching value needs to be checked and optimised.
  3. Currently pId and tId are stored as Text which can be changed to Long.
References -
  • http://rakesh.agrawal-family.com/papers/vldb94apriori.pdf
  • http://www.slideshare.net/deepti92pawar/the-comparative-study-of-apriori-and-fpgrowth-algorithm
  • http://www3.cs.stonybrook.edu/~cse634/lecture_notes/07apriori.pdf

Implementing Apriori Algorithm In Hadoop-HBase - Part 1 : Introduction to Apriori Algorithm

Apriori algorithm is a frequent item set mining algorithm used over transactional databases, proposed by Rakesh Agrawal and Ramakrishnan Srikant in 1993. This algorithm proceeds by identifying the frequent individual items in the database and extending them to larger and larger item sets as long as those item sets appear sufficiently often in the database. The frequent item sets determined by Apriori can be used to determine association rules which highlight general trends in the database.

Before we go further and see how this algorithm works it is better to be familiar terminologies used in this algorithm-

Tid  | Items
1     | Bread, Milk
2     | Bread, Diaper, Beer, Milk
3     | Milk, Diaper, Beer, Coke
4     | Bread, Milk, Diaper, Beer
5     | Bread, Milk, Diaper,Coke
    • Itemset    
A collection of one or more items
Example: {Milk, Bread, Diaper}
An itemset that contains k items
  • Support count ()
Frequency of occurrence of an itemset
E.g.   ({Milk, Bread, Diaper}) = 2
  • Support
Fraction of transactions that contain an itemset
E.g.   s( {Milk, Bread, Diaper} ) = 2/5
  • Frequent Itemset
An itemset whose support is greater than or equal to a minsup threshold.

  • Association Rule
An implication expression of the form X  Y, where X and Y are itemsets.
Example: {Milk, Diaper}  {Beer}
  • Rule Evaluation Metrics
Support (s) - Fraction of transactions that contain both X and Y
Confidence (c) - Measures how often items in Y  appear in transactions that
contain X.

In next few post I will describe how to implement this algorithm in HBase and MapReduce.

Monday, 17 November 2014

Writing Complex MongoDB Queries Using QueryBuilder

Writing Complex MongoDB Queries Using QueryBuilder

MongoDB provides a lot of query selectors for filtering documents from a collection.  Writing complex queries for MongoDB in Java can be tricky sometimes.
Consider below data present in student_marks collection
{"sid" : 1,"fname" : "Tom","lname" : "Ford","marks" : [ {"english" : 48}, {"maths" : 49}, {"science" : 50}]}
{"sid" : 2,"fname" : "Tim","lname" : "Walker","marks" : [ {"english" : 35}, {"maths" : 42}, {"science" : 37}]}
{"sid" : 3,"fname" : "John","lname" : "Ward","marks" : [ {"english" : 45}, {"maths" : 41}, {"science" : 37}]}
If we want to get students whose last name is Ford and have obtained more than 35 marks in english then the MongoDB shell command for this will be -
db.student_marks.find({$and:[{"lname":"Ford"},{"marks.english": {$gt:35}}]})
The same query written in Java will look something like this -
        DBObject query = new BasicDBObject();
        List<BasicDBObject> andQuery = new ArrayList<BasicDBObject>();
        andQuery.add(new BasicDBObject("lname", "Ford"));
        andQuery.add(new BasicDBObject("marks.english", new BasicDBObject("$gt", 35)));
        query.put("$and", andQuery);
Using MongoDB QueryBuilder we can rewrite above query as -

         DBObject query = new QueryBuilder()
                .and(new QueryBuilder().start().put("lname").is("Ford").get(),
                        new QueryBuilder().start().put("marks.english")

You can see that by using QueryBuilder we can write complex queries with ease. QueryBuilder class provides many methods like and, not, greaterThan, exists, etc. which helps in writing MongoDB queries more efficiently and less prone to error/mistakes.

Wednesday, 1 October 2014

Introduction To MongoDB Aggregation Pipeline

MongoDB Aggregation pipeline is a framework for data aggregation. It is modelled on the concept of data processing pipelines. Documents enter a multi-stage pipeline that transforms the documents into an aggregated results. It was introduced in MongoDB 2.2 to do aggregation operations without needing to use map-reduce.
Aggregation Pipeline
  • The $match and $sort pipeline operators can take advantage of an index when they occur at the beginning of the pipeline [Reference].
  • There are no restrictions on result size as a cursor is returned [Reference].
  • The output can be returned inline or written to a collection [Reference].
  • Pipeline stages have a limit of 100MB of RAM. To handle large datasets use allowDiskUse option [Reference].
  • Aggregation Pipeline have an optimization phase which attempts to reshape the pipeline for improved performance [Reference].
For most aggregation operations, the Aggregation Pipeline provides better performance and more coherent interface. However, map-reduce operations provide some flexibility that is presently not available in the aggregation pipeline.

The syntax for aggregation pipeline is
db.collection.aggregate( [ { <stage> }, ... ] )
The MongoDB aggregation pipeline consists of stages. Each stage transforms the documents as they pass through the pipeline. Pipeline stages do not need to produce one output document for every input document; e.g., some stages may generate new documents or filter out documents. Pipeline stages can appear multiple times in the pipeline.

Various stage operators supported by MongoDB are listed below-

$geoNearReturns an ordered stream of documents based on the proximity to a geospatial point. Incorporates the functionality of $match, $sort, and $limit for geospatial data. The output documents include an additional distance field and can include a location identifier field.
$groupGroups input documents by a specified identifier expression and applies the accumulator expression(s), if specified, to each group. Consumes all input documents and outputs one document per each distinct group. The output documents only contain the identifier field and, if specified, accumulated fields.
$limitPasses the first n documents unmodified to the pipeline where n is the specified limit. For each input document, outputs either one document (for the first n documents) or zero documents (after the first n documents).
$matchFilters the document stream to allow only matching documents to pass unmodified into the next pipeline stage. $match uses standard MongoDB queries. For each input document, outputs either one document (a match) or zero documents (no match).
$outWrites the resulting documents of the aggregation pipeline to a collection. To use the $out stage, it must be the last stage in the pipeline.
$projectReshapes each document in the stream, such as by adding new fields or removing existing fields. For each input document, outputs one document.
$redactReshapes each document in the stream by restricting the content for each document based on information stored in the documents themselves. Incorporates the functionality of $project and $match. Can be used to implement field level redaction. For each input document, outputs either one or zero document.
$skipSkips the first n documents where n is the specified skip number and passes the remaining documents unmodified to the pipeline. For each input document, outputs either zero documents (for the first n documents) or one document (if after the first n documents).
$sortReorders the document stream by a specified sort key. Only the order changes; the documents remain unmodified. For each input document, outputs one document.
$unwindDeconstructs an array field from the input documents to output a document for each element. Each output document replaces the array with an element value. For each input document, outputs n documents where n is the number of array elements and can be zero for an empty array.

Different expressions supported by MongoDB are listed here.

Tuesday, 30 September 2014

What is Write Concern in MongoDB?

In MongoDB there are multiple guarantee levels available for reporting the success of a write operation, called Write Concerns. The strength of the write concerns determine the level of guarantee. A weak Write Concern has better performance at the cost of lesser guarantee, while a strong Write Concern has higher guarantee as clients wait to confirm the write operations.

MongoDB provides different levels of write concern to better address the specific needs of applications. Clients may adjust write concern to ensure that the most important operations persist successfully to an entire MongoDB deployment. For other less critical operations, clients can adjust the write concern to ensure faster performance rather than ensure persistence to the entire deployment.

Write Concern Levels

MongoDB has the following levels of conceptual write concern, listed from weakest to strongest:

With an unacknowledged write concern, MongoDB does not acknowledge the receipt of write operations. Unacknowledged is similar to errors ignored; however, drivers will attempt to receive and handle network errors when possible. The driver’s ability to detect network errors depends on the system’s networking configuration.
Write operation to a ``mongod`` instance with write concern of ``unacknowledged``. The client does not wait for any acknowledgment. 

With a receipt acknowledged write concern, the mongod confirms the receipt of the write operation. Acknowledged write concern allows clients to catch network, duplicate key, and other errors. This is default write concern.
Write operation to a ``mongod`` instance with write concern of ``acknowledged``. The client waits for acknowledgment of success or exception.

With a journaled write concern, the MongoDB acknowledges the write operation only after committing the data to the journal. This write concern ensures that MongoDB can recover the data following a shutdown or power interruption.
You must have journaling enabled to use this write concern.
Write operation to a ``mongod`` instance with write concern of ``journaled``. The ``mongod`` sends acknowledgment after it commits the write operation to the journal.
Replica Acknowledged
Replica sets present additional considerations with regards to write concern. The default write concern only requires acknowledgement from the primary. With replica acknowledged write concern, you can guarantee that the write operation propagates to additional members of the replica set.
Write operation to a replica set with write concern level of ``w:2`` or write to the primary and at least one secondary.
Write operation to a replica set with write concern level of w:2 or write to the primary and at least one secondary.

Hive UDF to get Latitude and Longitude

In my previous post I explained about Hive GenericUDF.
In this post I will give an example of Hive GenericUDF to get Latitude and Longitude of a given location using Google Geocoding API. Lets call this Hive function as GeoEncodeUDF. GeoEncodeUDF function takes a String location and returns an array of Float containing latitude and longitude.

For obtaining latitude and longitude information I am using Google geocode API which is available here http://maps.googleapis.com/maps/api/geocode/json?address=<address>, this returns a JSON objects containg matching places and their latitude and longitude. This might return multiple address but for sake of simplicity I am taking the first address's latitude and longitude. I have created a helper method getLatLng in class GeoLatLng which takes location string and returns latitude and longitude in an array of float. This class is given below -
The GenericUDF is GeoEncodeUDF
I have overwritten initialize(), evaluate() and getDisplayString() methods which I have already described in my previous post.

Now to use this UDF in Hive we need to create a jar file of this UDF and add it to Hive. The commands to add this UDF to Hive are -
ADD JAR /path/to/HiveUDF.jar;
CREATE TEMPORARY FUNCTION geo_points AS 'com.rishav.hadoop.hive.ql.udf.generic.GeoEncodeUDF';
Now we can use geo_points function on any table having address string like this -
hive> select geo_points("india") from test.x limit 1;
This HQL will return an array containing lat-lng, to get them as separate columns use -
hive> select latlng[0], latlng[1] FROM (select geo_points("india") as latlng from test.x) tmp limit 1;
20.593683    78.96288