Sunday, March 31, 2013

Hadoop : Output of Combiner can be processed again by the Combiner

A combiner can be used in a map/reduce program to aggregate values locally at the mapper, before a call is made to the reducer. When a combiner is available, the output of the map() function is fed to the combine() function first. And the general understanding is that the output of the combine() function is sent over to the reduce() function on a reducer machine.

Except, it is not strictly correct. The output of combine() can be fed back into the combine() function for repeated aggregation. In general, this does not cause a problem, but one can write incorrect code, if one was not aware of this detail. An example would be a simple counting program.

This program counts the terms present in the given documents. In the reducer, the sum is incremented by 1 for each element in [c1, c2, ...] because, we know that all counts in this array are "1"s. That is because, the mapper always emits "1"s.

class Mapper
   method Map(docid id, doc d)
      for all term t in doc d do
         Emit(term t, count 1)

class Reducer
   method Reduce(term t, counts [c1, c2,...])
      sum = 0
      for all count c in [c1, c2,...] do
          sum = sum + 1
      Emit(term t, count sum)


Now let's add a combiner to aggregate the terms locally.

class Combiner
   method Combine(term t, [c1, c2,...])
      sum = 0
      for all count c in [c1, c2,...] do
          sum = sum + 1
      Emit(term t, count sum)


It is identical to the reducer. The sum is incremented by 1 for each "1" element in the array. But now, we see that we must change the reducer to not sum "1"s, as the combiner would be doing an aggregation first. So we change the reducer to this:

class Reducer
   method Reduce(term t, counts [c1, c2,...])
      sum = 0
      for all count c in [c1, c2,...] do
          sum = sum + c
      Emit(term t, count sum)

The thinking is that the array input into the combiner could never have values other than "1"s. But this is an incorrect assumption because, as we said, the output of a combiner, that would have produced aggregated values, other than "1"s, can be again input to the combine() function. Thus the correct combine() is as follows:

class Combiner
   method Combine(term t, [c1, c2,...])
      sum = 0
      for all count c in [c1, c2,...] do
          sum = sum + c
      Emit(term t, count sum)


In fact, this is identical to the reduce() function.

We can peek under the hood in Hadoop source code to see where this two step combining happens. Inside java.org.apache.hadoop.mapred.MapTask, a flush() method has two function calls of relevance.
sortAndSpill()
mergeParts()

sortAndSpill() is actually called early on by a spill thread as well. The flush() makes sure that any remaining data is properly spilled. flush() then interrupts the spill thread, waits for it to end, and then calls mergeParts().


sortAndSpill() is the section of code that runs as the mapper is writing the intermediate values into spill files. 


Inside sortAndSpill() :

            if (spstart != spindex) {
                combineCollector.setWriter(writer);
                RawKeyValueIterator kvIter =
                  new MRResultIterator(spstart, spindex);
                combinerRunner.combine(kvIter, combineCollector);
              }


This is where the combine() function is called first. But when data volume is high, Hadoop is unable to wait for all output from the mappers, before spilling the data to disk. When a threshold is reached, the buffers are spilled into a spill file. So it is quite possible, that one key gets spilled into two spill files. And if this happens, Hadoop can do yet another aggregation on running the data in spill files through the combiner. And that is partly what the mergeParts() function does:


         if (combinerRunner == null || numSpills < minSpillsForCombine) {
            Merger.writeFile(kvIter, writer, reporter, job);
          } else {
            combineCollector.setWriter(writer);
            combinerRunner.combine(kvIter, combineCollector);
          }


This is how combine() can be called twice, or more times, for the same key. So even if the mapper always emits a "1", the combiner could get values much larger than "1". It is always good practice to not assume the values coming into the combiner based on what is output from map().


14 comments:

Hadoop online training said...

Hi,
excellent information real time online training on
hadoop online training
industry based projects

Unknown said...

This was the concept i was struggling with for a long time. Thanks for sharing this here.

Hadoop course in adyar
Hadoop training in Tambaram
Hadoop course in Tambaram

Unknown said...

Great information shared on this website. Thanks a lot and keep up the good work and research on hadoop. Regards, hadoop online training

Unknown said...

Learning new technology would give oneself a true confidence in the current emerging Information Technology domain. With the knowledge of big data the most magnificent cloud computing technology one can go the peek of data processing. As there is a drastic improvement in this field everyone are showing much interest in pursuing this technology. Your content tells the same about evolving technology. Thanks for sharing this.

Hadoop Training in Chennai | Big Data Training Chennai | Big Data Training | Big Data Course in Chennai

Arjun kumar said...

I have finally found a Worth able content to read. The way you have presented information here is quite impressive. I have bookmarked this page for future use. Thanks for sharing content like this once again. Keep sharing content like this.

Software testing training in chennai | Software testing training institutes in chennai | Manual testing training in Chennai

Unknown said...

SAS stands for statistical analysis system which is a analysis tool developed by SAS institute and with the help of this tool data driven decisions can be taken which is helpful for the bsuiness.
SAS training in Chennai | SAS course in Chennai | SAS training institute in Chennai

Unknown said...

Informative post,It is useful for me to clear my doubts.I hope others also like the information you gave in your blog.
german classes in bangalore
german language course in bangalore
German Training in Perambur
German Training in Ashok Nagar

Unknown said...

Your blog is so inspiring for the young generations.thanks for sharing your information with us and please update more new ideas.
vmware training institutes in bangalore
best vmware training in bangalore
vmware Training in Nolambur
vmware Training in Saidapet

Anonymous said...

Thank you for sharing a huge message.

Big Data Hadoop Training In Chennai | Big Data Hadoop Training In anna nagar | Big Data Hadoop Training In omr | Big Data Hadoop Training In porur | Big Data Hadoop Training In tambaram | Big Data Hadoop Training In velachery

Silent Girl said...

Its a cozy delight reading your claim.Its full of aspire i am looking for and i lionize to claim a comment that "The content material of your proclaim is incredible" extremely good accomplish..... Memorial Day 2022 Quotes

Sunny said...

Thanks for giving us good content, very nice to see your blog. Artificial Intelligence Training in Hyderabad

digitalbadi said...

digital marketing course in hyderabad
digital marketing course in telugu

wordpress training in hyderabad
video editing course in hyderabad

seo training in hyderabad

mulesoft training said...

thanks for valuable information
nice ariticle

dellbhoomi training

excel logic site said...

Nice article
vba macros course
advanced excel course
power bi course in hyderabad
microsoft office essentials course
advanced excel course in hyderabad