Hadoop 2 came up to overcome the limitations of Hadoop 1.x. Hadoop 2 architecture overcomes previous limitations and meets the current data processing requirements.
Hadoop 2 Architecture – Key Design Concepts
Split up the two major functions of job tracker
Cluster resource management
Application life-cycle management
MapReduce becomes user library or one of the applications residing in Hadoop.
Key concepts to understand before getting into Hadoop 2 Architecture details.
Application
The application is the job submitted to the framework. For example – MapReduce Jobs
Container
A container is a basic unit of allocation which represents a physical resource such as Container A = 2GB, 1CPU.
The node manager provides containers to an application. Each mapper and reducer runs in its own container to be accurate. The AppMaster allocates these containers for the mappers and reducers tasks.
Therefore, a container is supervised by node manager and scheduled by a resource manager.
Resource Manager
It is global resource scheduler which tracks resource usages and node health. The resource manager has two main components: scheduler and application manager.
The responsibility of scheduler is to allocate resources to various running applications.
It is important to note that resource manager doesn’t facilitate any monitoring of the applications.
Application Manager is responsible for negotiating the first container for executing the application specific Application Master and provide the service for restarting the containers on failure.
Node Manager
Node Manager manages the life cycle of the container. It also monitors the health of a node.
Application Master
It manages application scheduling and task execution. It interacts with the scheduler to acquire required resources and it interacts with node manager to execute and monitor the assigned tasks.
YARN is the key component of Hadoop 2 Architecture. HDFS is the underlying file system.
It is a framework to develop and/or execute distributed processing applications. For Example MapReduce, Spark, Apache Giraph etc.
Please go to post YARN Architectureto understand Hadoop 2 architecture in detail.
Hadoop 2 Summary
Apache Hadoop 2.0 made a generational shift in architecture with YARN being integrated to whole Hadoop eco-system.
It allows multiple applications to run on the same platform. YARN is not only the major feature on Hadoop 2.0. HDFS has undergone major enhancement in terms of high availability (HA), snapshot and federation.
Name node high availability facilitates automated failover with a hot standby and resiliency for name node master service.
In high availability (HA) cluster, two separate machines are configured as Namenode. One Name node will be in the active state while as other will be in standby state.
All client operation is done by active node while as other acts as the slave node. Slave node is always in sync with the active node by having access to the directory on a shared storage device.
Snapshots create the recovery for backups at a given point of time which we call it the snapshot at that point.
To improve scalability and isolation, it does federation by creating multiple namespaces.
In this post, we will learn about YARN architecture. YARN (Yet Another Resource Negotiator) is the key component of Hadoop 2.x.
The underlying file system continues to be HDFS. It is basically a framework to develop and/or execute distributed processing applications. For Example MapReduce, Spark, Apache Giraph etc.
Let us look at one of the scenarios to understand the YARN architecture better.
Suppose we have two client requests. One wants to run the MapReduce job while another wants to execute a shell script.
MapReduce job is represented in blue color while as Shell script one is represented in green color.
Resource manager has two main components, application manager, and scheduler. The scheduler is responsible for allocating resources to the various running applications. The scheduler is pure scheduler in the sense that it performs no monitoring or tracking of the status of the application.
The scheduler also offers no guarantee for restarting of failed tasks due to hardware or application failures. The scheduler performs its scheduling function based on resource requirement of the application. It does so based on the apps extract notion of the resource container which incorporates elements such as memory, CPU, disk, network etc.
Application Manager is responsible for accepting job submissions, negotiating the first container for executing the application specific application master and provides the services for restarting the application master container on failure.
Node Manager is per-machine framework agent responsible for containers, monitoring their resources such as CPU, memory, network etc. and reporting the same to resource manager/scheduler.
Application Master has a responsibility for negotiating the appropriate resource container from the scheduler, tracking their statuses and monitoring their progresses.
The green color job in the diagram will have its own application master and the blue color job will have its own application master. An application master will handle containers.
Another view of YARN architecture is where resource manager is handling job queue, resource allocation, and job scheduling.
It is allocating resources against the available resource list. Slave node is having app master handling task queue and job lifecycle logic.
All HDFS commands are invoked by the “bin/hdfs” script. If we will run the hdfs scripts without any argument then it will print the description of all commands.
Commands
Usages
Description
classpath
hdfsclasspath
It prints the class path needed to get the Hadoop jar and the required libraries.
ls
hadoop fs -ls /
List the contents of the root directory in HDFS
version
hadoop version
Print the Hadoop version
df
hadoop fs -dfhdfs:/
amount of space used and available on currently mounted filesystem
balancer
hadoop balancer
Run a cluster balancing utility
mkdir
hadoop fs -mkdir /usr/training/hadoop_files
Create a new directory hadoop_files below the /usr/training directory in HDFS
Add the myfile.txt file from “hadoop_files” directory which is present in HDFS directory to the directory “data” which is present in your local directory
Lists files in current directory.If you run ls without any additional parameters, the program will list the contents of the current directory in short form.
-l
detailed list
-a
displays hidden files
cp
cp [option(s)] <sourcefile> <targetfile>
cp file1 new-file2
cp -r dir1 dir2
Copies sourcefile to targetfile.
-i
Waits for confirmation, if necessary, before an existing targetfile is overwritten
-r
Copies recursively (includes subdirectories)
mv
$ mv file_1.txt /scratch/kmak
Move or rename files. Copies sourcefile to targetfile then deletes the original sourcefile.
rm
rm myfile.txt
rm -r mydirectory
Removes the specified files from the file system. Directories are not removed by rm unless the option -r is used.
ln
ln file1.txt file2.txt
ln creates links between files.
cd
cd /scratch/kmak/bi
Changes the shell’s current working directory.
pwd
pwd
Print working directory.It writes the full pathname of the current working directory to the standard output.
mkdir
mkdir <mydir>
It is used to create directories on a file system.
rmdir
rmdir <emptydir>
Deletes the specified directory provided it is already empty.
nl
nl myfile.txt
nl numbers the lines in a file.
gedit
gedit myfile.txt
Text editor
stat
stat myfile.txt
Displays the status of an entire file system.
wc
wc myfile.txt
wc -l myfile.txt
wc -c myfile.txt
It is used to find out the number of newline count, word count, byte, and characters count in a file specified by the file arguments.
chown
chown chope file.txt
chown -R chope /scratch/work
It changes the owner and owning group of files.
chgrp
chgrp oracle myfile.txt
Changes group ownership of a file or files.
ifconfig
Ifconfig
It is used to view and change the configuration of the network interfaces on your system.
In this post, we will learn Hadoop 1 Architecture and step by step description of the architecture. Hadoop 1 Architecture had some limitations which have been addressed in Hadoop 2.x.
Hadoop 1 Architecture Description
One or more HDFS Clients submit the job to Hadoop System.
When the Hadoop System receives a client request, it first contacts the Master Node. Master Node consists of Name node, secondary name node and data nodes which form the HDFS layer while as job tracker in the Master node gives us MapReduce layer.
Once you write a MapReduce java program say using Eclipse IDE, you convert the program into a runnable jar file. Job tracker then receives this runnable jar.
Now, job tracker needs to know on which commodity machine the data blocks are residing. Name node will give the information about machine i.e., the IP address of commodity machine where data blocks are residing.
Slave node MapReduce component, i.e., task tracker, receives the runnable jar from job tracker and perform those tasks using map reduce components.
Task Tracker will create a JVM (java virtual machine) to execute the runnable jar. The program will first run the mapper routine. The mapper routine needs the key and value pair which is fetched by the task tracker. Task tracker internally accesses the data blocks residing on slave nodes.
Mapper routine will put the result set in the context which is also a 64 MB block by default.
Task Tracker will create another JVM where the reducer routine will run. Reducer takes the input as mapper output and then shuffle, sort and reduce the data blocks and finally gives you summarized information as output.
Once all Task Trackers finishes their jobs, Job Tracker takes those results and combines them into final result-set
Hadoop client then receives the final result.
Hadoop 1.x Limitations
Hadoop 1.x supports only MapReduce-based Batch/Data Processing Applications.
It does not support Real-time data processing.
It allows only one name node and one namespace per cluster to configure, i.e. it does not support federated architecture. The entire Hadoop cluster will go down if the name node fails.
Job tracker is the single point of failure which has to perform multiple activities such as Resource Management, Job Scheduling, and Job Monitoring, etc.
Hadoop 1.x does not support Horizontal Scalability.
It can support maximum of 4000 nodes and maximum of 40,000 concurrent tasks in the cluster.
Hadoop MapReduce is a system for parallel processing which was initially adopted by Google for executing the set of functions over large data sets in batch mode which is stored in the fault-tolerant large cluster.
The input data set which can be a terabyte file broken down into chunks of 64 MB by default is the input to Mapper function. The Mapper function then filters and sort these data chunks on Hadoop cluster data nodes based on the business requirement.
After the distributed computation is completed, the output of the mapper function is passed to reducer function which combines all the elements back together to provide the resulting output.
An example of Hadoop MapReduce usage is “word-count” algorithm in raw Java using classes provided by Hadoop libraries. Count how many times a given word such as “are”, “Hole”, “the” exists in a document which is the input file.
To begin, consider below figure, which breaks the word-count process into steps.
The building blocks of Hadoop MapReduce programs are broadly classified into two phases, the map and reduce.
Both phases take input data in form of (key, value) pair and output data as (key, value) pair. The mapper program runs in parallel on the data nodes in the cluster. Once map phase is over, reducer run in parallel on data nodes.
The input file is split into 64 MB chunk and is spread over the data nodes of the cluster. The mapper program runs on each data node of the cluster and generates (K1, V1) as the key-value pair.
Sort and shuffle stage creates the iterator for each key for e.g. (are, 1,1,1) which is passed to the reduce function that sums up the values for each key to generate (K2, V2) as output. The illustration of the same is shown in above figure (word count MapReduce process).
The summary of the classes defined in the “word count map reduce” program is as below :
public class WordCount {
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
----------
----------
--------
}
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
......................
.........................
............................
}
public static void main(String[] args) throws Exception {
========================
=====================
================
}
We have created a package in the eclipse and defined a class named “WordCount”. The “WordCount” class has two nested class and one main class. “Mapper” and “Reducer” are the reserved keywords.
The source code for the same is written by Hadoop developer. We are extending the “Mapper” and “Reducer” class by the “Map” and “Reduce” respectively using inheritance.
Let us understand what is LongWritable, Text, IntWritable. For the same, we need to first understand serialization and de-serialization in java.
Object serialization is a mechanism where an object can be represented as a sequence of bytes that includes the object’s data as well as information about the object’s type and the types of data stored in the object.
The serialized object is written in a file and then de-serialized to recreate the object back into memory.
For example word “Hai” has a serializable value of say “0010110” and then once it is written in a file, you can de-serialized back to “Hai”.
In Hadoop MapReduce framework, mapper output is feeding as reducer input. These intermediate values are always in serialized form.
Serialization and de-serialization in java are called as Writable in Hadoop MapReduce programming. Therefore, Hadoop developers have converted all the data types in serialized form. For example, Int in java is IntWritable in MapReduce framework, String in java is Text in MapReduce framework and so on.
The input and output of the mapper or reducer is in (key, value) format. For example, we have a file which contains text input and text outputs say the sample data as (1, aaa). The key is considered to be the precision of input data. The precision for (1, aaa) is defined as “01234”. 0 for “1”, 1 for “,” and so on which makes it to “01234”.
Therefore, for a text input/output file, the precision of first value is considered to be as key and the rest are values. In this case, “0” is considered as the key while as “(1, aaa)” as value.
Similarly, if you have another data in the file say (2, bbb). The precision for (1, bbb) is defined as “56789”. Key here will be 5 and the value will be (1, bbb).
Now, let us try to understand the below with an example:
Mapper<LongWritable, Text, Text, IntWritable> {
Consider, we have the first line in the file as “Hi! How are you”.
The mapper input key value is (0, Hi!), (4, How), (8, are), (12, you). Therefore, the key generated by mapper class has a data type “LongWritable” i.e. the first parameter and the value generated by mapper class is “Text”.
The mapper output value would be the word and the count of the word i.e. (Hi!,1), (How,1), (are,1), (you, 1).
If the word “are” repeated twice in the sentence then the mapper output would be (are,1,1). Hence, the key of the mapper output is “Text” while as the value is “IntWritable”. This output to the mapper is getting
This output to the mapper is getting fed as the input to the reducer. Therefore, if the reducer input is (are, 1, 1) then the output of the reducer will be (are,2). Here, the reducer output data type has the key as “Text” and value as “IntWritable”.
Step 3
Define the map class. The key and value input pair have to be serializable by the framework and hence need to implement the Writable interface.
Output pairs do not need to be of the same types as input pairs. Output pairs are collected with calls to context.
Inside the static class “map” we are declaring an object with the name “one” to store the incremental value of the given word and the particular word is stored in the variable named “word”.
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}
The above piece of code takes each line as an input and stores it into the variable “line”. StringTokenizer allows an application to break a string into tokens. For example:
StringTokenizerst = new StringTokenizer(“my name is kumar”,” “);
The output of the above line will be: my
name
is
kumar
If the “tokenizer” variable has more number of tokens to count then the while loop will get open. The context will take care of executing the for loop i.e. to read line by line of the file and store the output as the particular word and their occurrences. For example: if you have “hai, hai, hai” then the context will store (hai, 1, 1, 1)
Step 4
Reduce class will accept shuffled key-value pairs as input.The code then totals the values for the key-value pairs with the same key and outputs the totaled key-value pairs; e.g. <word,3>
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritableval : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
Step 5
The main method sets up the Map Reduce configuration by defining the type of input. In this case, the input is text.The code then defines the Map, Combine, and Reduce classes, as well as specifying the input/output formats.
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "wordcount");//Name for the job is “wordcount”
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class); // Mapper Class Name
job.setReducerClass(Reduce.class); //Reducer Class Name
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
Step 6
The full Java code for the “word count” program is as below:
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class WordCount {
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritableval : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "wordcount");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}