MIT6.824分布式系统学习笔记

摘要:

课程视频来源:www.simtoco.com

感谢simviso团队!

课程简介

  • MapReduce的核心思想是程序员只需要在分布式计算系统下编写自己的应用程序,但是不需要知道分布式的任何知识就可以写出一个简单的map函数和一个简单的reduce函数,MapReduce框架将负责其他所有工作。
  • MapReduce工作流程:首先输入数据被分割为许多的输入文件,然后会在每个输入文件上都执行map函数(并行执行),map函数的输出结果是一个list,list中的内容是很多的KV键值对。例如,如果执行的是字数统计,key就是某个单词,value就是1(稍候会统计这些1的个数)。接下来,MapReduce框架会收集所有的数据,例如将所有key为a的键值对收集起来,将他们作为参数交给reduce函数;然后收集所有key为b的键值对,作为参数传递给reduce函数……MapReduce框架会为出现在map输出结果中的每个key安排一个单独的reduce函数去处理他们。如果一个reduce函数被传入了两个(a, 1)键值对,那么他的输出就是(a, 2)。由于每个MapTask都有可能emit出key为a的键值对,所以执行reduce的worker机器需要与所有执行map的的机器通信,去询问他们是否有key为a的键值对emit出来。字数统计是一个十分简单基础的例子。上面的整个计算过程称为Job,每一个map或者reduce的调用被称为task。所以一个完整的Job就由一系列的MapTask和ReduceTask组成。
  • 下面讨论更加具体的内容。还是以字数统计为例,Map(k, v)函数有两个参数,k代表的是输入文件的文件名(这个文件名通常并不重要,我们不去关注它),v是输入文件的全部内容。接着,对于每个word,Map函数需要emit两个参数(即键值对word和1)
  • 实际应用中,解决一个问题可能需要一连串的MapReduce Job,前一个job的输出作为后一个job的输入。
  • 在谷歌关于MapReduce的那篇论文中,所有的原始数据都是存在GFS中,GFS实际上与运行MapReduce的worker在同一个服务器集合上,即服务器上同时部署了GFS和MapReduce(好处是,当需要将GFS中的数据作为参数传给map函数时,可以直接在本地硬盘读取,而不需要通过网络)。GFS会自动将存储在它之上的任何大文件分割为很多64M的数据块,并将他们分散存储在很多服务器上。如果随后要运行一个MapReduce job,它将整个已经爬过的网站作为输入,数据已经被拆分存储到了所有的服务器中(假设有1000台),这意味着我们在启动map worker的时候,他们可以从一千台GFS文件并行读取数据,这样就可以获得1000台服务器的吞吐量。

RPC和多线程

  • 在go中,所有线程公用一块地址空间,每个线程都有自己的一个栈。

  • 如果想要让main程序周期性地做一件事,比如周期性的检查worker是否还存活(MapReduce实际上就是这样做的),那就可以启动一个Goroutine,并在其中放入一个循环操作,该循环的大概操作是:让线程沉睡一秒,然后起来执行周期任务,接着继续沉睡,如此反复执行。

  • 进程与线程:一个进程拥有一片独立的内存(地址空间),一个进程中可以包含多个线程,这些线程公用这块内存,生活在同一个世界中,并且有着各自的栈,同一个进程中的线程之间可以访问对方的使用的地址空间(通常不会这么做,但是可以),因为他们毕竟存在于同于个世界中。而对于不同的进程之间,他们拥有完全独立的两块内存,类似于完全独立的两个世界。

  • 在go中,如果想用锁对一些共享数据进行保护(确保同一时间只有一个线程在修改这些变量),事实上go并不知道和变量(共享数据)有关的任何事,go只知道,一个线程拥有了锁,其他线程就必须等待。

  • sync.WaitGroup的一点介绍

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    //代码来自google官方
    package main

    import (
    "fmt"
    "sync"
    "net/http"
    )

    func main() {
    var wg sync.WaitGroup //WaitGroup用于线程同步
    var urls = []string{
    "http://www.golang.org/",
    "http://www.google.com/",
    "http://www.baiyuxiong.com/",
    }
    for _, url := range urls {
    // Increment the WaitGroup counter.
    wg.Add(1) //add操作一般在父线程中进行,会使得wg内部的计数器加上一个值,内部计数器变为0时,wait方法的阻塞才会取消
    // Launch a goroutine to fetch the URL.
    go func(url string) {
    // Decrement the counter when the goroutine completes.
    defer wg.Done() //Done会使得wg内部计数器减一,一般在线程最后执行。defer的功能是延迟执行,defer后面的语句会在defer所在函数马上要返回前执行
    // Fetch the URL.
    http.Get(url)
    fmt.Println(url);
    }(url)
    }
    // Wait for all HTTP fetches to complete.
    wg.Wait() //wait会阻塞直到wg计数器减为0
    fmt.Println("over");
    }

    21行的函数是现场定义现场调用的。url string是函数的形参,后面的大括号里面是函数体,最后的括号里面的url是实参,可以将从func开始一直到’}’的部分理解成是一个函数的函数名,那么最后的括号里面自然就是函数的实参。前面的go会使得这个函数在一个新的goroutine里面执行。如果goroutine中只需要做25行的事情,那么直接go http.Get(url)就可以了,但是由于我们想要做的不止25行的事情,所以使用了这个特殊的func

  • 加上-race的flag,即go run -race xxx.go可以检测程序中是否有临界区没有被锁好

  • for urls := range ch {
    }

    ch是channel,这种情况下,如果ch中没有东西,第一行处会一直等待直到ch中有东西之后进入循环。如果不在for里面加入break,将永远不会结束。默认情况下,发送和接收操作在另一端准备好之前都会阻塞

GFS