Pig Tutorial – Hadoop Pig Introduction, Pig Latin, Use Cases, Examples

In this series, we will cover Pig tutorial. Apache Pig provides a platform for executing large data sets in a distributed fashion on the cluster of commodity machines.

Pig tutorial – Pig Latin Introduction

The language which is used to execute the data sets is called Pig Latin. It allows you to perform data transformation such as join, sort, filter, and grouping of records etc.

It is sort of ETL process for Big Data environment. It also facilitates users to create their own functions for reading, processing, and writing data.

Pig is an open source project developed by Apache consortium (http://pig.apache.org). Therefore, users are free to download it as the source or binary code.

Pig Latin programs run on Hadoop cluster and it makes use of both Hadoop distributed file system, as well as MapReduce programming layer.

However, for prototyping Pig Latin programs can also run in “local mode” without a cluster. All the process invoked during running the program in local mode resides in single local JVM.

Using Pig Latin, programmers can perform MapReduce tasks easily without having to type complex Java codes. The map, sort, shuffle and reduce phase while using pig Latin language can be taken care internally by the operators and functions you will use in pig script.

Basic “hello world program” using Apache Pig

The basic “hello world program” in Hadoop is the word count program.

The same example is explained in “Hadoop and HDFS” tutorial using JAVA map-reduce program.

Now, let’s look at using Pig Latin program. Let us consider our input is a text file with words delimited by space and lines terminated by ‘\n’ stored in “src.txt” file. Sample data of the file as below:

Old MacDonald had a farm
And on his farm he had some chicks
With a chick chick here
And a chick chick there
Here a chick there a chick
Everywhere a chick chick

The word count program for above sample data using Pig Latin is as below:

A = load 'src.txt' as line;

--TOKENIZE splits the line into a field for each word. e.g. ({(Old),(MacDonald),(had),(a),(farm)})

B = foreach A generate TOKENIZE(line) as tokens;

--Convert the output of TOKENIZE into separate record. e.g. (Old)

-- (MacDonald)

-- (had)

-- (a)

-- (farm)

C = foreach B generate FLATTEN(tokens) as words;

--We have to count each word occurrences, for that we have to group all the words.

D = group C by words;

E = foreach D generate group, COUNT(C);

F = order E by $1;

-- We can print the word count on console using Dump.

dump F;

Sample output will be as follows:

(MacDonald,1)
(had,1)

Pig Use Cases – Examples

Pig Use Case#1

The weblog can be processed using pig because it has a goldmine of information. Using this information we can analyze the overall server usages and improve the server performance.

We can create the Usage Tracking mechanism such as monitoring users, processes, preempting security attacks on your server.

We can also analyze the frequent errors and take a corrective measure to enhance user experience.

Pig Use Case#2

To know about the effectiveness of an advertisement is one of the important goals for any companies.

Many companies invest millions of dollars in buying the ads space and for them and it is critical to know how popular their advertisement both in physical and virtual space.

Gathering advertising information from multiple sources and analysis to understand the customer behavior and its effectiveness is one of the important goals for many companies. This can be easily achieved by using Pig Latin language.

Pig Use Case#3

Processing the healthcare information is one of the important use cases of Pig.

Neural Network for Breast Cancer Data Built on Google App Engine is one of the important application developed using pig and Hadoop.

Pig Use Case#4

Stock analysis using Pig Latin such as to calculate average dividend, Total trade estimation, Group the stocks and Join them.

Apache Flume Agent – Flume Hadoop | Spool directory to logger

In this tutorial, we will learn to set up an Apache Flume agent that is going to monitor a spooling directory.

When any log files are added to the directory, it will read those log files and push the content of the log file to the console log.

Here the source is spooling directory and the sink is console log, Apache Flume Agent sits in between spooling directory and console log.

Apache Flume Agent

Configuration of Apache Flume Agent

  1. We need to set up all the options in “.properties” file.
  2. Use the command line to start a flume agent.
  3. Command to start a flume agent as below. “ng” stands for next generation.
$flume-ng agent
  1. Set the “.properties” file as below.

spool-to-log.properties

--name=agent1
agent1.sources =datasource1
agent1.sinks =datastore1
agent1.channels =ch1

agent1.sources.datasource1.channels = ch1
agent1.sinks.datastore1.channels = ch1

agent1.sources.datasource1.type =spooldir
agent1.sources.datasource1.spooldir =/usr/kmayank/spooldir

agent1.sinks.datastore1.type =logger
agent1.channels.ch1.type =file
  1. Run the flume agent using below command:
>flume-ng agent \
>--conf-file spool-to-log.properties \
>--name agent1 \
>--Dflume.root.logger=WARN, console

The last option “Dflume” is optional. It just specifies what the logging should look like on the screen. Here, we have only mentioned the warning to be printed on the screen.

  1. Place the file into the spooling directory. You will notice that after few seconds the file name will get appended with COMPLETED and data of the file will get printed to the screen/console.

It is important to note that whenever you place a file in the spooling directory, the file should be a text file, unique in name and immutable.

To read files in any other format, you have to write a custom deserializer in Java and plug it to your properties file.

Flume Events in Apache Flume Agent

The data inside the Apache Flume agent is represented as flume events. It is the base unit of communication between source, channel, and sink.

A flume event is a discrete object that represents one record of data that need to be transported from source to sink.

Source reads the data in form of flume event, sends the data to the channel and then the sink reads the data from the channel in the form of events.

Apache Flume event consists of key-value pair representing 1 record of data. Key consists of event header i.e. metadata information which deals with how you want to process or route the data to channel or sink.

The value is the actual data which is also called event body. Therefore, each record is represented by one flume event.

For example, one file in the spooling directory is considered as one event. The event body is usually represented by a byte array. When the sources write these data to channel, it can be one event or multiple events.

If the event body exceeds the channel capacity, then apache flume won’t be able to transport that event. For example, if you want to transport very large file to say HDFS, then the direct copy would be a better option than using a flume.

Flume Installation – Apache Flume Agent

Flume installation is a very simple process. Go through the following steps to install and configure Apache Flume.

Steps for Flume Installation

  1. Download the Apache flume from the Apache Webpage. (Apache Flume Download Url)
  2. Extract the folder from the zip file that is downloaded and point to the flume folder in bash profile. The entry to the bash profile is to make sure that you can start the flume agent from any directory. For Example : export FLUME_HOME = $HOME/apache-flume-1.6.0-bin
  3. Append the path variable with FLUME_HOME. For example, export PATH=$PATH:$FLUME_HOME/bin

Flume Agent

A flume agent is an independent java daemon (JVM) which receives events (data) from an external source to the next destination. The next destination can be an agent itself or it can be a sink. A flume agent can connect to any number of sources to any number of data storesin big data.

The next destination can be an agent itself or it can be a sink. A flume agent can connect to any number of sources to any number of datastores.

Let’s understand this with an example:

Apache Flume Agent

If you have two data sources say DS1 and DS2 and you want to write DS1 data into HDFS and DS2 data into Cassandra.

For this scenario, one flume agent is enough to complete the job. A flume agent does hop by hop operation i.e. writing the data of DS1 and DS2 to HDFS and Cassandra is one hop complete.

Let us suppose you have another hop for the data i.e. data that is written to the HDFS is read by some other application and finally it needs to go to some other datastore say hive.

Here, there are two flume agents required since we have two hops of data. One flume agent for DS1 and DS2 data to HDFS, Cassandra respectively and the other flume agent from HDFS to Hive.

There are three basic components of a flume agent:

  1. The first component to receive data. This component is called as the source.
  2. The second component to buffer data. This component is called as the channel.
  3. The third component to write data. This component is called as the sink.

A flume agent is set up using a configuration file and within that configuration file, we have to configure about the sources and the format in which data is sent to your source.

It is important to configure the channel capacity as per the rates of the source and the sink. There are many types of channels but the two are most commonly used channels.

First, is called an in-memory channel which uses the memory of the system where flume agent is running and buffers the data where in-memory. The in-memory channel basically acts like an in-memory queue.

The source will write to the tail of the queue and the sink will read from the head of the queue.

But there are issues with the memory channel. One primary issue is that it is capacity constraint by the amount of memory system has. In case of the crash, memory channel is not persistent.

Therefore, all the data that is present in the buffer might be lost. File channels are the best because it gives you fault-tolerance and non-lossy data i.e. you will get a guarantee of no data loss.

Since the data is buffered on disk for file channels, you can have larger buffer capacity as per your requirement. Channels are continuously polled by sink components which write the data to endpoints.

Multiple sources can write to the single channel. Also, one source can write to multiple channels i.e. there is a many-to-many relationship between sources and channel.

However, channel to sink relationship is one to one. The channel will not immediately delete the data as it writes to the sink. It will wait for an acknowledgment from sink before deleting any data from the channel.

YARN Architecture – Yet Another Resource Negotiator, Hadoop 2

In this post, we will learn about YARN architecture. YARN (Yet Another Resource Negotiator) is the key component of Hadoop 2.x.

The underlying file system continues to be HDFS. It is basically a framework to develop and/or execute distributed processing applications. For Example MapReduce, Spark, Apache Giraph etc.

Let us look at one of the scenarios to understand the YARN architecture better.

 

YARN Architecture
YARN Architecture

Suppose we have two client requests. One wants to run the MapReduce job while another wants to execute a shell script.

MapReduce job is represented in blue color while as Shell script one is represented in green color.

Resource manager has two main components, application manager, and scheduler. The scheduler is responsible for allocating resources to the various running applications. The scheduler is pure scheduler in the sense that it performs no monitoring or tracking of the status of the application.

The scheduler also offers no guarantee for restarting of failed tasks due to hardware or application failures. The scheduler performs its scheduling function based on resource requirement of the application. It does so based on the apps extract notion of the resource container which incorporates elements such as memory, CPU, disk, network etc.

Application Manager is responsible for accepting job submissions, negotiating the first container for executing the application specific application master and provides the services for restarting the application master container on failure.

Node Manager is per-machine framework agent responsible for containers, monitoring their resources such as CPU, memory, network etc. and reporting the same to resource manager/scheduler.

Application Master has a responsibility for negotiating the appropriate resource container from the scheduler, tracking their statuses and monitoring their progresses.

The green color job in the diagram will have its own application master and the blue color job will have its own application master. An application master will handle containers.

YARN architecture - another viewAnother view of YARN architecture is where resource manager is handling job queue, resource allocation, and job scheduling.

It is allocating resources against the available resource list. Slave node is having app master handling task queue and job lifecycle logic.

Hadoop 2 Architecture – Key Design Concepts, YARN, Summary

Hadoop 2 came up to overcome the limitations of Hadoop 1.x. Hadoop 2 architecture overcomes previous limitations and meets the current data processing requirements.

Hadoop 2 Architecture – Key Design Concepts

  • Split up the two major functions of job tracker
  • Cluster resource management
  • Application life-cycle management
  • MapReduce becomes user library or one of the applications residing in Hadoop.

Key concepts to understand before getting into Hadoop 2 Architecture details.

Application

The application is the job submitted to the framework. For example – MapReduce Jobs

Container

A container is a basic unit of allocation which represents a physical resource such as Container A = 2GB, 1CPU.

The node manager provides containers to an application. Each mapper and reducer runs in its own container to be accurate. The AppMaster allocates these containers for the mappers and reducers tasks.

Therefore, a container is supervised by node manager and scheduled by a resource manager.

Resource Manager

It is global resource scheduler which tracks resource usages and node health. The resource manager has two main components: scheduler and application manager.

The responsibility of scheduler is to allocate resources to various running applications.

It is important to note that resource manager doesn’t facilitate any monitoring of the applications.

Application Manager is responsible for negotiating the first container for executing the application specific Application Master and provide the service for restarting the containers on failure.

Node Manager

Node Manager manages the life cycle of the container. It also monitors the health of a node.

Application Master

It manages application scheduling and task execution. It interacts with the scheduler to acquire required resources and it interacts with node manager to execute and monitor the assigned tasks.

YARN (Yet Another Resource Negotiator)

YARN is the key component of Hadoop 2 Architecture. HDFS is the underlying file system.

It is a framework to develop and/or execute distributed processing applications. For Example MapReduce, Spark, Apache Giraph etc.

Please go to post YARN Architecture to understand Hadoop 2 architecture in detail.

Hadoop 2 Summary

Apache Hadoop 2.0 made a generational shift in architecture with YARN being integrated to whole Hadoop eco-system.

It allows multiple applications to run on the same platform. YARN is not only the major feature on Hadoop 2.0. HDFS has undergone major enhancement in terms of high availability (HA), snapshot and federation.

Name node high availability facilitates automated failover with a hot standby and resiliency for name node master service.

In high availability (HA) cluster, two separate machines are configured as Namenode. One Name node will be in the active state while as other will be in standby state.

All client operation is done by active node while as other acts as the slave node. Slave node is always in sync with the active node by having access to the directory on a shared storage device.

Snapshots create the recovery for backups at a given point of time which we call it the snapshot at that point.

To improve scalability and isolation, it does federation by creating multiple namespaces.