Overview

What is Kroxylicious Proxy?

Kroxylicious Proxy provides a pluggable, protocol-aware ("Layer 7") proxy for Apache Kafka® brokers and clusters, together with an API for conveniently implementing custom logic within such a proxy.

Why?

Proxies are a powerful and flexible architectural pattern. For Kafka, they can be used to add functionality to Kafka clusters which is not available out-of-the-box with Apache Kafka. In an ideal world, such functionality would be implemented directly in Apache Kafka. But there are numerous practical reasons that can prevent this, for example:

  • Organizations having very niche requirements which are unsuitable for implementation directly in Apache Kafka.

  • Functionality which requires changes to Kafka’s public API and which the Apache Kafka project is unwilling to implement. This is the case for broker interceptors, for example.

  • Experimental functionality which might end up being implemented in Apache Kafka eventually. For example using Kroxylicious proxy it’s easier to experiment with alternative transport protocols, such as Quic, or operating system APIs, such as io_uring, because there is already support for this in Netty, the networking framework on which Kroxylicious is built.

How it works

First let’s define the concepts in the landscape surrounding Kroxylicious.

  1. Kafka Client, or Client refers to any client application using a Kafka Client library to talk to a Kafka Cluster.

  2. Kafka Cluster or Cluster refers to a cluster comprising one or more Kafka Brokers.

  3. Downstream refers to the area between Kafka Client and Kroxylicious.

  4. Upstream refers to the area between Kroxylicious and a Kafka Cluster.

Diagram
Figure 1. Kroxylicious landscape

Now let’s define some concepts used within Kroxylicious itself.

Virtual Cluster

The Virtual Cluster is the downstream representation of a Kafka Cluster. At the conceptual level, a Kafka Client connects to a Virtual Cluster. Kroxylicious proxies all communications made to the Virtual Cluster through to a (physical) Kafka Cluster, passing it through the Filter Chain.

So far, this explanation has elided the detail of Kafka Brokers. Let’s talk about that now.

The Virtual Cluster automatically exposes a bootstrap endpoint for the Virtual Cluster. This is what the Kafka Client must specify as the bootstrap.servers property in the client configuration.

In addition to the bootstrap endpoint, Kroxylicious automatically exposes broker endpoints. There is one broker endpoint for each broker of the physical cluster. When the Client connects to a broker endpoint, Kroxylicious proxies all communications to the corresponding broker of the (physical) Kafka Cluster.

Kroxylicious automatically intercepts all the Kafka RPC responses that contain a broker address. It rewrites the address so that it refers to the corresponding broker endpoint of the Virtual Cluster. This means when the Kafka Client goes to connect to, say broker 0, it does so through the Virtual Cluster.

Target Cluster

The Target Cluster is the definition of physical Kafka Cluster within the Kroxylicious itself.

A Virtual Cluster has exactly one Target Cluster.

There can be a one-to-one relationship between Virtual Clusters and Target Clusters. The other possibility is many-to-one, where many Virtual Clusters point to the same Target Cluster. The many-to-one pattern is exploited by filters such as the Multi-tenancy filter.

Diagram
Figure 2. One-to-One relationship between Virtual Cluster and Target Cluster
Diagram
Figure 3. Many-to-one between Virtual Cluster and Target Cluster

A one-to-many pattern, where one Virtual Cluster points to many Target Clusters (providing amalgamation), is not a supported use-case.

Filter Chain

A Filter Chain consists of an ordered list of pluggable protocol filters.

A protocol filter implements some logic for intercepting, inspecting and/or manipulating Kafka protocol messages. Kafka protocol requests (such as Produce requests) pass sequentially through each of the protocol filters in the chain, beginning with the 1st filter in the chain and then following with the subsequent filters, before being forwarded to the broker.

When the broker returns a response (such as a Produce response) the protocol filters in the chain are invoked in the reverse order (that is, beginning with the nth filter in the chain, then the n-1th and so on) with each having the opportunity to inspect and/or manipulating the response. Eventually a response is returned to the client.

The description above describes only the basic capabilities of the protocol filter. Richer features of filters are described later.

Diagram
Figure 4. Illustration of a request and response being manipulated by filters in a chain

As mentioned above, Kroxylicious takes the responsibility to rewrite the Kafka RPC responses that carry broker address information so that they reflect the broker addresses exposed by the Virtual Cluster. These are the Metadata, DescribeCluster and FindCoordinator responses. This processing is entirely transparent to the work of the protocol filters. Filter authors are free to write their own filters that intercept these responses too.

Filter composition

An important principal for the protocol filter API is that filters should compose nicely. That means that filters generally don’t know what other filters might be present in the chain, and what they might be doing to messages. When a filter forwards a request or response it doesn’t know whether the message is being sent to the next filter in the chain, or straight back to the client.

Such composition is important because it means a proxy user can configure multiple filters (possibly written by several filter authors) and expect to get the combined effect of all of them.

It’s never quite that simple, of course. In practice they will often need to understand what each filter does in some detail in order to be able to operate their proxy properly, for example by understanding whatever metrics each filter is emitting.

Implementation

The proxy is written in Java, on top of Netty. The usual ChannelHandlers provided by the Netty project are used where appropriate (e.g. SSL support uses SslHandler), and Kroxylicious provides Kafka-specific handlers of its own.

The Kafka-aware parts use the Apache Kafka project’s own classes for serialization and deserialization.

Protocol filters get executed using a handler-per-filter model.

Deployment topologies

The proxy supports a range of possible deployment topologies. Which style is used depends on what the proxy is meant to achieve, architecturally speaking. Broadly speaking a proxy instance can be deployed:

As a forward proxy

Proxying the access of one or more clients to a particular cluster/broker that might also accessible (to other clients) directly.

Topic-level encryption provides one example use case for a forward proxy-style deployment. This might be applicable when using clients that don’t support interceptors, or if an organisation wants to apply the same encryption policy in a single place, securing access to the keys within their network.

As a reverse proxy

Proxying access for all clients trying to reach a particular cluster/broker.

Transparent multi-tenancy provides an example use case for a reverse proxy. While Apache Kafka itself has some features that enable multi-tenancy, they rely on topic name prefixing as the primary mechanism for ensuring namespace isolation. Tenants have to adhere to the naming policy and know they’re a tenant of a larger shared cluster.

Transparent multi-tenancy means each tenant has the illusion of having their own cluster, with almost complete freedom over topic and group naming, while still actually sharing a cluster.

We can further classify deployment topologies in how many proxy instances are used. For example:

Single proxy instance
Proxy pool

Filters

Built-in filters

The following filters are provided built-in as part of the distribution.

Record Encryption

What is it?

A filter that transparently provides an encryption-at-rest solution for Apache Kafka.

At a high level, the filter works in the following way. First, let’s consider the produce side:

  1. The filter intercepts produce requests sent from producing Kafka Clients.

  2. The filter disassembles the produce request.

  3. Each record within it is encrypted.

  4. The produce request is reassembled, replacing the original records with their encrypted counterparts.

  5. The filter forwards the modified produce request onto the Kafka Broker.

  6. The broker handles the records in the normal way (writing them to the topic’s log etc). The broker has no knowledge that the records are encrypted - to it, they are just opaque bytes.

Now, let’s consider the consume side:

  1. The filter intercepts the fetch responses sent by the Kafka Broker to the consuming Kafka Client.

  2. The filter disassembles the fetch response.

  3. Each record is decrypted.

  4. The fetch response is reassembled, replacing the encrypted records with their unencrypted counterparts.

  5. The filter forwards the modified fetch response onto the Kafka Client. The client has no knowledge that the record was encrypted.

The entire process is transparent from the point of view of both Kafka Client and Kafka Broker. Neither are aware that the records are being encrypted. Neither client nor broker have any access to the encryption keys or have any influence on the ciphering process.

The filter encrypts the records using symmetric encryption keys. The encryption technique employed is known as Envelope Encryption, which is a technique suited for encrypting large volumes of data in an efficient manner. Envelope Encryption is described in the next section.

The filter integrates with a Key Management Service (KMS). It is the Key Management Service (KMS) that has ultimate responsibility for the safe storage of key material. The role of the KMS is discussed later.

Envelope Encryption

Envelope Encryption is a technique described by NIST Special Publication 800-57 Part 1 Revision 5. It is the practice of encrypting the data with a Data Encryption Key (DEK), then wrapping (encrypting) the DEK with a Key Encryption Key (KEK). In this section we discuss how Envelope Encryption is employed by the filter to encrypt Kafka records. For more detail, refer to the design.

On the produce path, the filter is encrypting records. The filter uses a selector to determine which KEK to apply. It then requests that the KMS generates a DEK for the KEK. It is the DEK that is used to encrypt the record. The original record in the produce request is replaced by a cipher record which comprises the encrypted record, the encrypted DEK and some other metadata.

On the fetch path, the filter is decrypting records. The filter receives the cipher record from the Kafka Broker. The filter reverses the process used to construct the cipher record and uses the KMS to decrypt the DEK. The decrypted DEK is used to decrypt the encrypted record. The cipher record in the fetch response is then replaced with the decrypted record.

On the produce path, the filter employs a DEK reuse strategy. This means that records sent by a single connection to the same topic will be encrypted using the same DEK (until a time-out or encryption operations limit is reached, whichever occurs first). This prevents key exhaustion and prevents excessive interactions with the KMS.

On the fetch path, the filter employs an LRU strategy to keep recently encountered DEKs decrypted in memory. This prevents excessive interactions with the KMS.

Role of the KMS

The Key Management Service (KMS) is the secure repository for Key Encryption Keys (KEKs). The key material of the KEK never leaves the confines of the KMS. The KMS exposes cryptographic functions that operate against the KEKs stored within it.

The KMS is also the source of Data Encryption Keys (DEKs). When the KMS generates a DEK for a given KEK, it returns the DEK (which is securely generated random data), together with the encrypted DEK (which is the same data, encrypted using the KEK). Note that the KMS does not store the DEKs - DEKs are stored encrypted as part of the Kafka record held by the broker.

The filter uses the services of the KMS to generate DEKs, and decrypt encrypted DEKs.

The KMS must be available at runtime. If the KMS is unavailable, it will become impossible to produce or consume records through the filter until the KMS service is restored.

The filter currently has a single KMS integration with HashiCorp Vault®. More KMS integrations are planned. It is also possible for a user to implement their own KMS integration. The implementation must implement the KMS public API and make the implementation available on the classpath with the service loader mechanism.

It is recommended to use a KMS in a highly available configuration.
Key rotation

Key rotation is the practice of periodically replacing cryptographic keys with new ones. Using key rotation is considered cryptographic best-practice.

The filter allows for the rotation of KEKs within the KMS. When a KEK is rotated, the new key material will be applied to newly produced records. Existing records (which were encrypted with older versions of the KEK) remain decryptable as long as the previous KEK version remains present in the KMS.

If the previous KEK version is removed from the KMS, the records encrypted with that key version will become un-consumable (that is, the fetch will fail). In this case, the consumer offset must be advanced beyond those records.
What exactly gets encrypted?

The filter currently encrypts only record values. Record keys, headers, timestamps are not encrypted.

Null record values (which represent deletes, or tombstones, in compacted topics) sent by producers are passed through to the broker unencrypted. This means encryption may be applied to compacted topics too. Deletes will function normally.

The presence of unencrypted records on a topic configured for encryption is allowed. The unencrypted records will get passed through to consumer as normal. This supports the use-case where encryption is introduced into a system where topics are already populated with unencrypted content.

The filter supports use-cases where some Kafka topics are configured for encryption while others are left to be unencrypted.

Transactions producing to both encrypted and unencrypted topics are supported.

How to use the filter

There are three steps to using the filter.

  1. Setting up the KMS.

  2. Configuring the filter within Kroxylicious.

  3. Establishing the encryption key(s) within the KMS that will be used to encrypt the topics.

These steps are described in the next sections.

Setting up the KMS

In order to set up the KMS provider ready to use with the filter, follow these KMS provider specific steps.

HashiCorp Vault

Enable the Transit Engine

The filter integrates with the HashiCorp Vault Transit Engine. Vault does not enable the Transit Engine by default. It must be enabled before it can be used with the filter.

Vault Transit Engine URL

The Vault Transit Engine URL is required so the filter knows the location of the Transit Engine within the Vault instance.

The URL is formed from the concatenation of the Api Address (reported by Vault reported by during starts up) with the complete path to Transit Engine, including the name of the engine itself. If Namespacing is used on the Vault instance, the path needs to include the namespace(s). The URL will end with /transit unless the -path parameter was used when enabling the engine.

If namespacing is not in use, the URL will look like this:

https://myvaultinstance:8200/v1/transit

If namespacing is in use, the path must include the namespaces. For example, if there is a parent namespace is a and a child namespace is b, the URL will look like this:

https://myvaultinstance:8200/v1/a/b/transit

If the name of the Transit engine was changed (using the -path argument to the vault secrets enable transit command) the URL will look like this:

https://myvaultinstance:8200/v1/mytransit
Establish the naming convention for keys within Vault hierarchy

It is necessary to determine a naming convention for the KEKs within Vault. This will allow the keys used by the filter to be kept separate from any keys used by other systems. This document assumes that the naming convention will be to prefix the keys used by the filter with the word KEK_. If a different naming convention is used, adapt the instructions accordingly.

Administrator Actor

To use the filter, there must be an administrative actor established. This actor, which is likely to be a human, has the responsibility to create keys within Vault for use by the filter.

The Administrator must have permissions to log in to Vault and create keys beneath transit/keys/KEK_* in the Vault hierarchy.

The exact steps required to establish an administrative actor will depend on the way that Vault instance has been setup.

A minimal Vault policy required by the Administrator is as follows:

path "transit/keys/KEK_*" {
capabilities = ["read", "write"]
}
Filter Actor

To use the filter, there must be a Vault identity established for the filter itself. This identity must have permissions to perform the operations needed for envelope encryption (generating and decrypting DEKs).

Create a Vault policy for the filter actor:

vault policy write kroxylicious_encryption_filter_policy - << EOF
path "transit/keys/KEK_*" {
capabilities = ["read"]
}
path "/transit/datakey/plaintext/KEK_*" {
capabilities = ["update"]
}
path "transit/decrypt/KEK_*" {
capabilities = [ "update"]
}
EOF

Create a Periodic (long-lived) Vault Token for the filter[1]:

vault token create -display-name "kroxylicious encryption filter" \
                   -policy=kroxylicious_encryption_filter_policy \
                   -period=768h \                                     (1)
                   -no-default-policy \                               (2)
                   -orphan                                            (3)
1 Causes the token to be periodic (with every renewal using the given period).
2 Detach the "default" policy from the policy set for this token. This is done so the token has least-privilege.
3 Create the token with no parent. This is done so that expiration of a parent won’t expire the token used by the filter.

The token create command yields the token. The token value is required later when configuring the vault within the filter.

token              hvs.CAESIFJ_HHo0VnnW6DSbioJ80NqmuYm2WlON-QxAPmiJScZUGh4KHGh2cy5KdkdFZUJMZmhDY0JCSVhnY2JrbUNEWnE
token_accessor     4uQZJbEnxW4YtbDBaW6yVzwP
token_policies     [kroxylicious_encryption_filter_policy]

The token must be renewed before expiration. It is the responsibility of the Administrator to do this.

This can be done with a command like:

vault token renew --accessor <token_accessor>
Testing the Kroxylicious Vault Token using the CLI

To test whether the Kroxylicious Vault Token and the policy are working correctly, a script can be used.

First, as an Administrator, create a KEK in the hierarchy at this path transit/keys/KEK_testkey.

VAULT_TOKEN=<kroxylicious encryption filter token> validate_vault_token.sh <kek path>

The script should respond 'Ok'. If errors are reported check the policy/token configuration.

transit/keys/KEK_testkey can now be removed.

AWS Key Management Service

The filter is able to integrate with AWS Key Management Service.

Follow the instructions below to prepare your AWS account. You’ll need a privileged AWS user that is capable of creating users and policies to perform the set-up.

Establish an aliasing convention for keys within AWS KMS

The filter references KEKs within AWS via an AWS key alias.

It is necessary to establish a naming convention for the alias names. This will allow the keys used by the filter to be kept separate from any keys used by other systems. This document assumes that the naming convention will be to prefix the alias used by the filter with the word KEK_. If a different naming convention is used, adapt the instructions accordingly.

Administrator Actor

To use the filter, there must be an administrative identity established within AWS IAM. This user, which is likely to be a human, has the responsibility to manage keys and aliases within AWS KMS for use by the filter.

Use AWS IAM to create a kroxylicious-admin user. Attach the policies AWSKeyManagementServicePowerUser and IAMUserChangePassword to the user. You may wish to attach policy AWSCloudShellFullAccess so the Administrator can use the AWS CloudShell to use CLI to manage the KEKs. Grant the user access to the Console.

If using the CLI, the following commands can be used to establish the Administrator user. This example illustrates using the user-name kroxylicious-admin. The choice of name is not significant. If a different user-name is used, adapt the instructions accordingly.

ADMIN=kroxylicious-admin
INITIAL_PASSWORD=$(tr -dc 'A-Za-z0-9!?%=' < /dev/urandom | head -c 10)
CONSOLE_URL=https://$(aws sts get-caller-identity --query Account --output text).signin.aws.amazon.com/console
aws iam create-user --user-name ${ADMIN}
aws iam attach-user-policy --user-name ${ADMIN} --policy-arn arn:aws:iam::aws:policy/AWSKeyManagementServicePowerUser
aws iam attach-user-policy --user-name ${ADMIN} --policy-arn arn:aws:iam::aws:policy/IAMUserChangePassword
aws iam attach-user-policy --user-name ${ADMIN} --policy-arn arn:aws:iam::aws:policy/AWSCloudShellFullAccess
aws iam create-login-profile --user-name ${ADMIN} --password ${INITIAL_PASSWORD} --password-reset-required
echo Now log in at ${CONSOLE_URL}  with user name ${ADMIN} password ${INITIAL_PASSWORD} and change the password.
Filter Actor

The Record Encryption Filter needs to be able to log in to AWS itself. For this there needs to be service account identity established within AWS IAM.

Use AWS IAM to create the kroxylicious user. Create an Access Key for this user. The Access Key/Secret Key pair will be used by the Filter. Do not enable the Console for this user.

If using the CLI, these commands can be used to establish the Filter Actor. This example illustrates using the user-name kroxylicious. The choice of name is not significant. If a different user-name is used, adapt the instructions accordingly.

aws iam create-user --user-name kroxylicious
aws iam create-access-key --user-name kroxylicious
Create Alias Based Policy

Create an alias based policy granting permissions to use keys aliased by the established alias naming convention.

AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
cat > /tmp/policy << EOF
{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Sid": "AliasBasedIAMPolicy",
			"Effect": "Allow",
			"Action": [
				"kms:Encrypt",
				"kms:Decrypt",
				"kms:GenerateDataKey*",
				"kms:DescribeKey"
			],
			"Resource": [
                "arn:aws:kms:*:${AWS_ACCOUNT_ID}:key/*"
			],
			"Condition": {
				"ForAnyValue:StringLike": {
					"kms:ResourceAliases": "alias/KEK_*"
				}
			}
		}
	]
}
EOF
aws iam create-policy --policy-name KroxyliciousRecordEncryption --policy-document file:///tmp/policy
Apply Alias Based Policy to Filter Actor

Attach the alias policy to the Filter Actor user. This will allow the Filter actor to invoke the necessary key operations on all KEKs with aliases of the prescribed form.

AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
aws iam attach-user-policy --user-name kroxylicious --policy-arn "arn:aws:iam::${AWS_ACCOUNT_ID}:policy/KroxyliciousRecordEncryption"
Filter Configuration

The filter is configured as part of the filter chain in the following way:

filters:
- type: RecordEncryption                                        (1)
  config:
    kms: <KMS service name>                                       (2)
    kmsConfig:                                                    (3)
      ..:
    selector: <KEK selector service name>                         (4)
    selectorConfig:                                               (5)
      ..:
1 The name of the filter. This must be RecordEncryption, the filter was previously known EnvelopEncryption.
2 The KMS service name.
3 Object providing configuration understood by KMS provider.
4 The KEK selector service name.
5 Object providing configuration understood by key selector.
KMS Service configuration

In order to configure the KMS Service, follow these KMS provider specific steps.

HashiCorp Vault

For HashiCorp Vault, the KMS configuration looks like this. Use the Vault Token and Vault Transit Engine URL values that you gathered above.

kms: VaultKmsService                                          (1)
kmsConfig:
  vaultTransitEngineUrl: <vault transit engine service url>   (2)
  tls:                                                        (3)
  vaultToken:                                                 (4)
    passwordFile: /opt/vault/token
1 Name of the KMS provider. This must be VaultKmsService.
2 Vault Transit Engine URL including the protocol part, i.e. https: or http:
3 (Optional) TLS trust configuration.
4 File containing the Vault Token

For TLS trust and TLS client authentication configuration, the filter accepts the same TLS parameters as Upstream TLS except the PEM store type is currently not supported.

AWS Key Management Service

For AWS KMS, the KMS configuration looks like this.

kms: AwsKmsService                                            (1)
kmsConfig:
  endpointUrl: https://kms.<region>.amazonaws.com             (2)
  tls:                                                        (3)
  accessKey:
    passwordFile: /opt/aws/accessKey                          (4)
  secretKey:
    passwordFile: /opt/aws/secretKey                          (5)
  region: <region>                                            (6)
1 Name of the KMS provider. This must be AwsKmsService.
2 AWS Endpoint URL. This must include the https:// scheme part.
3 (Optional) TLS trust configuration.
4 File containing the AWS Access Key
5 File containing the AWS Secret Key
6 AWS region identifier (e.g. us-east-1)

For TLS trust and TLS client authentication configuration, the filter accepts the same TLS parameters as Upstream TLS except the PEM store type is currently not supported.

KEK selector configuration

The role of the KEK selector is to map from the topic name to key name. The filter looks up the resulting key name in the KMS.

If the filter is unable to find the key in the KMS, the filter will pass through the records belonging to that topic in the produce request without encrypting them.
Template KEK Selector

The TemplateKekSelector maps from topic name to key name. The template understands the substitution token ${topicName} which is replaced by the name of the topic. It can be used to build key names that include the topic name being encrypted.

Use the ${topicName} is optional. It is possible to pass a literal string. This will result in all topics being encrypted using the same key.

selector: TemplateKekSelector                                 (1)
selectorConfig:
  template: "KEK_${topicName}"                                (2)
1 The name of the KEK selector. This must be TemplateKekSelector.
2 Template used to build the key name from the topic name.
Establishing the keys in the KMS

It is the role of the Administrator to create KEKs in the KMS that will be used to encrypt the records. This must be done using whatever management interface the KMS provides.

The names (or aliases) of the encryption keys must match the naming conventions established above. If the selector generates a key name that doesn’t exist within the KMS, records will be sent to the topic without encryption.

For example, if using the TemplateKekSelector with the template KEK_${topicName}, create a key for every topic that is to be encrypted with the key name matching the topic name, prefixed by the string KEK_.

HashiCorp Vault

Using the Administrator actor, use either the HashiCorp UI or CLI to create AES-256 symmetric keys following your key naming convention. The key type must be aes256-gcm96, which is Vault’s default key type.

It is recommended to use a key rotation policy.

If using the Vault CLI, the command will look like:

vault write -f transit/keys/KEK_trades type=aes256-gcm96 auto_rotate_period=90d

AWS Key Management Service

As the Administrator, use either the AWS Console or CLI to create a Symmetric key with Encrypt and decrypt usage. Multi-region keys are supported. It is not possible to make use of keys from other AWS accounts[2].

Give the key an alias following the alias naming convention established above.

If using the CLI, this can be done with commands like this:

KEY_ALIAS="KEK_<name>"
KEY_ID=$(aws kms create-key | jq -r '.KeyMetadata.KeyId')
# the create key command will produce JSON output including the KeyId
aws kms create-alias --alias-name alias/${KEY_ALIAS} --target-key-id ${KEY_ID}

Once the key is created, it is recommended to use a key rotation policy.

aws kms enable-key-rotation --key-id ${KEY_ID} --rotation-period-in-days 180
Verifying that encryption is occurring

To verify that records sent to topics are indeed being encrypted, use kafka-console-consumer to consume the records directly from the target Kafka Cluster. Verify that encrypted text is seen rather than whatever plain text that was sent by producer.

kafka-console-consumer --bootstrap-server mycluster:8092 --topic trades --from-beginning

The record values seen will look something like this:

tradesvault:v1:+EfJ977UG1XkjI9yh7vxpgN2E1DKaIkDuxE+eCprVTKr+sskFuChcTe/KpR/c8ZDyP76W3itExmEzLOl����x)�Ũ�z�:S�������tБ��v���

Multi-tenancy

What is it?

Multi-tenancy filter enables isolation prefixing each metadata transiting through virtual cluster to target cluster. See Apache Kafka multi-tenancy documentation for more information.

How to use the filter

There are two steps to using the filter.

  1. Configuring virtual clusters

  2. Configuring the filter within Kroxylicious.

Configuring the filter within Kroxylicious.
filters:
  - type: MultiTenantTransformationFilterFactory
    config:
      prefixResourceNameSeparator: "." (1)
1 - is the default separator if no config is provided

If the virtual cluster name is demo, the created prefix will be demo.

Currently, only the prefix with separator is validated.
Verifying that multi-tenancy is occurring

Set up two virtual clusters devenv1 & devenv2. See this example.

Create a topic on devenv1

kafka-topics.sh --bootstrap-server devenv1:9392 --create --if-not-exists --topic first-topic

Create a topic on devenv2

kafka-topics.sh --bootstrap-server devenv2:9392 --create --if-not-exists --topic second-topic

List topics from devenv1

kafka-topics.sh --bootstrap-server devenv1:9392 --list

List topics from devenv2

kafka-topics.sh --bootstrap-server devenv2:9392 --list

A full example with kubernetes is available here

Schema Validation

What is it?

Schema validation filter that enables validating records using an existing schema in Apicurio Registry.

How to use the filter

There is just one step to use the filter.

  1. Configuring virtual clusters

  2. Configuring the filter within Kroxylicious.

Configuring the filter within Kroxylicious.
filters:
  - type: ProduceValidationFilterFactory
    config:
        rules:
        - topicNames:
            - test-topic
            valueRule:
                schemaValidationConfig:
                    apicurioGlobalId: 1
                    apicurioRegistryUrl: http://localhost:8080

The apicurioGlobalId parameter is the global identifier of the schema that will be used to validate the value of the records in test-topic. The second parameter, apicurioRegistryUrl, is the URL of your Apicurio Registry instance.

With this configuration, the values of all the records you produce to the test-topic 1 will be validated using the schema present in Apicurio Registry with global identifier 1.

OAUTHBEARER validation

What is it?

OauthBearerValidation filter enables a validation on the JWT token received from client before forwarding it to cluster.

If the token is not validated, then the request is short-circuited. It reduces resource consumption on the cluster when a client sends too many invalid SASL requests.

Diagram

How to use the filter

There are two steps to using the filter.

  1. Configuring virtual clusters

  2. Configuring the filter within Kroxylicious.

Configuring the filter within Kroxylicious.
filters:
  - type: OauthBearerValidation
    config:
      jwksEndpointUrl: https://oauth/JWKS   (1)
      jwksEndpointRefreshMs: 3600000        (2)
      jwksEndpointRetryBackoffMs: 100       (3)
      jwksEndpointRetryBackoffMaxMs: 10000  (4)
      scopeClaimName: scope                 (5)
      subClaimName: sub                     (6)
      authenticateBackOffMaxMs: 60000       (7)
      authenticateCacheMaxSize: 1000        (8)
      expectedAudience: https://first.audience, https//second.audience (9)
      expectedIssuer: https://your-domain.auth/ (10)
1 The OAuth/OIDC provider URL from which the provider’s JWKS (JSON Web Key Set) can be retrieved.
2 The (optional) value in milliseconds for the broker to wait between refreshing its JWKS (JSON Web Key Set) cache that contains the keys to verify the signature of the JWT.
3 The (optional) value in milliseconds for the initial wait between JWKS (JSON Web Key Set) retrieval attempts from the external authentication provider.
4 The (optional) value in milliseconds for the maximum wait between attempts to retrieve the JWKS (JSON Web Key Set) from the external authentication provider.
5 This (optional) setting can provide a different name to use for the scope included in the JWT payload’s claims.
6 This (optional) setting can provide a different name to use for the subject included in the JWT payload’s claims.
7 The (optional) maximum value in milliseconds to limit the client sending authenticate request. Setting 0 will never limit the client. Otherwise, an exponential delay is added to each authenticate request until the authenticateBackOffMaxMs has been reached.
8 The (optional) maximum number of failed tokens kept in cache.
9 The (optional) comma-delimited setting for the broker to use to verify that the JWT was issued for one of the expected audiences.
10 The (optional) setting for the broker to use to verify that the JWT was created by the expected issuer.

Note: OauthBearer config follows kafka’s properties

Community filters

Community contributed filters are showcased in the Community Gallery.

Custom Filters

Custom filters can be written in the Java programming language. Knowledge of the Kafka protocol is generally required to write a protocol filter.

There is currently one class of Custom Filters users can implement:

Protocol filters

Allow customisation of how protocol messages are handled on their way to, or from, the Cluster.

The following sections explain in more detail how to write your own filters.

Sample Custom Filter Project

A collection of sample filters is available within the Kroxylicious repository for you to download, try out, and customise. You can find them here for a hands-on introduction to creating your own custom filters.

API docs

Custom Protocol Filters are built by implementing interfaces supplied by the kroxylicious-api module (io.kroxylicious:kroxylicious-api on maven central). You can view the javadoc here.

Dependencies

How filter classes are loaded is not currently defined by the filter contract. In other words, filters might be loaded using a classloader-per-filter model, or using a single class loader. This doesn’t really make a difference to filter authors except where they want to make use of libraries as dependencies. Because those dependencies might be loaded by the same classloader as the dependencies of other filters there is the possibility of collision. Filter A and Filter B might both want to use Library C, and they might want to use different versions of Library C.

For common things like logging and metric facade APIs it is recommended to use the facade APIs which are also used by the proxy core.

Protocol filters

A protocol filter is a public top-level, concrete class with a particular public constructor and which implements one or more protocol filter interfaces. You can implement two distinct types of Custom Protocol Filter:

Note that these types are mutually exclusive, for example a Filter is not allowed to implement both RequestFilter and MetadataRequestFilter. This is to prevent ambiguity. If we received a MetadataRequest, would it be dispatched to the onMetadataRequest(..) method of MetadataRequestFilter or the onRequest method of RequestFilter, or both? Instead, we disallow these combinations, throwing an exception at runtime if your Filter implements incompatible interfaces.

Specific Message Protocol Filters

A filter may wish to intercept specific types of Kafka messages. For example, intercept all Produce Requests, or intercept all Fetch Responses. To support this case Kroxylicious provides an interfaces for all request types and response types supported by Kafka (at the version of Kafka Kroxylicious depends on). A filter implementation can implement any combination of these interfaces.

There is no requirement that a Filter handles both the request and response halves of an RPC. A Filter can choose to intercept only the request, or only the response, or both the request and response.

Examples

To intercept all Fetch Requests your class would implement FetchRequestFilter:

public class FetchRequestClientIdFilter implements FetchRequestFilter {

    @Override
    public CompletionStage<RequestFilterResult> onFetchRequest(short apiVersion,
                                                               RequestHeaderData header,
                                                               FetchRequestData request,
                                                               FilterContext context) {
        header.setClientId("fetch-client!");
        return context.forwardRequest(header, request);
    }
}

To intercept all Fetch Responses your class would implement FetchResponseFilter:

public class FetchRequestClientIdFilter implements FetchResponseFilter {

    @Override
    public CompletionStage<ResponseFilterResult> onFetchResponse(short apiVersion,
                                                                 ResponseHeaderData header,
                                                                 FetchResponseData response,
                                                                 FilterContext context) {
        mutateResponse(response);
        return context.forwardResponse(header, response);
    }
}

To intercept all Fetch Requests and all Fetch Responses your class would implement FetchRequestFilter and FetchResponseFilter:

public class FetchRequestClientIdFilter implements FetchRequestFilter, FetchResponseFilter {

    @Override
    public CompletionStage<RequestFilterResult> onFetchRequest(short apiVersion,
                                                               RequestHeaderData header,
                                                               FetchRequestData request,
                                                               FilterContext context) {
        header.setClientId("fetch-client!");
        return context.forwardRequest(header, request);
    }

    @Override
    public CompletionStage<ResponseFilterResult> onFetchResponse(short apiVersion,
                                                                 ResponseHeaderData header,
                                                                 FetchResponseData response,
                                                                 FilterContext context) {
        mutateResponse(response);
        return context.forwardResponse(header, response);
    }
}

Specific Message Filter interfaces are mutually exclusive with Request/Response. Kroxylicious will reject invalid combinations of interfaces.

Request/Response Protocol Filters

A filter may wish to intercept every message being sent from the Client to the Cluster or from the Cluster to the Client. To do this your custom filter will implement:

Custom filters are free to implement either interface or both interfaces to intercept all messages.

For example:

public class FixedClientIdFilter implements RequestFilter {

    @Override
    public CompletionStage<RequestFilterResult> onRequest(ApiKeys apiKey,
                                                          RequestHeaderData header,
                                                          ApiMessage body,
                                                          FilterContext filterContext) {
        header.setClientId("example!");
        return filterContext.forwardRequest(header, body);
    }

}

Request/Response Filter interfaces are mutually exclusive with Specific Message interfaces. Kroxylicious will reject invalid combinations of interfaces.

The Filter Result

As seen above, filter methods (onXyz[Request|Response]) must return a CompletionStage<FilterResult> object. It is the job of FilterResult to convey what message is to forwarded to the next filter in the chain (or broker /client if at the chain’s beginning or end). It is also used to carry instructions such as indicating that the connection must be closed, or a message dropped.

If the filter returns a CompletionStage that is already completed normally, Kroxylicious will immediately perform the action described by the FilterResult.

The filter may return a CompletionStage that is not yet completed. When this happens, Kroxylicious will pause reading from the downstream (the Client writes will eventually block), and it begins to queue up in-flight requests/responses arriving at the filter. This is done so that message order is maintained. Once the CompletionStage completes, the action described by the FilterResult is performed, reading from the downstream resumes and any queued up requests/responses are processed.

The pausing of reads from the downstream is a relatively costly operation. To maintain optimal performance filter implementations should minimise the occasions on which an incomplete CompletionStage is returned.

If the CompletionStage completes exceptionally, the connection is closed. This also applies if the CompletionStage does not complete within a timeout (20000 milliseconds).

Creating a Filter Result

The FilterContext is the factory for the FilterResult objects.

There are two convenience methods[3] that simply allow a filter to forward a result to the next filter. We’ve already seen these in action above.

  • context.forwardRequest(header, request) used by result filter to forward a request.

  • context.forwardResponse(header, response) used by result filter to forward a request.

To access richer features, use the filter result builders context.requestFilterResultBuilder() and responseFilterResultBuilder().

Filter result builders allow you to:

  1. forward a request/response: .forward(header, request).

  2. signal that a connection is to be closed: .withCloseConnection().

  3. signal that a message is to be dropped (i.e. not forwarded): .drop().

  4. for requests only, send a short-circuit response: .shortCircuitResponse(header, response)

The builder lets you combine legal behaviours together. For instance, to close the connection after forwarding a response to a client, a response filter could use:

return context.responseFilterResultBuilder()
        .forward(header, response)
        .withCloseConnection()
        .complete();

The builders yield either a completed CompletionStage<FilterResult> which can be returned directly from the filter method, or bare FilterResult. The latter exists to support asynchronous programming styles allowing you to use your own Futures.

The drop behaviour can be legally used in very specific circumstances. The Kafka Protocol is, for the most part, strictly request/response with responses expected in the order the request were sent. The client will fail if the contract isn’t upheld. The exception is Produce where acks=0. Filters may drop these requests without introducing a protocol error.

The protocol filter lifecycle

Instances of the filter class are created on demand when a protocol message is first sent by a client. Instances are specific to the channel between a single client and a single broker.

It exists while the client remains connected.

Handling state

The simplest way of managing per-client state is to use member fields. The proxy guarantees that all methods of a given filter instance will always be invoked on the same thread (also true of the CompletionStage completion in the case of Sending asynchronous requests to the Cluster). Therefore, there is no need to use synchronization when accessing such fields.

See the io.kroxylicious.proxy.filter package javadoc for more information on thread-safety.

Filter Patterns

Kroxylicious Protocol Filters support several patterns:

Intercepting Requests and Responses

This is a common pattern, we want to inspect or modify a message. For example:

public class SampleFetchResponseFilter implements FetchResponseFilter {
    @Override
    public CompletionStage<ResponseFilterResult> onFetchResponse(short apiVersion,
                                                                 ResponseHeaderData header,
                                                                 FetchResponseData response,
                                                                 FilterContext context) {
        mutateResponse(response, context); (1)
        return context.forwardResponse(header, response); (2)
    }
}
1 We mutate the response object. For example, you could alter the records that have been fetched.
2 We forward the response, sending it towards the client, invoking Filters downstream of this one.
We can only forward the response and header objects passed into the onFetchResponse. New instances are not supported.
Sending Response messages from a Request Filter towards the Client (Short-circuit responses)

In some cases we may wish to not forward a request from the client to the Cluster. Instead, we want to intercept that request and generate a response message in a Kroxylicious Protocol Filter and send it towards the client. This is called a short-circuit response.

Diagram
Figure 5. Illustration of responding without proxying

For example:

public class CreateTopicRejectFilter implements CreateTopicsRequestFilter {

    public CompletionStage<RequestFilterResult> onCreateTopicsRequest(short apiVersion, RequestHeaderData header, CreateTopicsRequestData request,
                                                                      FilterContext context) {
        CreateTopicsResponseData response = new CreateTopicsResponseData();
        CreateTopicsResponseData.CreatableTopicResultCollection topics = new CreateTopicsResponseData.CreatableTopicResultCollection(); (1)
        request.topics().forEach(creatableTopic -> {
            CreateTopicsResponseData.CreatableTopicResult result = new CreateTopicsResponseData.CreatableTopicResult();
            result.setErrorCode(Errors.INVALID_TOPIC_EXCEPTION.code()).setErrorMessage(ERROR_MESSAGE);
            result.setName(creatableTopic.name());
            topics.add(result);
        });
        response.setTopics(topics);
        return context.requestFilterResultBuilder().shortCircuitResponse(response).completed(); (2)
    }
}
1 Create a new instance of the corresponding response data and populate it. Note you may need to use the apiVersion to check which fields can be set at this request’s API version.
2 We generate a short-circuit response that will send it towards the client, invoking Filters downstream of this one.

This will respond to all Create Topic requests with an error response without forwarding any of those requests to the Cluster.

Closing the connections

There is a useful variation on the pattern above, where the filter needs, in addition to sending an error response, also to cause the connection to close. This is useful in use-cases where the filter wishes to disallow certain client behaviours.

public class DisallowAlterConfigs implements AlterConfigsRequestFilter {

    @Override
    public CompletionStage<RequestFilterResult> onAlterConfigsRequest(short apiVersion, RequestHeaderData header, AlterConfigsRequestData request,
                                                                      FilterContext context) {
        var response = new AlterConfigsResponseData();
        response.setResponses(request.resources().stream()
                .map(a -> new AlterConfigsResourceResponse()
                        .setErrorCode(Errors.INVALID_CONFIG.code())
                        .setErrorMessage("This service does not allow this operation - closing connection"))
                .toList());
        return context.requestFilterResultBuilder()
                         .shortCircuitResponse(response)
                         .withCloseConnection() (1)
                         .completed();
    }
}
1 We enable the close connection option on the builder. This will cause Kroxylicious to close the connection after the response is sent to the client.
Sending asynchronous requests to the Cluster

Filters can make additional asynchronous requests to the Cluster. This is useful if the Filter needs additional information from the Cluster in order to know how to mutate the filtered request/response.

The Filter can make use of CompletionStage chaining features ([#thenApply() etc.) to organise for actions to be done once the asynchronous request completes. For example, it could chain an action that mutates the filtered request/response using the asynchronous response, and finally, chain an action to forward the request/response to the next filter.

The asynchronous request/response will be intercepted by Filters upstream of this Filter. Filters downstream of this Filter (and the Client) do not see the asynchronous response.

Let’s take a look at an example. We’ll send an asynchronous request towards the Cluster for topic metadata while handling a FetchRequest and use the response to mutate the FetchRequest before passing it to the next filter in the chain.

public class FetchFilter implements FetchRequestFilter {
    public static final short METADATA_VERSION_SUPPORTING_TOPIC_IDS = (short) 12;

    @Override
    public CompletionStage<RequestFilterResult> onFetchRequest(ApiKeys apiKey,
                                                               RequestHeaderData header,
                                                               FetchRequestData request,
                                                               FilterContext context) {
        var metadataRequestHeader = new RequestHeaderData().setRequestApiVersion(METADATA_VERSION_SUPPORTING_TOPIC_IDS); (1)
        var metadataRequest = new MetadataRequestData(); (2)
        var topic = new MetadataRequestData.MetadataRequestTopic();
        topic.setTopicId(Uuid.randomUuid());
        metadataRequest.topics().add(topic);
        var stage = context.sendRequest(metadataRequestHeader, metadataRequest); (3)
        return stage.thenApply(metadataResponse -> mutateFetchRequest(metadataResponse, request)) (4)
                    .thenCompose(mutatedFetchRequest -> context.forwardRequest(header, mutatedFetchRequest)); (5)
    }
}
1 We construct a header object for the asynchronous request. It is important to specify the API version of the request that is to be used. The version chosen must be a version known to the Kafka Client used by Kroxylicious and must be an API version supported by the Target Cluster.
2 We construct a new request object. When constructing the request object, care needs to be taken to ensure the request is populated with the structure which matches the API version you have chosen. Refer to the Kafka Protocol Guide for more details.
3 We asynchronously send the request towards the Cluster and obtain a CompletionStage which will contain the response.
4 We use a computation stage to mutate the filtered fetch request using the response from the request sent at <3>.
5 We use another computation stage to forward the mutated request.

As you have read above, we need to know the API version we want our request to be encoded at. Your filter can discover what versions of an API the Kafka Cluster supports. To do this use the ApiVersionsService available from the FilterContext to determine programmatically what versions of an API are support and then write code to make a suitable request object.

Kroxylicious provides the guarantee that computation stages chained using the default execution methods are executed on the same thread as the rest of the Filter work, so we can safely mutate Filter members without synchronising. See the io.kroxylicious.proxy.filter package javadoc for more information on thread-safety.
Filtering specific API Versions

Kafka has a "bidirectional" client compatibility policy. In other words, new clients can talk to old servers, and old clients can talk to new servers. This allows users to upgrade either clients or servers without experiencing any downtime.

Since the Kafka protocol has changed over time, clients and servers need to agree on the schema of the message that they are sending over the wire. This is done through API versioning.

Before each request is sent, the client sends the API key and the API version. These two 16-bit numbers, when taken together, uniquely identify the schema of the message to follow.

You may wish to restrict your Filter to only apply to specific versions of an API. For example, "intercept all FetchRequest messages greater than api version 7". To do this you can override a method named shouldHandleXyz[Request|Response] on your filter like:

public class FetchFilter implements FetchRequestFilter {

    @Override
    public boolean shouldHandleFetchRequest(short apiVersion) {
        return apiVersion > 7;
    }

    @Override
    @Override
    public CompletionStage<RequestFilterResult> onRequest(ApiKeys apiKey,
                                                          RequestHeaderData header,
                                                          ApiMessage body,
                                                          FilterContext filterContext) {
        return context.forwardRequest(header, request);
    }
}

Filter Construction and Configuration

For Kroxylicious to instantiate and configure your custom filter we use Java’s ServiceLoader API. Each Custom Filter should provide a corresponding FilterFactory implementation that can create an instance of your custom Filter. The factory can optionally declare a configuration class that Kroxylicious will populate (using Jackson) when loading your custom Filter. The module must package a META-INF/services/io.kroxylicious.proxy.filter.FilterFactory file containing the classnames of each filter factory implementation into the JAR file.

For example in the kroxylicious-samples we have the SampleFilterConfig class. This is used in the SampleFetchResponseFilter). The configuration is routed to the Filter instance via the SampleFetchResponseFilterFactory.

Then, when we configure a filter in Kroxylicious configuration like:

filters:
- type: SampleFetchResponseFilterFactory
  config:
    findValue: a
    replacementValue: b

Kroxylicious will deserialize the config object into a SampleFilterConfig and use it to construct a SampleFetchResponseFilter passing the SampleFilterConfig instance as a constructor argument.

Packaging filters

Filters are packaged as standard .jar files. A typical Custom Filter jar contains:

  1. Filter implementation classes

  2. A FilterFactory implementation per Filter and service metadata (see Filter Construction and Configuration)

Deploying proxies

Topologies etc

Selecting plugins

Put the filter jars in some directory

Configuring virtual clusters

As described earlier, the Virtual Cluster is the downstream representation of a Kafka Cluster. Kafka Client connect to the Virtual Cluster.

You must define at least one Virtual Cluster.

Let’s look at how that is done by considering first a simple example. After we will look at more advanced options including TLS.

virtualClusters:
  demo:                                         (1)
    targetCluster:
      bootstrap_servers: myprivatecluster:9092  (2)
    clusterNetworkAddressConfigProvider:
      type: PortPerBrokerClusterNetworkAddressConfigProvider                       (3)
      config:
        bootstrapAddress: mypublickroxylicious:9192    (4)
1 The name of the virtual cluster.
2 The bootstrap of the (physical) Kafka Cluster. This is the Kafka Cluster being proxied.
3 The name of a cluster network address config provider. The built-in types are PortPerBrokerClusterNetworkAddressConfigProvider and SniRoutingClusterNetworkAddressConfigProvider.
4 The hostname and port of the bootstrap that will be used by the Kafka Clients. The hostname must be resolved by the clients.

This configuration declares a virtual cluster called demo. The physical Kafka Cluster being proxied is the defined by the targetCluster element. In this example, the PortPerBroker scheme is used by Kroxylicious to present the virtual cluster to the clients. Under this schema, Kroxylicious will open a port for each broker of the target cluster with port numbers beginning at 9192 +1. So, if the target cluster has three brokers, Kroxylicious will bind 9192 for bootstrap and 9193-9195 inclusive to allow the clients to connect to each broker.

Cluster Network Address Config Providers

The Cluster Network Address Config Provider controls how the virtual cluster is presented to the network. Two alternatives are supported: PortPerBroker and SniRouting which have different characteristics which make each suitable for different use-cases. They are described next.

PortPerBroker scheme

In the PortPerBroker scheme, Kroxylicious automatically opens a port for each virtual cluster’s bootstrap and one port per broker of each target cluster. So, if you have two virtual clusters, each targeting a Kafka Cluster of three brokers, Kroxylicious will bind eight ports in total.

PortPerBroker is designed to work best with simplistic configurations. It is preferable if the target cluster has sequential, stable broker ids and a known minimum broker id (like 0,1,2 for a cluster of 3 brokers). It can work with non-sequential broker ids, but you would have to expose maxBrokerId - minBrokerId ports, which could be a huge number if your cluster included broker ids 0 and 20000.

Kroxylicious monitors the broker topology of the target cluster at runtime. It will adjust the number of open ports dynamically. If another broker is added to the Kafka Cluster, Kroxylicious will automatically open a port for it. Similarly, if a broker is removed from the Kafka Cluster, Kroxylicious will automatically close the port that was assigned to it.

The PortPerBroker scheme can be used with either clear text or TLS downstream connections.

clusterNetworkAddressConfigProvider:
  type: PortPerBrokerClusterNetworkAddressConfigProvider
  config:
    bootstrapAddress: mycluster.kafka.com:9192                   (1)
    brokerAddressPattern: mybroker-$(nodeId).mycluster.kafka.com (2)
    brokerStartPort: 9193                                        (3)
    numberOfBrokerPorts: 3                                       (4)
    lowestTargetBrokerId: 1000                                   (5)
    bindAddress: 192.168.0.1                                     (6)
1 The hostname and port of the bootstrap that will be used by the Kafka Clients.
2 (Optional) The broker address pattern used to form the broker addresses. If not defined, it defaults to the hostname part of the bootstrapAddress and the port number allocated to the broker.
3 (Optional) The starting number for broker port range. Defaults to the port of the bootstrapAddress plus 1.
4 (Optional) The maximum number of brokers of ports that are permitted. Defaults to 3.
5 (Optional) The lowest broker id of the target cluster. Defaults to 0. This should be the lowest node.id (or broker.id) defined in the target cluster.
6 (Optional) The bind address used when binding the ports. If undefined, all network interfaces will be bound.

The brokerAddressPattern configuration parameter understands the replacement token $(nodeId). It is optional. If present, it will be replaced by the node.id (or broker.id) assigned to the broker of the target cluster.

For example if your configuration looks like the above and your cluster has three brokers, your Kafka Client will receive broker address information like this:

0.  mybroker-0.mycluster.kafka.com:9193
1.  mybroker-1.mycluster.kafka.com:9194
2.  mybroker-2.mycluster.kafka.com:9194
It is a responsibility for the deployer of Kroxylicious to ensure that the bootstrap and generated broker DNS names are resolvable and routable by the Kafka Client.

The numberOfBrokerPorts imposes a maximum on the number of brokers that a Kafka Cluster can have. Set this value to be as high as the maximum number of brokers that your operational rules allow for a Kafka Cluster.

Note that each broker’s id must be greater than or equal to lowestTargetBrokerId, and less than lowestTargetBrokerId + numberOfBrokerPorts. The current strategy for mapping node ids to ports is nodeId → brokerStartPort + nodeId - lowestTargetBrokerId. So a configuration like:

clusterNetworkAddressConfigProvider:
  type: PortPerBrokerClusterNetworkAddressConfigProvider
  config:
    bootstrapAddress: mycluster.kafka.com:9192
    brokerStartPort: 9193
    numberOfBrokerPorts: 3
    lowestTargetBrokerId: 1000

can only map broker ids 1000, 1001 and 1002 to ports 9193, 9194 and 9195 respectively. You would have to reconfigure numberOfBrokerPorts to accommodate new brokers being added to the cluster.

RangeAwarePortPerNode scheme

The original PortPerBroker scheme has the limitation that we only control the lowest target brokerId and a maximum number of brokers. We then expect all brokerIds to fall into the range [lowestBrokerId, lowestBrokerId + maxBrokerCount) We must be able to map every possible broker id to a unique port so that in a cluster of Kroxylicious proxies all members will deterministically map nodeId X to a port Y. Meaning that we may need to allocate many ports that are not required if there are large gaps between nodeIds in the target cluster.

The Range Aware Port Per Node schema introduces the idea of Node ID Ranges, allowing you to model what nodeId ranges exist in the target cluster so that the proxy can expose a more compact number of ports but still retain this deterministic mapping from nodeId to port.

Aside from how it maps nodeIds to ports it behaves the same as Port-Per-Broker.

clusterNetworkAddressConfigProvider:
  type: RangeAwarePortPerNodeClusterNetworkAddressConfigProvider
  config:
    bootstrapAddress: mycluster.kafka.com:9192                   (1)
    brokerAddressPattern: mybroker-$(nodeId).mycluster.kafka.com (2)
    brokerStartPort: 9193                                        (3)
    nodeIdRanges:                                                (4)
      - name: brokers                                            (5)
        range:
          startInclusive: 0                                      (6)
          endExclusive: 3                                        (7)
1 The hostname and port of the bootstrap that will be used by the Kafka Clients.
2 (Optional) The broker address pattern used to form the broker addresses. If not defined, it defaults to the hostname part of the bootstrapAddress and the port number allocated to the broker.
3 (Optional) The starting number for broker port range. Defaults to the port of the bootstrapAddress plus 1.
4 The list of Node ID rangers, must be non-empty.
5 Name of the range, must be unique within the nodeIdRanges list.
6 Start of the range (inclusive)
7 End of the range (exclusive). Must be greater than startInclusive, empty ranges are not allowed.

NodeIdRanges must be distinct, a nodeId cannot be part of more than one range.

The brokerAddressPattern configuration parameter understands the replacement token $(nodeId). It is optional. If present, it will be replaced by the node.id (or broker.id) assigned to the broker of the target cluster.

For example: if I have a target cluster using KRaft that looks like:

  • nodeId: 0, roles: controller

  • nodeId: 1, roles: controller

  • nodeId: 2, roles: controller

  • nodeId: 1000, roles: broker

  • nodeId: 1001, roles: broker

  • nodeId: 1002, roles: broker

  • nodeId: 99999, roles: broker

Then we can model this as three Node Id Ranges:

    clusterNetworkAddressConfigProvider:
      type: RangeAwarePortPerNodeClusterNetworkAddressConfigProvider
      config:
        bootstrapAddress: mycluster.kafka.com:9192
        nodeIdRanges:
          - name: controller
            range:
              startInclusive: 0
              endExclusive: 3
          - name: brokers
            range:
              startInclusive: 1000
              endExclusive: 1003
          - name: broker-outlier
            range:
              startInclusive: 99999
              endExclusive: 100000

Which will result in this mapping from nodeId to Port

  • nodeId: 0 → port 9193

  • nodeId: 1 → port 9194

  • nodeId: 2 → port 9195

  • nodeId: 1000 → port 9196

  • nodeId: 1001 → port 9197

  • nodeId: 1002 → port 9198

  • nodeId: 99999 → port 9199

SniRouting scheme

In the SniRouting scheme, Kroxylicious uses SNI information to route traffic to either the boostrap or individual brokers. As this relies on SNI (Server Name Indication), the use of Downstream TLS is required.

With this scheme, you have the choice to share a single port across all virtual clusters, or you can assign a different port to each. Single port operation may have cost advantages when using load balancers of public clouds, as it allows a single cloud provider load balancer to be shared across all virtual clusters.

clusterNetworkAddressConfigProvider:
  type: SniRoutingClusterNetworkAddressConfigProvider
  config:
    bootstrapAddress: mycluster.kafka.com:9192                    (1)
    brokerAddressPattern: mybroker-$(nodeId).mycluster.kafka.com  (2)
    bindAddress: 192.168.0.1                                      (3)
1 The hostname and port of the bootstrap that will be used by the Kafka Clients.
2 The broker address pattern used to form the broker addresses. The $(nodeId) token must be present.
3 (Optional) The bind address used when binding the port. If undefined, all network interfaces will be bound.

The brokerAddressPattern configuration parameters requires that the $(nodeId) token is present within it. This is replaced by the node.id (or `broker.id) assigned to the broker of the target cluster. This allows Kroxylicious to generate separate routes for each broker.

It is a responsibility for the deployer of Kroxylicious to ensure that the bootstrap and generated broker DNS names are resolvable and routable by the Kafka Client.

Transport Layer Security (TLS)

In this section we look at how to enable TLS for either the downstream and/or upstream. Note, there is no interdependency; it is supported to have TLS configured for the downstream and use clear text communications for the upstream, or vice-versa.

TLS is recommended for both upstream and downstream for production configurations.

Downstream TLS

Here’s how to enable TLS for the downstream side. This means the Kafka Client will connect to the virtual cluster over TLS rather than clear text. For this, you will need to obtain a TLS certificate for the virtual cluster from your Certificate Authority.

When requesting the certificate ensure that the certificate will match the names of the virtual cluster’s bootstrap and broker addresses. This may mean making use of wildcard certificates and/or Subject Alternative Names (SANs).

Kroxylicious accepts key material in PKCS12 or JKS keystore format, or PEM formatted file(s). The following configuration illustrates configuration with PKCS12 keystore.

virtualClusters:
  demo:
    tls:
        key:
          storeFile: /opt/cert/server.p12               (1)
          storePassword:
            passwordFile: /opt/cert/store.password      (2)
          keyPassword:
            passwordFile: /opt/cert/key.password        (3)
          storeType: PKCS12                             (4)
    clusterNetworkAddressConfigProvider:
      ...
1 File system location of a keystore (or in the case of PEM format a text file containing the concatenation of the private key, certificate, and intermediates).
2 File system location of a file containing the key store’s password.
3 (Optional) File system location of a file containing the key’s password. If omitted the key store’s password is used to decrypt the key too.
4 (Optional) Store type. Supported types are: PKCS12, JKS and PEM. Defaults to Java default key store type which is usually PKCS12.

Alternatively, if your key material is in separate PEM files (private key, and certificate/intermediates), the following configuration may be used:

virtualClusters:
  demo:
    tls:
        key:
          privateKeyFile: /opt/cert/server.key          (1)
          certificateFile: /opt/cert/server.crt         (2)
          keyPassword:
            passwordFile: /opt/cert/key.password        (3)
    clusterNetworkAddressConfigProvider:
      ...
1 File system location of the server private key.
2 File system location of the server certificate and intermediate(s).
3 (Optional) File system location of a file containing the key’s password.
For the private-key, PKCS-8 keys are supported by default. For PKCS-1 keys, Bouncycastle libraries must be added to the Kroxylicious classpath.See https://github.com/netty/netty/issues/7323 for more details.

Upstream TLS

Here’s how to enable TLS for the upstream side.This means that Kroxylicious connects to the (physical) Kafka Cluster) over TLS.For this, your Kafka Cluster must have already been configured to use TLS.

By default, Kroxylicious inherits what it trusts from the platform it is running on and uses this to determine whether the Kafka Cluster is trusted or not.

To support cases where trust must be overridden (such as use-cases involving the use of private CAs or self-signed certificates), Kroxylicious accepts override trust material in PKCS12 or JKS keystore format, or PEM formatted certificates.

The following illustrates enabling TLS, inheriting platform trust:

virtualClusters:
  demo:
    targetCluster:
      bootstrap_servers: myprivatecluster:9092
      tls: {}                                         (1)
      #...
1 Use an empty object to enable TLS inheriting trust from the platform.

The following illustrates enabling TLS but with trust coming from a PKCS12 trust store instead of the platform:

virtualClusters:
  demo:
    targetCluster:
      bootstrap_servers: myprivatecluster:9092
      tls:
        trust:
          storeFile: /opt/cert/trust.p12                (1)
          storePassword:
            passwordFile: /opt/cert/store.password      (2)
          storeType: PKCS12                             (3)
      #...
1 File system location of a truststore (or in the case of PEM format a text file containing the certificates).
2 File system location of a file containing the trust store’s password.
3 (Optional) Trust store type. Supported types are: PKCS12, JKS and PEM. Defaults to Java default key store type (PKCS12).

The following illustrates connection to physical cluster using TLS client authentication (aka Mutual TLS).

virtualClusters:
  demo:
    targetCluster:
      bootstrap_servers: myprivatecluster:9092
      tls:
        key:
          privateKeyFile: /opt/cert/client.key
          certificateFile: /opt/cert/client.cert
        trust:
          storeFile: /opt/cert/client/server.cer
          storeType: PEM

It is also possible to disable trust so that Kroxylicious will connect to any Kafka Cluster regardless of its certificate validity.

This option is not recommended for production use.
virtualClusters:
  demo:
    targetCluster:
      bootstrap_servers: myprivatecluster:9092
      tls:
        trust:
          insecure: true                                (1)
      #...
1 Enables insecure TLS.

YAML Proxy level configuration

Configuring proxy plugins

Filter level configuration

Operating proxies

Logging

The Kroxylicious binary distributions ship with log4j2 as a logging backend. To customize the logging configuration:

1. Use an Alternative Log4j configuration file

When using bin/kroxylicious-start.sh from the binary distribution, you can optionally set an environment variable:

KROXYLICIOUS_LOGGING_OPTIONS="-Dlog4j2.configurationFile=/path/to/custom/log4j2.yaml"

to load an alternative log4j2 configuration.

2. Change the Root Log level

When using bin/kroxylicious-start.sh from the binary distribution and using the default logging configuration file, you can set an environment variable:

KROXYLICIOUS_ROOT_LOG_LEVEL="DEBUG"

to configure the root log level (note this will be very verbose at DEBUG/TRACE)

Monitoring and observability

Kroxylicious uses micrometer as a facade for gathering metrics. A Prometheus backend is the only supported implementation so far.

Admin HTTP Endpoint

Kroxylicious offers a configurable, insecure, HTTP endpoint for administration purposes. It can offer:

  • Prometheus scrape endpoint at /metrics

minimal configuration example
adminHttp:
  endpoints:
    prometheus: {}

Defaults to binding to 0.0.0.0:9190. If no endpoints are configured the admin http server is not created.

complete configuration example
adminHttp:
  host: localhost   (1)
  port: 9999        (2)
  endpoints:
    prometheus: {}  (3)
1 Bind address for the server specified using either a hostname or interface address. If omitted, it will bind to all interfaces (i.e. 0.0.0.0).
2 Port number to be bound. If omitted port 9190 will be bound.
3 Enables the prometheus endpoint.

Micrometer Metrics

Kroxylicious integrates with micrometer.

Micrometer provides a simple facade over the instrumentation clients for the most popular observability systems, allowing you to instrument your JVM-based application code without vendor lock-in.

complete configuration example
adminHttp:
  endpoints:
    prometheus: {}
micrometer:
  - type: "CommonTagsHook"
    config:
      commonTags:
        zone: "euc-1a" (1)
  - type: "StandardBindersHook"
    config:
      binderNames:
      - "JvmGcMetrics" (2)

This configuration:

1 configures a common tag on the global micrometer registry of zone: euc-1a to add to all metrics (becomes a label in prometheus)
2 registers a JvmGcMetrics binder with the global registry (shipped with micrometer)

Prometheus is connected to the micrometer global registry so Filters can record metrics against it, and they will be available as part of the Prometheus scrape data.

If you executed curl localhost:9999/metrics you should see metrics like:

jvm_gc_memory_allocated_bytes_total{zone="euc-1a",} 0.0
Common Tags

We can add common tags that will be added to all metrics. These will be available as labels from the Prometheus scrape.

To configure common tags use configuration:

  - type: "CommonTagsHook"
    config:
      commonTags:
        zone: "euc-1a"
        owner: "becky"
Standard Binders

Micrometer has a concept of MeterBinder:

Binders register one or more metrics to provide information about the state of some aspect of the application or its container.

By registering some standard binders shipped with micrometer you can expose metrics about the JVM and system which can observe JVM memory usage, garbage collection and other behaviour.

To configure multiple binders you can use configuration like:

micrometer:
  - type: "StandardBindersHook"
    config:
      binderNames:
      - "JvmGcMetrics"
      - "JvmHeapPressureMetrics"

And those named binders will be bound to the global meter registry

Table 1. Available Binders

name

micrometer class

ClassLoaderMetrics

io.micrometer.core.instrument.binder.jvm.ClassLoaderMetrics

JvmCompilationMetrics

io.micrometer.core.instrument.binder.jvm.JvmCompilationMetrics

JvmGcMetrics

io.micrometer.core.instrument.binder.jvm.JvmGcMetrics

JvmHeapPressureMetrics

io.micrometer.core.instrument.binder.jvm.JvmHeapPressureMetrics

JvmInfoMetrics

io.micrometer.core.instrument.binder.jvm.JvmInfoMetrics

JvmMemoryMetrics

io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics

JvmThreadMetrics

io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics

FileDescriptorMetrics

io.micrometer.core.instrument.binder.system.FileDescriptorMetrics

ProcessorMetrics

io.micrometer.core.instrument.binder.system.ProcessorMetrics

UptimeMetrics

io.micrometer.core.instrument.binder.system.UptimeMetrics

Micrometer Usage from Filters

Filters can use the static methods of Metrics to register metrics with the global registry. Or use Metrics.globalRegistry to get a reference to the global registry. Metrics registered this way will be automatically available through the prometheus scrape endpoint.


1. The example token create command illustrates the use of -no-default-policy and -orphan. The use of these flags is not functionally important. You may adapt the configuration of the token to suit the standards required by your organization
2. https://github.com/kroxylicious/kroxylicious/issues/1217
3. The context.forward*() methods behave exactly as the builder form .forward(header, message).complete()