MIT 6.824 Distributed Systems 分布式系统课程系列1 Lab1-MapReduce

MIT 6.824 Distributed Systems 分布式系统课程系列1 Lab1-MapReduce


go 1.15安装

https://github.com/golang/go/wiki/Ubuntu

sudo add-apt-repository ppa:longsleep/golang-backports
sudo apt update
sudo apt install golang-go

export PATH=$PATH:/usr/lib/go-1.15/bin

实验主页

https://pdos.csail.mit.edu/6.824/labs/lab-mr.html

课程示例代码:

git clone git://g.csail.mit.edu/6.824-golabs-2020 6.824

cd 6.824
cd src/main
go build -buildmode=plugin ../mrapps/wc.go      把wc变成一个类似插件
go run mrsequential.go wc.so pg*.txt            跑mrsequential.go,用wc.so插件里的函数处理文件pg*.txt。

最终测试是运行 /src/main/test-mr.sh
需要看一下这个测试脚本来搞清流程。

看示例代码:

实验任务:

流程解析

  1. a个原始文本分为a个map任务,编号X。
  2. master分配map任务给worker,处理分到的文件,得到一串键值对,value都是1,如{‘about’, 1}。
  3. 存入中间文件。怎么存,要用ihash当前单词,再%nReduce得到Y。按XY存到mr-X-Y文件。
  4. master分配reduce任务给worker。此时所有单词的数据是按单词的hash再%nReduce分割在不同Y编号中的。那么对于每个Y,可以把所有X文件里的键值对数量累加一下,存到mr-out-Y。此时每个mr-out-Y文件就存了以hash分割的所有原始数据的单词数据。
  5. test-mr.sh这个测试程序就是把所有mr-out-Y文件里的数据整合起来排序。得到的结果如果与mrsequential.go得到的结果一样就通过测试。

go相关:

代码:

没有做的很完善精细,只是跑通测试。仅供参考。

rpc.go

package mr

//
// RPC definitions.
//
// remember to capitalize all names.
//

import "os"
import "strconv"

//
// example to show how to declare the arguments
// and reply for an RPC.
//

type ExampleArgs struct {
    X int
}

type ExampleReply struct {
    ServerStatus int
}

// Add your RPC definitions here.

//type Task struct {
//    Text string
//}

type Task struct {
    TaskType int
    TaskID int
    FileName string
    NReduce int
    NMap int
}

type TaskResult struct {
    TaskType int
    Result bool
    TaskID int
    //Data []KeyValue

}

// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the master.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func masterSock() string {
    s := "/var/tmp/824-mr-"
    s += strconv.Itoa(os.Getuid())
    return s
}

master.go

package mr

import "log"
import "net"
import "os"
import "net/rpc"
import "net/http"

import "fmt"
//import "io/ioutil"

type Master struct {
    // Your definitions here.
    map_task_status map[int]int
    map_task_content map[int]string
    map_task_count int
    map_current_task int
    map_done_task_count int

    nReduce int

    reduce_task_status map[int]int
    reduce_task_count int
    reduce_current_task int
    reduce_done_task_count int

    server_status int
}

// Your code here -- RPC handlers for the worker to call.

//
// an example RPC handler.
//
// the RPC argument and reply types are defined in rpc.go.
//
func (m *Master) Example(args *ExampleArgs, reply *ExampleReply) error {

    return nil
}

func (m *Master) FetchTask(args *ExampleArgs, task *Task) error {
    fmt.Println("FetchTask")
    //fmt.Println(m.map_current_task)

    //task.Text = m.task_content[m.map_current_task]

    //m.map_current_task += 1

    if m.map_current_task < m.map_task_count { // map 未结束
        task.TaskType = 0
        task.TaskID = m.map_current_task
        task.FileName = m.map_task_content[m.map_current_task]
        task.NReduce = m.nReduce
        task.NMap = m.map_task_count

        m.map_current_task += 1
    } else {
        task.TaskType = 1
        task.TaskID = m.reduce_current_task
        task.NReduce = m.nReduce
        task.NMap = m.map_task_count

        m.reduce_current_task += 1
    }

    return nil
}

func (m *Master) ReportTask(result *TaskResult, reply *ExampleReply) error {
    fmt.Println("ReportTask")

    if result.TaskType == 0 { // 简化
        m.map_done_task_count ++
    } else {
        m.reduce_done_task_count ++
    }

    if m.map_done_task_count == m.map_task_count && m.reduce_done_task_count == m.reduce_task_count {
        reply.ServerStatus = 1
        m.server_status = 1
    } else {
        reply.ServerStatus = 0
    }

    return nil
}

//
// start a thread that listens for RPCs from worker.go
//
func (m *Master) server() {
    rpc.Register(m)
    rpc.HandleHTTP()
    //l, e := net.Listen("tcp", ":1234")
    sockname := masterSock()
    os.Remove(sockname)
    l, e := net.Listen("unix", sockname)
    if e != nil {
        log.Fatal("listen error:", e)
    }
    go http.Serve(l, nil)
}

//
// main/mrmaster.go calls Done() periodically to find out
// if the entire job has finished.
//
func (m *Master) Done() bool {
    ret := false

    // Your code here.
    //if m.map_done_task_count == m.map_task_count {
    //    ret = true
    //}

    if m.server_status == 1{
        ret = true
    }

    return ret
}

//
// create a Master.
// main/mrmaster.go calls this function.
// nReduce is the number of reduce tasks to use.
//
func MakeMaster(files []string, nReduce int) *Master {
    m := Master{}

    // Your code here.

    m.server_status = 0

    // 读所有输入文件。每个文件的内容存作为一个task,存到一个
    m.map_task_status = make(map[int]int)
    m.map_task_content = make(map[int]string)
    m.map_task_count = 0
    m.map_current_task = 0
    m.map_done_task_count = 0

    m.nReduce = nReduce

    for _, filename := range files {
        /*
        file, err := os.Open(filename)
        if err != nil {
            log.Fatalf("cannot open %v", filename)
        }
        content, err := ioutil.ReadAll(file)
        if err != nil {
            log.Fatalf("cannot read %v", filename)
        }
        file.Close()*/

        m.map_task_status[m.map_task_count] = 0
        m.map_task_content[m.map_task_count] = string(filename)

        fmt.Println(m.map_task_count)
        fmt.Println(len(string(filename)))
        m.map_task_count += 1
    }

    fmt.Println(m.map_task_content)

    // reduce tasks
    m.reduce_task_status = make(map[int]int)
    m.reduce_task_count = nReduce
    m.reduce_current_task = 0
    m.reduce_done_task_count = 0

    for i := 0; i < nReduce; i++ {
        m.reduce_task_status[i] = 0
    }

    m.server()
    return &m
}

worker.go

package mr

import "fmt"
import "log"
import "net/rpc"
import "hash/fnv"
import "io/ioutil"
import "os"
import "encoding/json"
import "sort"

//
// Map functions return a slice of KeyValue.
//
type KeyValue struct {
    Key   string
    Value string
}

// for sorting by key.
type ByKey []KeyValue

// for sorting by key.
func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

//
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
//
func ihash(key string) int {
    h := fnv.New32a()
    h.Write([]byte(key))
    return int(h.Sum32() & 0x7fffffff)
}

//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,
    reducef func(string, []string) string) {

    // Your worker implementation here.

    // uncomment to send the Example RPC to the master.
    // CallExample()
    //GetTask()

    for true {
        // declare an argument structure.
        args := ExampleArgs{}

        // fill in the argument(s).
        args.X = 99

        // declare a reply structure.
        //reply := ExampleReply{}
        task := Task{}

        // send the RPC request, wait for the reply.
        err := call("Master.FetchTask", &args, &task)

        if !err {
            log.Fatal("rpc error:", err)
            break
        }

        fmt.Printf("task.TaskType %d\n", task.TaskType)

        if task.TaskType == 0 {
            file_name := task.FileName

            file, err := os.Open(file_name)
            if err != nil {
                log.Fatalf("cannot open %v", file_name)
            }
            content, err := ioutil.ReadAll(file)
            if err != nil {
                log.Fatalf("cannot read %v", file_name)
            }
            file.Close()


            map_data := make(map[int][]KeyValue)

            kva := mapf("gg", string(content))

            for _, kv := range kva {
                //fmt.Printf("%s %s\n", kv.Key, kv.Value)
                hash_id := ihash(kv.Key) % task.NReduce

                map_data[hash_id] = append(map_data[hash_id], kv)
            }

            // 输出到临时文件 mr-task.TaskID-hash_id
            for hash_id, data  := range map_data {
                file_name := fmt.Sprintf("mr-%d-%d", task.TaskID, hash_id)

                file, err := os.Create(file_name)

                //file, err := os.Open(file_name)
                if err != nil {
                    log.Fatalf("cannot create %v", file_name)
                }

                enc := json.NewEncoder(file)
                for _, kv := range data {
                    //fmt.Printf("%s %s\n", kv.Key, kv.Value)
                    err := enc.Encode(&kv)
                    if err != nil {
                        fmt.Println(err)
                        log.Fatalf("Encode error")
                    }
                }

                file.Close()
            }

            // report
            // declare an argument structure.
            reply := ExampleReply{}

            // declare a reply structure.
            //reply := ExampleReply{}
            task_result := TaskResult{}
            task_result.Result = true
            task_result.TaskType = task.TaskType
            task_result.TaskID = task.TaskID
            //task_result.Data =

            // send the RPC request, wait for the reply.
            call("Master.ReportTask", &task_result, &reply)

            if reply.ServerStatus == 1 {
                fmt.Println("server tasks all done")
                break
            }
        } else {
            // 把所有mr-X-Y整合为mr-out-Y

            //file_names := fmt.Sprintf("mr-*-%d", task.TaskID)

            kva := []KeyValue{}

            //for _, filename := range file_names {
            for t := 0; t < task.NMap; t ++ {
                filename := fmt.Sprintf("mr-%d-%d", t, task.TaskID)

                file, err := os.Open(filename)
                if err != nil {
                    log.Fatalf("cannot open %v", filename)
                }

                /*
                content, err := ioutil.ReadAll(file)
                if err != nil {
                    log.Fatalf("cannot read %v", filename)
                }*/

                dec := json.NewDecoder(file)

                for {
                    var kv KeyValue
                    if err := dec.Decode(&kv); err != nil {
                      break
                    }
                    kva = append(kva, kv)
                }

                file.Close()

                //kva := mapf(filename, string(content))
                //intermediate = append(intermediate, kva...)
            }

            sort.Sort(ByKey(kva))

            file_name := fmt.Sprintf("mr-out-%d", task.TaskID)

            file, err := os.Create(file_name)
            if err != nil {
                log.Fatalf("cannot create %v", file_name)
            }

            i := 0
            for i < len(kva) {
                j := i + 1
                for j < len(kva) && kva[j].Key == kva[i].Key {
                    j++
                }
                values := []string{}
                for k := i; k < j; k++ {
                    values = append(values, kva[k].Value)
                }
                output := reducef(kva[i].Key, values)

                // this is the correct format for each line of Reduce output.
                fmt.Fprintf(file, "%v %v\n", kva[i].Key, output)

                i = j
            }

            file.Close()

            // report
            // declare an argument structure.
            reply := ExampleReply{}

            // declare a reply structure.
            //reply := ExampleReply{}
            task_result := TaskResult{}
            task_result.Result = true
            task_result.TaskType = task.TaskType
            task_result.TaskID = task.TaskID
            //task_result.Data =

            // send the RPC request, wait for the reply.
            call("Master.ReportTask", &task_result, &reply)

            if reply.ServerStatus == 1 {
                fmt.Println("server tasks all done")
                break
            }
        }
    }
}

func GetTask() {

    // declare an argument structure.
    args := ExampleArgs{}

    // fill in the argument(s).
    args.X = 99

    // declare a reply structure.
    //reply := ExampleReply{}
    task := Task{}

    // send the RPC request, wait for the reply.
    call("Master.FetchTask", &args, &task)

    // reply.Y should be 100.
    fmt.Printf("task.TaskType %s\n", task.TaskType)
}

func ReportTask() {
}

//
// example function to show how to make an RPC call to the master.
//
// the RPC argument and reply types are defined in rpc.go.
//
func CallExample() {

    // declare an argument structure.
    args := ExampleArgs{}

    // fill in the argument(s).
    args.X = 99

    // declare a reply structure.
    reply := ExampleReply{}

    // send the RPC request, wait for the reply.
    call("Master.Example", &args, &reply)
}

//
// send an RPC request to the master, wait for the response.
// usually returns true.
// returns false if something goes wrong.
//
func call(rpcname string, args interface{}, reply interface{}) bool {
    // c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
    sockname := masterSock()
    c, err := rpc.DialHTTP("unix", sockname)
    if err != nil {
        log.Fatal("dialing:", err)
    }
    defer c.Close()

    err = c.Call(rpcname, args, reply)
    if err == nil {
        return true
    }

    fmt.Println(err)
    return false
}

上次更新 2021-01-28