Skip to main content

Kafka Plugin

This page explains the YAML fields from lynx-kafka/conf/example_config.yml. The repository example uses a standalone kafka: block; when you merge it into a Lynx bootstrap file, the same fields live under lynx.kafka.

Runtime Facts

ItemValue
Go modulegithub.com/go-lynx/lynx-kafka
Config prefixlynx.kafka
Runtime plugin namekafka.client
Main API shapeplugin instance methods such as ProduceWith, ProduceBatchWith, SubscribeWith

YAML Walkthrough

Top-level kafka

FieldWhat it controlsWhen it mattersDefault / interactionCommon misconfig
brokersSeed broker address list shared by all producer and consumer instances.Always. Startup validation fails when it is empty.No safe default. Every runtime client reuses this list.Pointing TLS or SASL traffic at a PLAINTEXT listener, or keeping only one stale broker address.
tlsTLS and mTLS settings for broker connections.Only when the broker exposes TLS listeners.Child fields are ignored unless tls.enabled: true.Turning it on for plaintext brokers, or setting only cert_file without key_file.
saslSASL authentication settings.Only when the Kafka cluster requires SASL.Child fields are ignored unless sasl.enabled: true. SASL can be combined with TLS.Filling username and password but leaving enabled: false, or choosing a mechanism the cluster does not support.
dial_timeoutDial timeout for producer and consumer network bootstrapping.Startup and reconnect paths.Defaults to 10s when omitted.Lowering it too far in cross-AZ or cross-region environments and treating timeout noise as broker instability.
producersNamed producer definitions.When the service publishes messages.The first enabled producer becomes the implicit default producer for name-less calls.Expecting disabled items to reserve names or health status at runtime.
consumersNamed consumer definitions.When the service subscribes to topics.Consumers are initialized lazily on SubscribeWith; they are not all created at boot.Assuming config alone starts message consumption before application code subscribes.

kafka.tls

FieldWhat it controlsWhen it mattersDefault / interactionCommon misconfig
tls.enabledTurns broker TLS on or off.Required for TLS or mTLS clusters.Defaults to false. When false, the rest of tls.* is ignored.Enabling it against non-TLS ports.
tls.ca_fileCustom CA bundle path.Needed when broker certs are not trusted by the host OS.Optional for public CA chains.Pointing at the client cert instead of the CA bundle.
tls.cert_fileClient certificate for mTLS.Only for broker-side client cert authentication.Optional for server-only TLS. Must be paired with tls.key_file.Setting it alone and expecting mTLS to work.
tls.key_fileClient private key for mTLS.Only for broker-side client cert authentication.Optional for server-only TLS. Must be paired with tls.cert_file.Mounting the wrong key or forgetting the matching cert.
tls.insecure_skip_verifySkips broker certificate verification.Only for tightly controlled local test clusters.Defaults to false. It weakens both security review and hostname validation.Leaving it on in shared or production environments.
tls.server_nameExplicit SNI and hostname verification name.Only when the broker certificate SAN/CN does not match the address in brokers.Empty by default. Used together with tls.enabled: true.Setting it to a broker ID or IP that is not actually present in the certificate.

kafka.sasl

FieldWhat it controlsWhen it mattersDefault / interactionCommon misconfig
sasl.enabledTurns SASL auth on or off.Required only for SASL-enabled clusters.Defaults to false. When false, the rest of sasl.* is ignored.Supplying credentials without enabling the block.
sasl.mechanismSASL mechanism name.When sasl.enabled: true.Accepted values are PLAIN, SCRAM-SHA-256, SCRAM-SHA-512.Picking PLAIN for a cluster that requires SCRAM, or using the right mechanism with the wrong port/security mode.
sasl.usernameSASL username.When sasl.enabled: true.No default. Startup validation requires it once SASL is enabled.Forgetting to inject the secret in non-local environments.
sasl.passwordSASL password.When sasl.enabled: true.No default. Startup validation requires it once SASL is enabled.Hard-coding it in the repo instead of secret management.

kafka.producers[]

FieldWhat it controlsWhen it mattersDefault / interactionCommon misconfig
nameLogical producer name used by application code.Always for named publish calls.Keep it stable because ProduceWith references it directly.Renaming it in YAML without updating callers.
enabledEnables or disables one producer instance.When you want the instance to be available.Disabled entries are ignored.Leaving a template example enabled and accidentally creating an unused producer.
required_acksKafka ack policy.Every publish path.Allowed values are -1, 1, 0. The example sets 1; keep it explicit because omitting an integer field can still collapse to 0 (no ack).Forgetting to set it and accidentally turning a business topic into fire-and-forget semantics.
batch_sizeMaximum messages buffered before a batch flush.Throughput-sensitive producers.Defaults to 1000 when omitted. Works together with batch_timeout.Increasing it for low-latency traffic and then blaming Kafka for added tail latency.
batch_timeoutMaximum wait time before sending a partial batch.Throughput-sensitive producers.Defaults to 1s when omitted. batch_size: 1 or batch_timeout: 0s effectively disables async batching.Keeping a long timeout on a latency-critical producer.
compressionProducer compression algorithm.When publish bandwidth or broker storage matters.Defaults to snappy. Valid values are none, gzip, snappy, lz4, zstd.Choosing an algorithm unsupported by the broker/client toolchain or assuming compression is free for tiny messages.
topicsTopic allow-list and review hint for this producer.Useful when one service owns multiple producers with distinct routing intent.The publish call still passes the topic explicitly; keep the code path and config list aligned.Assuming this list alone reroutes application publishes.

kafka.consumers[]

FieldWhat it controlsWhen it mattersDefault / interactionCommon misconfig
nameLogical consumer instance name.Always for SubscribeWith.Keep it stable because code and metrics reference it directly.Renaming it in config while handlers still subscribe with the old name.
enabledEnables or disables one consumer definition.When the service should be allowed to subscribe with that instance.Disabled entries are ignored.Assuming a disabled consumer can still be selected by code.
group_idKafka consumer group ID.Required for enabled consumers.No default. Startup validation fails when it is empty.Reusing the same group for unrelated workloads and then wondering why partitions move unexpectedly.
auto_commitWhether offsets are committed automatically.Every consumer path.Defaults to true. When false, your handler must own commit timing.Leaving it true for handlers that should commit only after downstream success.
auto_commit_intervalInterval for automatic offset commits.Only when auto_commit: true.Defaults to 5s when omitted.Tuning it while auto_commit is already false.
start_offsetInitial offset selection for new group state.When the group has no stored offsets yet.Defaults to latest. Valid values are latest and earliest.Switching it to earliest in production and unexpectedly replaying retained history.
max_concurrencyMaximum handler concurrency for that consumer instance.Every active consumer group.Defaults to 10. Must stay greater than 0.Cranking it up without checking downstream idempotency or partition-order requirements.
rebalance_timeoutTime budget for group rebalance work.During consumer assignment and scale events.Defaults to 30s when omitted.Setting it below the actual startup or assignment cost of the consumer.
topicsIntended topic set for the consumer definition.Useful as configuration documentation and review guardrails.Application code still passes topics again to SubscribeWith, so the two lists must stay aligned.Updating YAML topics and forgetting to update the subscribe call.

Complete YAML Example

lynx:
kafka:
brokers:
- 127.0.0.1:9092 # Required seed broker list for producer and consumer bootstrap
- 127.0.0.1:9093 # Optional secondary broker for bootstrap resilience

tls:
enabled: true # Enable only when brokers expose TLS listeners
ca_file: /etc/ssl/certs/kafka-ca.pem # Custom CA bundle for broker certificate validation
cert_file: /etc/ssl/certs/kafka-client.pem # Client certificate for mutual TLS
key_file: /etc/ssl/private/kafka-client.key # Client private key for mutual TLS
insecure_skip_verify: false # Keep false outside controlled local debugging
server_name: kafka.internal # Override SNI / hostname verification when needed

sasl:
enabled: false # Turn on only when the cluster requires SASL
mechanism: PLAIN # PLAIN | SCRAM-SHA-256 | SCRAM-SHA-512
username: kafka-user # Required when SASL is enabled
password: kafka-pass # Required when SASL is enabled

dial_timeout: 5s # Connection timeout; defaults to 10s when omitted

producers:
- name: default # Application-facing producer name
enabled: true # Disabled entries are ignored
required_acks: 1 # -1=all ISR, 1=leader only, 0=no ack
batch_size: 1000 # Defaults to 1000 when omitted
batch_timeout: 50ms # Defaults to 1s; 0s effectively disables async batching
compression: snappy # none | gzip | snappy | lz4 | zstd
topics:
- topic_a # Review hint / allow-list for this producer
- topic_b

consumers:
- name: group_a # Application-facing consumer name
enabled: true # Disabled entries are ignored
group_id: app-group-a # Required for enabled consumers
auto_commit: false # false means the handler owns commit timing
auto_commit_interval: 3s # Used only when auto_commit is true
start_offset: latest # latest | earliest
max_concurrency: 8 # Defaults to 10 when omitted
rebalance_timeout: 45s # Defaults to 30s when omitted
topics:
- topic_a # Keep aligned with SubscribeWith topics in application code

Minimum Viable YAML Example

lynx:
kafka:
brokers:
- 127.0.0.1:9092
producers:
- name: default
enabled: true
required_acks: 1

Source Template

  • lynx-kafka/conf/example_config.yml

How To Consume It

plugin := lynx.Lynx().GetPluginManager().GetPlugin("kafka.client")
kafkaClient := plugin.(*kafka.Client)

err := kafkaClient.ProduceWith(ctx, "default", "orders", key, value)
err = kafkaClient.SubscribeWith(ctx, "group_a", []string{"topic_a"}, handler)