step1. implement mapper and reducer
step2. chmod a+x <mapper/reducer>
step3. execute like below:
> bin/hadoop jar contrib/streaming/hadoop-*-streaming.jar -file /home/hadoop/mapper.py -mapper /home/hadoop/mapper.py -file /home/hadoop/reducer.py -reducer /home/hadoop/reducer.py -input inputdir/* -output outputdir
mapper.py
#!/usr/bin/env python                                                          
import sys
import itertools
def read_input(file, separator=','):
    for line in file:
        yield line.strip().split(separator)
def main():
    rows = read_input(sys.stdin)
    for row in rows:
        if row[0] != None:
            mapper_output = get_convert_func(row[0])
            mapper_output(row)
def get_convert_func(fid):
    """ return function for convert each data """
    if fid == '001':
        return convert001
    elif fid == '002':
        return convert002
    else:
        return None
def convert001(row, separator='\t'):
    if len(row) != 6:
        return None
    fid = row[0]
    k1 = row[1]
    k2 = row[2]
    k3 = row[3]
    k4 = row[4]
    v1 = row[5]
    K = get_key_or_value_string(k1, k2, k3, k4)
    V = get_key_or_value_string(fid, v1)
    print "%s%s%s" % (K, separator, V)
def convert002(row, separator='\t'):
    if len(row) != 7:
        return None
    fid = row[0]
    k1 = row[1]
    k2 = row[2]
    k3 = row[3]
    k4 = row[4]
    v1 = row[5]
    v2 = row[6]
    K = get_key_or_value_string(k1, k2, k3, k4)
    V = get_key_or_value_string(fid, v1, v2)
    print "%s%s%s" % (K, separator, V)
def get_key_or_value_string(*args):
    return (',').join(args)
if __name__ == "__main__":
    main()
reducer.py
#!/usr/bin/env python
import sys
from itertools import groupby
from operator import itemgetter
def read_mapper_output(file, separator='\t'):
    for row in file:
        yield row.strip().split(separator, 1)
def main(separator='\t'):
    data = read_mapper_output(sys.stdin, separator=separator)
    for K, group in groupby(data, itemgetter(0)):
        L = ['-1', '-1', '-1', '-1', '-1', '-1', '-1']
        L[0], L[1], L[2], L[3] = get_columns_from_key(K, 4, ',')
        for K, V in group:
            columns = V.split(',')
            if columns[0] == '001':
                L[4] = columns[1] 
                L[5] = columns[2]
            elif columns[0] == '002':
                L[6] = columns[1]
            else:
                pass
        if is_healthy_output(L):
            print get_string_from_list(L)
def is_healthy_output(row):
    """ return True if output row does not have '-1' column """
    if [c for c in row if c == '-1'] == []:
        return True
    else:
        return False
def get_string_from_list(L):
    return (',').join(L)
def get_columns_from_key(key, length, separator=','):
    L = key.split(',')
    if len(L) != length:
        return None
    else:
        return L
if __name__ == "__main__":
    main()
I am reading your post from the beginning, it was so interesting to read & I feel thanks to you for posting such a good blog, keep updates regularly.
ReplyDeleteRegards,
Python Courses in Chennai|Python Classes in Chennai|Python training courses
Thanks for sharing this niche useful informative post to our knowledge, Actually SAP is ERP software that can be used in many companies for their day to day business activities it has great scope in future.
ReplyDeleteRegards,
SAP training in chennai|SAP course in chennai|SAP Training Chennai|sap training in Chennai