2011/06/17

hadoop-streaming: inner join

hadoop-streaming HOWTO
step1. implement mapper and reducer
step2. chmod a+x <mapper/reducer>
step3. execute like below:
> bin/hadoop jar contrib/streaming/hadoop-*-streaming.jar -file /home/hadoop/mapper.py -mapper /home/hadoop/mapper.py -file /home/hadoop/reducer.py -reducer /home/hadoop/reducer.py -input inputdir/* -output outputdir

mapper.py
#!/usr/bin/env python                                                          

import sys
import itertools

def read_input(file, separator=','):
    for line in file:
        yield line.strip().split(separator)

def main():
    rows = read_input(sys.stdin)

    for row in rows:
        if row[0] != None:
            mapper_output = get_convert_func(row[0])
            mapper_output(row)

def get_convert_func(fid):
    """ return function for convert each data """

    if fid == '001':
        return convert001
    elif fid == '002':
        return convert002
    else:
        return None

def convert001(row, separator='\t'):
    if len(row) != 6:
        return None

    fid = row[0]
    k1 = row[1]
    k2 = row[2]
    k3 = row[3]
    k4 = row[4]
    v1 = row[5]

    K = get_key_or_value_string(k1, k2, k3, k4)
    V = get_key_or_value_string(fid, v1)
    print "%s%s%s" % (K, separator, V)

def convert002(row, separator='\t'):
    if len(row) != 7:
        return None

    fid = row[0]
    k1 = row[1]
    k2 = row[2]
    k3 = row[3]
    k4 = row[4]
    v1 = row[5]
    v2 = row[6]

    K = get_key_or_value_string(k1, k2, k3, k4)
    V = get_key_or_value_string(fid, v1, v2)
    print "%s%s%s" % (K, separator, V)


def get_key_or_value_string(*args):
    return (',').join(args)


if __name__ == "__main__":
    main()

reducer.py
#!/usr/bin/env python

import sys
from itertools import groupby
from operator import itemgetter

def read_mapper_output(file, separator='\t'):
    for row in file:
        yield row.strip().split(separator, 1)

def main(separator='\t'):
    data = read_mapper_output(sys.stdin, separator=separator)

    for K, group in groupby(data, itemgetter(0)):
        L = ['-1', '-1', '-1', '-1', '-1', '-1', '-1']
        L[0], L[1], L[2], L[3] = get_columns_from_key(K, 4, ',')

        for K, V in group:
            columns = V.split(',')
            if columns[0] == '001':
                L[4] = columns[1] 
                L[5] = columns[2]
            elif columns[0] == '002':
                L[6] = columns[1]
            else:
                pass

        if is_healthy_output(L):
            print get_string_from_list(L)

def is_healthy_output(row):
    """ return True if output row does not have '-1' column """
    if [c for c in row if c == '-1'] == []:
        return True
    else:
        return False

def get_string_from_list(L):
    return (',').join(L)

def get_columns_from_key(key, length, separator=','):
    L = key.split(',')
    if len(L) != length:
        return None
    else:
        return L

if __name__ == "__main__":
    main()

2 comments:

  1. I am reading your post from the beginning, it was so interesting to read & I feel thanks to you for posting such a good blog, keep updates regularly.
    Regards,
    Python Courses in Chennai|Python Classes in Chennai|Python training courses

    ReplyDelete
  2. Thanks for sharing this niche useful informative post to our knowledge, Actually SAP is ERP software that can be used in many companies for their day to day business activities it has great scope in future.
    Regards,
    SAP training in chennai|SAP course in chennai|SAP Training Chennai|sap training in Chennai

    ReplyDelete

100