里程碑与功能要点
- 1.0.0:支持 Pulsar 模板消息发送;支持自定义注解实例化 Consumer;监听消息;作者:howinfun
- 1.1.0:支持动态开启/关闭 Consumer 消费线程池,支持自定义配置消费线程池参数;作者:howinfun
- 1.2.0:在 Spring 容器停止时,释放 Pulsar 相关资源;作者:howinfun
- 1.3.0:支持多 Pulsar 数据源;作者:howinfun
一、背景
Pulsar 作为新兴的云原生消息队列,逐渐受到开发者的关注。目前大多数项目基于 Spring Boot 开发,但针对 Pulsar 的起步集成仍缺乏成熟的通用 Starter。为避免在常规使用 Pulsar API 时产生重复代码,本文介绍自主开发的 Pulsar Starter 的设计初衷与实现思路。
二、设计思路
初版以简洁为目标,尽量保留 Pulsar API 的原生能力,避免过度抽象与复杂性。
2.1、PulsarClient
无论是 Producer 还是 Consumer,都是通过 PulsarClient 创建。第一版仅暴露常用参数,方便快速上手与集成。
本组件具备以下能力点:
- 参数化外部化:支持通过 application.properties 或 application.yml 外部化配置 PulsarClient 参数。
- 配置提示:提供配置项提示信息,便于快速定位参数含义。
- 自动实例化与注入:读取外部配置,实例化 PulsarClient,并注入到 Spring 的 IoC 容器中。
2.2、Producer
Producer 是用于发送消息的组件。
实现提供一个模板类,便于按需创建对应的 Producer 实例;支持将 topic 与 Producer 的关系缓存,避免重复创建;支持同步与异步发送。
2.3、Consumer
Consumer 是用于消费消息的组件。
提供一个抽象类,开发者实现 doReceive 方法以完成消费逻辑;同时提供自定义注解,支持配置 topic、Tenant、Namespace 等属性。实现类加上自定义注解后,Starter 能自动识别并创建对应的 Consumer 实例;支持同步消费与线程池异步消费。
三、使用示例
3.1、引入依赖
3.2、配置示例
pulsaR.service-URL=pulsar://127.0.0.1:6650
pulsar.tenant=study
pulsar.namespace=default
pulsar.operation-timeout=30
pulsar.io-threads=10
pulsar.listener-threads=10
3.3、发送消息
3.4、消费消息
四、源码与后续
源码相关内容可在代码托管平台查看,如需建议或意见,欢迎提出 MR(合并请求)。
