To understand the MapReduce algorithm, it is vital to understand the challenge it attempts to provide a solution for. With the rise of digital age and the capability of capturing and storing data, there has been an explosion in the amount of data at our disposal. Businesses and corporations were intuitive enough to realize the true potential of this data in terms of gaining insights about customer needs and making predictions to take informed decisions; yet, only within a few years, managing this gigantic amount of data posed a serious challenge for organizations. This is where Big Data comes into picture.
Big data refers to the gigantic volumes of structured and unstructured data and the ways of dealing with it to aid in strategic business planning, reduction in production costs, and smart decision making. However, with Big Data came great challenges of capturing, storing, analyzing and sharing this data with traditional database servers. As a major breakthrough in processing of immense data, Google came up with the MapReduce algorithm inspired by the classic technique: Divide and Conquer.
MapReduce, when combined with Hadoop Distributed File System, plays a crucial role in Big Data Analytics. It introduces a way of performing multiple operations on large volumes of data parallely in batch mode using ‘key-value’ pair as the basic unit of data for processing.
MapReduce algorithm involves two major components; Map and Reduce.
The Map component (aka Mapper) is responsible for splitting large data in equal sized chunks of information which are then distributed among a number of nodes (computers) in such a way that the load is balanced and distributed as well as faults and failures are managed by rollbacks.
The Reduce component (aka Reducer) comes into play once the distributed computation is completed and acts as an accumulator to aggregate the results as final output.
The entire process of MapReduce includes four stages.
1. Input Split
In the first phase, the input file is located and transformed for processing by the Mapper. The file gets split up in fixed-sized chunks on Hadoop Distributed File System. The input file format decides how to split up the data using a function called InputSplit. The intuition behind splitting data is simply that the time taken to process a split is always smaller than the time to process the whole dataset as well as to balance the load eloquently across multiple nodes within the cluster.
Once all the data has been transformed in an acceptable form, each input split is passed to a distinct instance of mapper to perform computations which result in key-value pairs of the dataset. All the nodes participating in Hadoop cluster perform the same map computations on their respective local datasets simultaneously. Once mapping is completed, each node outputs a list of key-value pairs which are written on the local disk of the respective node rather than HDFS. These outputs are now fed as inputs to the Reducer.
3. Shuffling and Sorting
Before the reducer runs, the intermediate results of mapper are gathered together in a Partitioner to be shuffled and sorted so as to prepare them for optimal processing by the reducer.
For each output, reduce is called to perform its task. The reduce function is user-defined. Reducer takes as input the intermediate shuffled output and aggregates all these result into the desired result set. The output of reduce stage is also a key-value pair but can be transformed in accordance to application requirements by making use of OutputFormat, a feature provided by Hadoop.
It is clear from the stages’ order that MapReduce is a sequential algorithm. Reducer cannot start its operation unless Mapper has completed its execution. Despite being prone to I/O latency and a sequential algorithm, MapReduce is thought of as the heart of Big Data Analytics owing to its capability of parallelism and fault-tolerance.
After getting familiar with the gist of MapReduce Algorithm, we will now move ahead to translate the Word Count Example as shown in figure in Python code.
MapReduce in Python
We aim to write a simple MapReduce program for Hadoop in Python that is meant to count words by value in a given input file.
We will make use of Hadoop Streaming API to be able to pass data between different phases of MapReduce through STDIN (Standard Input) and STDOUT (Standard Output).
1. First of all, we need to create an example input file.
Folio3 introduces ML.
Folio3 introduces BigData.
BigData facilitates ML.
2. Now, create “mapper.py” to be executed in the Map phase.
Mapper.py will read data from standard input and will print on standard output a list of tuples for each word occuring in the input file.
import sys # input comes from STDIN (standard input) for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # split the line into words words = line.split() for word in words: # write the results to STDOUT (standard output) # tab-delimited words with default count 1 print '%s\t%s' % (word, 1)
3. Next, create a file named “reducer.py” to be executed in Reduce phase. Reducer.py will take the output of mapper.py as its input and will sum the occurrences of each word to a final count.
from operator import itemgetter import sys current_word = None current_count = 0 word = None # input comes from STDIN for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # parse the input we got from mapper.py word, count = line.split('\t', 1) # convert count (currently a string) to int try: count = int(count) except ValueError: # count was not a number, so silently # ignore/discard this line continue # prepare mapper.py output to be sorted by Hadoop # by key before it is passed to the reducer if current_word == word: current_count += count else: if current_word: # write result to STDOUT print '%s\t%s' % (current_word, current_count) current_count = count current_word = word # to output the last word if needed if current_word == word: print '%s\t%s' % (current_word, current_count)
4. Make sure you make the two programs executable by using the following commands:
> chmod +x mapper.py
> chmod +x reducer.py
You can find the full code at Folio3 AI repository.
Running MapReduce Locally
> cat dummytext.txt | python mapper.py | sort -k1 | python reducer.py
Running MapReduce on Hadoop Cluster
We assume that the default user created in Hadoop is f3user.
1. Firstly, we will copy local dummy file to Hadoop Distributed file system by running:
> hdfs dfs -put /src/dummytext.txt /user/f3user
2. Finally, we run our MapReduce job on Hadoop cluster by leveraging the support of streaming API to support standard I/O.
> hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-file /src/mapper.py -mapper “python mapper.py” \
-file /src/reducer.py -reducer “python reducer.py” \
-input /user/f3user/dummytext.txt -output /user/f3user/wordcount
The job will take input from ‘user/f3user/dummytext.txt’ and write output to ‘user/f3user/wordcount’.
Running this job will produce the output as:
Congratulations, you just completed your first MapReduce application on Hadoop with Python!
Please feel free to reach out to us, if you have any questions. In case you need any help with development, installation, integration, up-gradation and customization for your Business Solutions.
We have expertise in Deep learning, Computer Vision, Predictive learning, CNN, HOG and NLP.
Connect with us for more information at [email protected]