BigData Investigation 5 – MapReduce with Python and Hadoop Streaming

HadoopStreamingIn this post I will explain the Hadoop Streaming utility. Hadoop Streaming uses executables or scripts to create a MapReduce job and submits the job to a Hadoop cluster. Hadoop’s programming model is called MapReduce. In a previous post I have explained MapReduce using a Unix pipe which includes two Python scripts and a few Linux commands. In this post I will use the same Python scripts and Hadoop Streaming to run them as MapReduce job on a Hadoop cluster.

In hindsight the creation of this post was the most difficult so far. I am using the Hadoop Book as travel guide for my BigData Investigation. The book provides an example for running Ruby scripts on Hadoop, but I failed to run it on my Hadoop cluster. Meanwhile I understand that the Hadoop cluster of my setup was configured in Pseudo-Distributed Mode but the example requires a Hadoop cluster in Standalone (Local) Mode. I will explain the different Hadoop cluster modes in the next post.

I recommend that you read my introduction in MapReduce (BigData Investigation 4 – MapReduce Explained), before you continue to read this post. In the MapReduce Explained post I have analyzed the sample data and the Python scripts which are illustrated in the Hadoop Book and executed them in a Unix pipe. The example provides different scripts for map (max_temperature_map.py) and reduce (max_temperature_reduce.py). For your convenience I am copying here the output of the Unix pipe. See the MapReduce Explained post for details.

[cloudera@quickstart hadoop-book]$ cat input/ncdc/sample.txt | \
ch02-mr-intro/src/main/python/max_temperature_map.py | \
sort | \
ch02-mr-intro/src/main/python/max_temperature_reduce.py
1949    111
1950    22

[cloudera@quickstart hadoop-book]$

For this post I use the Cloudera QuickStart VM. Login to the VM (user: cloudera, password: cloudera) and get the examples and the sample data of the Hadoop Book.

login as: cloudera
cloudera@127.0.0.1's password:
Last login: Mon Aug 29 13:12:04 2016 from 10.0.2.2

[cloudera@quickstart ~]$ git clone https://github.com/tomwhite/hadoop-book.git 
Initialized empty Git repository in /home/cloudera/hadoop-book/.git/
remote: Counting objects: 4931, done.
remote: Total 4931 (delta 0), reused 0 (delta 0), pack-reused 4931
Receiving objects: 100% (4931/4931), 2.59 MiB | 59 KiB/s, done.
Resolving deltas: 100% (1975/1975), done.

[cloudera@quickstart ~]$ cd hadoop-book/

[cloudera@quickstart hadoop-book]$ ls -l ch02-mr-intro/src/main/python/
total 8
-rwxrwxr-x 1 cloudera cloudera 231 Aug 29 13:13 max_temperature_map.py
-rwxrwxr-x 1 cloudera cloudera 374 Aug 29 13:13 max_temperature_reduce.py

[cloudera@quickstart hadoop-book]$ ls -l input/ncdc/sample.txt
-rw-rw-r-- 1 cloudera cloudera 529 Aug 29 13:13 input/ncdc/sample.txt

[cloudera@quickstart hadoop-book]$

Hadoop Streaming supports any executable or script as mapper or reducer. Hadoop Streaming requires that the executables or scripts read the input from Standard Input (stdin) and write the results to Standard Output (stdout). Here is the documentation of Hadoop Streaming for Hadoop 2.7.2 (Hadoop 2.7.2 was the latest stable Hadoop release when I wrote this post). Hadoop Streaming supports a plenty of additional command options which we do not need for this post. See the documentation for details.

The documentation starts with the following example.

hadoop jar hadoop-streaming-2.7.2.jar \
  -input myInputDirs \
  -output myOutputDir \
  -mapper /bin/cat \
  -reducer /usr/bin/wc

We need to adjust the values for the options to run the example Python scripts with the sample data provided by the Hadoop Book on the Cloudera QuickStart VM. I found the correct syntax after several trials and error.

Hadoop Streaming is shipped as jar file and needs to be passed as argument to the hadoop CLI command. First we need to find the hadoop-streaming jar file.

[cloudera@quickstart hadoop-book]$ locate hadoop-streaming | grep jar
/usr/jars/hadoop-streaming-2.6.0-cdh5.7.0.jar
/usr/jars/hadoop-streaming-2.6.0-mr1-cdh5.7.0.jar
/usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.6.0-mr1-cdh5.7.0.jar
/usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-mr1.jar
/usr/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.7.0.jar
/usr/lib/hadoop-mapreduce/hadoop-streaming.jar
/usr/lib/oozie/oozie-sharelib-mr1/lib/mapreduce-streaming/hadoop-streaming.jar
/usr/lib/oozie/oozie-sharelib-yarn/lib/mapreduce-streaming/hadoop-streaming.jar

[cloudera@quickstart hadoop-book]$ ls -l /usr/lib/hadoop-mapreduce/hadoop-streaming.jar
lrwxrwxrwx 1 root root 35 Apr  5 23:49 /usr/lib/hadoop-mapreduce/hadoop-streaming.jar ->
 hadoop-streaming-2.6.0-cdh5.7.0.jar

[cloudera@quickstart hadoop-book]$

From my various failed attempts I know that we need to specify the full path for the mapper script and the reduce script. The following command brings us closer to the correct syntax, but we are not yet there. The issue is that Hadoop expects the input data in the HDFS file system while I have stored it on the local Linux file system. HDFS is the default distributed file system which is shipped with Hadoop. I will explain HDFS in a future post.

[cloudera@quickstart hadoop-book]$ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-input /home/cloudera/hadoop-book/input/ncdc/sample.txt \
-output output \
-mapper /home/cloudera/hadoop-book/ch02-mr-intro/src/main/python/max_temperature_map.py \
-reducer /home/cloudera/hadoop-book/ch02-mr-intro/src/main/python/max_temperature_reduce.py

packageJobJar: [] [/usr/jars/hadoop-streaming-2.6.0-cdh5.7.0.jar] /tmp/streamjob305702993711
004322.jar tmpDir=null
16/08/29 13:44:07 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/08/29 13:44:07 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/08/29 13:44:09 INFO mapreduce.JobSubmitter: Cleaning up the staging area /tmp/hadoop-yarn
/staging/cloudera/.staging/job_1472469537947_0002
16/08/29 13:44:09 WARN security.UserGroupInformation: PriviledgedActionException as:cloudera
 (auth:SIMPLE) cause:org.apache.hadoop.mapred.InvalidInputException: Input path does not exist:
 hdfs://quickstart.cloudera:8020/home/cloudera/hadoop-book/input/ncdc/sample.txt
16/08/29 13:44:09 WARN security.UserGroupInformation: PriviledgedActionException as:cloudera
 (auth:SIMPLE) cause:org.apache.hadoop.mapred.InvalidInputException: Input path does not exist:
 hdfs://quickstart.cloudera:8020/home/cloudera/hadoop-book/input/ncdc/sample.txt
16/08/29 13:44:09 ERROR streaming.StreamJob: Error Launching job : Input path does not exist:
 hdfs://quickstart.cloudera:8020/home/cloudera/hadoop-book/input/ncdc/sample.txt
Streaming Command Failed!

[cloudera@quickstart hadoop-book]$

For execution on a multi-node Hadoop cluster we would need to copy the sample data into HDFS, but for our purposes we can tweak to URI of the input data to tell the hadoop CLI command to read the input data from the local Linux file system. I did the same trick to write the output to the local Linux file system and not to HDFS. This command completes successfully. The output illustrates that Hadoop Streaming monitors the progress of job execution and reports regular status updates.

[cloudera@quickstart hadoop-book]$ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-input file:/home/cloudera/hadoop-book/input/ncdc/sample.txt \
-output file:/tmp/output_storageulf \
-mapper /home/cloudera/hadoop-book/ch02-mr-intro/src/main/python/max_temperature_map.py \
-reducer /home/cloudera/hadoop-book/ch02-mr-intro/src/main/python/max_temperature_reduce.py

packageJobJar: [] [/usr/jars/hadoop-streaming-2.6.0-cdh5.7.0.jar] /tmp/streamjob3758230746893225
 679.jar tmpDir=null
16/08/29 14:26:48 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/08/29 14:26:49 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/08/29 14:26:49 INFO mapred.FileInputFormat: Total input paths to process : 1
16/08/29 14:26:49 INFO mapreduce.JobSubmitter: number of splits:2
16/08/29 14:26:50 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1472469537947_0005
16/08/29 14:26:50 INFO impl.YarnClientImpl: Submitted application application_1472469537947_0005
16/08/29 14:26:50 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/
 proxy/application_1472469537947_0005/
16/08/29 14:26:50 INFO mapreduce.Job: Running job: job_1472469537947_0005
16/08/29 14:27:00 INFO mapreduce.Job: Job job_1472469537947_0005 running in uber mode : false
16/08/29 14:27:00 INFO mapreduce.Job:  map 0% reduce 0%
16/08/29 14:27:13 INFO mapreduce.Job:  map 50% reduce 0%
16/08/29 14:27:14 INFO mapreduce.Job:  map 100% reduce 0%
16/08/29 14:27:21 INFO mapreduce.Job:  map 100% reduce 100%
16/08/29 14:27:21 INFO mapreduce.Job: Job job_1472469537947_0005 completed successfully
16/08/29 14:27:21 INFO mapreduce.Job: Counters: 49
        File System Counters
                FILE: Number of bytes read=865
                FILE: Number of bytes written=347468
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=210
                HDFS: Number of bytes written=0
                HDFS: Number of read operations=2
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=0
        Job Counters
                Launched map tasks=2
                Launched reduce tasks=1
                Rack-local map tasks=2
                Total time spent by all maps in occupied slots (ms)=22524
                Total time spent by all reduces in occupied slots (ms)=5034
                Total time spent by all map tasks (ms)=22524
                Total time spent by all reduce tasks (ms)=5034
                Total vcore-seconds taken by all map tasks=22524
                Total vcore-seconds taken by all reduce tasks=5034
                Total megabyte-seconds taken by all map tasks=23064576
                Total megabyte-seconds taken by all reduce tasks=5154816
        Map-Reduce Framework
                Map input records=5
                Map output records=5
                Map output bytes=55
                Map output materialized bytes=77
                Input split bytes=210
                Combine input records=0
                Combine output records=0
                Reduce input groups=2
                Reduce shuffle bytes=77
                Reduce input records=5
                Reduce output records=2
                Spilled Records=10
                Shuffled Maps =2
                Failed Shuffles=0
                Merged Map outputs=2
                GC time elapsed (ms)=351
                CPU time spent (ms)=1450
                Physical memory (bytes) snapshot=558338048
                Virtual memory (bytes) snapshot=4510806016
                Total committed heap usage (bytes)=391979008
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=794
        File Output Format Counters
                Bytes Written=29
16/08/29 14:27:21 INFO streaming.StreamJob: Output directory: file:/tmp/output_storageulf

[cloudera@quickstart hadoop-book]$

The output of the reduce function was written to /tmp/output_storageulf/part-00000. It is the same as the output of the Unix pipe which I have pasted at the beginning of this post.

[cloudera@quickstart hadoop-book]$ ls -l /tmp/output_storageulf/
total 4
-rw-r--r-- 1 yarn yarn 17 Aug 29 14:27 part-00000
-rw-r--r-- 1 yarn yarn  0 Aug 29 14:27 _SUCCESS

[cloudera@quickstart hadoop-book]$ cat /tmp/output_storageulf/part-00000
1949    111
1950    22

[cloudera@quickstart hadoop-book]$

Please note that all output files and directories are owned by yarn:yarn. YARN is the scheduler of Hadoop. That ownership of the output directory and it files by yarn:yarn indicates that the Hadoop Streaming utility indeed created a MapReduce job which was submitted to Hadoop cluster for execution. I will explain YARN in a future post.

Ulf’s Conclusion

The Hadoop Streaming utility is easy to use. It allows to use executables or any scripting language to create MapReduce applications. In contrast to a Unix pipe, Hadoop streaming runs the executables or scripts on many nodes in parallel and monitors the progress until it completes.

In the next post I will explain the Hadoop Cluster modes.

Changes:
2016/09/23 – added link – “Hadoop Cluster modes” => BigData Investigation 6 – Hadoop Cluster Modes

Share this article

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.