BigData Investigation 4 – MapReduce Explained

MapReduceIn this post I will explain MapReduce. MapReduce is Hadoop’s programming model to analyze data. I use the Hadoop Book for my investigation on BigData.  MapReduce is covered in chapter 2. Let’s study the examples to understand MapReduce.

All code examples of the Hadoop Book are available at GitHub. First we need to copy the example data and code from GitHub to a Linux server. I executed the examples on the Cloudera Quickstart VM which I have installed earlier. Though in a hindsight I have learned that any Linux server will work for this post. Hadoop is not yet required.

login as: cloudera
cloudera@127.0.0.1's password:
Last login: Sat Aug 13 11:42:17 2016 from 10.0.2.2

[cloudera@quickstart ~]$ git clone https://github.com/tomwhite/hadoop-book.git
Initialized empty Git repository in /home/cloudera/hadoop-book/.git/
remote: Counting objects: 4931, done.
remote: Total 4931 (delta 0), reused 0 (delta 0), pack-reused 4931
Receiving objects: 100% (4931/4931), 2.59 MiB | 462 KiB/s, done.
Resolving deltas: 100% (1975/1975), done.

[cloudera@quickstart ~]$ cd hadoop-book/

[cloudera@quickstart hadoop-book]$ ls
appc           ch06-mr-dev       ch14-flume   ch20-hbase         hadoop-meta
book           ch08-mr-types     ch15-sqoop   ch21-zk            input
ch02-mr-intro  ch09-mr-features  ch16-pig     ch22-case-studies  pom.xml
ch03-hdfs      ch10-setup        ch17-hive    common             README.md
ch04-yarn      ch12-avro         ch18-crunch  conf               snippet
ch05-io        ch13-parquet      ch19-spark   hadoop-examples

[cloudera@quickstart hadoop-book]

MapReduce programs can be written in Java or in any programming language which support Standard Streams, e.g. read from standard input (stdin) and write to standard output (stdout). The book provides examples for Java, Ruby and Python to process a text file with five lines of data. Let’s look into the text file first.

[cloudera@quickstart hadoop-book]$ cat input/ncdc/sample.txt
0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999
0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999
0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999
0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999
0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999

[cloudera@quickstart hadoop-book]$

The lines represent weather data which is difficult to read for humans. MapReduce allows to quickly analyze billions of such lines on a Hadoop cluster to gain insight. Each of the above lines includes a year and a measured temperature. For instance, the third line tells that in the year 1950 a temperature of -1.1 °C was measured.

0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999

Each MapReduce application consists of three parts:

  1. The Map function is an application specific function which extracts the information of each line.
  2. The Reduce function is an application specific function which consolidates and integrates the extracted information of all lines.
  3. The Hadoop framework is a generic framework used by all applications which ensures parallel execution on hundreds or thousands of nodes.

The Hadoop Book provides example code for Java, Ruby and Python where the Map function extracts the year and the temperature of each line and and the Reduce function determines each year’s maximum temperature. I will start with the Python example, given that I do not know Ruby and have limited Java skills.

The book illustrates Python programs for map (max_temperature_map.py) and reduce (max_temperature_reduce.py) which in total comprise less than 20 lines of code. Follow the links in the previous sentence to look at the code. The interesting thing is that the Python scripts can be executed without Hadoop using Unix pipes. The temperature is scaled by a factor of ten, so that the maximum of year 1949 was 11.1°C and the maximum of 1950 was 2.2°C.

[cloudera@quickstart hadoop-book]$ ls -l ch02-mr-intro/src/main/python/
total 8
-rwxrwxr-x 1 cloudera cloudera 231 Aug 14 03:20 max_temperature_map.py
-rwxrwxr-x 1 cloudera cloudera 374 Aug 14 03:20 max_temperature_reduce.py

[cloudera@quickstart hadoop-book]$ cat input/ncdc/sample.txt | \
ch02-mr-intro/src/main/python/max_temperature_map.py | \
sort | \
ch02-mr-intro/src/main/python/max_temperature_reduce.py
1949    111
1950    22

[cloudera@quickstart hadoop-book]

Let’s see, what each step of the Unix pipe does. The Unix cat command prints the content of the file with the sample data. We already used it at the beginning of this post, but for your convenience I paste the output here again.

[cloudera@quickstart hadoop-book]$ cat input/ncdc/sample.txt
0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999
0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999
0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999
0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999
0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999

[cloudera@quickstart hadoop-book]

The Map function (max_temperature_map.py) extracts the year and the temperature of each line and prints them in a single line. The tuple of year and temperature is also referred to as key-value pair.

[cloudera@quickstart hadoop-book]$ cat input/ncdc/sample.txt | \
ch02-mr-intro/src/main/python/max_temperature_map.py
1950    +0000
1950    +0022
1950    -0011
1949    +0111
1949    +0078

[cloudera@quickstart hadoop-book]$

Hadoop sorts all key-value pairs before it passes them to the Reduce function. The Unix sort command does the same for our Unix pipe.

[cloudera@quickstart hadoop-book]$ cat input/ncdc/sample.txt | \
ch02-mr-intro/src/main/python/max_temperature_map.py | \
sort
1949    +0078
1949    +0111
1950    +0000
1950    -0011
1950    +0022

[cloudera@quickstart hadoop-book]$

Finally the Reduce function (max_temperature_reduce.py) picks each year’s maximum temperature and prints it.

[cloudera@quickstart hadoop-book]$ cat input/ncdc/sample.txt | \
ch02-mr-intro/src/main/python/max_temperature_map.py | \
sort | \
ch02-mr-intro/src/main/python/max_temperature_reduce.py
1949    111
1950    22

[cloudera@quickstart hadoop-book]$

The sort is not really needed for this example which processes five lines only. Though the algorithm of the max_temperature_reduce.py script assumes that the keys of the input stream are sorted. I suspect that this pattern improves the performance of the Reduce function, if millions of key-value pairs need to be reduced.

Our Unix pipe creates a single instance of the Map function (max_temperature_map.py) and a single instance of the Reduce function (max_temperature_reduce.py). This is OK for development and education purposes where only a few lines of data are analyzed. In contrast Hadoop spawns hundreds or thousands instances to analyze huge amount of data on many nodes in parallel.

Ulf’s Conclusion

MapReduce is a simple programming model for data analysis. The use of programming languages like Java or Python allows to analyze a broad range of unstructured data including measured data, log files, web pages, images and videos. The Hadoop framework takes care that respective analysis jobs will be executed on hundreds or thousands of nodes in parallel to analyze huge amounts of data quickly.

In the next post I will explain how to run the Python scripts on a Hadoop cluster.

Changes:
2016/09/16 – added link – “how to run the Python scripts on a Hadoop cluster” => BigData Investigation 5 – MapReduce with Python and Hadoop Streaming

Share this article

Leave a Reply

Your email address will not be published. Required fields are marked *