跳转到主要内容

Kafka 插件

Kafka 模块是一个由 runtime 管理的客户端插件,内部支持命名生产者和命名消费者实例,而不是只有一个全局 producer。

Runtime 事实

项目
Go modulegithub.com/go-lynx/lynx-kafka
配置前缀lynx.kafka
Runtime 插件名kafka.client
主要 API 形态插件实例方法,例如 ProduceWithProduceBatchWithSubscribeWith

代码实际支持什么

从实现看,这个插件提供:

  • 多个命名 producer
  • 多个命名 consumer
  • SubscribeWith 时延迟初始化 consumer
  • 每个 producer 独立的 retry handler
  • 每个 producer 独立的 circuit breaker
  • 可选 SASL 和 TLS
  • producer 批处理
  • 连接管理与健康状态上报

第一个启用的 producer 会成为默认 producer,供未显式指定名称的方法使用。

配置形态

lynx:
kafka:
brokers:
- "127.0.0.1:9092"
producers:
- name: order-producer
enabled: true
topics: ["orders"]
batch_size: 1000
consumers:
- name: order-consumer
enabled: true
group_id: order-group
topics: ["orders"]
max_concurrency: 10

官方模板实际怎么用

官方模板默认并不会启用 Kafka。

这和当前脚手架分阶段接入的思路一致:

  • 模板先从同步的 HTTP / gRPC 和存储层起步
  • 只有服务真的需要异步工作流时,才补消息基础设施
  • 所以这页描述的是“如何加一层 runtime 管理的 Kafka”,不是 lynx-layout 第一天就会启动什么

代码里的校验要求必须配置 brokers;如果开启 SASL 或 TLS,对应字段也必须合法;consumer group 配置也会被校验。

如何使用

通常通过 runtime 取出插件实例,再调用它的方法:

plugin := lynx.Lynx().GetPluginManager().GetPlugin("kafka.client")
kafkaClient := plugin.(*kafka.Client)

err := kafkaClient.ProduceWith(ctx, "order-producer", "orders", key, value)
err = kafkaClient.SubscribeWith(ctx, "order-consumer", []string{"orders"}, handler)

实际注意点

  • producer 会在插件启动阶段初始化。
  • consumer 不会在启动时全部创建,而是在订阅时初始化。
  • required_acks、压缩、重试、批处理和 offset 策略都直接影响语义,不只是吞吐参数。

相关页面