step1. implement mapper and reducer
step2. chmod a+x <mapper/reducer>
step3. execute like below:
> bin/hadoop jar contrib/streaming/hadoop-*-streaming.jar -file /home/hadoop/mapper.py -mapper /home/hadoop/mapper.py -file /home/hadoop/reducer.py -reducer /home/hadoop/reducer.py -input inputdir/* -output outputdir
mapper.py
#!/usr/bin/env python import sys import itertools def read_input(file, separator=','): for line in file: yield line.strip().split(separator) def main(): rows = read_input(sys.stdin) for row in rows: if row[0] != None: mapper_output = get_convert_func(row[0]) mapper_output(row) def get_convert_func(fid): """ return function for convert each data """ if fid == '001': return convert001 elif fid == '002': return convert002 else: return None def convert001(row, separator='\t'): if len(row) != 6: return None fid = row[0] k1 = row[1] k2 = row[2] k3 = row[3] k4 = row[4] v1 = row[5] K = get_key_or_value_string(k1, k2, k3, k4) V = get_key_or_value_string(fid, v1) print "%s%s%s" % (K, separator, V) def convert002(row, separator='\t'): if len(row) != 7: return None fid = row[0] k1 = row[1] k2 = row[2] k3 = row[3] k4 = row[4] v1 = row[5] v2 = row[6] K = get_key_or_value_string(k1, k2, k3, k4) V = get_key_or_value_string(fid, v1, v2) print "%s%s%s" % (K, separator, V) def get_key_or_value_string(*args): return (',').join(args) if __name__ == "__main__": main()
reducer.py
#!/usr/bin/env python import sys from itertools import groupby from operator import itemgetter def read_mapper_output(file, separator='\t'): for row in file: yield row.strip().split(separator, 1) def main(separator='\t'): data = read_mapper_output(sys.stdin, separator=separator) for K, group in groupby(data, itemgetter(0)): L = ['-1', '-1', '-1', '-1', '-1', '-1', '-1'] L[0], L[1], L[2], L[3] = get_columns_from_key(K, 4, ',') for K, V in group: columns = V.split(',') if columns[0] == '001': L[4] = columns[1] L[5] = columns[2] elif columns[0] == '002': L[6] = columns[1] else: pass if is_healthy_output(L): print get_string_from_list(L) def is_healthy_output(row): """ return True if output row does not have '-1' column """ if [c for c in row if c == '-1'] == []: return True else: return False def get_string_from_list(L): return (',').join(L) def get_columns_from_key(key, length, separator=','): L = key.split(',') if len(L) != length: return None else: return L if __name__ == "__main__": main()
I am reading your post from the beginning, it was so interesting to read & I feel thanks to you for posting such a good blog, keep updates regularly.
ReplyDeleteRegards,
Python Courses in Chennai|Python Classes in Chennai|Python training courses
Thanks for sharing this niche useful informative post to our knowledge, Actually SAP is ERP software that can be used in many companies for their day to day business activities it has great scope in future.
ReplyDeleteRegards,
SAP training in chennai|SAP course in chennai|SAP Training Chennai|sap training in Chennai