Monday, 16 July 2018

Apache Avro Format in Hadoop

Apache Avro file format created by Doug cutting is a data serialization system for Hadoop. Avro provides simple integration with dynamic languages. Avro implementations for C, C++, C#, Java, PHP, Python, and Ruby are available.

Avro file

Avro file has two things-

  • Data definition (Schema)
  • Data

Both data definition and data are stored together in one file. With in the Avro data there is a header, in that there is a metadata section where the schema is stored. All objects stored in the file must be written according to that schema.

Avro Schema

Avro relies on schemas for reading and writing data. Avro schemas are defined with JSON that helps in data interoperability. Schemas are composed of primitive types (null, boolean, int, long, float, double, bytes, and string) and complex types (record, enum, array, map, union, and fixed).

While defining schema you can write it in a separate file having .avsc extension.

Avro Data

Avro data is serialized and stored in binary format which makes for a compact and efficient storage. Avro data itself is not tagged with type information because the schema used to write data is always available when the data is read. The schema is required to parse data. This permits each datum to be written with no per-value overheads, making serialization both fast and small.

Avro file format

Avro specifies an object container file format. A file has a schema, and all objects stored in the file must be written according to that schema, using binary encoding.

Objects are stored in blocks that may be compressed. Syncronization markers are used between blocks to permit efficient splitting of files for MapReduce processing.

A file consists of:

  • A file header, followed by
  • one or more file data blocks

Following image shows the Avro file format.

Header Data block Data block …….

Avro file header consists of:

  1. Four bytes, ASCII 'O', 'b', 'j', followed by 1.
  2. File metadata, including the schema.
  3. The 16-byte, randomly-generated sync marker for this file.

A file header is thus described by the following schema:

{"type": "record", "name": "org.apache.avro.file.Header",
 "fields" : [
   {"name": "magic", "type": {"type": "fixed", "name": "Magic", "size": 4}},
   {"name": "meta", "type": {"type": "map", "values": "bytes"}},
   {"name": "sync", "type": {"type": "fixed", "name": "Sync", "size": 16}},
  ]
}
A file data block consists of:
  1. A long indicating the count of objects in this block.
  2. A long indicating the size in bytes of the serialized objects in the current block, after any codec is applied
  3. The serialized objects. If a codec is specified, this is compressed by that codec.
  4. The file's 16-byte sync marker.

How schema is defined in Avro

Avro Schema is defined using JSON and consists of-
  1. A JSON string, naming a defined type.
  2. A JSON object, of the form: {"type": "typeName" ...attributes...}
    where typeName is either a primitive or derived type name, as defined below. Attributes not defined in this document are permitted as metadata, but must not affect the format of serialized data.
  3. A JSON array, representing a union of embedded types.

Primitive Types in Avro

Primitive types used in Avro are as follows-
  • null: no value
  • boolean: a binary value
  • int: 32-bit signed integer
  • long: 64-bit signed integer
  • float: single precision (32-bit) IEEE 754 floating-point number
  • double: double precision (64-bit) IEEE 754 floating-point number
  • bytes: sequence of 8-bit unsigned bytes
  • string: unicode character sequence
As example if you are defining field of type String
 {"name": "personName",  "type": "string"}

Complex Types in Avro

Avro supports six kinds of complex types: record, enum, array, map, union and fixed.

record- Records are defined using the type name "record" and support following attributes:

  • name- A JSON string providing the name of the record, this is a required attribute.
  • doc- A JSON string providing documentation to the user of this schema, this is an optional attribute.
  • aliases- A JSON array of strings, providing alternate names for this record, this is an optional attribute.
  • fields- A JSON array, listing fields, this is a required attribute. Each field in Record is a JSON object with the following attributes:
    • name- A JSON string providing the name of the field, this is a required attribute.
    • doc- A JSON string describing this field for users, this is an optional attribute.
    • type- A JSON object defining a schema, or a JSON string naming a record definition, this is a required attribute.
    • default- A default value for this field, used when reading instances that lack this field, this is an optional attribute.
    • order- Specifies how this field impacts sort ordering of this record, this is an optional attribute. Valid values are "ascending" (the default), "descending", or "ignore".
    • aliases- A JSON array of strings, providing alternate names for this field, this is an optional attribute.
As example schema for Person having Id, Name and Address fields.
{
 "type": "record",
 "name": "PersonRecord",
 "doc": "Person Record",
 "fields": [
  {"name":"Id",  "type":"long"},
  {"name":"Name",  "type":"string"},
  {"name":"Address",   "type":"string"}
 ]
}

enum- Enums use the type name "enum" and support the following attributes:

  • name- A JSON string providing the name of the enum, this is a required attribute. namespace, a JSON string that qualifies the name;
  • aliases- A JSON array of strings, providing alternate names for this enum, this is an optional attribute.
  • doc- a JSON string providing documentation to the user of this schema, this is an optional attribute.
  • symbols- A JSON array, listing symbols, as JSON strings, this is a required attribute. All symbols in an enum must be unique; duplicates are prohibited.
For example, four seasons can be defined as:
{ "type": "enum",
  "name": "Seasons",
  "symbols" : ["WINTER", "SPRING", "SUMMER", "AUTUMN"]
}

array- Arrays use the type name "array" and support a single attribute:

  • items- The schema of the array's items.
For example, an array of strings is declared with:
{"type": "array", "items": "string"}

map- Maps use the type name "map" and support one attribute:

  • values- The schema of the map's values.
Map keys are assumed to be strings. For example, a map from string to int is declared with:
{"type": "map", "values": "int"}

union- Unions are represented using JSON arrays. For example, ["null", "string"] declares a schema which may be either a null or string. Avro data confirming to this union should match one of the schemas represented by union.

fixed- Fixed uses the type name "fixed" and supports following attributes:

  • name- A string naming this fixed, this is a required attribute. namespace, a string that qualifies the name;
  • aliases- A JSON array of strings, providing alternate names for this enum, this is an optional attribute.
  • size- An integer, specifying the number of bytes per value, this is a required attribute.
For example, 16-byte quantity may be declared with:
{"type": "fixed", "size": 16, "name": "md5"}

Data encoding in Avro

Avro specifies two serialization encodings: binary and JSON. Most applications will use the binary encoding, as it is smaller and faster.

Reference: https://avro.apache.org/docs/1.8.2/index.html

That's all for this topic Apache Avro Format in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!


Related Topics

  1. Parquet File Format in Hadoop
  2. Sequence File in Hadoop
  3. How to Configure And Use LZO Compression in Hadoop
  4. File Write in HDFS - Hadoop Framework Internal Steps
  5. Java Program to Read File in HDFS

You may also like -

>>>Go to Hadoop Framework Page

Thursday, 12 July 2018

Shuffle And Sort Phases in Hadoop MapReduce

When you run a MapReduce job and mappers start producing output internally lots of processing is done by the Hadoop framework before the reducers get their input. This whole internal processing is known as shuffle phase in Hadoop framework.

The tasks done internally by Hadoop framework with in the shuffle phase are as follows-

  1. Data from mappers is partitioned as per the number of reducers.
  2. Data is also sorted by keys with in a partition.
  3. Output from Maps is written to disk as may temporary files.
  4. Once the map task is finished all the files written to the disk are merged to create a single file.
  5. Data from a particular partition (from all mappers) is transferred to a reducer that is suppose to process that particular partition.
  6. If data transferred to a reducer exceeded the memory limit then it is copied to a disk.
  7. Once reducer has got its portion of data from all the mappers data is again merged while still maintaining the sort order of keys to create reduce task input.

As you can see some of the shuffle phase tasks happen at the nodes where mappers are running and some of them at the nodes where reducers are running.

Shuffle phase process at mappers side

When the map task starts producing output it is not directly written to disk instead there is a memory buffer (size 100 MB by default) where map output is kept. This size is configurable and parameter that is used is – mapreduce.task.io.sort.mb

When that data from memory is spilled to disk is controlled by the configuration parameter mapreduce.map.sort.spill.percent (default is 80% of the memory buffer). Once this threshold of 80% is reached, a thread will begin to spill the contents to disk in the background.

Before writing to the disk the Mapper outputs are sorted and then partitioned per Reducer. The total number of partitions is the same as the number of reduce tasks for the job. For example let's say there are 4 mappers and 2 reducers for a MapReduce job. Then output of all of these mappers will be divided into 2 partitions one for each reducer.

shuffle phase in Hadoop

If there is a Combiner that is also executed in order to reduce the size of data written to the disk.

This process of keeping data into memory until threshold is reached, partitioning and sorting, creating a new spill file every time threshold is reached and writing data to the disk is repeated until all the records for the particular map tasks are processed. Before the Map task is finished all these spill files are merged, keeping the data partitioned and sorted by keys with in each partition, to create a single merged file.

Following image illustrates the shuffle phase process at the Map end.

shuffle phase map side

Shuffle phase process at Reducer side

By this time you have the Map output ready and stored on a local disk of the node where Map task was executed. Now the relevant partition of the output of all the mappers has to be fetched by the framework to the nodes where reducers are running.

Reducers don’t wait for all the map tasks to finish to start copying the data, as soon as a Map task is finished data transfer from that node is started. For example if there are 10 mappers running, framework won’t wait for all the 10 mappers to finish to start map output transfer. As soon as a map task finishes transfer of data starts.

Data copied from mappers is kept is memory buffer at the reducer side too. The size of the buffer is configured using the following parameter.

mapreduce.reduce.shuffle.input.buffer.percent- The percentage of memory- relative to the maximum heapsize as typically specified in mapreduce.reduce.java.opts- that can be allocated to storing map outputs during the shuffle. Default is 70%.

When the buffer reaches a certain threshold map output data is merged and written to disk.

This merging of Map outputs is known as sort phase. During this phase the framework groups Reducer inputs by keys since different mappers may have output the same key.

The threshold for triggering the merge to disk is configured using the following parameter.

mapreduce.reduce.merge.inmem.thresholds- The number of sorted map outputs fetched into memory before being merged to disk. In practice, this is usually set very high (1000) or disabled (0), since merging in-memory segments is often less expensive than merging from disk.

The merged file, which is the combination of data written to the disk as well as the data still kept in memory constitutes the input for Reduce task.

shuffle and sort phase in MapReduce

Points to note-

  1. The Mapper outputs are sorted and then partitioned per Reducer.
  2. The total number of partitions is the same as the number of reduce tasks for the job.
  3. Reducer has 3 primary phases: shuffle, sort and reduce.
  4. Input to the Reducer is the sorted output of the mappers.
  5. In shuffle phase the framework fetches the relevant partition of the output of all the mappers, via HTTP.
  6. In sort phase the framework groups Reducer inputs by keys from different map outputs.
  7. The shuffle and sort phases occur simultaneously; while map-outputs are being fetched they are merged.

That's all for this topic Shuffle And Sort Phases in Hadoop MapReduce. If you have any doubt or any suggestions to make please drop a comment. Thanks!


Related Topics

  1. MapReduce Flow in YARN
  2. Predefined Mapper And Reducer Classes in Hadoop
  3. Speculative Execution in Hadoop
  4. Uber Mode in Hadoop
  5. Data Compression in Hadoop

You may also like -

>>>Go to Hadoop Framework Page

Wednesday, 11 July 2018

How to Check Hadoop MapReduce Logs

In your Hadoop MapReduce job if you are wondering how to put logs or where to check MapReduce logs or even System.out statements then this post shows the same. Note that here accessing logs is shown for MapReuduce 2.

Location of logs in Hadoop MapReduce

An application ID is created for every MapReduce job. You can get that application ID from the console itself after starting your MapReduce job. It will be similar to as shown below.

18/07/11 14:39:23 INFO impl.YarnClientImpl: Submitted application application_1531299441901_0001 

A folder with the same application ID will be created in the logs/userlogs of your Hadoop installation directory. For example I can see following directory for the application IDp mentioned above. HADOOP_INSTALLATION_DIR/logs/userlogs/application_1531299441901_0001

With in this directory you will find separate folders created for mappers and reducers and there you will have following files for logs and sysouts.

syslog- Contains the log messages.

sysout- Contains the System.out messages.

MapReduce example with logs

Here is a simple word count MapReduce program with logs and sysouts added.

import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordCount extends Configured implements Tool {
    public static final Log log = LogFactory.getLog(WordCount.class);
    // Map function
    public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
         private Text word = new Text();
         public void map(LongWritable key, Text value, Context context) 
                 throws IOException, InterruptedException {
             log.info("In map method");
             // Splitting the line on spaces
             String[] stringArr = value.toString().split("\\s+");
             System.out.println("Array length- " + stringArr.length);
             for (String str : stringArr) {
                 word.set(str);
                 context.write(word, new IntWritable(1));
             }
             
         }
    }
    
    // Reduce function
    public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
        
        private IntWritable result = new IntWritable();
        public void reduce(Text key, Iterable<IntWritable> values, Context context) 
                throws IOException, InterruptedException {
            log.info("In reduce method with key " + key);
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            System.out.println("Key - " + key + " sum - " + sum);
            result.set(sum);
            context.write(key, result);
        }
    }
    
    public static void main(String[] args) throws Exception {
        int exitFlag = ToolRunner.run(new WordCount(), args);
        System.exit(exitFlag);
    }
    
    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "WordCount");
        job.setJarByClass(getClass());
        job.setMapperClass(MyMapper.class);    
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        return job.waitForCompletion(true) ? 0 : 1;
    }
}
 
Once you run this MapReduce job, using the application ID you can go to the location as already explained above and check the log and sysout messages.

That's all for this topic How to Check Hadoop MapReduce Logs. If you have any doubt or any suggestions to make please drop a comment. Thanks!


Related Topics

  1. How to Handle Missing And Under Replicated Blocks in HDFS
  2. How to Compress Intermediate Map Output in Hadoop
  3. How to Write a Map Only Job in Hadoop MapReduce
  4. Predefined Mapper And Reducer Classes in Hadoop
  5. How to Configure And Use LZO Compression in Hadoop

You may also like -

>>>Go to Hadoop Framework Page

Monday, 9 July 2018

Predefined Mapper And Reducer Classes in Hadoop

Hadoop framework comes prepackaged with many Mapper and Reducer classes. This post explains some of these predefined Mappers and Reducers in Hadoop and shows examples using the predefined Mappers and Reducers classes.

Predefined Mapper classes in Hadoop

  1. ChainMapper- The ChainMapper class allows to use multiple Mapper classes within a single Map task. Using this predefined class you can chain mapper classes where output of one map task becomes input of th second map task. That helps in breaking a complex task with lots of data processing into a chain of smaller tasks.
  2. FieldSelectionMapper- This class implements a mapper class that can be used to perform field selections in a manner similar to unix cut. The input data is treated as fields separated by a user specified separator (the default value is "\t"). The user can specify a list of fields that form the map output keys, and a list of fields that form the map output values. The field separator is under attribute "mapreduce.fieldsel.data.field.separator" The map output field list spec is under attribute "mapreduce.fieldsel.map.output.key.value.fields.spec". The value is expected to be like "keyFieldsSpec:valueFieldsSpec" key/valueFieldsSpec are comma (,) separated field spec: fieldSpec,fieldSpec,fieldSpec ... Each field spec can be a simple number (e.g. 5) specifying a specific field, or a range (like 2-5) to specify a range of fields, or an open range (like 3-) specifying all the fields starting from field 3. By using this predefined class you don't need to write your own mapper with the split logic, you can configure FieldSelectionMapper with the required data to split the record.
  3. InverseMapper- This predefined Mapper swaps keys and values.
  4. TokenCounterMapper- Tokenize the input values and emit each word with a count of 1. This predefined class can be used where you want to do the sum of values like in a word count MapReduce program.
  5. MultithreadedMapper- This Mapper is a Multithreaded implementation for org.apache.hadoop.mapreduce.Mapper. This predefined mapper is useful if your job is more I/O bound than CPU bound.
  6. ValueAggregatorMapper- This class implements the generic mapper of Aggregate.
  7. WrappedMapper- This predefined mapper wraps a given one to allow custom Mapper.Context implementations.
  8. RegexMapper- A Mapper that extracts text matching a regular expression.

Predefined Reducer classes in Hadoop

  1. ChainReducer- The ChainReducer class allows to chain multiple Mapper classes after a Reducer within the Reducer task. For each record output by the Reducer, the Mapper classes are invoked in a chained fashion. The output of the reducer becomes the input of the first mapper and output of first becomes the input of the second, and so on until the last Mapper, the output of the last Mapper will be written to the task's output.
  2. IntSumReducer- This predefined Reducer is used to sum the int values grouped with a key. You can use this predefined reducer where you want to get the sum of values grouped by keys.
  3. LongSumReducer- This predefined Reducer is used to sum the long values grouped with a key.
  4. FieldSelectionReducer- This class implements a reducer class that can be used to perform field selections in a manner similar to unix cut. The input data is treated as fields separated by a user specified separator (the default value is "\t"). The user can specify a list of fields that form the reduce output keys, and a list of fields that form the reduce output values. The fields are the union of those from the key and those from the value. The field separator is under attribute "mapreduce.fieldsel.data.field.separator" The reduce output field list spec is under attribute "mapreduce.fieldsel.reduce.output.key.value.fields.spec". The value is expected to be like "keyFieldsSpec:valueFieldsSpec" key/valueFieldsSpec are comma (,) separated field spec: fieldSpec,fieldSpec,fieldSpec ... As example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields 4,3,0 and 1 for keys, and use fields 6,5,1,2,3,7 and above for values.
  5. ValueAggregatorReducer- This class implements the generic reducer of Aggregate.
  6. WrappedReducer- A Reducer which wraps a given one to allow for custom Reducer.Context implementations.

Predefined Mapper and Reducer class examples

Example 1- If you have to get few fields of the input file you can use FiledSelectionMapper for the same. Let’s say you have data in following format for item, zone and total sales.

Item1 zone-1 234
Item1 zone-2 456
Item3 zone-2 123 

And you need to find total sales for each item which means you’ll have to extract field 0 and field 2 in your Mapper.

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.fieldsel.FieldSelectionHelper;
import org.apache.hadoop.mapreduce.lib.fieldsel.FieldSelectionMapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class SalesCalc extends Configured implements Tool {    
    
    // Reduce function
    public static class TotalSalesReducer extends Reducer<Text, Text, Text, IntWritable>{
        
        public void reduce(Text key, Iterable<Text> values, Context context) 
                throws IOException, InterruptedException {
            int sum = 0;
            for (Text val : values) {
                sum += Integer.parseInt(val.toString());
            }      
            context.write(key, new IntWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception {
        int exitFlag = ToolRunner.run(new SalesCalc(), args);
        System.exit(exitFlag);
    }
    
    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        // setting the separator
        conf.set(FieldSelectionHelper.DATA_FIELD_SEPERATOR, "\t");
        // Configure the fields that are to be extracted
        conf.set(FieldSelectionHelper.MAP_OUTPUT_KEY_VALUE_SPEC, "0:2");
        Job job = Job.getInstance(conf, "Sales");
        job.setJarByClass(getClass());
        // setting predefined FieldSelectionMapper
        job.setMapperClass(FieldSelectionMapper.class);    
 
        job.setReducerClass(TotalSalesReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        return job.waitForCompletion(true) ? 0 : 1;
    }
} 
Example 2- You can write a word count MapReduce program using predefined TokenCounterMapper and IntSumReducer. In that case you don’t need to write any logic just configure these classes and run your MapReduce job.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.map.TokenCounterMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
public class WordCount extends Configured implements Tool{
 
 public static void main(String[] args) throws Exception{
  int exitFlag = ToolRunner.run(new SimpleWordCount(), args);
  System.exit(exitFlag);
 
 }
 
 @Override
 public int run(String[] args) throws Exception {
  Configuration conf = new Configuration();
  Job job = Job.getInstance(conf, "WordCount");
  job.setJarByClass(getClass());
  // Setting pre-defing mapper and reducer
  job.setMapperClass(TokenCounterMapper.class);    
  job.setReducerClass(IntSumReducer.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);
      job.setInputFormatClass(TextInputFormat.class);
      job.setOutputFormatClass(TextOutputFormat.class);
      FileInputFormat.addInputPath(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));
      return job.waitForCompletion(true) ? 0 : 1;
 }
}

That's all for this topic Predefined Mapper And Reducer Classes in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!


Related Topics

  1. Chaining MapReduce Job in Hadoop
  2. MapReduce Flow in YARN
  3. Speculative Execution in Hadoop
  4. How to Compress MapReduce Job Output in Hadoop
  5. Replica Placement Policy in Hadoop Framework

You may also like -

>>>Go to Hadoop Framework Page

Chaining MapReduce Job in Hadoop

While processing data using MapReduce you may want to break the requirement into a series of task and do them as a chain of MapReduce jobs rather than doing everything with in one MapReduce job and making it more complex. Hadoop provides two predefined classes ChainMapper and ChainReducer for the purpose of chaining MapReduce job in Hadoop.

ChainMapper class in Hadoop

Using ChainMapper class you can use multiple Mapper classes within a single Map task. The Mapper classes are invoked in a chained fashion, the output of the first becomes the input of the second, and so on until the last Mapper, the output of the last Mapper will be written to the task's output.

For adding map tasks to the ChainedMapper addMapper() method is used.

ChainReducer class in Hadoop

Using the predefined ChainReducer class in Hadoop you can chain multiple Mapper classes after a Reducer within the Reducer task. For each record output by the Reducer, the Mapper classes are invoked in a chained fashion. The output of the reducer becomes the input of the first mapper and output of first becomes the input of the second, and so on until the last Mapper, the output of the last Mapper will be written to the task's output.

For setting the Reducer class to the chain job setReducer() method is used.

For adding a Mapper class to the chain reducer addMapper() method is used.

How to chain MapReduce jobs

Using the ChainMapper and the ChainReducer classes it is possible to compose Map/Reduce jobs that look like [MAP+ / REDUCE MAP*].

In the chain of MapReduce job you can have-

  • A chain of map tasks executed using ChainMapper
  • A reducer set using ChainReducer.
  • A chain of map tasks added using ChainReducer (This step is optional).

Special care has to be taken when creating chains that the key/values output by a Mapper are valid for the following Mapper in the chain.

Benefits of using a chained MapReduce job

  • When MapReduce jobs are chained data from immediate mappers is kept in memory rather than storing to disk so that another mapper in chain doesn't have to read data from disk. Immediate benefit of this pattern is a dramatic reduction in disk IO.
  • Gives you a chance to break the problem into simpler tasks and execute them as a chain.

Chained MapReduce job example

Let’s take a simple example to show chained MapReduce job in action. Here input file has item, sales and zone columns in the below format (tab separated) and you have to get the total sales per item for zone-1.

Item1 345 zone-1
Item1 234 zone-2
Item3 654 zone-2
Item2 231 zone-3
    

For the sake of example let’s say in first mapper you get all the records, in the second mapper you filter them to get only the records for zone-1. In the reducer you get the total for each item and then you flip the records so that key become value and value becomes key. For that Inverse Mapper is used which is another predefined mapper in Hadoop.

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Sales extends Configured implements Tool{
    // First Mapper
    public static class CollectionMapper extends Mapper<LongWritable, Text, Text, Text>{
        private Text item = new Text();
        
        public void map(LongWritable key, Text value, Context context) 
                 throws IOException, InterruptedException {
            //splitting record
            String[] salesArr = value.toString().split("\t");
            item.set(salesArr[0]);
            // Writing (sales,zone) as value
            context.write(item, new Text(salesArr[1] + "," + salesArr[2]));
         }
    }
    
    // Mapper 2
    public static class FilterMapper extends Mapper<Text, Text, Text, IntWritable>{
        public void map(Text key, Text value, Context context) 
                 throws IOException, InterruptedException {
        
            String[] recordArr = value.toString().split(",");
            // Filtering on zone
            if(recordArr[1].equals("zone-1")) {
                Integer sales = Integer.parseInt(recordArr[0]);
                context.write(key, new IntWritable(sales));
            }
         }
    }
    
    // Reduce function
    public static class TotalSalesReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
        
        public void reduce(Text key, Iterable<IntWritable> values, Context context) 
                throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }      
            context.write(key, new IntWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception {
        int exitFlag = ToolRunner.run(new Sales(), args);
        System.exit(exitFlag);
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Sales");
        job.setJarByClass(getClass());
        
        // MapReduce chaining
        Configuration mapConf1 = new Configuration(false);
        ChainMapper.addMapper(job, CollectionMapper.class, LongWritable.class, Text.class,
                   Text.class, Text.class,  mapConf1);
        
        Configuration mapConf2 = new Configuration(false);
        ChainMapper.addMapper(job, FilterMapper.class, Text.class, Text.class,
                   Text.class, IntWritable.class, mapConf2);
        
        Configuration reduceConf = new Configuration(false);        
        ChainReducer.setReducer(job, TotalSalesReducer.class, Text.class, IntWritable.class,
                 Text.class, IntWritable.class, reduceConf);

        ChainReducer.addMapper(job, InverseMapper.class, Text.class, IntWritable.class,
                 IntWritable.class, Text.class, null);
         
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        return job.waitForCompletion(true) ? 0 : 1;
    }

}

That's all for this topic Chaining MapReduce Job in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!


Related Topics

  1. Converting Text File to Parquet File Using Hadoop MapReduce
  2. How to Write a Map Only Job in Hadoop MapReduce
  3. Data Locality in Hadoop
  4. How to Compress Intermediate Map Output in Hadoop
  5. Java Program to Write File in HDFS

You may also like -

>>>Go to Hadoop Framework Page

Friday, 6 July 2018

Converting Text File to Parquet File Using Hadoop MapReduce

This post shows how to convert existing data to Parquet file format using MapReduce in Hadoop. In the example given here Text file is converted to Parquet file.

You will need to put following jars in class path in order to read and write Parquet files in Hadoop.

  • parquet-hadoop-bundle-1.10.0.jar
  • parquet-avro-1.10.0.jar
  • jackson-mapper-asl-1.9.13.jar
  • jackson-core-asl-1.9.13.jar
  • avro-1.8.2.jar

Using Avro to define schema

Rather than creating Parquet schema directly Avro framework is used to create schema as it is more convenient. Then you can use Avro API classes to write and read files respectively. The mapping between Avro and Parquet schema and mapping between Avro record to Parquet record will be taken care of by these classes itself.

MapReduce code to convert file to Parquet

In the code Avro schema is define inline. Program uses Avro genric API to create generic record. Also it’s a Mapper only job as just conversion is required, records are not aggregated.

import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.parquet.avro.AvroParquetOutputFormat;
import org.apache.parquet.example.data.Group;

public class ParquetConvert extends Configured implements Tool{
    
    /// Schema
    private static final Schema MAPPING_SCHEMA = new Schema.Parser().parse(
            "{\n" +
            "    \"type\":    \"record\",\n" +                
            "    \"name\":    \"TextFile\",\n" +
            "    \"doc\":    \"Text File\",\n" +
            "    \"fields\":\n" + 
            "    [\n" +  
            "            {\"name\":    \"line\", \"type\":    \"string\"}\n"+
            "    ]\n"+
            "}\n");
    
    // Map function
    public static class ParquetConvertMapper extends Mapper<LongWritable, Text, Void, GenericRecord> {
        
        private GenericRecord record = new GenericData.Record(MAPPING_SCHEMA);
         public void map(LongWritable key, Text value, Context context) 
                 throws IOException, InterruptedException {
             record.put("line", value.toString());
             context.write(null, record); 
         }        
    }
    
    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(getConf(), "ParquetConvert");
        job.setJarByClass(getClass());
        job.setMapperClass(ParquetConvertMapper.class);    
        job.setNumReduceTasks(0);
        job.setOutputKeyClass(Void.class);
        job.setOutputValueClass(Group.class);
        job.setOutputFormatClass(AvroParquetOutputFormat.class);
        // setting schema
        AvroParquetOutputFormat.setSchema(job, MAPPING_SCHEMA);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        return job.waitForCompletion(true) ? 0 : 1;
    }
    

    public static void main(String[] args) throws Exception{
        int exitFlag = ToolRunner.run(new ParquetConvert(), args);
        System.exit(exitFlag);
    }

}   
On runnning the MapReduce code using the following command
hadoop jar /PATH_TO_JAR org.netjs.ParquetConvert /test/input /test/output
You can see that the Parquet file is written at the output location.
hdfs dfs -ls /test/output

Found 4 items
-rw-r--r--   1 netjs supergroup          0 2018-07-06 09:54 /test/output/_SUCCESS
-rw-r--r--   1 netjs supergroup        276 2018-07-06 09:54 /test/output/_common_metadata
-rw-r--r--   1 netjs supergroup        429 2018-07-06 09:54 /test/output/_metadata
-rw-r--r--   1 netjs supergroup        646 2018-07-06 09:54 /test/output/part-m-00000.parquet

Reading Parquet file using MapReduce 

The following MapReduce program takes Parquet file as input and output a text file. In the Parquet file the records are in following format, so you need to write appropriate logic to extract the relevant part.

{"line": "Hello wordcount MapReduce Hadoop program."}
{"line": "This is my first MapReduce program."}
{"line": "This file will be converted to Parquet using MR."}
import java.io.IOException;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.example.ExampleInputFormat;

public class ParquetRead extends Configured implements Tool{
    // Map function
    public static class ParquetMapper extends Mapper<NullWritable, Group, NullWritable, Text> {
         public void map(NullWritable key, Group value, Context context) 
                 throws IOException, InterruptedException {
             NullWritable outKey = NullWritable.get();
             String line = value.toString();
             
             String[] fields = line.split(": ");
             context.write(outKey, new Text(fields[1]));
                     
         }        
    }
    
    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(getConf(), "ParquetRead");
        job.setJarByClass(getClass());
        job.setMapperClass(ParquetMapper.class);    
        job.setNumReduceTasks(0);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);
        
        job.setInputFormatClass(ExampleInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception{
        int exitFlag = ToolRunner.run(new ParquetRead(), args);
        System.exit(exitFlag);
    }

}
If you want to read back the data you got using the writing to Parquet MapReduce program you can use the following command.
hadoop jar /PATH_TO_JAR org.netjs.ParquetRead /test/output/part-m-00000.parquet /test/out

That's all for this topic How to Read And Write Parquet File in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!


Related Topics

  1. How to Read And Write Parquet File in Hadoop
  2. How to Configure And Use LZO Compression in Hadoop
  3. MapReduce Flow in YARN
  4. Input Splits in Hadoop

You may also like -

>>>Go to Hadoop Framework Page

How to Read And Write Parquet File in Hadoop

This post shows how to use Hadoop Java API to read and write Parquet file.

You will need to put following jars in class path in order to read and write Parquet files in Hadoop.

  • parquet-hadoop-bundle-1.10.0.jar
  • parquet-avro-1.10.0.jar
  • jackson-mapper-asl-1.9.13.jar
  • jackson-core-asl-1.9.13.jar
  • avro-1.8.2.jar

Using Avro to define schema

Rather than creating Parquet schema and using ParquetWriter and ParquetReader to write and read file respectively it is more convenient to use a framework like Avro to create schema. Then you can use AvroParquetWriter and AvroParquetReader to write and read Parquet files. The mapping between Avro and Parquet schema and mapping between Avro record to Parquet record will be taken care of by these classes itself.

Writing Parquet file – Java program

First thing you’ll need is the schema, since Avro is used so you will have to define Avro schema.

EmpSchema.avsc

{
 "type": "record",
 "name": "empRecords",
 "doc": "Employee Records",
 "fields": 
  [{
   "name": "id", 
   "type": "int"
   
  }, 
  {
   "name": "Name",
   "type": "string"
  },
  {
   "name": "Dept",
   "type": "string"
  }
 ]
}

Java program

The task needed in the program are as follows-

  1. First thing is to parse the schema.
  2. Then create a generic record using Avro genric API.
  3. Once you have the record write it to file using AvroParquetWriter.
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

public class ParquetFileWrite {

    public static void main(String[] args) {
        // First thing - parse the schema as it will be used
        Schema schema = parseSchema();
        List<GenericData.Record> recordList = getRecords(schema);
        writeToParquet(recordList, schema);
    }
    
    private static Schema parseSchema() {
        Schema.Parser parser = new    Schema.Parser();
        Schema schema = null;
        try {
            // pass path to schema
            schema = parser.parse(ClassLoader.getSystemResourceAsStream(
                "resources/EmpSchema.avsc"));
            
        } catch (IOException e) {
            e.printStackTrace();            
        }
        return schema;
        
    }
    
    private static List<GenericData.Record> getRecords(Schema schema){
        List<GenericData.Record> recordList = new ArrayList<GenericData.Record>();
        GenericData.Record record = new GenericData.Record(schema);
        // Adding 2 records
        record.put("id", 1);
        record.put("Name", "emp1");
        record.put("Dept", "D1");
        recordList.add(record);
        
        record = new GenericData.Record(schema);
        record.put("id", 2);
        record.put("Name", "emp2");
        record.put("Dept", "D2");
        recordList.add(record);
        
        return recordList;
    }
    
    
    private static void writeToParquet(List<GenericData.Record> recordList, Schema schema) {
        // Path to Parquet file in HDFS
        Path path = new Path("/test/EmpRecord.parquet");
        ParquetWriter<GenericData.Record> writer = null;
        // Creating ParquetWriter using builder
        try {
            writer = AvroParquetWriter.
                <GenericData.Record>builder(path)
                .withRowGroupSize(ParquetWriter.DEFAULT_BLOCK_SIZE)
                .withPageSize(ParquetWriter.DEFAULT_PAGE_SIZE)
                .withSchema(schema)
                .withConf(new Configuration())
                .withCompressionCodec(CompressionCodecName.SNAPPY)
                .withValidation(false)
                .withDictionaryEncoding(false)
                .build();
            
            for (GenericData.Record record : recordList) {
                writer.write(record);
            }
            
        }catch(IOException e) {
            e.printStackTrace();
        }finally {
            if(writer != null) {
                try {
                    writer.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
    

To run this Java program in Hadoop environment export the class path where your .class file for the Java program resides.

$ export HADOOP_CLASSPATH=/home/netjs/eclipse-workspace/bin 

Then you can run the Java program using the following command.

$ hadoop org.netjs.ParquetFileWrite 
18/07/05 19:56:41 INFO compress.CodecPool: Got brand-new compressor [.snappy] 18/07/05 19:56:41 INFO hadoop.InternalParquetRecordWriter:Flushing mem columnStore to file. allocated memory: 3072

Reading Parquet file – Java program

To read the parquet file created above you can use the following program.

import java.io.IOException;

import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;

public class ParquetFileRead {

    public static void main(String[] args) {
        readParquetFile();
    }
        
    private static void readParquetFile() {
        ParquetReader<GenericData.Record> reader = null;
        Path path =    new    Path("/test/EmpRecord.parquet");
        try {
            reader = AvroParquetReader
                    .<GenericData.Record>builder(path)
                    .withConf(new Configuration())
                    .build();
            GenericData.Record record;
            while ((record = reader.read()) != null) {
                System.out.println(record);
            }
        }catch(IOException e) {
            e.printStackTrace();
        }finally {
            if(reader != null) {
                try {
                    reader.close();
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    }
}    

Using parquet-tools jar

You can also download parquet-tools jar and use it to see the content of a Parquet file, file metadata of the Parquet file, Parquet schema etc. As example to see the content of a Parquet file-

$ hadoop jar /parquet-tools-1.10.0.jar cat /test/EmpRecord.parquet 

That's all for this topic How to Read And Write Parquet File in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!


Related Topics

  1. How to Read And Write SequenceFile in Hadoop
  2. Java Program to Read File in HDFS
  3. File Read in HDFS - Hadoop Framework Internal Steps

You may also like -

>>>Go to Hadoop Framework Page