step1. implement mapper and reducer
step2. chmod a+x <mapper/reducer>
step3. execute like below:
> bin/hadoop jar contrib/streaming/hadoop-*-streaming.jar -file /home/hadoop/mapper.py -mapper /home/hadoop/mapper.py -file /home/hadoop/reducer.py -reducer /home/hadoop/reducer.py -input inputdir/* -output outputdir
mapper.py
#!/usr/bin/env python
import sys
import itertools
def read_input(file, separator=','):
for line in file:
yield line.strip().split(separator)
def main():
rows = read_input(sys.stdin)
for row in rows:
if row[0] != None:
mapper_output = get_convert_func(row[0])
mapper_output(row)
def get_convert_func(fid):
""" return function for convert each data """
if fid == '001':
return convert001
elif fid == '002':
return convert002
else:
return None
def convert001(row, separator='\t'):
if len(row) != 6:
return None
fid = row[0]
k1 = row[1]
k2 = row[2]
k3 = row[3]
k4 = row[4]
v1 = row[5]
K = get_key_or_value_string(k1, k2, k3, k4)
V = get_key_or_value_string(fid, v1)
print "%s%s%s" % (K, separator, V)
def convert002(row, separator='\t'):
if len(row) != 7:
return None
fid = row[0]
k1 = row[1]
k2 = row[2]
k3 = row[3]
k4 = row[4]
v1 = row[5]
v2 = row[6]
K = get_key_or_value_string(k1, k2, k3, k4)
V = get_key_or_value_string(fid, v1, v2)
print "%s%s%s" % (K, separator, V)
def get_key_or_value_string(*args):
return (',').join(args)
if __name__ == "__main__":
main()
reducer.py
#!/usr/bin/env python
import sys
from itertools import groupby
from operator import itemgetter
def read_mapper_output(file, separator='\t'):
for row in file:
yield row.strip().split(separator, 1)
def main(separator='\t'):
data = read_mapper_output(sys.stdin, separator=separator)
for K, group in groupby(data, itemgetter(0)):
L = ['-1', '-1', '-1', '-1', '-1', '-1', '-1']
L[0], L[1], L[2], L[3] = get_columns_from_key(K, 4, ',')
for K, V in group:
columns = V.split(',')
if columns[0] == '001':
L[4] = columns[1]
L[5] = columns[2]
elif columns[0] == '002':
L[6] = columns[1]
else:
pass
if is_healthy_output(L):
print get_string_from_list(L)
def is_healthy_output(row):
""" return True if output row does not have '-1' column """
if [c for c in row if c == '-1'] == []:
return True
else:
return False
def get_string_from_list(L):
return (',').join(L)
def get_columns_from_key(key, length, separator=','):
L = key.split(',')
if len(L) != length:
return None
else:
return L
if __name__ == "__main__":
main()
I am reading your post from the beginning, it was so interesting to read & I feel thanks to you for posting such a good blog, keep updates regularly.
ReplyDeleteRegards,
Python Courses in Chennai|Python Classes in Chennai|Python training courses
Thanks for sharing this niche useful informative post to our knowledge, Actually SAP is ERP software that can be used in many companies for their day to day business activities it has great scope in future.
ReplyDeleteRegards,
SAP training in chennai|SAP course in chennai|SAP Training Chennai|sap training in Chennai