Monday, 28 July 2014

Hadoop Ecosystem - Projects

I discussed about hadoop ecosystem in my last post. Lets have a look at some of the more dynamic projects of the ecosystem.

Pig

Pig is a data flow language that provides high level abstraction to hadoop framework. It is a sequence of operations to extract / query the relevant data from HDFS. Pig provides a flexible way to handle data. It is a batch query language. The script written under Pig is converted into MapReduce program under the hood. This is taken care by the Pig interpreter. The generated MapReduce code can work on HDFS and retrieve the desired data. Pig was developed at Yahoo and is an open source project with a strong community.

Hive

Hive is a SQL - kind of language which provides high level abstraction to HDFS, just like Pig. Hive's SQL - kind of language is called HiveQL. It is heavily inspired by MySql. Hive was developed so that data analysts who wanted to use the hadoop platform but are not so well versed with writing MapReduce programs in java, can access the platform with SQL kind of querying. Hive was developed at Facebook and now is a open Source Apcahe project. Currently, Hive is one of the most dynamic project in hadoop ecosystem and is getting richer and richer.

HBase

HBase is a column - oriented database which sits on top of hadoop framework and provide a near real - time capability to the platform. HBase can work in tandem with Pig and Hive to provide a flexible interaction with hadoop. HBase is inspired by Google's BigTable and is another dynamic project under hadoop ecosystem.

Sqoop

Sqoop is short for Sql - to - Hadoop. It is a open-source project under Apache foundation. Sqoop is used to transfer data between HDFS and any RDBMS.

Zookeeper

It is another open-source project under Apache foundation. Zookeeper is distributed service coordinator. It provides primitives such as distributed locks that can be used for building distributed applications.

Sunday, 20 July 2014

What is Hadoop Ecosystem

Hadoop is an open source framework for storing and processing large amounts of data in distributed manner. It is a batch processing system and has 2 main components - Hadoop Distributed File System ( HDFS ) and MapReduce. HDFS is distributed storage  component while MapReduce is distributed computing. It is Apache foundation project with strong community. There are many real world big data challenges and hadoop core functionality was found to be wanting to face those challenges. There are many gaps in the way hadoop provided solution and a specific big data challenge. For eg - Hadoop is a batch processing system so if you want a real-time solution out of hadoop, it fails. This gap have led to springing up of another apache project, HBASE. HBASE is a column-oriented database that sits on top of hadoop framework and provide real-time capability to the framework.

Similarly, a lot of projects sprung up around core hadoop functionality and together they are called hadoop eco-system. So, each project in hadoop eco-system provide solution to a specific big data problem / challenge. Hadoop eco-system gets bigger and bigger as this technology is being used more and more. There are many projects in eco-system which are Apache foundation's while others belong to various vendors. There are some projects which are evolving pretty quickly due to its strong developer community while others are relatively new.

Lets have a closer look at hadoop eco-system projects in coming posts.

Saturday, 12 July 2014

Comparison of Hadoop with Traditional RDBMS

Hadoop is an open source framework for writing and running distributed applications that process large amounts of data.

Comparison of Hadoop with Traditional RDBMS:

Data Size:

Traditional RDBMS: Can handle data in GBs.
Hadoop: Can handle data in PBs.

Access:

Traditional RDBMS: Interactive and Batch.
Hadoop: Batch.

Updates:

Traditional RDBMS: Read and write many times.
Hadoop: Write once, read many times.

Structure:

Traditional RDBMS: Static schema.
Hadoop: Dynamic schema.

Integrity:

Traditional RDBMS: High.
Hadoop: Low.

Scaling: 

Traditional RDBMS: Nonlinear.
Hadoop: Linear.

Saturday, 5 July 2014

MapReduce Program: To find average temperature for each year in NCDC data set.

Big data is a framework for storage and processing of data ( structured/unstructured ). Please check out the program below which draw out results out of semi-structured data from a weather sensor. Its a MapReduce program written in java.

The aim of the program is to find the average temperature in each year of NCDC data.

I am using Cloudera's  VM - cloudera-quickstart-vm-4.6.0-0-virtualbox 
(https://downloads.cloudera.com/demo_vm/virtualbox/cloudera-quickstart-vm-4.6.0-0-virtualbox.7z) for development. Make sure you are using the VM that is compatible with your machine, as it comes with different configurations. Cloudera's VM is a great tool to start with for hadoop learning.

There are couple of other VMs available. There is one from Yahoo and another one from Hortonworks, which are equally good and provide a great platform for learning hadoop.

This program takes a data input of multiple files where each file contains weather data of a particular year. This weather data is shared by NCDC (National Climatic Data Center ) and is collected by weather sensors at many locations across the globe. NCDC input data can be downloaded from 
There is a data file for each year. Each data file contains among other things, the year and the temperature information( which is relevant for this program ).

Below is the snapshot of the data with year and temperature field highlighted in green box. This is the snapshot of data taken from year 1901 file:


So, in a MapReduce program there are 2 most important phases - Map Phase and Reduce Phase.
You need to have an understanding of MapReduce concepts so as to understand the intricacies of MapReduce programming. It is one the major component of Hadoop along with HDFS. I will try to include more posts in coming weeks around fundamentals of MapReduce.

Continuing with our current program:
  
·    For writing any MapReduce program, firstly, you need to figure out the data flow, like in this example am taking just the year and temperature information in the map phase and passing it on to the reduce phase. So Map phase in my example is essentially a data preparation phase. Reduce phase on the other hand is more of a data aggregation one.

·    Secondly, decide on the types for the key/value pairs—MapReduce program uses lists and (key/value) pairs as its main data primitives. So you need to decide the types for key/value pairs—K1, V1, K2, V2, K3, and V3 for the input, intermediate, and output key/value pairs. In this example, am taking LongWritable  and Text as (K1,V1) for input and Text and IntWritable as both for (K2,V2) and (K3,V3)

Map Phase: I will be pulling out the year and temperature data from the log data that is there in the file, as shown in the above snapshot.
Reduce Phase: The data that is generated by the mapper(s) is fed to the reducer, which is another java program. This program takes all the values associated with a particular key and find the average temperature for that key. So, a key in our case is the year and value is a set of IntWritable objects which represent all the captured temperature information for that year.

I will be writing a java class, each for a Map and Reduce phase and one driver class to create a job with configuration information.

So, in this particular example I will be writing 3 java classes: 
  •          AverageMapper.java
  •          AverageReducer.java
  •          AverageDriver.java
 Let me share the code of all the 3 classes, along with the explanation of working of each class:
   
AverageMapper.java

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import java.io.IOException;

public class AverageMapper extends Mapper <LongWritable, Text, Text, IntWritable>
{
           
public static final int MISSING = 9999;
           
public void map(LongWritable key, Text value, Context context) throws                                                                  IOException,  InterruptedException
            {
                        String line = value.toString();
                        String year = line.substring(15,19);               
                        int temperature;                    
                        if (line.charAt(87)=='+')
                                    temperature = Integer.parseInt(line.substring(88, 92));
                        else
                                    temperature = Integer.parseInt(line.substring(87, 92));       
                       
                        String quality = line.substring(92, 93);
                        if(temperature != MISSING && quality.matches("[01459]"))
                        context.write(new Text(year),new IntWritable(temperature));                   
            }
}

Let us get into the details of our AverageMapper class. I need to extend  generic class Mapper with four formal data types: input key, input value, output key, output value. The key for the Map phase is the offset of the beginning of the line from the beginning of the file, but as we have no need for it, we can ignore it. The input value would be temperature and output key would be year and output value will be temperature, an integer. The data is fed to the map function one line or record at a time. The map() function converts it into the string and read the year and temperature part from the applicable index value. Also, map() function creates a Context object which is the output object from map(). It contains year value as Text and temperature value as IntWritable.

 AverageReducer.java

import org.apache.hadoop.mapreduce.*;
import java.io.IOException;

public class AverageReducer extends Reducer <Text, IntWritable,Text, IntWritable >
 {
public void reduce(Text key,  Iterable<IntWritable> values, Context context) throws IOException,                                                       InterruptedException
            {          
            int max_temp = 0; 
            int count = 0;
            for (IntWritable value : values)
                        {
                                    max_temp += value.get();     
                                    count+=1;
                        }
            context.write(key, new IntWritable(max_temp/count));
            }                                             
}

Now coming to Reduce Class. Again, four formal data types: input key, input value, output key, output value is specified for this class. The input type and value of reduce function should match output key and value of the map function: Text and IntWritable objects. The reduce() function iterates through all the values and find the sum and count of the values, and finally the average temperature value from that.

AverageDriver.java

import org.apache.hadoop.io.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class AverageDriver 
{
           
            public static void main (String[] args) throws Exception
            {
                        if (args.length != 2)
                        {
                               System.err.println("Please Enter the input and output parameters");
                               System.exit(-1);
                        }
                       
                      Job job = new Job();
                      job.setJarByClass(AverageDriver.class);
                      job.setJobName("Max temperature");
                       
                      FileInputFormat.addInputPath(job,new Path(args[0]));
                      FileOutputFormat.setOutputPath(job,new Path (args[1]));
                       
                      job.setMapperClass(AverageMapper.class);
                      job.setReducerClass(AverageReducer.class);
                       
                      job.setOutputKeyClass(Text.class);
                      job.setOutputValueClass(IntWritable.class);
                       
                      System.exit(job.waitForCompletion(true)?0:1);                                             
            }
}

A Job object forms the specification of the job and gives you control over how the job will be run. Hadoop has a special feature of data locality, wherein the code for the program is send to the data instead of other way around. So, Hadoop distributes the jar file of the program across the cluster. we pass the name of the class in setJarByClass() method which hadoop can use to locate the jar file containing this class. We need to specify input and output paths. Input path can specify the file or directory which will be used as an input to the program and output path is  a directory which will be created by Reducer. If the directory already exists it leads to an error. Then we specify the map and reduce types to use via setMapperClass() and setReducerClass(). Next we set the output types for the map and reduce functions. waitForCompletion() method submits the job and waits for it to finish. It return 0 or 1, indicating success or failure of the job.