Big Data Note

摘要:

hadoop、Kafka、Flink
持续更新中…

学习计划

Hadoop(3月20日~4月13日左右 20-25天)

Kafka(4月13日~5月8日左右 20-25天)

Flink(5月8日~6月底 20-25天)

  • 学习资料:尚硅谷教程

SpringBoot(6月底~实习前 20-25天)

学习目标

  • 搭建hadoop、flink、kafka系统并了解这3个开源系统的原理,达到可以流畅部署,可以基本上理解核心逻辑,比如了解flink作业启动过程、flink作业运行时处理过程、kafka系统数据写入过程、数据持久化过程、数据消费过程、hadoop系统hdfs数据写入过程、数据持久化过程、数据读取过程、主控fsimage数据结构、yarn队列原理

  • 使用taf-java构造数据实时写入数据到kafka,flink从kafka消费数据进行计算,然后把数据写入到mysql,可以计算一个网站常用的指标,比如UV、PV、DAU等指标。

  • 重点看Hadoop,Kafka,flink这3个组件,可以在云上买一些服务器搭建或者虚拟出多台服务器搭建环境,把这些组件代码也download下来,结合源码学习

  • 开发语言主要是java,然后是go和python开发一些工具,RPC框架是使用taf,这个框架已经开源,外部叫tars,可以基于tars来搭建开发环境,java的java springboot也需要学习

hadoop

  • hadoop两大核心组件:HDFS和MapReduce

  • MapReduce两大核心组件:JobTracker和TaskTraker

    前者对整个作业任务进行管理,会把用户的大作业拆分成很多的小作业,每一个小作业在一个机器上执行,这个机器上的小作业就由部署在该机器的TaskTraker进行追踪和执行。

  • DataNode和TaskTraker可以部署在同一台机器上

HDFS

  • 块的概念:与普通文件系统中块的概念类似,但是HDFS中的块会大很多。如果块太小的话,块的数量就会很多,后续的寻址开销就会很大。通过块的设计,将一个大文件拆分成若干的块,如此便可以存储在不同的机器上。每个块的大小是固定的,也方便对数据进行管理。

  • HDFS中的NamaNode和DataNode

    • NameNode用来记录文件的每一个部分都被存储在了哪台机器上面,DataNode用来实际存储数据。NameNode中存储的是元数据,元数据记录了文件是什么、文件被分成多少块、每个块和文件是怎么映射的、每个块被存储到那个服务器上面等等信息。

    • 元数据有两个关键的数据结构:FsImage和Editlog。

      FsImage用来保存系统文件树以及文件树中所有的文件和文件夹中的元数据。其中包括文件的复制等级、修改和访问时间、访问权限、块大小以及组成文件的块。但是,每个块具体没存储在哪个节点的信息不是由FsImage保存的。

      EditLog用来记录对数据进行的操作。

      在NameNode工作起来的时候,FsImage中的存的是历史数据,而EditLog中存的是对历史数据所有的操作。首先需要根据F中的历史数据以及E中记录的操作,得到最新的元数据信息。NameNode会把最新元数据的F保存下来,然后把旧版的删掉,同时再创建一个新的空的E。之后,在对数据进行操作的时候,因为F比较大,如果每次都去操作F的话,效率很低。所以我们只是将更新的操作存到E里面。

    • 在NN(NameNode)工作过程中,E会不断变大。这时候,每间隔一段时间,SecondaryNameNode(第二名称节点)会让主N停止向E中写东西,同时将主N的F和E都通过HTTP的方式下载过来。在这个过程中,主N会将新的操作记录在edit.new中。第二N会根据F和E合并出新的F,再将新的F发给主N。然后主N会将edit.new更改成新的E。这样就既解决了E不断变大的问题,同时实现了第二N的冷备份。(元数据的持久化)

  • FSImage数据结构(https://blog.csdn.net/CNHK1225/article/details/50786785)

    Fsimage是一个二进制文件,当中记录了HDFS中所有文件和目录的元数据信息,fsimage保存有如下信息:

    1. 首先是一个image head,其中包含:

    a) imgVersion(int):当前image的版本信息

    b) namespaceID(int):用来确保别的HDFS instance中的datanode不会误连上当前NN。

    c) numFiles(long):整个文件系统中包含有多少文件和目录

    d) genStamp(long):生成该image时的时间戳信息。

    1. 接下来便是对每个文件或目录的源数据信息,如果是目录,则包含以下信息:

    a) path(String):该目录的路径,如”/user/build/build-index”

    b) replications(short):副本数(目录虽然没有副本,但这里记录的目录副本数也为3)

    c) mtime(long):该目录的修改时间的时间戳信息

    d) atime(long):该目录的访问时间的时间戳信息

    e) blocksize(long):目录的blocksize都为0

    f) numBlocks(int):实际有多少个文件块,目录的该值都为-1,表示该item为目录

    g) nsQuota(long):namespace Quota值,若没加Quota限制则为-1

    h) dsQuota(long):disk Quota值,若没加限制则也为-1

    i) username(String):该目录的所属用户名

    j) group(String):该目录的所属组

    k) permission(short):该目录的permission信息,如644等,有一个short来记录。

    1. 若从fsimage中读到的item是一个文件,则还会额外包含如下信息:

    a) blockid(long):属于该文件的block的blockid,

    b) numBytes(long):该block的大小

    c) genStamp(long):该block的时间戳

    当该文件对应的numBlocks数不为1,而是大于1时,表示该文件对应有多个block信息,此时紧接在该fsimage之后的就会有多个blockid,numBytes和genStamp信息。

  • BlockMap(https://blog.csdn.net/CNHK1225/article/details/50786785)

    从以上fsimage中加载如namenode内存中的信息中可以很明显的看出,在fsimage中,并没有记录每一个block对应到哪几个datanodes的对应表信息,而只是存储了所有的关于namespace的相关信息。而真正每个block对应到datanodes列表的信息在hadoop中并没有进行持久化存储,而是在所有datanode启动时,每个datanode对本地磁盘进行扫描,将本datanode上保存的block信息汇报给namenode,namenode在接收到每个datanode的块信息汇报后,将接收到的块信息,以及其所在的datanode信息等保存在内存中。HDFS就是通过这种块信息汇报的方式来完成 block -> datanodes list的对应表构建。Datanode向namenode汇报块信息的过程叫做blockReport,而namenode将block -> datanodes list的对应表信息保存在一个叫BlocksMap的数据结构中。

  • HDFS HA

    HDFS2.0引入HDFS HA的概念,我们的名字节点将不止一个,其中有一个活跃名字节点和一个待命名字节点,具体哪个活跃哪个待命由zookeeper进行管理。当活跃名字节点出现故障是,待命名字节点可以立刻顶上去。要做到这一点,必须保证活跃名字节点和待命名字节点储存的元数据等等数据都是实时同步的。Editlog的同步是通过名字节点之间的共享存储系统进行同步的,对于映射表信息(例如一个节点包括几个块,某个快被存储到哪个节点上)的实时同步,是通过底层的数据节点一直都同时向活跃名字节点和待命名字节点汇报来进行维护的

  • HDFS Federation

    先前的HDFS架构仅允许整个群集使用单个命名空间,单个Namenode管理命名空间。 HDFS Federation通过向HDFS添加对多个Namenodes /Namespaces的支持来解决此限制,每个名字空间各自独立地管理自己的命名空间。每个DataNode要向集群中所有的namenode注册,且周期性的向所有namenode发送心跳和块报告,并执行来自所有namenode的命令,所以所有的名字节点都是共享底层的数据节点的。

    每个命名空间都有一个块池,块池是属于单个命名空间的一组块。 Datanode存储集群中所有块池的块。 每个Block Pool都是独立管理的。 这允许命名空间为新块生成块ID,而无需与其他命名空间协调。

    访问数据时,就相当于每个名字节点是一个文件夹,我要访问哪个名字节点的数据,就去访问哪个文件夹。

    Federation提高了HDFS的集群扩展性,同时各个名字节点可以同时对外提供服务,提高了吞吐率。再就是数据的隔离性,可以将不同业务类型的数据放在不同的名字节点进行管理。

  • HDFS存储原理

    通常来讲,所有的数据块都会被存放3份,也就是冗余数据,用来保证一个数据出问题后,还可以恢复。三个块不会存储在同一个节点内。对于第一个块,如果这个块是某一个集群内部的某一个节点产生的,那么就会吧这个快存放在这个节点上。如果这个块是外部产生的,HDFS会选一个比较空闲的节点来存放数据;对于第二个块,会放在不同机架上的某一个节点。对于第三个块,会放在和第一个节点相同的机架上面的不同节点上面。(集群中有很多机架,每个机架上有很多节点,同一机架的节点之间的数据传输带宽是很高的)

  • HDFS读取原理

    读取时,理论上可以从之前存储的任意一个冗余的块中进行读取。但是一般会采取就近原则。HDFS提供一个API,可以算出数据所在的机架UD,也算出需要读取数据的节点的机架ID,这样就可以选一个比较近的进行读取。

  • HDFS数据读写原理

  • HDFS编程

    使用java进行编程。首先需要将/usr/local/hadoop/share/hadoop中的commonjar包进行导入

    然后还需要将/hadoop/etc/hadoop中的core-site.xml和hdfs-site.xml文件拷贝到当前的java工程目录(bin)下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    import org.apache.hadoop.conf.Configuration; //他的实例化对象会包含hadoop的配置信息
    import org.apache.hadoop.fs.FileSystem; //其实例为HDFS实例
    import org.apache.hadoop.fs.Path;//路径要用这个表示
    public class Chapter3 {
    public static void main(String[] args) {
    try {
    String filename = "hdfs://localhost:9000/user/hadoop/test.txt";
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(conf);
    if(fs.exists(new Path(filename))){
    System.out.println("文件存在"); }
    else{
    System.out.println("文件不存在"); }
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    }

MapReduce

  • 基本原理:分而治之

  • 计算向数据靠拢:寻找离数据块最近的map机器对数据进行处理

  • Master/Slave架构

    包含一个Master服务器,上面运行作业跟踪器JobTracker,负责整个作业的调度和处理以及失败的恢复。另外包含若干个slave服务器,上面运行负责具体任务执行的TaskTracker,负责接收JobTracker给他发的作业处理指令,完成具体的任务处理。

  • MR的体系结构

    • Client

      提交任务给JobTracker

    • JobTracker

      负责资源监控、检测各个TaskTracker的运行情况、一旦检测到错误,就把这个任务分配到其他节点继续执行。

      这些信息也会被JobTracker发送给Task scheduler

    • Task scheduler

      负责任务分配,决定哪个任务应该分配给哪个节点

    • TaskTracker

      执行具体任务

      把自己的资源使用情况以及任务执行进度通过心跳的方式反馈给JobTracker

      资源会被分为一个个的槽(slot),资源以slot的方式分配出去。slot会被分为map slot和reduce slot用于执行不同的任务

      在一台机器上可以同时执行map任务和reduce任务

  • MR的工作流程

    • HDFS中的数据会被分片,然后为每一个小分片启动一个map任务,map任务的输入时一个键值对,输出时很多键值对。map任务执行的结果要分发给reduce任务,类似于神经网络从一层传数据给下一层。要吧map任务的输出分为几类取决于接下来有多少reduce节点。比如有三个reduce节点,那么每个map都会吧自己的输出结果分为三类,这个过程叫做shuffle,shuffle结束之后,才会吧结果发送给reduce。reduce结束之后,再把结果输出到HDFS中去。
  • MapReduce各个执行阶段及Shuffle过程详解 https://blog.csdn.net/zhengwei223/article/details/78304764

    • 分片过多会导致map任务多,资源消耗大。分片太少会使得并行度降低。
    • map任务数量去觉得分片的数量,reduce任务的数量取决于整个集群中 reduce slot可用资源总数,一班reduce任务数会比这个资源总数略小
    • Map任务的输入是键值对,map任务处理完之后经过shuffle过程会生成若干的key-value list作为reduce任务的输入,reduce处理后输出的依然是K-V.
  • MR的整个过程,数据一开始从HDFS中读取,最终写入HDFS。中间产生的中间结果不会写入HDFS而是存在各个机器自己的磁盘中。

  • MR不是万能的,只有当一个大的任务可以拆分成若干小的任务而且各个任务之间不会相互依赖才可以用MR。

  • 简单的词频统计实例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    import java.io.IOException;
    import java.util.Iterator;
    import java.util.StringTokenizer;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;//简单理解为hadoop中的Int类型
    import org.apache.hadoop.io.Text;//简单理解为hadoop中的String类型
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    public class WordCount {
    public WordCount() {
    }
    public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();//程序的运行时参数
    String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
    if(otherArgs.length < 2) {
    System.err.println("Usage: wordcount <in> [<in>...] <out>");
    System.exit(2);
    }
    //MR程序都是以job任务的形式来执行
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(WordCount.TokenizerMapper.class);
    job.setCombinerClass(WordCount.IntSumReducer.class);
    job.setReducerClass(WordCount.IntSumReducer.class);
    job.setOutputKeyClass(Text.class);//设置输出的key类型
    job.setOutputValueClass(IntWritable.class); //设置输出的value类型
    for(int i = 0; i < otherArgs.length - 1; ++i) {
    FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
    System.exit(job.waitForCompletion(true)?0:1);
    }
    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
    //IntWritable是 Hadoop 中实现的用于封装 Java 数据类型的类,相当于java中Integer整型变量
    private static final IntWritable one = new IntWritable(1);
    private Text word = new Text();
    public TokenizerMapper() {
    }
    //重写map函数。map函数的参数一定是键值对,在本例中,键值为Objext key,其值为文本的行号。value值为文本中对应行的内容,内容是Text对象
    public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
    //StringTokenizer用于分词,默认是按照空白字符进行分词
    StringTokenizer itr = new StringTokenizer(value.toString());
    while(itr.hasMoreTokens()) {
    this.word.set(itr.nextToken());
    context.write(this.word, one);//输出的键值对的key值为单词,value为1
    }
    }
    }
    //map输出的结果要进行shuffle后才会输入到reducer,shuffle的处理结果是<key, value-list>,本例中,key就是某个单词,value list就是1,1,1...这样的形式,有几个1就代表这个词出现了几次。value list使用Iterable容器来存储
    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();
    public IntSumReducer() {
    }
    //value list使用Iterable容器来存储
    public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
    int sum = 0;
    IntWritable val;
    for(IntWritable val : values) {
    sum += val.get();
    }
    this.result.set(sum);
    context.write(key, this.result);//输出结果为K-V形式
    }
    }
    }

Yarn

  • MR1.0独揽了计算、资源分配、任务调度等任务,存在单点故障、内存溢出、资源分配不合理等问题。之后的MR2.0就演变成一个完全的计算框架,只负责计算,资源的调度则由YARN来完成。

  • YARN的体系结构:由ResourceManager ApplicationMaster NodeManager组成。

  • ResourceManager(RM):负责整个系统的资源管理和分配。包括两大核心组件,一个叫调度器(Scheduler)(会在后面详解),一个叫应用程序管理器(ApplicationManager)。调度器负责接收来自ApplicationMaster的应用程序资源请求,调度器将cpu、内存等资源以容器的形式分配给申请的应用程序。容器作为动态资源分配单位,每个容器都封装了一定数量的CPU、内存、磁盘等资源,从而限定每个应用程序可以使用的资源量。RM的另一个组件应用程序管理器(ApplicationManager),负责系统中所有应用程序的管理工作。应用程序管理器是管理ApplicationMaster的而不是直接去管理某个具体的任务。

  • ApplicationMaster:用户提交作业时,AM会与RM协商获取资源,RM以容器的形式为AM提供资源。AM将获得的资源进一步分配给内部的各个map任务和reduce任务,实现资源的二次分配。AM要与NodeManager一直保持通信,因为容器都是在NodeManager上面的,作业也是在NM上执行,所以AM要一直和NM保持通讯来监控作业的执行情况。在启动或者停止一个任务时,需要启动或者停止容器,也都是有AM通知NM的。

  • NodeManager:NM是驻留在一个YARN集群中的每个节点上的代理,一个YARN集群的每一个节点上面都有一个NM,NM有以下几个功能:

    1. 容器声明周期管理
    2. 监控每个容器的资源使用情况
    3. 以心跳的形式与RM保持通信,向RM汇报作业的资源使用情况和每个容器的运行状态。
    4. 跟踪节点的健康状况
    5. 接受来自AM的启动或者停止容器的各种请求。
    6. NM主要负责管理抽象的容器,只处理与容器相关的事情,不具体负责每个人物状态的管理。这些管理工作是由AM完成的,AM会不断与NM通信来掌握各个任务的执行状态。
  • YARN的各个组件是和hadoop集群统一部署的。

  • YARN工作流程:

    1. 用户编写客户端应用程序,向YARN提交应用程序,提交内容包括ApplicationMaster程序、AM启动命令和用户程序。
    2. RM负责接收和处理来自客户端的请求,为应用程序分配一个容器,在该容器中启动一个ApplicationMaster。ApplicationMaster也是需要在容器中执行的
    3. AM启动后,向RM注册,这样RM才能对AM进行监控。
    4. AM向RM申请资源
    5. RM以容器的形式向AM分配资源。
    6. AM获得资源后,对资源进行二次分配,将资源分配给自己管辖的各个map任务和reduce任务。然后在容器中启动任务。
    7. 执行过程中,NM不断向AM汇报任务执行情况。
    8. 程序执行完之后,AM想RM的应用程序管理器注销并关闭自己。
  • Yarn调度器Scheduler:

    理想情况下,我们应用对Yarn资源的请求应该立刻得到满足,但现实情况资源往往是有限的,特别是在一个很繁忙的集群,一个应用资源的请求经常需要等待一段时间才能的到相应的资源。在Yarn中,负责给应用分配资源的就是Scheduler。其实调度本身就是一个难题,很难找到一个完美的策略可以解决所有的应用场景。为此,Yarn提供了多种调度器和可配置的策略供我们选择。

    在Yarn中有三种调度器可以选择:FIFO SchedulerCapacity SchedulerFairS cheduler

    • FIFO Scheduler把应用按提交的顺序排成一个队列,这是一个先进先出队列,在进行资源分配的时候,先给队列中最头上的应用进行分配资源,待最头上的应用需求满足后再给下一个分配,以此类推。

      FIFO Scheduler是最简单也是最容易理解的调度器,也不需要任何配置,但它并不适用于共享集群。大的应用可能会占用所有集群资源,这就导致其它应用被阻塞。在共享集群中,更适合采用Capacity SchedulerFair Scheduler,这两个调度器都允许大任务和小任务在提交的同时获得一定的系统资源。

    • 对于Capacity调度器,有一个专门的队列用来运行小任务,但是为小任务专门设置一个队列会预先占用一定的集群资源,这就导致大任务的执行时间会落后于使用FIFO调度器时的时间。

      Capacity Scheduler 是一种多租户、弹性的分配方式。
      支持多个队列,每个队列可配置一定量的资源,每个采用FIFO的方式调度。
      每个租户一个队列,每个队列可以配置能使用的资源上限与下限(如 50%,达到这个上限后即使其他的资源空置着,也不可使用),通过配置可以令队列至少有资源下限配置的资源可使用。

    • 在Fair调度器中,我们不需要预先占用一定的系统资源,Fair调度器会为所有运行的job动态的调整系统资源。当第一个大job提交时,只有这一个job在运行,此时它获得了所有集群资源;当第二个小任务提交后,Fair调度器会分配一半资源给这个小任务,让这两个任务公平的共享集群资源。

      需要注意的是,在Fair调度器中,从第二个任务提交到获得资源会有一定的延迟,因为它需要等待第一个任务释放占用的Container。小任务执行完成之后也会释放自己占用的资源,大任务又获得了全部的系统资源。最终的效果就是Fair调度器即得到了高的资源利用率又能保证小任务及时完成。

Kafka

  • Kafka是一个分布式流处理平台。

  • 流处理平台的特性:

    1. 可以让你发布和订阅流式的记录。这一方面与消息队列或者企业消息系统类似。
    2. 可以储存流式的记录,并且有较好的容错性。
    3. 可以在流式记录产生时就进行处理
  • Kafka适用的场景:

    1. 构造实时流数据管道,它可以在系统或应用之间可靠地获取数据。 (相当于message queue)
    2. 构建实时流式应用程序,对这些流数据进行转换或者影响。 (就是流处理,通过kafka stream topic和topic之间内部进行变化)
  • Kafka作为一个集群,运行在一台或者多台服务器上

  • Kafka通过topic对存储的流数据进行分类

  • 每条记录中包含一个key,一个value和一个timestamp

  • Kafka有四个核心API:

    1. The Producer API 允许一个应用程序发布一串流式的数据到一个或者多个Kafka topic。
    2. The Consumer API 允许一个应用程序订阅一个或多个 topic ,并且对发布给他们的流式数据进行处理。
    3. The Streams API 允许一个应用程序作为一个流处理器,消费一个或者多个topic产生的输入流,然后生产一个输出流到一个或多个topic中去,在输入输出流中进行有效的转换。
    4. The Connector API 允许构建并运行可重用的生产者或者消费者,将Kafka topics连接到已存在的应用程序或者数据系统。比如,连接到一个关系型数据库,捕捉表(table)的所有变更内容。
  • Topics和日志

    Topic 就是数据主题,是流式记录,是数据记录发布的地方,可以用来区分业务系统。Kafka中的Topics总是多订阅者模式,一个topic可以拥有一个或者多个消费者来订阅它的数据。

    对于每一个topic, Kafka集群都会维持一个分区日志。每个分区都是有序且顺序不可变的记录集,并且不断地追加到结构化的commit log文件。分区中的每一个记录都会分配一个id号来表示顺序,我们称之为offset,offset用来唯一的标识分区中每一条记录。

    Kafka 集群保留所有发布的记录—无论他们是否已被消费—并通过一个可配置的参数——保留期限来控制. 举个例子, 如果保留策略设置为2天,一条记录发布后两天内,可以随时被消费,两天过后这条记录会被抛弃并释放磁盘空间。Kafka的性能和数据大小无关,所以长时间存储数据没有什么问题.

    事实上,在每一个消费者中唯一保存的元数据是offset(偏移量)即消费在log中的位置.偏移量由消费者所控制:通常在读取记录后,消费者会以线性的方式增加偏移量,但是实际上,由于这个位置由消费者控制,所以消费者可以采用任何顺序来消费记录。例如,一个消费者可以重置到一个旧的偏移量,从而重新处理过去的数据;也可以跳过最近的记录,从”现在”开始消费。

    日志中的 partition(分区)有以下几个用途。第一,当日志大小超过了单台服务器的限制,允许日志进行扩展。每个单独的分区都必须受限于主机的文件限制,不过一个主题可能有多个分区,因此可以处理无限量的数据。第二,可以作为并行的单元集—关于这一点,更多细节如下

  • 分布式

    日志的分区partition (分布)在Kafka集群的服务器上。每个服务器在处理数据和请求时,共享这些分区。每一个分区都会在已配置的服务器上进行备份,确保容错性.

    每个分区都有一台 server 作为 “leader”,零台或者多台server作为 follwers 。leader server 处理一切对 partition (分区)的读写请求,而follwers只需被动的同步leader上的数据。当leader宕机了,followers 中的一台服务器会自动成为新的 leader。每台 server 都会成为某些分区的 leader 和某些分区的 follower,因此集群的负载是平衡的。

  • 补充资料(来源:尚硅谷)

题目
题目
题目
题目
题目
题目
题目

  • Kafka写入数据流程

    题目

    我们从创建 一个 ProducerRecord 对象开始, ProducerRecord 对象需要包含目标主题和要发送的内容。我们还可以指定键或分区。在发送 ProducerRecord对象时,生产者要先把键和 值对象序列化成字节数组,这样它们才能够在网络上传输 。

    接下来,数据被传给分区器。如果之前在 ProducerRecord对象里指定了分区,那么分区器就不会再做任何事情,直接把指定的分区返回。如果没有指定分区 ,那么分区器会根据 ProducerRecord对象的键来选择一个分区 。选好分区以后 ,生产者就知道该往哪个主题和分区发送这条记录了。紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的 broker 上。

    服务器在收到这些消息时会返回一个响应。如果消息成功写入 Kafka,就返回 一 个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入 失败, 就会返回 一个错误 。生产者在收到错误之后会尝试重新发送消息,几次之后如果还是失败,就返回错误信息。

  • kafka的ack应答机制

    ack = 0:producer不等待broker中leader的ack;broker接收消息后如果还没写入本地log,broker就发生故障,可能会丢失数据;

    ack = 1:producer等待leader的ack,但是不等待replication的ack;这样partition的leader消息落盘成功后返回ack,但是在follwer备份成功之前若leader发生故障,也会丢失数据;延迟时间短但是可靠性低;

    ack = -1:producer等待leader和replication的ack,这样只有等leader中partition消息落盘成功,并且follower中replication消息备份落盘成功,才会给producer返回ack,数据一般不会丢失,延迟时间长但是可靠性高。

  • 创建kafka生产者(https://www.jianshu.com/p/26532247d4cc)

    要往 Kafka写入消息,首先要创建一个生产者对象,井设置一些属性。

    下面的代码片段展示了如何创建一个新的生产者,这里只指定了必要的属性,其他使用默认设置。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    private Properties kafkaProps = new Properties();

    kafkaProps.put("bootstrap.servers","broker1:9092,broker2:9092");

    kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");

    kafkaProps.put("value.seializer","org.apache.kafka.common.serialization.StringSerializer");

    producer = new KafkaProducer<String, String>(kafkaProps);
  • Kafka生产者有 3个必选的属性

    • bootstrap.servers

    该属性指定 broker 的地址清单,地址的格式为 host:port。清单里不需要包含所有的broker地址,生产者会从给定的 broker里查找到其他 broker的信息。不过建议至少要提供两个 broker的信息, 一旦其中一个宕机,生产者仍然能够连接到集群上。

    • key.serializer

    broker希望接收到的消息的键和值都是字节数组。生产者接口允许使用参数化类型,因此可以把 Java对象作为键和值发送给 broker。这样的代码具有良好的可读性,不过生产者需要知道如何把这些 Java对象转换成字节数组。 key.serializer必须被设置为一个实现了org.apache.kafka.common.serialization.Serializer接口的类,生产者会使用这个类把键对象序列化成字节数组。 Kafka 客户端默认提供了ByteArraySerializer(这个只做很少的事情)、 StringSerializer和 IntegerSerializer,因此,如果你只使用常见的几种 Java对象类型,那么就没必要实现自己的序列化器 。要注意, key.serializer是必须设置的,就算你打算只发送值内容。

    • value.serializer

    与 key.serializer一样, value.serializer指定的类会将值序列化。如果键和值都是字符串,可以使用与 key.serializer 一样的序列化器。如果键是整数类型而值是字符扇 , 那么需要使用不同的序列化器。

  • 同步发送消息到kafka

    1
    2
    3
    4
    5
    6
    ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
    try{
    producer.send(record).get();
    } catch(Exception e) {
    e.printStack();
    }

    在这里, producer.send() 方住先返回一个 Future对象,然后调用 Future对象的 get() 方法等待 Kafka 响应。如果服务器返回错误, get()方怯会抛出异常。如果没有发生错误,我们会得到一个 RecordMetadata对象,可以用它获取消息的偏移量。如果在发送数据之前或者在发送过程中发生了任何错误 ,比如 broker返回 了一个不允许重发消息的异常或者已经超过了重发的次数 ,那么就会抛出异常。我们只是简单地把异常信息打印出来。

  • 异步发送消息到kafka

    假设消息在应用程序和 Kafka集群之间一个来回需要 10ms。如果在发送完每个消息后都等待回应,那么发送 100个消息需要 1秒。但如果只发送消息而不等待响应,那么发送100个消息所需要的时间会少很多。大多数时候,我们并不需要等待响应——尽管 Kafka 会把目标主题、分区信息和消息的偏移量发送回来,但对于发送端的应用程序来说不是必需的。不过在遇到消息发送失败时,我们需要抛出异常、记录错误日志,或者把消息写入 “错误消息”文件以便日后分析。

    为了在异步发送消息的同时能够对异常情况进行处理,生产者提供了回调支持 。下面是使用异步发送消息、回调的一个例子。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    ProducerRecord<String, String> record = new ProducerRecord<>("CustomCountry", "Precision Products", "France");//Topic Key Value
    producer.send(record, new DemoProducerCallback());//发送消息时,传递一个回调对象,该回调对象必须实现org.apahce.kafka.clients.producer.Callback接口

    private class DemoProducerCallback implements Callback {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    if (e != null) {//如果Kafka返回一个错误,onCompletion方法抛出一个non null异常。
    e.printStackTrace();//对异常进行一些处理,这里只是简单打印出来
    }
    }
    }

    这个例子中使用了回调函数,需要一个实现了Callback接口的类,这个接口只有一个onComplement方法。如果kafka返回一个错误,onComplement方法会抛出一个非空异常

  • kafka消费过程

    • 在一些消息系统中,消息代理会在消息被消费之后立即删除消息。如果有不同类型的消费者订阅同一个主题,消息代理可能需要冗余地存储同一消息;或者等所有消费者都消费完才删除,这就需要消息代理跟踪每个消费者的消费状态,这种设计很大程度上限制了消息系统的整体吞吐量和处理延迟。Kafka的做法是生产者发布的所有消息会一致保存在Kafka集群中,不管消息有没有被消费。用户可以通过设置保留时间来清理过期的数据,比如,设置保留策略为两天。那么,在消息发布之后,它可以被不同的消费者消费,在两天之后,过期的消息就会自动清理掉。

    • 消费者群组和分区再均衡

      一个新的消费者加 入群组时,它读取的是原本由其他消费者读取的消息。当一个消费者被关闭或发生崩溃时,它就离开群组,原本由它读取的分区将由群组里的其他消费者来读取。在主题发生变化时 , 比如管理员添加了新的分区,会发生分区重分配。

      分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡。再均衡非常重要, 它为消费者群组带来了高可用性和伸缩性(我们可以放心地添加或移除消费者), 不过在正常情况下,我们并不希望发生这样的行为。在再均衡期间,消费者无法读取消息,造成整个群组一小段时间的不可用。另外,当分区被重新分配给另 一个消费者时,消费者当前的读取状态会丢失,它有可能还需要去刷新缓存 ,在它重新恢复状态之前会拖慢应用程序。我们将在本章讨论如何进行安全的再均衡,以及如何避免不必要的再均衡。

      消费者通过向被指派为 群组协调器的 broker (不同的群组可以有不同的协调器)发送 心跳 来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询消息 (为了获取消息)或提交偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡。

      如果一个消费者发生崩溃,井停止读取消息,群组协调器(broker)会等待几秒钟,确认它死亡了才会触发再均衡。在这几秒钟时间里,死掉的消费者不会读取分区里的消息。在清理消费者时,消费者会通知协调器它将要离开群组,协调器会立即触发一次再均衡,尽量降低处理停顿。在本章的后续部分,我们将讨论一些用于控制发送心跳频率和会话过期时间的配置参数,以及如何根据实际需要来配置这些参数 。

    • 分配分区是怎样的一个过程

      当消费者要加入群组时,它会向群组协调器发送 一 个 JoinGroup 请求。第 一 个加入群组的消费者将成为“群主”。群主从协调器那里获得群组的成员列 表(列表中包含了所有最近发送过心跳的消费者,它们被认为是活跃的), 并负责给每一个消费者分配分区。它使用 一个实现了 PartitionAssignor接口的类来决定哪些分 区应该被分配给哪个消费者 。

      Kafka 内置了两种分配策略,在后面的配置参数小节我们将深入讨论。分配完毕之后,群主把分配情况列表发送给群组协调器,协调器再把这些信息发送给所有消费者。每个消费者只能看到自己的分配信息,只有群 主知道群组 里所有消费者的分配信息。这个过程会在每次再均衡时重复发生。

  • 任务提交流程

    题目

    Flink任务提交后,Client向HDFS上传Flink的Jar包和配置,之后向Yarn ResourceManager提交任务,ResourceManager分配Container资源并通知对应的NodeManager启动ApplicationMaster,ApplicationMaster启动后加载Flink的Jar包和配置构建环境,然后启动JobManager,之后ApplicationMaster向ResourceManager申请资源启动TaskManager,ResourceManager分配Container资源后,由ApplicationMaster通知资源所在节点的NodeManager启动TaskManager,NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager,TaskManager启动后向JobManager发送心跳包,并等待JobManager向其分配任务。

  • 任务调度原理

    客户端不是运行时和程序执行的一部分,但它用于准备并发送dataflow(JobGraph)给Master(JobManager),然后,客户端断开连接或者维持连接以等待接收计算结果。

    当 Flink 集群启动后,首先会启动一个 JobManger 和一个或多个的 TaskManager。由 Client 提交任务给 JobManager,JobManager 再调度任务到各个 TaskManager 去执行,然后 TaskManager 将心跳和统计信息汇报给 JobManager。TaskManager 之间以流的形式进行数据的传输。上述三者均为独立的 JVM 进程。

    Client 为提交 Job 的客户端,可以是运行在任何机器上(与 JobManager 环境连通即可)。提交 Job 后,Client 可以结束进程(Streaming的任务),也可以不结束并等待结果返回。

    JobManager 主要负责调度 Job 并协调 Task 做 checkpoint,职责上很像 Storm 的 Nimbus。从 Client 处接收到 Job 和 JAR 包等资源后,会生成优化后的执行计划,并以 Task 的单元调度到各个 TaskManager 去执行。

    TaskManager 在启动的时候就设置好了槽位数(Slot),每个 slot 能启动一个 Task,Task 为线程。从 JobManager 处接收需要部署的 Task,部署启动后,与自己的上游建立 Netty 连接,接收数据并处理。

  • Worker与Slots

    每一个worker(TaskManager)是一个JVM进程,它可能会在独立的线程上执行一个或多个subtask。为了控制一个worker能接收多少个task,worker通过task slot来进行控制(一个worker至少有一个task slot)。

    每个task slot表示TaskManager拥有资源的一个固定大小的子集。假如一个TaskManager有三个slot,那么它会将其管理的内存分成三份给各个slot。资源slot化意味着一个subtask将不需要跟来自其他job的subtask竞争被管理的内存,取而代之的是它将拥有一定数量的内存储备。需要注意的是,这里不会涉及到CPU的隔离,slot目前仅仅用来隔离task的受管理的内存。

    通过调整task slot的数量,允许用户定义subtask之间如何互相隔离。如果一个TaskManager一个slot,那将意味着每个task group运行在独立的JVM中(该JVM可能是通过一个特定的容器启动的),而一个TaskManager多个slot意味着更多的subtask可以共享同一个JVM。而在同一个JVM进程中的task将共享TCP连接(基于多路复用)和心跳消息。它们也可能共享数据集和数据结构,因此这减少了每个task的负载。

    题目

    Task Slot是静态的概念,是指TaskManager具有的并发执行能力,可以通过参数taskmanager.numberOfTaskSlots进行配置,而并行度parallelism是动态概念,即TaskManager运行程序时实际使用的并发能力,可以通过参数parallelism.default进行配置。

    也就是说,假设一共有3个TaskManager,每一个TaskManager中的分配3个TaskSlot,也就是每个TaskManager可以接收3个task,一共9个TaskSlot,如果我们设置parallelism.default=1,即运行程序默认的并行度为1,9个TaskSlot只用了1个,有8个空闲,因此,设置合适的并行度才能提高效率。

    题目

    题目

  • Window

    streaming流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而window是一种切割无限数据为有限块进行处理的手段。

    Window是无限数据流处理的核心,Window将一个无限的stream拆分成有限大小的“buckets”桶,我们可以在这些桶上做计算操作。

    Window可以分成两类:

    • CountWindow:按照指定的数据条数生成一个Window,与时间无关。

    • TimeWindow:按照时间生成Window。

    对于TimeWindow,可以根据窗口实现原理的不同分成三类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)

    1. 滚动窗口(Tumbling Windows)

      将数据依据固定的窗口长度对数据进行切片

      特点时间对齐,窗口长度固定,没有重叠

      滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。例如:如果你指定了一个5分钟大小的滚动窗口,窗口的创建如下图所示:

      题目

    2. 滑动窗口(Sliding Windows)

      滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成

      特点时间对齐,窗口长度固定,有重叠

      滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。

      例如,你有10分钟的窗口和5分钟的滑动,那么每个窗口中5分钟的窗口里包含着上个10分钟产生的数据,如下图所示:

      题目

    3. 会话窗口(Session Windows)

      由一系列事件组合一个指定时间长度的timeout**间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口**。

      特点时间无对齐

      session窗口分配器通过session活动来对元素进行分组,session窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。一个session窗口通过一个session间隔来配置,这个session间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的session将关闭并且后续的元素将被分配到新的session窗口中去。

      题目