互联网技术 / 互联网资讯 · 2024年3月8日

自主开发的 Pulsar Starter(Spring Boot 集成)

里程碑与功能要点

  • 1.0.0:支持 Pulsar 模板消息发送;支持自定义注解实例化 Consumer;监听消息;作者:howinfun
  • 1.1.0:支持动态开启/关闭 Consumer 消费线程池,支持自定义配置消费线程池参数;作者:howinfun
  • 1.2.0:在 Spring 容器停止时,释放 Pulsar 相关资源;作者:howinfun
  • 1.3.0:支持多 Pulsar 数据源;作者:howinfun

一、背景

Pulsar 作为新兴的云原生消息队列,逐渐受到开发者的关注。目前大多数项目基于 Spring Boot 开发,但针对 Pulsar 的起步集成仍缺乏成熟的通用 Starter。为避免在常规使用 Pulsar API 时产生重复代码,本文介绍自主开发的 Pulsar Starter 的设计初衷与实现思路。

二、设计思路

初版以简洁为目标,尽量保留 Pulsar API 的原生能力,避免过度抽象与复杂性。

2.1、PulsarClient

无论是 Producer 还是 Consumer,都是通过 PulsarClient 创建。第一版仅暴露常用参数,方便快速上手与集成。

本组件具备以下能力点:

  • 参数化外部化:支持通过 application.properties 或 application.yml 外部化配置 PulsarClient 参数。
  • 配置提示:提供配置项提示信息,便于快速定位参数含义。
  • 自动实例化与注入:读取外部配置,实例化 PulsarClient,并注入到 Spring 的 IoC 容器中。

2.2、Producer

Producer 是用于发送消息的组件。

实现提供一个模板类,便于按需创建对应的 Producer 实例;支持将 topic 与 Producer 的关系缓存,避免重复创建;支持同步与异步发送。

2.3、Consumer

Consumer 是用于消费消息的组件。

提供一个抽象类,开发者实现 doReceive 方法以完成消费逻辑;同时提供自定义注解,支持配置 topic、Tenant、Namespace 等属性。实现类加上自定义注解后,Starter 能自动识别并创建对应的 Consumer 实例;支持同步消费与线程池异步消费。

三、使用示例

3.1、引入依赖

3.2、配置示例
pulsaR.service-URL=pulsar://127.0.0.1:6650
pulsar.tenant=study
pulsar.namespace=default
pulsar.operation-timeout=30
pulsar.io-threads=10
pulsar.listener-threads=10

3.3、发送消息

3.4、消费消息

四、源码与后续

源码相关内容可在代码托管平台查看,如需建议或意见,欢迎提出 MR(合并请求)。