These are simple algorithms written following the Map/Reduce model in Python. To successfully run them, you must download this archive containing:
- MapReduce.py and MapReduce.pyc: a simple implementation of the Map/Reduce programming model in Python
- books.json, dna.json, friends.json, matrix.json and records.json: simple datasets
The first task involved the books dataset presenting an input as a 2 element list: [document_id, text] where document_id is the document identifier formatted as a string and text is the text of the document formatted as a string. The output should be a (word, document ID list) tuple where word is a String and document ID list is a list of Strings.
import MapReduce
import sys
mr = MapReduce.MapReduce()
def mapper(record):
# key: document identifier
# value: document contents
key = record[0]
value = record[1]
words = value.split()
for w in words:
#create list word - docid
mr.emit_intermediate(w, key)
def reducer(key, list_of_values):
# key: word
# value: docids
total = []
#append docids to word index if not already there
for v in list_of_values:
if v not in total:
total.append(v)
mr.emit((key, total))
if __name__ == '__main__':
inputdata = open(sys.argv[1])
mr.execute(inputdata, mapper, reducer)
You can test the solution by running:
python inverted_index.py books.json
The second task required us to replicate the following query:
SELECT *
FROM Orders, LineItem
WHERE Order.order_id = LineItem.order_id;
Run it as:
SELECT *
FROM Orders, LineItem
WHERE Order.order_id = LineItem.order_id;
in Map/Reduce using the records dataset. The input are database records formatted as lists of Strings, every list element corresponds to a different field in its corresponding record.
The
first item(index 0) in each record is a string that identifies which
table the record originates from. This field has two possible values: line_item which indicates that the record is a line item and order which indicates that the record is an order.
The second element(index 1) in each record is the order_id.
Lastly, LineItem records have 17 elements including the identifier string and Order records have 10 elements including the identifier string.
The output should be a joined record.The
result should be a single list of length 27 that contains the fields
from the order record followed by the fields from the line item record.
Each list element should be a string.
import MapReduce
import sys
mr = MapReduce.MapReduce()
# =============================
# Do not modify above this line
def mapper(record):
# key: table name. table[1]=order id, order table 10 columns, line table 17 columns
# value: full records for that orderid
key = record[0]
orderid=record[1]
mr.emit_intermediate(orderid, record)
def reducer(key, list_of_values):
#key: orderid
#list_of_values: full record from both tables
results={}
values=[]
out_counter=0
list_length=len(list_of_values)
#for every item in our list, the first record is always from order table, while the others are from line and must be joined with it
for i in range(1,list_length):
values+=list_of_values[0]#order
values+=list_of_values[i]#i-th join
results[out_counter]=values#temporarily store the full joined row
values=[]
mr.emit(results[out_counter])#print it and move to the next one
out_counter+=1
# Do not modify below this line
# =============================
if __name__ == '__main__':
inputdata = open(sys.argv[1])
mr.execute(inputdata, mapper, reducer)
Test it with:
python join.py records.json
The third task asked us to consider a simple social network dataset consisting of key-value pairs where each key is a person and each value is a friend of that person then describe a MapReduce algorithm to count he number of friends each person has.
The input is a 2 element list: [personA, personB] where personA is the name of a person formatted as a string and personB is the name of one of personA’s friends formatted as a string.
This implies that personB is a friend of personA, but it does not imply that personA is a friend of personB. The output should be a (person, friend count) tuple where person is a string and friend count is an integer describing the number of friends person has.
This implies that personB is a friend of personA, but it does not imply that personA is a friend of personB. The output should be a (person, friend count) tuple where person is a string and friend count is an integer describing the number of friends person has.
import MapReduce
import sys
mr = MapReduce.MapReduce()
# =============================
# Do not modify above this line
def mapper(record):
# key: person
# value: friend
key = record[0]
friend=record[1]
mr.emit_intermediate(key, friend)
def reducer(key, list_of_values):
#key: person
#list_of_values: friend_list
mr.emit((key, len(list_of_values)))
# Do not modify below this line
# =============================
if __name__ == '__main__':
inputdata = open(sys.argv[1])
mr.execute(inputdata, mapper, reducer)
You can test it using the friends dataset and running:
python friend_count.py friends.json
The next problem was still on the friends dataset and asked us to implement a MapReduce algorithm to generate a list of all non-symmetric
friend relationships. The input is the same as the previous problem but the expected output this time is a tuple (person, friend) or (friend, person) for each asymmetric friendship, remembering that only one of those
output tuples will exist in the input.
import MapReduce
import sys
mr = MapReduce.MapReduce()
# =============================
# Do not modify above this line
def mapper(record):
# key: person
# value: friend
key = record[0]
friend=record[1]
mr.emit_intermediate(key, friend)
def reducer(key, list_of_values):
#key: person
#list_of_values: person friends
#mr.intermediate: full dictionary person-friend list
for v in list_of_values:#for every friend of mine
if v not in mr.intermediate.keys():#if he has no friends, it's asymmetric
mr.emit((key, v))
mr.emit((v, key))
else:#search his friends for me
if key not in mr.intermediate[v]:#if i'm not there, it's asymmetric
mr.emit((key,v))
mr.emit((v,key))
# Do not modify below this line
# =============================
if __name__ == '__main__':
inputdata = open(sys.argv[1])
mr.execute(inputdata, mapper, reducer)
The solution can be tested with:
python asymmetric_friendships.py friends.json
The fifth problem moved over to the dna dataset and required us to consider a set of key-value pairs where each key is sequence id and each value is a string of nucleotides. We had to write a MapReduce query to remove the last 10 characters from each string of nucleotides, then remove any duplicates generated given an input as a 2 element list: [sequence id, nucleotides] where sequence id is a unique identifier formatted as a string and nucleotides is a sequence of nucleotides formatted as a string.
import MapReduce
import sys
mr = MapReduce.MapReduce()
# =============================
# Do not modify above this line
def mapper(record):
# key: id
# value: sequence
key = record[0]
sequence=record[1]
trimmed=sequence[:-10]
mr.emit_intermediate(trimmed, 0)#trim last 10 characters then pass it along as key, automatically removing all duplicates
def reducer(key, list_of_values):
#key: id
#list_of_values: trimmed sequences
#mr.intermediate:
mr.emit(key)#nothing to do here, just print all unique keys out
# Do not modify below this line
# =============================
if __name__ == '__main__':
inputdata = open(sys.argv[1])
mr.execute(inputdata, mapper, reducer)
Run it as:
python unique_trims.py dna.json
Finally, the last problem asked us to compute the matrix multiplication between two non-square matrices but it was a little bugged since we had no way of computing the resulting matrix size beforehand. Given we're working in Map/Reduce, we would have to read all the data once to correctly determine the size before cycling on it a second time to perform our calculations; in a Big Data context however, being allowed two passes on the same data is a rare luxury. This forced us to hardcode the size inside our script, somewhat crippling the exercise.
Anyway, the input was presented as the matrix row records formatted as lists. Each list will have the format [matrix, i, j, value] where matrix is a string and i, j, and value are integers.
The first item, matrix, is a string that identifies which matrix the record originates from. This field has two possible values: either a if the record is from matrix A or b. The output should be matrix row records formatted as tuples as well. Each tuple will have the format (i, j, value) where each element is an integer.
The first item, matrix, is a string that identifies which matrix the record originates from. This field has two possible values: either a if the record is from matrix A or b. The output should be matrix row records formatted as tuples as well. Each tuple will have the format (i, j, value) where each element is an integer.
import MapReduce
import sys
mr = MapReduce.MapReduce()
# =============================
# Do not modify above this line
def mapper(record):
# key: matrix (a,b)
# value: i,j,value
key = record[0]
if key=='a':#since it's A*B
mr.emit_intermediate(key, [record[1],record[2],record[3]])#index a on i,j,value
else:
mr.emit_intermediate(key, [record[2],record[1],record[3]])#and b on j,i,value
def reducer(key, list_of_values):
#key: matrix a or b
#list_of_values: i,j,value for a and j,i,value for b
#mr.intermediate: contains all rows from both a and b in a dictionary keyed on a,b
a={}
b={}
if key=='a':#computing A*B
#I NEED THE MATRIX DIMENSIONS!!! (5)
#populate two dictionaries with our known values
for v in list_of_values:
a[(v[0], v[1])]=v[2]
for r in mr.intermediate['b']:
b[(r[0], r[1])]=r[2]
#and fill the blanks
for i in range(0,5):
for j in range(0,5):
if (i,j) not in a.keys():
a[(i,j)]=0
if (j,i) not in b.keys():
b[(j,i)]=0
result=0
#compute the multiplication A*Bij = SUM(Aik * Bkj) for k in 0..4
for i in range(0,5):
for j in range(0,5):
for k in range(0,5):
result+=a[(i,k)]*b[(j,k)]
mr.emit((i,j,result))
result=0
# Do not modify below this line
# =============================
if __name__ == '__main__':
inputdata = open(sys.argv[1])
mr.execute(inputdata, mapper, reducer)
The script is based on the matrix dataset and can be run as:
python multiply.py matrix.json
Again, another interesting approach to Python programming as well as the Map/Reduce model.
No comments:
Post a Comment
With great power comes great responsibility