35 答疑解惑(三):主流消息队列都是如何存储消息的?

你好,我是李玥。

在我们一起做了两个实践案例以后,相信你或多或少都会有一些收获。在学习和练习这两个实践案例中,我希望你收获的不仅仅是流计算和 RPC 框架的设计实现原理,还能学会并掌握在实现这些代码过程中,我们用到的很多设计模式和编码技巧,以及代码背后无处不在的“松耦合”、“拥抱变化”这些设计思想。最重要的是,把这些学到的东西能最终用在你编写的代码中,才是真正的收获。

照例,在每一模块的最后一节课,我们安排热点问题答疑,解答同学们关注比较多的一些问题。

1. 主流消息队列都是如何存储消息的?

我在之前的课程中提到过,现代的消息队列它本质上是一个分布式的存储系统。那决定一个存储系统的性能好坏,最主要的因素是什么?就是它的存储结构。

很多大厂在面试的时候,特别喜欢问各种二叉树、红黑树和哈希表这些你感觉平时都用不到的知识,原因是什么?其实,无论是我们开发的应用程序,还是一些开源的数据库系统,在数据量达到一个量级之上的时候,决定你系统整体性能的往往就是,你用什么样的数据结构来存储这些数据。而大部分数据库,它最基础的存储结构不是树就是哈希表。

即使你不去开发一个数据库,在设计一个超大规模的数据存储的时候,你也需要掌握各种数据库的存储结构,才能选择一个适合你的业务数据的数据库产品。所以,掌握这些最基础的数据结构相关的知识,是很有必要的,不仅仅是为了应付面试。

在所有的存储系统中,消息队列的存储可能是最简单的。每个主题包含若干个分区,每个分区其实就是一个 WAL(Write Ahead Log),写入的时候只能尾部追加,不允许修改。读取的时候,根据一个索引序号进行查询,然后连续顺序往下读。

接下来我们看看,几种主流的消息队列都是如何设计它们的存储结构的。

先来看 Kafka,Kafka 的存储以 Partition 为单位,每个 Partition 包含一组消息文件(Segment file)和一组索引文件(Index),并且消息文件和索引文件一一对应,具有相同的文件名(但文件扩展名不一样),文件名就是这个文件中第一条消息的索引序号。

每个索引中保存索引序号(也就是这条消息是这个分区中的第几条消息)和对应的消息在消息文件中的绝对位置。在索引的设计上,Kafka 采用的是稀疏索引,为了节省存储空间,它不会为每一条消息都创建索引,而是每隔几条消息创建一条索引。

写入消息的时候非常简单,就是在消息文件尾部连续追加写入,一个文件写满了再写下一个文件。查找消息时,首先根据文件名找到所在的索引文件,然后用二分法遍历索引文件内的索引,在里面找到离目标消息最近的索引,再去消息文件中,找到这条最近的索引指向的消息位置,从这个位置开始顺序遍历消息文件,找到目标消息。

可以看到,寻址过程还是需要一定时间的。一旦找到消息位置后,就可以批量顺序读取,不必每条消息都要进行一次寻址。

然后我们再来看一下 RocketMQ,RocketMQ 的存储以 Broker 为单位。它的存储也是分为消息文件和索引文件,但是在 RocketMQ 中,每个 Broker 只有一组消息文件,它把在这个 Broker 上的所有主题的消息都存在这一组消息文件中。索引文件和 Kafka 一样,是按照主题和队列分别建立的,每个队列对应一组索引文件,这组索引文件在 RocketMQ 中称为 ConsumerQueue。RocketMQ 中的索引是定长稠密索引,它为每一条消息都建立索引,每个索引的长度(注意不是消息长度)是固定的 20 个字节。

写入消息的时候,Broker 上所有主题、所有队列的消息按照自然顺序追加写入到同一个消息文件中,一个文件写满了再写下一个文件。查找消息的时候,可以直接根据队列的消息序号,计算出索引的全局位置(索引序号 x 索引固定长度 20),然后直接读取这条索引,再根据索引中记录的消息的全局位置,找到消息。可以看到,这里两次寻址都是绝对位置寻址,比 Kafka 的查找是要快的。

img

对比这两种存储结构,你可以看到它们有很多共通的地方,都是采用消息文件 + 索引文件的存储方式,索引文件的名字都是第一条消息的索引序号,索引中记录了消息的位置等等。

在消息文件的存储粒度上,Kafka 以分区为单位,粒度更细,优点是更加灵活,很容易进行数据迁移和扩容。RocketMQ 以 Broker 为单位,较粗的粒度牺牲了灵活性,带来的好处是,在写入的时候,同时写入的文件更少,有更好的批量(不同主题和分区的数据可以组成一批一起写入),更多的顺序写入,尤其是在 Broker 上有很多主题和分区的情况下,有更好的写入性能。

索引设计上,RocketMQ 和 Kafka 分别采用了稠密和稀疏索引,稠密索引需要更多的存储空间,但查找性能更好,稀疏索引能节省一些存储空间,代价是牺牲了查找性能。

可以看到,两种消息队列在存储设计上,有不同的选择。大多数场景下,这两种存储设计的差异其实并不明显,都可以满足需求。但是在某些极限场景下,依然会体现出它们设计的差异。比如,在一个 Broker 上有上千个活动主题的情况下,RocketMQ 的写入性能就会体现出优势。再比如,如果我们的消息都是几个、十几个字节的小消息,但是消息的数量很多,这时候 Kafka 的稀疏索引设计就能节省非常多的存储空间。

2. 流计算与批计算的区别是什么?

有些同学在《[29 | 流计算与消息(一):通过 Flink 理解流计算的原理]》的课后留言提问,对于“按照固定的时间窗口定时汇总”的场景,流计算和批计算是不是就是一样的呢?对于这个问题,我们通过一个例子来分析一下就明白了。

比如,你要在一个学校门口开个网吧,到底能不能赚钱需要事先进行调研,看看学生的流量够不够撑起你这个网吧。然后,你就蹲在学校门口数人头,每过来一个学生你就数一下,数一下一天中每个小时会有多少个学生经过,这是流计算。你还可以放个摄像头,让它自动把路过的每个人都拍下来,然后晚上回家再慢慢数这些照片,这就是批计算。简单地说,流计算就是实时统计计算,批计算则是事后统计计算,这两种方式都可以统计出每小时的人流量。

那这两种方式哪种更好呢?还是那句话,看具体的使用场景和需求。流计算的优势就是实时统计,每到整点的时候,上一个小时的人流量就已经数出来了。在 T+0 的时刻就能第一时间得到统计结果,批计算相对就要慢一些,它最早在 T+0 时刻才开始进行统计,什么时候出结果取决于统计的耗时。

但是,流计算也有它的一些不足,比如说,你在数人头的时候突然来了个美女,你多看了几眼,漏数了一些人怎么办?没办法,明天再来重新数吧。也就是说,对于流计算的故障恢复还是一个比较难解决的问题。

另外,你数了一整天人头,回去做分析的时候才发现,去网吧的大多数都是男生,所以你需要统计的是在校男生,而不是所有人的数量。这时候,如果你保存了这一天所有人的照片,那你重新数一遍照片就可以了,否则,你只能明天上街再数一次人头。这个时候批计算的优势就体现出来了,因为你有原始数据,当需求发生变化的时候,你可以随时改变算法重新计算。

总结下来,大部分的统计分析类任务,使用流计算和批计算都可以实现。流计算具有更好的实时性,而批计算可靠性更好,并且更容易应对需求变化。所以,大部分针对海量数据的统计分析,只要是对实时性要求没有那么高的场景,大多采用的还是批计算的方式。

3. RPC 框架的 JDBC 注册中心

上节课《[34 | 动手实现一个简单的 RPC 框架(四):服务端]》的课后思考题,要求你基于 JDBC 协议实现一个注册中心,这样就可以支持跨服务器来访问注册中心。这个作业应该是我们这个系列课程中比较难的一个作业了,我在这里也给出一个实现供你参考。

这个参考实现的代码同样在放在 GitHub 上,你可以在这里查看或者下载,它和之前的 RPC 框架是同一个项目的不同分支,分支名称是 jdbc-nameservice。同样,我把如何设置环境,编译代码,启动数据库,运行这个 RPC 框架示例的方法都写在了 README 中,你可以参照运行。

相比于原版的 RPC 框架,我们增加了一个单独的 Module:jdbc-nameservice,也就是 JDBC 版的注册中心的实现。这个实现中,只有一个类 JdbcNameService,和 LocalFileNameService 一样,他们都实现了 NameService 接口。在 JdbcNameService 这个注册中心实现中,它提供 JDBC 协议的支持,注册中心的元数据都存放在数据库中。

我们这个思考题,其中的一个要求就是,能兼容所有支持 JDBC 协议的数据库。虽然 JDBC 的协议是通用的,但是每种数据库支持 SQL 的语法都不一样,所以,我们这里把 SQL 语句作为一种资源文件从源代码中独立出来,这样确保源代码能兼容所有的 JDBC 数据库。不同类型的数据的 SQL 语句,可以和数据库的 JDBC 驱动一样,在运行时来提供就可以了。

这个数据库中,我们只需要一张表就够了,这里面我们的表名是 rpc_name_service,表结构如下:

img

为了能自动根据数据库类型去加载对应的 sql,我们规定 sql 文件的名称为:[SQL 名] [数据库类型].sql。比如我们使用的 HSQLDB 自动建表的 SQL 文件,它的文件名就是:ddl.hsqldb.sql。 JdbcNameService 这个类的实现就比较简单了,在 connect 方法中去连接数据库,如果 rpc_name_service 不存在,就创建这个表。在 registerService 中往数据库中插入或者更新一条数据,在 lookupService 中去数据库查询对应服务名的 URI。

在使用的时候,还需要在 CLASSPATH 中包含下面几个文件:

  1. add-service.[数据库类型].sql
  2. lookup-service.[数据库类型].sql
  3. ddl.[数据库类型].sql
  4. 数据库的 JDBC 驱动 JAR 文件。

在我们这个实现中,已经包含了 HSQLDB 这种数据库的 SQL 文件和驱动,你也可以尝试提供 MySQL 的 SQL 文件和驱动,就可以使用 MySQL 作为注册中心的数据库了。

4. 完成作业的最佳姿势

我们案例篇的几个编码的作业,都是基于课程中讲解的代码进行一些修改和扩展,很多同学在留言区分享了代码。为了便于你修改和分享代码,建议你使用 GitHub 的 Fork 功能,用法也很简单,在示例项目的 GitHub 页面的右上角,有一个 Frok 按钮,点击之后,会在你自己的 GitHub 账号下面创建一份这个项目的副本,你可以在这个副本上进行修改和扩展来完成你的作业,最后直接分享这个副本的项目就可以了。

总结

以上就是我们这次热点问题答疑的全部内容了,同时我们这个系列课程的最后一篇:案例篇到这里也就结束了。

这个案例篇模块不同于前两个模块,之前主要是讲解一些消息队列相关的实现原理、知识和方法技巧等等,案例篇的重点还是来通过实际的案例,来复习和练习前两篇中涉及到的一些知识。我们案例篇中每节课的作业,大多也都是需要你来写一些代码。

希望你在学习案例篇的时候,不要只是听和看,更重要的就是动手来写代码,通过练习把学到的东西真正的消化掉。也欢迎你在评论区留言,分享你的代码。

加餐 JMQ的Broker是如何异步处理消息的?

你好,我是李玥。

我们的课程更新到进阶篇之后,通过评论区的留言,我看到有一些同学不太理解,为什么在进阶篇中要讲这些“看起来和消息队列关系不大的”内容呢?

在这里,我跟你分享一下这门课程的设计思路。我们这门课程的名称是“消息队列高手课”,我希望你在学习完这门课程之后,不仅仅只是成为一个使用消息队列的高手,而是设计和实现消息队列的高手。所以我们在设计课程的时候,分了基础篇、进阶篇和案例篇三部分。

基础篇中我们给大家讲解消息队列的原理和一些使用方法,重点是让大家学会使用消息队列。

你在进阶篇中,我们课程设计的重点是讲解实现消息队列必备的技术知识,通过分析源码讲解消息队列的实现原理。希望你通过进阶篇的学习能够掌握到设计、实现消息队列所必备的知识和技术,这些知识和技术也是设计所有高性能、高可靠的分布式系统都需要具备的。

进阶篇的上半部分,我们每一节课一个专题,来讲解设计实现一个高性能消息队列,必备的技术和知识。这里面每节课中讲解的技术点,不仅可以用来设计消息队列,同学们在设计日常的应用系统中也一定会用得到。

前几天我在极客时间直播的时候也跟大家透露过,由我所在的京东基础架构团队开发的消息队列 JMQ,它的综合性能要显著优于目前公认性能非常好的 Kafka。虽然在开发 JMQ 的过程中有很多的创新,但是对于性能的优化这块,并没有什么全新的划时代的新技术,JMQ 之所以能做到这样的极致性能,靠的就是合理地设计和正确地使用已有的这些通用的底层技术和优化技巧。我把这些技术和知识点加以提炼和总结,放在进阶篇的上半部分中。

进阶篇的下半部分,我们主要通过分析源码的方式,来学习优秀开源消息队列产品中的一些实现原理和它们的设计思想。

在最后的案例篇,我会和大家一起,利用进阶篇中学习的知识,一起来开发一个简单的 RPC 框架。为什么我们要开发一个 RPC 框架,而不是一个消息队列?这里面就是希望大家不只是机械的去学习,仅仅是我告诉这个问题怎么解决,你就只是学会了这个问题怎么解决,而是能做到真正理解原理,掌握知识和技术,并且能融会贯通,灵活地去使用。只有这样,你才是真的“学会了”。

有的同学在看了进阶篇中已更新的这几节课程之后,觉得只讲技术原理不过瘾,希望能看到这些技术是如何在消息队列中应用并落地的,看到具体的实现和代码,所以我以京东 JMQ 为例,将这些基础技术点在消息队列实现中的应用讲解一下。

JMQ 的 Broker 是如何异步处理消息的?

对于消息队列的 Broker,它最核心的两个流程就是接收生产者发来的消息,以及给消费者发送消息。后者的业务逻辑相对比较简单,影响消息队列性能的关键,就是消息生产的这个业务流程。在 JMQ 中,经过优化后的消息生产流程,实测它每秒钟可以处理超过 100 万次请求。

我们在之前的课程中首先讲了异步的设计思想,这里给你分享一下我在设计这个流程时,是如何来将异步的设计落地的。

消息生产的流程需要完成的功能是这样的:

img

  • 首先,生产者发送一批消息给 Broker 的主节点;
  • Broker 收到消息之后,会对消息做一系列的解析、检查等处理;
  • 然后,把消息复制给所有的 Broker 从节点,并且需要把消息写入到磁盘中;
  • 主节点收到大多数从节点的复制成功确认后,给生产者回响应告知消息发送成功。

由于使用各种异步框架或多或少都会有一些性能损失,所以我在设计这个流程的时候,没有使用任何的异步框架,而是自行设计一组互相配合的处理线程来实现,但使用的异步设计思想和我们之前课程中所讲的是一样的。

对于这个流程,我们设计的线程模型是这样的:

img

图中白色的细箭头是数据流,蓝色的箭头是控制流,白色的粗箭头代表远程调用。蓝白相间的方框代表的是处理的步骤,我在蓝色方框中标注了这个步骤是在什么线程中执行的。圆角矩形代表的是流程中需要使用的一些关键的数据结构。

这里我们设计了 6 组线程,将一个大的流程拆成了 6 个小流程。并且整个过程完全是异步化的。

流程的入口在图中的左上角,Broker 在收到来自生产者的发消息请求后,会在一个 Handler 中处理这些请求,这和我们在普通的业务系统中,用 Handler 接收 HTTP 请求是一样的,执行 Handler 中业务逻辑使用的是 Netty 的 IO 线程。

收到请求后,我们在 Handler 中不做过多的处理,执行必要的检查后,将请求放到一个内存队列中,也就是图中的 Requests Queue。请求被放入队列后,Handler 的方法就结束了。可以看到,在 Handler 中只是把请求放到了队列中,没有太多的业务逻辑,这个执行过程是非常快的,所以即使是处理海量的请求,也不会过多的占用 IO 线程。

由于要保证消息的有序性,整个流程的大部分过程是不能并发的,只能单线程执行。所以,接下来我们使用一个线程 WriteThread 从请求队列中按照顺序来获取请求,依次进行解析请求等其他的处理逻辑,最后将消息序列化并写入存储。序列化后的消息会被写入到一个内存缓存中,就是图中的 JournalCache,等待后续的处理。

执行到这里,一条一条的消息已经被转换成一个连续的字节流,每一条消息都在这个字节流中有一个全局唯一起止位置,也就是这条消息的 Offset。后续的处理就不用关心字节流中的内容了,只要确保这个字节流能快速正确的被保存和复制就可以了。

这里面还有一个工作需要完成,就是给生产者回响应,但在这一步,消息既没有落盘,也没有完成复制,还不能给客户端返回响应,所以我们把待返回的响应按照顺序放到一个内存的链表 Pending Callbacks 中,并记录每个请求中的消息对应的 Offset。

然后,我们有 2 个线程,FlushThread 和 ReplicationThread,这两个线程是并行执行的,分别负责批量异步进行刷盘和复制,刷盘和复制又分别是 2 个比较复杂的流程,我们暂时不展开讲。刷盘线程不停地将新写入 Journal Cache 的字节流写到磁盘上,完成一批数据的刷盘,它就会更新一个刷盘位置的内存变量,确保这个刷盘位置之前数据都已经安全的写入磁盘中。复制线程的逻辑也是类似的,同样维护了一个复制位置的内存变量。

最后,我们设计了一组专门用于发送响应的线程 ReponseThreads,在刷盘位置或者复制位置更新后,去检查待返回的响应链表 Pending Callbacks,根据 QOS 级别的设置(因为不同 QOS 基本对发送成功的定义不一样,有的设置需要消息写入磁盘才算成功,有的需要复制完成才算成功),将刷盘位置或者复制位置之前所有响应,以及已经超时的响应,利用这组线程 ReponseThreads 异步并行的发送给各个客户端。

这样就完成了消息生产这个流程。整个流程中,除了 JournalCache 的加载和卸载需要对文件加锁以外,没有用到其他的锁。每个小流程都不会等待其他流程的共享资源,也就不用互相等待资源(没有数据需要处理时等待上游流程提供数据的情况除外),并且只要有数据就能第一时间处理。

这个流程中,最核心的部分在于 WriteThread 执行处理的这个步骤,对每条消息进行处理的这些业务逻辑,都只能在 WriteThread 中单线程执行,虽然这里面干了很多的事儿,但是我们确保这些逻辑中,没有缓慢的磁盘和网络 IO,也没有使用任何的锁来等待资源,全部都是内存操作,这样即使单线程可以非常快速地执行所有的业务逻辑。

这个里面有很重要的几点优化:

  • 一是我们使用异步设计,把刷盘和复制这两部分比较慢的操作从这个流程中分离出去异步执行;
  • 第二是,我们使用了一个写缓存 Journal Cache 将一个写磁盘的操作,转换成了一个写内存的操作,来提升数据写入的性能,关于如何使用缓存,后面我会专门用一节课来讲;
  • 第三是,这个处理的全流程是近乎无锁的设计,避免了线程因为等待锁导致的阻塞;
  • 第四是,我们把回复响应这个需要等待资源的操作,也异步放到其他的线程中去执行。

你看,一个看起来很简单的接收请求写入数据并回响应的流程,需要涉及的技术包括:异步的设计、缓存设计、锁的正确使用、线程协调、序列化和内存管理,等等。你需要对这些技术都有深入的理解,并合理地使用,才能在确保逻辑正确、数据准确的前提下,做到极致的性能。这也是为什么我们在课程的进阶篇中,用这么多节课来逐一讲解这些“看起来和消息队列没什么关系”的知识点和技术。

我也希望同学们在学习这些知识点的时候,不仅仅只是记住了,能说出来,用于回答面试问题,还要能真正理解这些知识点和技术背后深刻的思想,并使用在日常的设计和开发过程中。

比如说,在面试的时候,很多同学都可以很轻松地讲 JVM 内存结构,也知道怎么用 jstat、jmap、jstack 这些工具来查看虚拟机的状态。但是,当我给出一个有内存溢出的问题程序和源代码,让他来分析原因并改正的时候,却很少有人能给出正确的答案。在我看来,对于 JVM 这些基础知识,这样的同学他以为自己已经掌握了,但是,无法领会技术背后的思想,做不到学以致用,那还只是别人知识,不是你的。

再比如,我下面要说的这个俩大爷的作业,你是真的花时间把代码写出来了,还只是在脑子想了想怎么做,就算完成了呢?

俩大爷的思考题

我们在进阶篇的开始,花了 4 节课的内容,来讲解如何实现高性能的异步网络通信,在《13 | 传输协议:应用程序之间对话的语言》中,我给大家留了一个思考题:写一个程序,让俩大爷在胡同口遇见 10 万次并记录下耗时。

有几个同学在留言区分享了自己的代码,每一个同学分享的代码我都仔细读过,有的作业实现了异步的网络通信,有的作业序列化和协议设计实现得很好,但很遗憾的是,没有一份作业能在序列化、协议设计和异步网络传输这几方面都做到我期望的水平。

在这个作业中,应用到了我们进阶篇中前四节课讲到的几个知识点:

  • 使用异步设计的方法;
  • 异步网络 IO;
  • 专用序列化、反序列化方法;
  • 设计良好的传输协议;
  • 双工通信。

这里面特别是双工通信的方法,大家都没能正确的实现。所以,这些作业的实际执行性能也没能达到一个应有的水平。

这里,我也给出一个作业的参考实现,我们用 Go 语言来实现这个作业:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
package main



import (

"encoding/binary"

"fmt"

"io"

"net"

"sync"

"time"

)



var count = uint32(0) // 俩大爷已经遇见了多少次

var total = uint32(100000) // 总共需要遇见多少次



var z0 = " 吃了没,您吶?"

var z3 = " 嗨!吃饱了溜溜弯儿。"

var z5 = " 回头去给老太太请安!"

var l1 = " 刚吃。"

var l2 = " 您这,嘛去?"

var l4 = " 有空家里坐坐啊。"



var liWriteLock sync.Mutex // 李大爷的写锁

var zhangWriteLock sync.Mutex // 张大爷的写锁



type RequestResponse struct {

Serial uint32 // 序号

Payload string // 内容

}



// 序列化 RequestResponse,并发送

// 序列化后的结构如下:

// 长度 4 字节

// Serial 4 字节

// PayLoad 变长

func writeTo(r *RequestResponse, conn *net.TCPConn, lock *sync.Mutex) {

lock.Lock()

defer lock.Unlock()

payloadBytes := []byte(r.Payload)

serialBytes := make([]byte, 4)

binary.BigEndian.PutUint32(serialBytes, r.Serial)

length := uint32(len(payloadBytes) + len(serialBytes))

lengthByte := make([]byte, 4)

binary.BigEndian.PutUint32(lengthByte, length)



conn.Write(lengthByte)

conn.Write(serialBytes)

conn.Write(payloadBytes)

// fmt.Println(" 发送: " + r.Payload)

}



// 接收数据,反序列化成 RequestResponse

func readFrom(conn *net.TCPConn) (*RequestResponse, error) {

ret := &RequestResponse{}

buf := make([]byte, 4)

if _, err := io.ReadFull(conn, buf); err != nil {

return nil, fmt.Errorf(" 读长度故障:%s", err.Error())

}

length := binary.BigEndian.Uint32(buf)

if _, err := io.ReadFull(conn, buf); err != nil {

return nil, fmt.Errorf(" 读 Serial 故障:%s", err.Error())

}

ret.Serial = binary.BigEndian.Uint32(buf)

payloadBytes := make([]byte, length-4)

if _, err := io.ReadFull(conn, payloadBytes); err != nil {

return nil, fmt.Errorf(" 读 Payload 故障:%s", err.Error())

}

ret.Payload = string(payloadBytes)

return ret, nil

}



// 张大爷的耳朵

func zhangDaYeListen(conn *net.TCPConn) {

for count < total {

r, err := readFrom(conn)

if err != nil {

fmt.Println(err.Error())

break

}

// fmt.Println(" 张大爷收到:" + r.Payload)

if r.Payload == l2 { // 如果收到:您这,嘛去?

go writeTo(&RequestResponse{r.Serial, z3}, conn, &zhangWriteLock) // 回复:嗨!吃饱了溜溜弯儿。

} else if r.Payload == l4 { // 如果收到:有空家里坐坐啊。

go writeTo(&RequestResponse{r.Serial, z5}, conn, &zhangWriteLock) // 回复:回头去给老太太请安!

} else if r.Payload == l1 { // 如果收到:刚吃。

// 不用回复

} else {

fmt.Println(" 张大爷听不懂:" + r.Payload)

break

}

}

}



// 张大爷的嘴

func zhangDaYeSay(conn *net.TCPConn) {

nextSerial := uint32(0)

for i := uint32(0); i < total; i++ {

writeTo(&RequestResponse{nextSerial, z0}, conn, &zhangWriteLock)

nextSerial++

}

}



// 李大爷的耳朵,实现是和张大爷类似的

func liDaYeListen(conn *net.TCPConn, wg *sync.WaitGroup) {

defer wg.Done()

for count < total {

r, err := readFrom(conn)

if err != nil {

fmt.Println(err.Error())

break

}

// fmt.Println(" 李大爷收到:" + r.Payload)

if r.Payload == z0 { // 如果收到:吃了没,您吶?

writeTo(&RequestResponse{r.Serial, l1}, conn, &liWriteLock) // 回复:刚吃。

} else if r.Payload == z3 {

// do nothing

} else if r.Payload == z5 {

//fmt.Println(" 俩人说完走了 ")

count++

} else {

fmt.Println(" 李大爷听不懂:" + r.Payload)

break

}

}

}



// 李大爷的嘴

func liDaYeSay(conn *net.TCPConn) {

nextSerial := uint32(0)

for i := uint32(0); i < total; i++ {

writeTo(&RequestResponse{nextSerial, l2}, conn, &liWriteLock)

nextSerial++

writeTo(&RequestResponse{nextSerial, l4}, conn, &liWriteLock)

nextSerial++

}

}



func startServer() {

tcpAddr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:9999")

tcpListener, _ := net.ListenTCP("tcp", tcpAddr)

defer tcpListener.Close()

fmt.Println(" 张大爷在胡同口等着 ...")

for {

conn, err := tcpListener.AcceptTCP()

if err != nil {

fmt.Println(err)

break

}

fmt.Println(" 碰见一个李大爷:" + conn.RemoteAddr().String())

go zhangDaYeListen(conn)

go zhangDaYeSay(conn)

}



}



func startClient() {

var tcpAddr *net.TCPAddr

tcpAddr, _ = net.ResolveTCPAddr("tcp", "127.0.0.1:9999")

conn, _ := net.DialTCP("tcp", nil, tcpAddr)



defer conn.Close()

var wg sync.WaitGroup

wg.Add(1)

go liDaYeListen(conn, &wg)

go liDaYeSay(conn)

wg.Wait()

}



func main() {

go startServer()

time.Sleep(time.Second)

t1 := time.Now()

startClient()

elapsed := time.Since(t1)

fmt.Println(" 耗时: ", elapsed)

}

在我的 Mac 执行 10 万次大约需要不到 5 秒钟:

1
2
3
4
5
6
7
go run hutong.go

张大爷在胡同口等着 ...

碰见一个李大爷:127.0.0.1:50136

耗时: 4.962786896s

在这段程序里面,我没有对程序做任何特殊的性能优化,只是使用了我们之前四节课中讲到的,上面列出来的那些知识点,完成了一个基本的实现。

在这段程序中,我们首先定义了 RequestResponse 这个结构体,代表请求或响应,它包括序号和内容两个字段。readFrom 方法的功能是,接收数据,反序列化成 RequestResponse。

协议的设计是这样的:首先用 4 个字节来标明这个请求的长度,然后用 4 个字节来保存序号,最后变长的部分就是大爷说的话。这里面用到了使用前置长度的方式来进行断句,这种断句的方式我在之前的课程中专门讲到过。

这里面我们使用了专有的序列化方法,原因我在之前的课程中重点讲过,专有的序列化方法具备最好的性能,序列化出来的字节数也更少,而我们这个作业比拼的就是性能,所以在这个作业中采用这种序列化方式是最合适的选择。

zhangDaYeListen 和 liDaYeListen 这两个方法,它们的实现是差不多的,就是接收对方发出的请求,然后给出正确的响应。zhangDaYeSay 和 liDaYeSay 这两个方法的实现也是差不多的,当俩大爷遇见后,就开始不停地说各自的请求,并不等待对方的响应,连续说 10 万次。

这 4 个方法,分别在 4 个不同的协程中并行运行,两收两发,实现了全双工的通信。在这个地方,不少同学还是摆脱不了“一问一答,再问再答”这种人类交流的自然方式对思维的影响,写出来的依然是单工通信的程序,单工通信的性能是远远不如双工通信的,所以,要想做到比较好的网络传输性能,双工通信的方式才是最佳的选择。

为了避免并发向同一个 socket 中写入造成数据混乱,我们给俩大爷分别定义了一个写锁,确保每个大爷同一时刻只能有一个协程在发送数据。后面的课程中,我们会专门来讲,如何正确地使用锁。

最后,我们给张大爷定义为服务端,李大爷定义为客户端,连接建立后,分别开启两个大爷的耳朵和嘴,来完成这 10 万次遇见。

结束语 程序员如何构建知识体系?

你好,我是李玥。

在课程即将结束的时候,我们不聊技术本身,我想坐下来,跟你聊聊怎么来构建个人的技术知识体系。

现在做技术的人普遍都有一种焦虑,相信你也或多或少有一点,焦虑什么呢?总是感觉,自己不懂的技术太多了。虽然你不停地去学习,拼命地扩充自己的技术栈,但是面对不断出现的新技术,学习的速度永远赶不上新技术发展的速度,就会感觉自己不会的东西越来越多,这其实就是一种技术焦虑。

焦虑的来源是什么?焦虑,其实是对某些不好的事情过度担心而产生的一种烦躁情绪。这种担心更多来源于“看不清”或者说是“未知”,人的本能就是对未知的事物会有莫名的恐惧。比如,我小时候考试考得不好,拿着成绩单回家的路上是最焦虑的时候,因为我不知道我爸妈看到成绩之后会不会给我一顿胖揍。成绩单交给爸妈之后,即使被揍了,也不再焦虑了,当然屁股痛是另外一回事儿。

对于技术焦虑来说,你所担心的“不好的事情”,其实就是担心自己技术成长跟不上技术环境的发展速度。就像一场赛跑,赛道是无数条路,所有人都在不同的路上拼命地往前跑,你不知道别人跑到哪儿了,也不知道还有多远才能终点,不焦虑才怪。解决焦虑的办法是,给你一个导航,你能看到自己处在什么位置,前面的路是什么样的,应该怎么来走,焦虑也就解除了。

缓解技术焦虑的“导航”是什么?如果你能跳出来,看清整个技术体系全貌,知道你自己的技术栈在这个技术体系中的位置,了解自己的长处和短板,也就不再焦虑了。

我们可以把整个技术体系理解为一个超大的倒立的锥形体,上大下小。这个锥形,**越靠上越偏重于应用,或者说偏重于业务,越靠下,越偏重于基础技术和理论。**整个技术知识结构是这样的模式,组成这个技术模型的每个技术点也呈现这样的状态。比如消息队列,就是整个技术体系中的一小块,它也是一个倒立的锥形。

img

最上层是消息队列相关的生态系统,这个里面涉及到的技术就非常多了,包括怎么和流计算配合,怎么和微服务配合,怎么来实现云原生等等。再往下一层,是各种消息队列产品,这里面任何一种消息队列产品,你要想把它学到精通,都需要花很多精力。

这个锥形越往下层,涉及到的技术就越少。比如说,消息队列的实现原理,我们这一整门课也就差不多讲全了。它用到的底层技术,就是异步、并发、锁等。直到这个锥形的尖尖,就一个数据结构,也是所有消息队列的理论基础:“队列”这个数据结构。

在回到宏观层面来看这个大锥形,虽然它越来越大,但是,新增的部分都在哪儿?都在上面是不是?也就是说,这个大锥形它上面的大饼越摊越大,但是底下的部分,其实变化很少。虽然计算机相关的科学也只有几十年的历史,但是,近二十几年,基础理论方面几乎没有任何突破性的进展,也即是说这个大锥形的尖尖,二十年没变过。我十几年前大学本科学的课程,和现在在校大学生学的课程相比,基本没什么变化,还是编译原理、图论这些课。

看清了技术体系的整体,再来看你自身这个个体。对于整个技术体系这个超大的锥形体,我们每个人能掌握的,也就是你个人的技术栈,也就只有其中很小的一部分。

你可能学了很多技术,包括大学里面教的基础理论知识、工作主要用的编程语言和一些框架等、为了面试,刷了好多的架构和算法题。你是不是感觉,这三部分完全没有任何关系?大学的课程早就忘得差不多了,因为工作中基本用不上;工作中每天用到的就是这点儿框架和增删改查,做得很熟练了,也没什么挑战;刷题的那些算法也仅仅是用来面试而已。

原因是什么?因为你的技术栈还没有打通形成体系,是断层的。这些知识其实是有联系的,无论你开发的是什么应用,使用什么编程语言,都免不了要使用一些基础组件或者存储系统,实现这些基础组件必然会用到一些设计模式、各种算法,那这些模式和算法,它的理论基础就是你在大学中学习的那些图论、计算机组成原理等等这些课中涉及的知识。所以说,并不是这些知识你用不到,而是你的知识体系没有建立起来。

那一个好的、成体系的技术栈应该是什么样的呢?应该是,“基础深厚,涉猎广泛,融汇贯通”。

把你个人的技术栈放到大锥形体中,应该像一个头向下倒立的鱿鱼。我们都知道,鱿鱼脑袋又大又尖,须子又多又长。把鱿鱼倒过来,它脑袋要尽量塞满这个大锥形的底部,也就是说,底层的大部分基础知识你要掌握。

向上延伸的很多触手,代表整个技术体系的最上层的众多领域中,其中的几个领域你也是要掌握的。并且,自上而下,最好不要有断层,上层你掌握的技术不能只是浮于表面,而是要足够的深入,深入到与你掌握的底层技术连通起来,代表你的知识体系是贯通的。

举个例子,比如你写了一段代码,往数据库中写了一条数据。你编写的程序,它在运行时是怎么存储和传输这条数据的?数据是如何从你的程序传递给数据库的?数据在数据库中是如何处理并存储的?数据库又是怎么把数据保存到磁盘上的?数据在磁盘上是以什么形式保存的?如果你可以回答出这些问题,那代表在这方面你的知识体系自上而下已经打通了。

img

这样的个人技术体系它有什么好处呢?你已经掌握的每项应用技术,都是你实际工作中最常用的东西,你掌握的足够深入,设计出来的技术方案或者写出来的代码质量就更高,遇到相关的疑难问题也不至于难倒你。有一个非常好的基础,你学习新的应用技术也会非常快,因为你只要学习它上层那部分就可以了,底层的技术很多是相通的。

就像《消息队列高手课》专栏,我们用 30 节课的时间,讲了从消息队列的应用到实现原理、再到底层技术。然后,我们在实践篇来开发 RPC 框架的时候,你会发现它和消息队列用到的很多底层技术就是一样的,那你学起来就会很容易很快。对于 RPC 框架这个知识体系,很多底层基础技术你都已经掌握了,你真正需要学习的,新的知识相对就会少一些。

那么,我们应该怎么样来构建自己的技术知识体系呢?这里我给你一些建议。

我们大部分程序员的成长轨迹都是差不多的。大学学习的那些计算机专业课,除了你学的那门编程语言以外,其他的专业课毕业之后大概率是用不到的。原因是,这些技术在整个技术体系中,基本上处于最底层。而我们毕业后,无论是做 Web 开发、做 APP 开发还是搞机器学习等等,在技术锥形结构中都是最上层的应用技术,距离底层的技术太远,联系不上。

这个阶段我给你的建议是,在一个技术领域内尽量扩展你的技术广度。比如,你是做 Web 开发的,你可以尝试多学多用相关的各种技术,像各种 Web 框架、HTTP 协议、JSON、数据库应用、相关的各种中间件和组件。你不要今天学了 Spring,明天又去学机器学习,后天再去学安卓,这样跨度太大很难建立体系。

当你在某个技术领域的技术广度足够了,达到一个什么程度呢?这个领域内常用的技术你都会使用,随便一个技术,你都知道,或者即使你不知道,简单看一下也能很快明白,了解这个技术在这个技术领域内是哪一类的,至少你知道的某个技术和它是类似的。

这个时候,你就可以去深入的,有目的的去找领域内最核心的几个技术,去深入地学习它的实现原理和底层技术。比如,你是做 Web 开发的,你可以去学习某个 Web 框架的实现原理,学习 Web 容器的工作原理、学习数据库的存储结构等等。当你把这个技术领域内的大部分技术研究到足够深入,能够联系起来,你在这个技术领域内的个人技术体系就建好了,你的第一条“小鱿鱼”就形成了。

所谓万事开头难,这一步其实是最难的,但是只要迈过这个门槛,后面的过程就相对简单了。你这个小鱿鱼可能个头很小,须子也不多,头扎的也不够深,但是没关系,只要是建立起了知识体系,这些问题都不是关键问题,都可以通过不断的学习来逐步成长。

建立起第一个小鱿鱼之后,你可以选择继续扩展你知识体系的广度和深度,养肥养大这条小鱿鱼。也可以换一个新的技术方向,再养一条新的小鱿鱼。随着你涉猎的技术广度和深度逐步增加,这些孤立的小鱿鱼,总会在底层的某个地方连通起来,变成一条大鱿鱼。当你个人的技术体系足够丰满了之后,大部分新技术对你来说不过都是一根儿鱿鱼须子而已。你了解了技术全景,再构建起你个人的技术体系,自然就不再焦虑了。

所谓,“大道至简,知易行难”。道理总是听起来很简单,真正能理解道理,落到实践中去,还是非常难的。在实践的过程中,可能有很多繁琐的、琐碎的问题都需要你去解决。希望你在构建自己的知识体系过程中,能沉下心,坚持去学习、练习和试错。勤勉之道无他,在有恒而已,愿你我共勉。

“送君千里,终有一别”,我们的《消息队列高手课》也该说再见了。在专栏开始更新时,我曾鼓励你立 Flag,不知道在这三个月的时间里,你是否坚持学习了每一节课?有没有离当初的目标更近一些?希望你能坚定不移地朝自己的目标走去,无悔初心。

感谢你的一路支持,专栏虽然更新结束,但我们依旧可以在留言区继续交流技术!最后,也祝福你不仅能成为消息队列的高手,还可以通过对消息队列的学习,打通任督二脉,不仅能在职场上披襟斩棘,更能实现你的技术梦想!