跳转到主要内容

Kafka 插件

本页逐项解释 lynx-kafka/conf/example_config.yml 里的 YAML 字段。仓库示例使用独立的 kafka: 块;当你把它合并进 Lynx 启动配置时,对应路径是 lynx.kafka

Runtime 事实

项目
Go modulegithub.com/go-lynx/lynx-kafka
配置前缀lynx.kafka
Runtime 插件名kafka.client
主要 API 形态插件实例方法,例如 ProduceWithProduceBatchWithSubscribeWith

YAML 字段拆解

顶层 kafka

字段作用何时生效默认值 / 交互影响常见误配
brokers所有 producer / consumer 共用的 broker 种子地址列表。始终生效。为空会直接启动失败。没有安全默认值。所有运行时客户端都复用它。把 TLS / SASL 流量打到明文端口,或者只留下一个已经失效的 broker。
tlsbroker 连接的 TLS / mTLS 配置。只有 broker 暴露 TLS listener 时才需要。只有 tls.enabled: true 时子字段才有意义。明文 broker 也开 TLS,或者只配了 cert_file 没配 key_file
saslSASL 鉴权配置。只有 Kafka 集群要求 SASL 时才需要。只有 sasl.enabled: true 时子字段才有意义;可以和 TLS 叠加。填了用户名密码但没启用,或机制和集群要求不一致。
dial_timeoutproducer / consumer 建连超时。启动和重连阶段。省略时默认 10s跨可用区环境里配得太小,把超时噪声误判成 broker 不稳定。
producers命名 producer 定义列表。服务要发布消息时。第一个启用的 producer 会成为未显式传名时的默认 producer。以为禁用项还能在运行时占位保留名字。
consumers命名 consumer 定义列表。服务要订阅 topic 时。consumer 会在 SubscribeWith 时懒初始化,不会在启动时全部建好。以为只写配置就会自动开始消费。

kafka.tls

字段作用何时生效默认值 / 交互影响常见误配
tls.enabled开关 broker TLS。连接 TLS 或 mTLS 集群时。默认 false;为 false 时其余 tls.* 都会被忽略。对非 TLS 端口开启它。
tls.ca_file自定义 CA 证书链路径。broker 证书不在系统信任库时。公共 CA 场景可不填。把客户端证书误填成 CA 文件。
tls.cert_filemTLS 客户端证书。只有 broker 要求客户端证书时。纯服务端 TLS 可不填;必须和 tls.key_file 成对出现。只配证书不配私钥。
tls.key_filemTLS 客户端私钥。只有 broker 要求客户端证书时。纯服务端 TLS 可不填;必须和 tls.cert_file 成对出现。私钥和证书不匹配。
tls.insecure_skip_verify跳过 broker 证书校验。只建议本地、临时测试环境。默认 false;会弱化证书和主机名校验。在线上环境遗留为 true
tls.server_name显式指定 SNI / 主机名校验名。证书 SAN/CN 与 brokers 里的地址不一致时。默认空;必须配合 tls.enabled: true填成证书里根本不存在的 IP 或别名。

kafka.sasl

字段作用何时生效默认值 / 交互影响常见误配
sasl.enabled开关 SASL。只有 SASL 集群才需要。默认 false;为 false 时其余 sasl.* 被忽略。配了凭证但没打开开关。
sasl.mechanismSASL 机制名。sasl.enabled: true 时。允许值为 PLAINSCRAM-SHA-256SCRAM-SHA-512集群要求 SCRAM 却仍写成 PLAIN
sasl.usernameSASL 用户名。sasl.enabled: true 时。没有默认值;启用 SASL 后启动校验要求必填。非本地环境忘记注入密钥。
sasl.passwordSASL 密码。sasl.enabled: true 时。没有默认值;启用 SASL 后启动校验要求必填。把明文密码直接提交进仓库。

kafka.producers[]

字段作用何时生效默认值 / 交互影响常见误配
name业务代码引用的 producer 逻辑名。所有命名发布调用。名称应保持稳定,因为 ProduceWith 直接按它查找。YAML 改名了,代码调用名没同步改。
enabled是否启用该 producer。想让该实例真正可用时。关闭后该项会被忽略。把示例 producer 原样保留为启用,结果启动出多余实例。
required_acksKafka ACK 级别。所有发布路径。允许值为 -110。示例显式写了 1;建议保持显式,因为整数字段省略后可能落成 0(无 ACK)。忘了填它,业务 topic 变成 fire-and-forget。
batch_size批发送前允许缓存的最大消息数。吞吐导向 producer。省略时默认 1000;和 batch_timeout 联动。低延迟链路里盲目调大,尾延迟反而上升。
batch_timeout未攒满批次时的最长等待时间。吞吐导向 producer。省略时默认 1sbatch_size: 1batch_timeout: 0s 基本等于关闭异步批处理。低延迟 producer 还保留较长等待时间。
compression压缩算法。带宽和 broker 存储敏感时。默认 snappy;允许值为 nonegzipsnappylz4zstd选了 broker / 工具链不支持的算法,或对极小消息也期待明显收益。
topics该 producer 的 topic 白名单和评审提示。一个服务内有多个 producer 身份时更有价值。实际发布仍要在代码里显式传 topic,因此代码和配置里的列表要同步。误以为只改这里就能改掉代码实际发送的 topic。

kafka.consumers[]

字段作用何时生效默认值 / 交互影响常见误配
nameconsumer 实例逻辑名。所有 SubscribeWith 调用。名称应保持稳定,代码和监控都会依赖它。改了配置名,订阅代码还在用旧名字。
enabled是否启用该 consumer 定义。想让它可被订阅时。关闭后该项会被忽略。以为禁用项还能被代码选中。
group_idKafka consumer group ID。启用 consumer 时必填。没有默认值;为空会启动失败。不同业务误共用一个 group,导致分区抢占和重平衡异常。
auto_commit是否自动提交 offset。所有消费路径。默认 true;为 false 时要由业务显式掌握提交时机。需要“下游成功后再提交”却仍保持 true
auto_commit_interval自动提交 offset 的时间间隔。只有 auto_commit: true 时。省略时默认 5sauto_commit 已经关闭了,还继续调它。
start_offset新 group 首次启动时的起始位点。group 还没有已保存 offset 时。默认 latest;允许值为 latestearliest在线上切成 earliest 后意外回放历史数据。
max_concurrency该 consumer 的最大处理并发。所有活跃 consumer group。默认 10;必须大于 0不评估幂等性和分区顺序要求就盲目拉高。
rebalance_timeoutgroup rebalance 的时间预算。consumer 重分配和扩缩容时。省略时默认 30s配得低于实际启动和分配耗时。
topics该 consumer 预期负责的 topic 列表。适合作为配置评审和运行说明。代码仍要在 SubscribeWith 再传一次 topics,所以两边必须同步。只改了 YAML,忘了同步订阅代码。

完整 YAML 示例

lynx:
kafka:
brokers:
- 127.0.0.1:9092 # producer 和 consumer 启动所需的 broker 种子地址
- 127.0.0.1:9093 # 可选的第二个 broker,用于提升启动冗余

tls:
enabled: true # 只有 broker 暴露 TLS listener 时才开启
ca_file: /etc/ssl/certs/kafka-ca.pem # broker 证书校验所用的自定义 CA 链
cert_file: /etc/ssl/certs/kafka-client.pem # 双向 TLS 所需的客户端证书
key_file: /etc/ssl/private/kafka-client.key # 双向 TLS 所需的客户端私钥
insecure_skip_verify: false # 仅限可控的本地调试环境才考虑设为 true
server_name: kafka.internal # 需要时覆盖 SNI / 主机名校验值

sasl:
enabled: false # 只有集群要求 SASL 时才开启
mechanism: PLAIN # PLAIN | SCRAM-SHA-256 | SCRAM-SHA-512
username: kafka-user # 开启 SASL 后必须提供
password: kafka-pass # 开启 SASL 后必须提供

dial_timeout: 5s # 建连超时;省略时默认 10s

producers:
- name: default # 业务代码使用的 producer 名
enabled: true # 禁用项会被忽略
required_acks: 1 # -1=全部 ISR,1=仅 leader,0=不等待 ACK
batch_size: 1000 # 省略时默认 1000
batch_timeout: 50ms # 省略时默认 1s;0s 基本等于关闭异步批处理
compression: snappy # none | gzip | snappy | lz4 | zstd
topics:
- topic_a # 该 producer 的评审提示 / topic 白名单
- topic_b

consumers:
- name: group_a # 业务代码使用的 consumer 名
enabled: true # 禁用项会被忽略
group_id: app-group-a # 启用 consumer 时必填
auto_commit: false # false 表示由 handler 自己控制提交时机
auto_commit_interval: 3s # 只有 auto_commit 为 true 时才使用
start_offset: latest # latest | earliest
max_concurrency: 8 # 省略时默认 10
rebalance_timeout: 45s # 省略时默认 30s
topics:
- topic_a # 需与应用代码中的 SubscribeWith topics 保持一致

最小可用 YAML 示例

lynx:
kafka:
brokers:
- 127.0.0.1:9092
producers:
- name: default
enabled: true
required_acks: 1

配置来源

  • lynx-kafka/conf/example_config.yml

如何使用

plugin := lynx.Lynx().GetPluginManager().GetPlugin("kafka.client")
kafkaClient := plugin.(*kafka.Client)

err := kafkaClient.ProduceWith(ctx, "default", "orders", key, value)
err = kafkaClient.SubscribeWith(ctx, "group_a", []string{"topic_a"}, handler)

相关页面