MIT 6.824 Distributed Systems 分布式系统课程系列1 Lab1-MapReduce
该课程包括分布式系统相关的各种知识。包括mapreduce,raft算法,Zookeeper,比特币等等。
配备4个lab,需要动手写代码,实现mapreduce,raft算法,在raft基础上建立分布式kv服务。
同时可学一下go语言。go语言争议很大,之前粗略看过两次都丧失兴趣。这次正好强制再学一次并实际应用。
课程主页
https://pdos.csail.mit.edu/6.824/
https://pdos.csail.mit.edu/6.824/schedule.html
还有配套的视频。可去各大视频网站查找。
go 1.15安装
https://github.com/golang/go/wiki/Ubuntu
sudo add-apt-repository ppa:longsleep/golang-backports
sudo apt update
sudo apt install golang-go
export PATH=$PATH:/usr/lib/go-1.15/bin
实验主页
https://pdos.csail.mit.edu/6.824/labs/lab-mr.html
- 做一个mapreduce程序。处理多个文本,统计里面每个单词的数量。
- 基本原理就是分而治之,分割任务,分配给多个机器去处理,得到一些可独立处理的中间结果,再多个机器做汇总的任务,得到最终结果。
- 通过lab1可熟悉最基本的go代码:基本语法、文件操作、编解码、错误处理、rpc等等。
课程示例代码:
git clone git://g.csail.mit.edu/6.824-golabs-2020 6.824
cd 6.824
cd src/main
go build -buildmode=plugin ../mrapps/wc.go 把wc变成一个类似插件
go run mrsequential.go wc.so pg*.txt 跑mrsequential.go,用wc.so插件里的函数处理文件pg*.txt。
最终测试是运行 /src/main/test-mr.sh
需要看一下这个测试脚本来搞清流程。
看示例代码:
mrsequential.go是个顺序执行的流程,实现了最简单的mapreduce。
map函数把文字分割成单词,返回包含所有{单词,1}键值对的数组。框架把相同键的数据给reduce函数,reduce函数统计相同键数据的数量。
最后把每个单词的数量输出到mr-out-0文件。
import “plugin”
可把自己的代码编成plugin。选择plugin里的函数。
- intermediate = append(intermediate, kva…)
把kva里的每项添加到intermediate。这写法有点。。。
tgpl 139
实验任务:
完成mr/master.go, mr/worker.go, and mr/rpc.go
做两个程序,一个master一个worker。实现一个分布式的mapreduce。一个master进程,多个worker进程。worker通过rpc向master请求任务并执行。
流程解析
- 得看mapreduce论文或网友解读,不然只看实验要求的话很难猜出它的逻辑。
- 论文里的那张流程图非常清晰。程序分为两个过程,先多个worker做map,生成中间文件,再多个worker做reduce,生成数个最终文件。
- a个原始文本分为a个map任务,编号X。
- master分配map任务给worker,处理分到的文件,得到一串键值对,value都是1,如{‘about’, 1}。
- 存入中间文件。怎么存,要用ihash当前单词,再%nReduce得到Y。按XY存到mr-X-Y文件。
- master分配reduce任务给worker。此时所有单词的数据是按单词的hash再%nReduce分割在不同Y编号中的。那么对于每个Y,可以把所有X文件里的键值对数量累加一下,存到mr-out-Y。此时每个mr-out-Y文件就存了以hash分割的所有原始数据的单词数据。
- test-mr.sh这个测试程序就是把所有mr-out-Y文件里的数据整合起来排序。得到的结果如果与mrsequential.go得到的结果一样就通过测试。
go相关:
怎么好像每运行一次后都要再编译下plugin?
rpc: gob error encoding body: gob: type xxxx has no exported fields
定义结构,成员名字首字母要大写,不然rpc这边会报错。文件Create后马上Open。再写会失败。
go的rpc还挺方便。但如果要用别的语言交互,还是得整协议。
文档:
https://golang.google.cn/pkg/net/rpc/#Register
https://drstearns.github.io/tutorials/gojson/
代码:
没有做的很完善精细,只是跑通测试。仅供参考。
rpc.go
package mr
//
// RPC definitions.
//
// remember to capitalize all names.
//
import "os"
import "strconv"
//
// example to show how to declare the arguments
// and reply for an RPC.
//
type ExampleArgs struct {
X int
}
type ExampleReply struct {
ServerStatus int
}
// Add your RPC definitions here.
//type Task struct {
// Text string
//}
type Task struct {
TaskType int
TaskID int
FileName string
NReduce int
NMap int
}
type TaskResult struct {
TaskType int
Result bool
TaskID int
//Data []KeyValue
}
// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the master.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func masterSock() string {
s := "/var/tmp/824-mr-"
s += strconv.Itoa(os.Getuid())
return s
}
master.go
package mr
import "log"
import "net"
import "os"
import "net/rpc"
import "net/http"
import "fmt"
//import "io/ioutil"
type Master struct {
// Your definitions here.
map_task_status map[int]int
map_task_content map[int]string
map_task_count int
map_current_task int
map_done_task_count int
nReduce int
reduce_task_status map[int]int
reduce_task_count int
reduce_current_task int
reduce_done_task_count int
server_status int
}
// Your code here -- RPC handlers for the worker to call.
//
// an example RPC handler.
//
// the RPC argument and reply types are defined in rpc.go.
//
func (m *Master) Example(args *ExampleArgs, reply *ExampleReply) error {
return nil
}
func (m *Master) FetchTask(args *ExampleArgs, task *Task) error {
fmt.Println("FetchTask")
//fmt.Println(m.map_current_task)
//task.Text = m.task_content[m.map_current_task]
//m.map_current_task += 1
if m.map_current_task < m.map_task_count { // map 未结束
task.TaskType = 0
task.TaskID = m.map_current_task
task.FileName = m.map_task_content[m.map_current_task]
task.NReduce = m.nReduce
task.NMap = m.map_task_count
m.map_current_task += 1
} else {
task.TaskType = 1
task.TaskID = m.reduce_current_task
task.NReduce = m.nReduce
task.NMap = m.map_task_count
m.reduce_current_task += 1
}
return nil
}
func (m *Master) ReportTask(result *TaskResult, reply *ExampleReply) error {
fmt.Println("ReportTask")
if result.TaskType == 0 { // 简化
m.map_done_task_count ++
} else {
m.reduce_done_task_count ++
}
if m.map_done_task_count == m.map_task_count && m.reduce_done_task_count == m.reduce_task_count {
reply.ServerStatus = 1
m.server_status = 1
} else {
reply.ServerStatus = 0
}
return nil
}
//
// start a thread that listens for RPCs from worker.go
//
func (m *Master) server() {
rpc.Register(m)
rpc.HandleHTTP()
//l, e := net.Listen("tcp", ":1234")
sockname := masterSock()
os.Remove(sockname)
l, e := net.Listen("unix", sockname)
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
}
//
// main/mrmaster.go calls Done() periodically to find out
// if the entire job has finished.
//
func (m *Master) Done() bool {
ret := false
// Your code here.
//if m.map_done_task_count == m.map_task_count {
// ret = true
//}
if m.server_status == 1{
ret = true
}
return ret
}
//
// create a Master.
// main/mrmaster.go calls this function.
// nReduce is the number of reduce tasks to use.
//
func MakeMaster(files []string, nReduce int) *Master {
m := Master{}
// Your code here.
m.server_status = 0
// 读所有输入文件。每个文件的内容存作为一个task,存到一个
m.map_task_status = make(map[int]int)
m.map_task_content = make(map[int]string)
m.map_task_count = 0
m.map_current_task = 0
m.map_done_task_count = 0
m.nReduce = nReduce
for _, filename := range files {
/*
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", filename)
}
file.Close()*/
m.map_task_status[m.map_task_count] = 0
m.map_task_content[m.map_task_count] = string(filename)
fmt.Println(m.map_task_count)
fmt.Println(len(string(filename)))
m.map_task_count += 1
}
fmt.Println(m.map_task_content)
// reduce tasks
m.reduce_task_status = make(map[int]int)
m.reduce_task_count = nReduce
m.reduce_current_task = 0
m.reduce_done_task_count = 0
for i := 0; i < nReduce; i++ {
m.reduce_task_status[i] = 0
}
m.server()
return &m
}
worker.go
package mr
import "fmt"
import "log"
import "net/rpc"
import "hash/fnv"
import "io/ioutil"
import "os"
import "encoding/json"
import "sort"
//
// Map functions return a slice of KeyValue.
//
type KeyValue struct {
Key string
Value string
}
// for sorting by key.
type ByKey []KeyValue
// for sorting by key.
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
//
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
//
func ihash(key string) int {
h := fnv.New32a()
h.Write([]byte(key))
return int(h.Sum32() & 0x7fffffff)
}
//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
// Your worker implementation here.
// uncomment to send the Example RPC to the master.
// CallExample()
//GetTask()
for true {
// declare an argument structure.
args := ExampleArgs{}
// fill in the argument(s).
args.X = 99
// declare a reply structure.
//reply := ExampleReply{}
task := Task{}
// send the RPC request, wait for the reply.
err := call("Master.FetchTask", &args, &task)
if !err {
log.Fatal("rpc error:", err)
break
}
fmt.Printf("task.TaskType %d\n", task.TaskType)
if task.TaskType == 0 {
file_name := task.FileName
file, err := os.Open(file_name)
if err != nil {
log.Fatalf("cannot open %v", file_name)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", file_name)
}
file.Close()
map_data := make(map[int][]KeyValue)
kva := mapf("gg", string(content))
for _, kv := range kva {
//fmt.Printf("%s %s\n", kv.Key, kv.Value)
hash_id := ihash(kv.Key) % task.NReduce
map_data[hash_id] = append(map_data[hash_id], kv)
}
// 输出到临时文件 mr-task.TaskID-hash_id
for hash_id, data := range map_data {
file_name := fmt.Sprintf("mr-%d-%d", task.TaskID, hash_id)
file, err := os.Create(file_name)
//file, err := os.Open(file_name)
if err != nil {
log.Fatalf("cannot create %v", file_name)
}
enc := json.NewEncoder(file)
for _, kv := range data {
//fmt.Printf("%s %s\n", kv.Key, kv.Value)
err := enc.Encode(&kv)
if err != nil {
fmt.Println(err)
log.Fatalf("Encode error")
}
}
file.Close()
}
// report
// declare an argument structure.
reply := ExampleReply{}
// declare a reply structure.
//reply := ExampleReply{}
task_result := TaskResult{}
task_result.Result = true
task_result.TaskType = task.TaskType
task_result.TaskID = task.TaskID
//task_result.Data =
// send the RPC request, wait for the reply.
call("Master.ReportTask", &task_result, &reply)
if reply.ServerStatus == 1 {
fmt.Println("server tasks all done")
break
}
} else {
// 把所有mr-X-Y整合为mr-out-Y
//file_names := fmt.Sprintf("mr-*-%d", task.TaskID)
kva := []KeyValue{}
//for _, filename := range file_names {
for t := 0; t < task.NMap; t ++ {
filename := fmt.Sprintf("mr-%d-%d", t, task.TaskID)
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
/*
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", filename)
}*/
dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
kva = append(kva, kv)
}
file.Close()
//kva := mapf(filename, string(content))
//intermediate = append(intermediate, kva...)
}
sort.Sort(ByKey(kva))
file_name := fmt.Sprintf("mr-out-%d", task.TaskID)
file, err := os.Create(file_name)
if err != nil {
log.Fatalf("cannot create %v", file_name)
}
i := 0
for i < len(kva) {
j := i + 1
for j < len(kva) && kva[j].Key == kva[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, kva[k].Value)
}
output := reducef(kva[i].Key, values)
// this is the correct format for each line of Reduce output.
fmt.Fprintf(file, "%v %v\n", kva[i].Key, output)
i = j
}
file.Close()
// report
// declare an argument structure.
reply := ExampleReply{}
// declare a reply structure.
//reply := ExampleReply{}
task_result := TaskResult{}
task_result.Result = true
task_result.TaskType = task.TaskType
task_result.TaskID = task.TaskID
//task_result.Data =
// send the RPC request, wait for the reply.
call("Master.ReportTask", &task_result, &reply)
if reply.ServerStatus == 1 {
fmt.Println("server tasks all done")
break
}
}
}
}
func GetTask() {
// declare an argument structure.
args := ExampleArgs{}
// fill in the argument(s).
args.X = 99
// declare a reply structure.
//reply := ExampleReply{}
task := Task{}
// send the RPC request, wait for the reply.
call("Master.FetchTask", &args, &task)
// reply.Y should be 100.
fmt.Printf("task.TaskType %s\n", task.TaskType)
}
func ReportTask() {
}
//
// example function to show how to make an RPC call to the master.
//
// the RPC argument and reply types are defined in rpc.go.
//
func CallExample() {
// declare an argument structure.
args := ExampleArgs{}
// fill in the argument(s).
args.X = 99
// declare a reply structure.
reply := ExampleReply{}
// send the RPC request, wait for the reply.
call("Master.Example", &args, &reply)
}
//
// send an RPC request to the master, wait for the response.
// usually returns true.
// returns false if something goes wrong.
//
func call(rpcname string, args interface{}, reply interface{}) bool {
// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
sockname := masterSock()
c, err := rpc.DialHTTP("unix", sockname)
if err != nil {
log.Fatal("dialing:", err)
}
defer c.Close()
err = c.Call(rpcname, args, reply)
if err == nil {
return true
}
fmt.Println(err)
return false
}