Kafka

API group/version: crabka.io/v1alpha1

Spec

FieldTypeRequiredDefaultDescription
authorizationobjectnoCluster-level authorizer selection. When None, the broker uses the default AllowAll authorizer (no ACL checks). When Some, the operator renders the [authorization] TOML section so the broker builds the matching Arc<dyn Authorizer> (SimpleAclAuthorizer for type: simple, OpaAuthorizer for type: opa). With simple or opa selected, the operator's inter-broker principal MUST appear in super_users (no implicit ANONYMOUS allow); operators opt in explicitly.
authorization.allowOnErrorbooleanno
authorization.expireAfterMsintegerno
authorization.initialCacheCapacityintegerno
authorization.maximumCacheSizeintegerno
authorization.superUsersarrayno
authorization.typestringyes
authorization.urlstringno
clientsCaobjectnoPer-cluster CA used to sign KafkaUser TLS certs. Absent → fully-defaulted CertificateAuthority.
clientsCa.generateCertificateAuthoritybooleannotrueWhen true (default), the operator generates and renews this CA. When false, the operator expects the CA Secret pair to be pre-created by the cluster admin and refuses to overwrite them. Renewal of BYO CAs is the admin's responsibility; the CronJob skips them and emits an Event when they're nearing expiry.
clientsCa.renewalDaysintegerno30Window before notAfter in which the renewal CronJob will reissue leaf certs. Default 30.
clientsCa.validityDaysintegerno365Cert validity in days. Default 365.
clusterCaobjectnoPer-cluster CA used for inter-broker mTLS + broker certs. Absent → fully-defaulted CertificateAuthority (operator-generated, 365/30 days).
clusterCa.generateCertificateAuthoritybooleannotrueWhen true (default), the operator generates and renews this CA. When false, the operator expects the CA Secret pair to be pre-created by the cluster admin and refuses to overwrite them. Renewal of BYO CAs is the admin's responsibility; the CronJob skips them and emits an Event when they're nearing expiry.
clusterCa.renewalDaysintegerno30Window before notAfter in which the renewal CronJob will reissue leaf certs. Default 30.
clusterCa.validityDaysintegerno365Cert validity in days. Default 365.
configobjectnoOpaque broker properties (server.properties-style key/value pairs). These are passed through to the broker's [server_properties] TOML table; the broker currently treats them as inert. Changes propagate through the config hash.
delegationTokenobjectnoDelegation-token master HMAC key source. When None, the broker rejects all KIP-48 delegation-token RPCs with err 61 DELEGATION_TOKEN_AUTH_DISABLED. When Some, the operator injects CRABKA_DELEGATION_TOKEN_SECRET_KEY into each broker pod via a valueFrom.secretKeyRef, baking the key into the rendered StatefulSet so the SSA reconcile doesn't race with out-of-band kubectl set env patches.
delegationToken.secretKeyRefobjectyesReference to a Kubernetes Secret (same namespace as the Kafka CR) whose data.<key> value is the broker's master HMAC key for KIP-48 delegation tokens.
delegationToken.secretKeyRef.keystringnoKey within the Secret's data. Defaults to secret-key.
delegationToken.secretKeyRef.namestringyesSecret name in the same namespace as the Kafka CR.
interBrokerKerberosobjectnoInter-broker Kerberos initiate config. Required when interBrokerListenerName resolves to a type: gssapi listener; supplies the shared client principal + KDC. The keytab is reused from that listener's keytabSecretRef.
interBrokerKerberos.clientPrincipalstringyesPrincipal every broker authenticates as when dialing peers, e.g. kafka@EXAMPLE.COM. Must exist in the shared keytab.
interBrokerKerberos.kdcUrlstringyesKDC endpoint, e.g. tcp://kdc:88.
interBrokerKerberos.serviceNamestringnoTarget SPN primary. Defaults to kafka.
interBrokerListenerNamestringnoName of the listener used for inter-broker traffic. When None, the operator picks the first internal listener; when listeners is empty, the synthesized default "PLAIN".
kafkaVersionstringyesCrabka version label, propagated to all pool pods via the app.kubernetes.io/version label.
krb5ConfSecretRefobjectnoOptional process-wide krb5.conf. Mounted into broker pods and pointed at via KRB5_CONFIG; serves both accept and initiate paths.
krb5ConfSecretRef.keystringyesKey within the Secret whose value is the krb5.conf contents.
krb5ConfSecretRef.secretNamestringyesName of the Secret holding the krb5.conf.
listenersarraynoNamed listeners. Empty (or absent) synthesizes a single internal PLAIN listener on port 9092.
loggingobjectnoBroker log configuration. When None, brokers use their built-in default RUST_LOG filter. When Some, the operator composes (inline) or reads (external) a tracing env-filter string, renders it into the broker ConfigMap (rust.log key), wires it into each broker pod's RUST_LOG env, and rolls the cluster on change via the config hash.
logging.loggersobjectnoInline loggers: tracing target → level. The key root (case-insensitive) sets the global default level (a bare env-filter directive); any other key is a tracing target (a Rust module path, e.g. crabka_broker). Levels are trace|debug|info|warn|error|off (case-insensitive; fatal is accepted as an alias for error).
logging.typestringnoinline
logging.valueFromobjectnoExternal logging source. Required when type: external; the referenced ConfigMap key's value is used verbatim as the broker's RUST_LOG filter.
logging.valueFrom.configMapKeyRefobjectyes
logging.valueFrom.configMapKeyRef.keystringyes
logging.valueFrom.configMapKeyRef.namestringyes
metadataVersionstringnoKRaft metadata version (the runtime analog of inter.broker.protocol.version). When unset, tracks kafkaVersion's major.minor; when set, pins the metadata version for the safe two-step upgrade. Validated against kafkaVersion and the finalized status.metadataVersion — an invalid value surfaces KafkaVersionValid=False and blocks the roll.
metricsConfigobjectnoPrometheus scrape configuration. When None, brokers do not bind /metrics and no PodMonitor / ServiceMonitor is rendered. When Some, the broker StatefulSet gains a metrics container port (TCP 9404) and the resources requested by pod_monitor / service_monitor are SSA-applied.
metricsConfig.podMonitorobjectno
metricsConfig.podMonitor.intervalstringno
metricsConfig.podMonitor.labelsobjectno
metricsConfig.podMonitor.scrapeTimeoutstringno
metricsConfig.serviceMonitorobjectno
metricsConfig.serviceMonitor.intervalstringno
metricsConfig.serviceMonitor.labelsobjectno
metricsConfig.serviceMonitor.scrapeTimeoutstringno
metricsConfig.typestringnoprometheus
networkPolicyobjectnoOpt-in NetworkPolicy generation. When None, no NetworkPolicy is generated. When Some (even {}), the operator renders a cluster-level NetworkPolicy gating ingress to broker / controller pods.
tieredStorageobjectnoKIP-405: cluster-wide tiered storage. When Some, every broker pod boots with the local-tier RSM enabled, an emptyDir mounted at /var/lib/crabka/remote (the broker's remote_log_storage_dir), and [remote_storage] rendered in the broker TOML. Per-topic enablement is unchanged (KafkaTopic.spec.config["remote.storage.enable"] = "true"). The emptyDir default with InmemoryRemoteLogMetadataManager as the only RLMM means tier data does not survive pod restarts. PVC support pairs with the production RLMM.
tieredStorage.metadataManagerobjectnoKIP-405: pick the RemoteLogMetadataManager the broker pods run. When absent (or set to type: Topic), the broker activates the durable crabka_remote_storage_topic::TopicBasedRemoteLogMetadataManager against the internal __remote_log_metadata topic, so tier-segment metadata survives pod restarts and is consistent across brokers in the cluster. The in-memory fixture is selected only by an explicit type: InMemory (test/dev only).
tieredStorage.metadataManager.topicobjectnoTopic-backed tuning. Optional when kind == Topic (broker fills defaults for bootstrap and topic parameters), must be absent otherwise.
tieredStorage.metadataManager.topic.bootstrapstringyeshost:port the broker pod dials to reach its own listener for publishing / consuming __remote_log_metadata. Typically the pod's loopback inter-broker listener (e.g. 127.0.0.1:9094).
tieredStorage.metadataManager.topic.numPartitionsintegernoPartition count for __remote_log_metadata on first creation. Defaults to 50 (Kafka's remote.log.metadata.topic.num.partitions).
tieredStorage.metadataManager.topic.replicationintegernoReplication factor for __remote_log_metadata on first creation. Defaults to 3 (Kafka's remote.log.metadata.topic.replication.factor).
tieredStorage.metadataManager.typestringyesImplementation selector.
tieredStorage.persistenceobjectnoKIP-405: durable storage for the local-tier directory. Only valid with type=Local. When absent (default), the operator renders an emptyDir for tier-storage. When Some, the operator renders a volumeClaimTemplate of the configured size / class so tier data survives pod restarts — pairing with the topic-backed RLMM, this closes the "tier data is lost on pod restart" caveat.
tieredStorage.persistence.classstringnoStorage class name. None = cluster default.
tieredStorage.persistence.deleteClaimbooleannofalsetruepersistentVolumeClaimRetentionPolicy.whenDeleted: Delete. Must match the parent KafkaNodePool.spec.storage.deleteClaim when both PVCs are present (K8s StatefulSets have a single set-wide retention policy with no per-template override). Validated at reconcile time; mismatch surfaces as TieredStorageInvalid.
tieredStorage.persistence.sizestringyesK8s Quantity (e.g., "50Gi", "500Mi"). Non-empty; resource-quantity well-formedness is validated by the Kubernetes API server at SSA time.
tieredStorage.s3objectnoS3-backend tuning. Required when kind == S3, must be absent otherwise. The struct mirrors crabka_remote_storage::S3Config — non-credential fields are rendered verbatim into the broker TOML's [remote_storage.s3] block; credentials are sourced from Kubernetes Secrets and injected as broker-pod env (AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY).
tieredStorage.s3.allowHttpbooleannoAllow plaintext HTTP. Off by default; flip on for MinIO running without TLS. AWS S3 itself never needs this.
tieredStorage.s3.bucketstringyesS3 bucket name. Required.
tieredStorage.s3.credentialsobjectnoOptional explicit credentials. When None, the broker falls back to the AWS credential chain (IRSA on EKS, instance profile on EC2, etc.).
tieredStorage.s3.credentials.accessKeyIdobjectyesReference to the Secret holding the AWS_ACCESS_KEY_ID value.
tieredStorage.s3.credentials.accessKeyId.keystringnoKey within the Secret's data. Defaults to secret-key.
tieredStorage.s3.credentials.accessKeyId.namestringyesSecret name in the same namespace as the Kafka CR.
tieredStorage.s3.credentials.secretAccessKeyobjectyesReference to the Secret holding the AWS_SECRET_ACCESS_KEY value.
tieredStorage.s3.credentials.secretAccessKey.keystringnoKey within the Secret's data. Defaults to secret-key.
tieredStorage.s3.credentials.secretAccessKey.namestringyesSecret name in the same namespace as the Kafka CR.
tieredStorage.s3.endpointstringnoOptional custom endpoint URL (e.g. http://minio:9000 for MinIO, https://<account>.r2.cloudflarestorage.com for Cloudflare R2). When None, the AWS S3 endpoint for the configured region is used.
tieredStorage.s3.multipartChunkSizeintegernoOverride the per-part size for multipart uploads (bytes). When unset, the broker uses crabka_remote_storage::DEFAULT_MULTIPART_CHUNK_SIZE (16 MiB).
tieredStorage.s3.multipartThresholdintegernoOverride the single-PUT / multipart cutoff (bytes). When unset, the broker uses crabka_remote_storage::DEFAULT_MULTIPART_THRESHOLD (100 MiB). Lower in tests to exercise the multipart path on small fixtures.
tieredStorage.s3.prefixstringnoOptional key prefix inside the bucket. Lets multiple Crabka clusters share a bucket without colliding.
tieredStorage.s3.regionstringyesAWS region. Required even for non-AWS endpoints (MinIO, R2) — object_store's AmazonS3Builder rejects an empty region.
tieredStorage.typestringyesBackend kind selector.
tracingobjectnoDistributed-tracing wiring for the broker pods. When Some, the operator renders the matching CRABKA_OTLP_* env vars onto every broker pod — the broker's telemetry pipeline reads them via TelemetryConfig::from_env and installs the OTLP tracer at startup. When None, no OTLP env vars are emitted and the broker leaves tracing off (the default).
tracing.otlpobjectnoOTLP-backend tuning. Required when kind == Otlp.
tracing.otlp.endpointstringyesRequired. OTLP collector endpoint (scheme://host:port). Rendered as CRABKA_OTLP_ENDPOINT; turning the field on implicitly sets CRABKA_OTLP_ENABLED=true as well.
tracing.otlp.protocolstringnoOTLP wire protocol selector. Mirrors the broker's internal OtlpProtocol enum and the OTEL_EXPORTER_OTLP_PROTOCOL spec values.
tracing.otlp.sampleRationumbernoOptional sampling ratio in [0.0, 1.0]. Rendered as CRABKA_OTLP_SAMPLE_RATIO. Defaults to the broker's 1.0 (sample every trace).
tracing.otlp.serviceNamestringnoOptional service.name resource attribute. Rendered as OTEL_SERVICE_NAME. Defaults to the broker's "crabka-broker".
tracing.otlp.timeoutSecsintegernoOptional export timeout in seconds. Rendered as CRABKA_OTLP_TIMEOUT_SECS. Defaults to the broker's 10.
tracing.typestringyesTracing backend selector.

Status

FieldTypeRequiredDefaultDescription
clientsCaobjectnoStatus surface for a single CA. Populated by the reconciler from the parsed CA cert + the CRD spec.
clientsCa.certGenerationintegerno0Monotonic generation of the active signing cert (bumped on same-key renewal and on key promotion).
clientsCa.generatedbooleanyestrue when the operator generated this CA (i.e. generateCertificateAuthority == true); false for BYO.
clientsCa.keyGenerationintegerno0Monotonic generation of the active signing key (bumped only on key replacement).
clientsCa.notAfterstringyesRFC3339 notAfter of the current (signing) CA cert.
clientsCa.rotationPhasestringnoStaged key-replacement phase (idle | key-replace-trust | key-replace-promote).
clientsCa.trustAnchorsintegernoNumber of CA certs currently in the trust bundle.
clusterCaobjectnoStatus surface for a single CA. Populated by the reconciler from the parsed CA cert + the CRD spec.
clusterCa.certGenerationintegerno0Monotonic generation of the active signing cert (bumped on same-key renewal and on key promotion).
clusterCa.generatedbooleanyestrue when the operator generated this CA (i.e. generateCertificateAuthority == true); false for BYO.
clusterCa.keyGenerationintegerno0Monotonic generation of the active signing key (bumped only on key replacement).
clusterCa.notAfterstringyesRFC3339 notAfter of the current (signing) CA cert.
clusterCa.rotationPhasestringnoStaged key-replacement phase (idle | key-replace-trust | key-replace-promote).
clusterCa.trustAnchorsintegernoNumber of CA certs currently in the trust bundle.
conditionsarrayno[]Standard Kubernetes-style condition list. Surfaces Ready, ListenersValid, ListenersReady.
kafkaVersionstringnoEcho of spec.kafkaVersion, for observability.
listenersarraynoPer-listener resolved addresses. Populated once ListenersReady=True.
metadataVersionstringnoThe operator-finalized metadata version. Advances only when version validation passes; drives the downgrade-window check on the next reconcile.
readyReplicasintegernoMirrors StatefulSet.status.readyReplicas.
replicasintegernoMirrors StatefulSet.status.replicas.