FindShine +

Thinking in MapReduce


MapReduce Overview

不要以为你懂了WordCount了就学会了MapReduce,更不用将如此General的框架想成只能做count操作的SB模型。做完的Programming Assignment3,才发现以前对MapReduce的理解有两个误区:



input: (record)
output: (intermediate key, value)
same_key -> Group
input:Group of (intermediate key, value)
output: (key, bag of values)


  1. 统计个数(objects count):Map阶段,将输入数据分成一个个需要统计的object,Reduce阶段为每一个key做求和输出。具体的应用有对输入文本进行单词统计、社交网络朋友数统计等。【附例】

  2. SQL Join查询:Map的输入为(table_name,raw_info),将join的列名作为key,raw_info作为value生成键值对,在Reduce阶段对不同表,相同key的raw_info连接并输出。【附例】

  3. 矩阵乘法(PageRank!):计算稀疏矩阵乘法A x B=C,输入为[matrix, i, j, value]表示矩阵matrix的i行j列的值为Value。对于A中的每个元素,用Mapper把它送至C中相同行的每一列记录中;对于B中的每个元素,用Mapper把它送至C中相同列的每一行记录中。然后在Reduce阶段作sigma(a(i,k)*b(k,j))的求和。【附例】



最后,推荐一个在线学习实践和调试MapReduce的网站 感谢Bill Howe和他的团队设计如此精彩的实践作业,有兴趣的同学赶紧加入进来吧:。



import json

class MapReduce:
	def __init__(self):
		self.intermediate = {}
		self.result = []

	def emit_intermediate(self, key, value):
		self.intermediate.setdefault(key, [])

	def emit(self, value):

	def execute(self, data, mapper, reducer):
		for line in data:
			record = json.loads(line)

		for key in self.intermediate:
			reducer(key, self.intermediate[key])

		#jenc = json.JSONEncoder(encoding='latin-1')
		jenc = json.JSONEncoder()
		for item in self.result:
			print jenc.encode(item)


wordcount 示例:

Create an Inverted index. Given a set of documents, an inverted index is a dictionary where each word is associated with a list of the document identifiers in which that word appears.

Mapper Input The input is a 2 element list: [document_id, text]

document_id: document identifier formatted as a string text: text of the document formatted as a string

Reducer Output The output should be a (word, document ID list) tuple where word is a String and document ID list is a list of Strings.

# Part 1
mr = MapReduce.MapReduce()

# Part 2
def mapper(record):
	# key: document identifier
	# value: document contents
	key = record[0]
	value = record[1]
	words = value.split()
	for w in words:
	  mr.emit_intermediate(w, 1)

# Part 3
def reducer(key, list_of_values):
	# key: word
	# value: list of occurrence counts
	total = 0
	for v in list_of_values:
	  total += v
	mr.emit((key, total))

# Part 4
inputdata = open(sys.argv[1])
mr.execute(inputdata, mapper, reducer)

Implement a relational join as a MapReduce query



FROM Orders, LineItem

WHERE Order.order_id = LineItem.order_id

import MapReduce
import sys

SQL Join in the Simple Python MapReduce Framework

mr = MapReduce.MapReduce()

# =============================
# Do not modify above this line

def mapper(record):
	# key: order_id
	# value: row_record
	key = record[1]
	value = [record[0],record]
	mr.emit_intermediate(key, value)

def reducer(key, list_of_values):
	# key: order_id
	# value: raw_info
	for v in list_of_values:
		for u in list_of_values:
			if v[0]>u[0]:
				value = v[1]+u[1]

# Do not modify below this line
# =============================
if __name__ == '__main__':
  inputdata = open(sys.argv[1])
  mr.execute(inputdata, mapper, reducer)

Assume you have two matrices A and B in a sparse matrix format, where each record is of the form i, j, value. Design a MapReduce algorithm to compute matrix multiplication: A x B

Map Input The input to the map function will be matrix row records formatted as lists. Each list will have the format [matrix, i, j, value] where matrix is a string and i, j, and value are integers.

The first item, matrix, is a string that identifies which matrix the record originates from. This field has two possible values:

‘a’ indicates that the record is from matrix A

‘b’ indicates that the record is from matrix B

Reduce Output The output from the reduce function will also be matrix row records formatted as tuples. Each tuple will have the format (i, j, value) where each element is an integer.

import MapReduce
import sys

Matrix Multiplication in the Simple Python MapReduce Framework

mr = MapReduce.MapReduce()

# =============================
# Do not modify above this line

def mapper(record):
	col_max = 5 # max column number
	raw_max = 5 # max raw number
	matrix_id = record[0]
	row = record[1]
	col = record[2]
	value = record[3]
	if matrix_id=='a':
		for k in range(col_max):
	if matrix_id=='b':
		for i in range(raw_max):

def reducer(key, list_of_values):
	M = 5
	i = key[0]
	k = key[1]
	sum = 0
	for term in list_of_values:
		if term[0][0]=='a':
			row = term[0][1]
			col = term[0][2]
			for ob in list_of_values:
				if ob[0][0]=='b' and ob[0][1]==col:
					sum += int(term[1])*int(ob[1])


# Do not modify below this line
# =============================
if __name__ == '__main__':
  inputdata = open(sys.argv[1])
  mr.execute(inputdata, mapper, reducer)


