【mq】从零开始实现 mq-01-生产者、消费者启动

首页 / 新闻资讯 / 正文

MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据结构。

指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。

消费者可以到指定队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息。

MQ 的作用?

消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。

解耦:一个业务需要多个模块共同实现,或者一条消息有多个系统需要对应处理,只需要主业务完成以后,发送一条MQ,其余模块消费MQ消息,即可实现业务,降低模块之间的耦合。

异步:主业务执行结束后从属业务通过MQ,异步执行,减低业务的响应时间,提高用户体验。

削峰:高并发情况下,业务异步处理,提供高峰期业务处理能力,避免系统瘫痪。

ps: 以上内容摘选自百科。

maven 引入

<dependency>     <groupId>io.netty</groupId>     <artifactId>netty-all</artifactId>     <version>4.1.42.Final</version> </dependency>  <dependency>     <groupId>com.alibaba</groupId>     <artifactId>fastjson</artifactId>     <version>1.2.76</version> </dependency>

模块划分

The message queue in java. 作为 mq 的从零开始的学习项目,目前已开源。

项目的模块如下:

模块 说明
mq-common 公共代码
mq-broker 注册中心
mq-producer 消息生产者
mq-consumer 消息消费者

接口定义

package com.github.houbb.mq.consumer.api;  /**  * @author binbin.hou  * @since 1.0.0  */ public interface IMqConsumer {      /**      * 订阅      * @param topicName topic 名称      * @param tagRegex 标签正则      */     void subscribe(String topicName, String tagRegex);      /**      * 注册监听器      * @param listener 监听器      */     void registerListener(final IMqConsumerListener listener);  }

IMqConsumerListener 作为消息监听类的接口,定义如下:

public interface IMqConsumerListener {       /**      * 消费      * @param mqMessage 消息体      * @param context 上下文      * @return 结果      */     ConsumerStatus consumer(final MqMessage mqMessage,                             final IMqConsumerListenerContext context);  }

ConsumerStatus 代表消息消费的几种状态。

消息体

启动消息体 MqMessage 定义如下:

package com.github.houbb.mq.common.dto;  import java.util.Arrays; import java.util.List;  /**  * @author binbin.hou  * @since 1.0.0  */ public class MqMessage {      /**      * 标题名称      */     private String topic;      /**      * 标签      */     private List<String> tags;      /**      * 内容      */     private byte[] payload;      /**      * 业务标识      */     private String bizKey;      /**      * 负载分片标识      */     private String shardingKey;      // getter&setter&toString  }

push 消费者策略实现

消费者启动的实现如下:

/**  * 推送消费策略  *  * @author binbin.hou  * @since 1.0.0  */ public class MqConsumerPush extends Thread implements IMqConsumer  {      // 省略...      @Override     public void run() {         // 启动服务端         log.info("MQ 消费者开始启动服务端 groupName: {}, port: {}, brokerAddress: {}",                 groupName, port, brokerAddress);          EventLoopGroup bossGroup = new NioEventLoopGroup();         EventLoopGroup workerGroup = new NioEventLoopGroup();          try {             ServerBootstrap serverBootstrap = new ServerBootstrap();             serverBootstrap.group(workerGroup, bossGroup)                     .channel(NioServerSocketChannel.class)                     .childHandler(new ChannelInitializer<Channel>() {                         @Override                         protected void initChannel(Channel ch) throws Exception {                             ch.pipeline().addLast(new MqConsumerHandler());                         }                     })                     // 这个参数影响的是还没有被accept 取出的连接                     .option(ChannelOption.SO_BACKLOG, 128)                     // 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。                     .childOption(ChannelOption.SO_KEEPALIVE, true);              // 绑定端口,开始接收进来的链接             ChannelFuture channelFuture = serverBootstrap.bind(port).syncUninterruptibly();             log.info("MQ 消费者启动完成,监听【" + port + "】端口");              channelFuture.channel().closeFuture().syncUninterruptibly();             log.info("MQ 消费者关闭完成");         } catch (Exception e) {             log.error("MQ 消费者启动异常", e);             throw new MqException(ConsumerRespCode.RPC_INIT_FAILED);         } finally {             workerGroup.shutdownGracefully();             bossGroup.shutdownGracefully();         }       }       // 省略...  }

ps: 初期我们把 consumer 作为服务端,后续引入 broker 则只有 broker 是服务端。

MqConsumerHandler 处理类

这个类是一个空的实现。

public class MqConsumerHandler extends SimpleChannelInboundHandler {      @Override     protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object object) throws Exception {         //nothing     }  }

测试代码

MqConsumerPush mqConsumerPush = new MqConsumerPush(); mqConsumerPush.start();

启动日志:

[DEBUG] [2022-04-21 19:16:41.343] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter. [INFO] [2022-04-21 19:16:41.356] [Thread-0] [c.g.h.m.c.c.MqConsumerPush.run] - MQ 消费者开始启动服务端 groupName: C_DEFAULT_GROUP_NAME, port: 9527, brokerAddress:  [INFO] [2022-04-21 19:16:43.196] [Thread-0] [c.g.h.m.c.c.MqConsumerPush.run] - MQ 消费者启动完成,监听【9527】端口

接口定义

最基本的消息发送接口。

package com.github.houbb.mq.producer.api;  import com.github.houbb.mq.common.dto.MqMessage; import com.github.houbb.mq.producer.dto.SendResult;  /**  * @author binbin.hou  * @since 1.0.0  */ public interface IMqProducer {      /**      * 同步发送消息      * @param mqMessage 消息类型      * @return 结果      */     SendResult send(final MqMessage mqMessage);      /**      * 单向发送消息      * @param mqMessage 消息类型      * @return 结果      */     SendResult sendOneWay(final MqMessage mqMessage);  }

生产者实现

MqProducer 启动的实现如下,基于 netty。

package com.github.houbb.mq.producer.core;  /**  * 默认 mq 生产者  * @author binbin.hou  * @since 1.0.0  */ public class MqProducer extends Thread implements IMqProducer {      //省略...      @Override     public void run() {         // 启动服务端         log.info("MQ 生产者开始启动客户端 GROUP: {}, PORT: {}, brokerAddress: {}",                 groupName, port, brokerAddress);          EventLoopGroup workerGroup = new NioEventLoopGroup();          try {             Bootstrap bootstrap = new Bootstrap();             ChannelFuture channelFuture = bootstrap.group(workerGroup)                     .channel(NioSocketChannel.class)                     .option(ChannelOption.SO_KEEPALIVE, true)                     .handler(new ChannelInitializer<Channel>(){                         @Override                         protected void initChannel(Channel ch) throws Exception {                             ch.pipeline()                                     .addLast(new LoggingHandler(LogLevel.INFO))                                     .addLast(new MqProducerHandler());                         }                     })                     .connect("localhost", port)                     .syncUninterruptibly();              log.info("MQ 生产者启动客户端完成,监听端口:" + port);             channelFuture.channel().closeFuture().syncUninterruptibly();             log.info("MQ 生产者开始客户端已关闭");         } catch (Exception e) {             log.error("MQ 生产者启动遇到异常", e);             throw new MqException(ProducerRespCode.RPC_INIT_FAILED);         } finally {             workerGroup.shutdownGracefully();         }     }      //省略... }

MqProducerHandler 处理类

默认的空实现,什么都不做。

package com.github.houbb.mq.producer.handler;  import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler;  /**  * @author binbin.hou  * @since 1.0.0  */ public class MqProducerHandler extends SimpleChannelInboundHandler {      @Override     protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object object) throws Exception {         //do nothing now     }  }

启动代码

MqProducer mqProducer = new MqProducer(); mqProducer.start();

启动日志:

[DEBUG] [2022-04-21 19:17:11.960] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter. [INFO] [2022-04-21 19:17:11.974] [Thread-0] [c.g.h.m.p.c.MqProducer.run] - MQ 生产者开始启动客户端 GROUP: P_DEFAULT_GROUP_NAME, PORT: 9527, brokerAddress:  四月 21, 2022 7:17:13 下午 io.netty.handler.logging.LoggingHandler channelRegistered 信息: [id: 0x5cb48145] REGISTERED 四月 21, 2022 7:17:13 下午 io.netty.handler.logging.LoggingHandler connect 信息: [id: 0x5cb48145] CONNECT: localhost/127.0.0.1:9527 四月 21, 2022 7:17:13 下午 io.netty.handler.logging.LoggingHandler channelActive 信息: [id: 0x5cb48145, L:/127.0.0.1:57740 - R:localhost/127.0.0.1:9527] ACTIVE [INFO] [2022-04-21 19:17:13.833] [Thread-0] [c.g.h.m.p.c.MqProducer.run] - MQ 生产者启动客户端完成,监听端口:9527

基于 netty 最基本的服务端启动、客户端启动到这里就结束了。

千里之行,始于足下。

我们下一节将和大家一起学习,如何实现客户端与服务端之间的交互。

希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。

我是老马,期待与你的下次重逢。

The message queue in java.(java 简易版本 mq 实现):https://github.com/houbb/mq

rpc-从零开始实现 rpc:https://github.com/houbb/rpc

【mq】从零开始实现 mq-01-生产者、消费者启动

上一个:Instant的使用