About this guide
This guide covers developing plugins for Kroxylicious using the Java programming language. Other guides should be consulted if you want to deploy, configure or secure a Kroxylicious proxy.
1. Kroxylicious overview
Kroxylicious is an Apache Kafka protocol-aware ("Layer 7") proxy designed to enhance Kafka-based systems. Through its filter mechanism it allows additional behavior to be introduced into a Kafka-based system without requiring changes to either your applications or the Kafka cluster itself. Built-in filters are provided as part of the solution.
Functioning as an intermediary, the Kroxylicious mediates communication between a Kafka cluster and its clients. It takes on the responsibility of receiving, filtering, and forwarding messages.
An API provides a convenient means for implementing custom logic within the proxy.
1.1. Compatibility
There are effectively two APIs a filter developer needs to care about:
-
The Filter API against which the filter is written. This is a contract between the Filter developer and the Kroxylicious runtime. It includes
Filter
,FilterFactory
, which the developer is responsible for implementing, andFilterContext
andFilterFactoryContext
, which are provided by the Kroxylicous runtime for the developer to use. -
The "configuration API" that your filter exposes. This is a contract between the Filter developer and Kroxylicious users.
1.1.1. Compatibility of the Filter API
The Kroxylicious project uses semantic versioning. For the filter API this means that you can compile your filter against the Kroxylicious API at version x.yc.zc and users will be able to use it with Kroxylicious runtimes at version x.yr.zr if the runtime version is not older than the compile time version (that is if yr ≥ yc and zr ≥ zc).
1.1.2. Compatibility of your Filter configuration
The Kroxylicious proxy isn’t able to provide or enforce any compatibility guarantees about the configuration API that your plugin offers to users. In other words you are free you release your plugin at version a.b.c and later release a version a.d.e which doesn’t accept the same configuration syntax (JSON or YAML) that the original version did.
Doing this makes it more difficult for users to upgrade from older versions on your plugin, because they will have to rewrite and revalidate the configuration which worked with the old version.
For this reason filter developers are strongly encouraged to adopt Semantic versioning as the way to communicate compatibility of the configuration API they offer to users.
2. Custom filters
Custom filters can be written in the Java programming language. Kroxylicious supports Java 17. Knowledge of the Kafka protocol is generally required to write a protocol filter.
There is currently one class of Custom Filters users can implement:
- Protocol filters
-
Allow customisation of how protocol messages are handled on their way to, or from, the Cluster.
The following sections explain in more detail how to write your own filters.
2.1. Sample Custom Filter Project
A collection of sample filters is available within the Kroxylicious repository for you to download, try out, and customise. You can find them here for a hands-on introduction to creating your own custom filters.
2.2. API docs
Custom filters are built by implementing interfaces supplied by the kroxylicious-api module (io.kroxylicious:kroxylicious-api on maven central). You can view the javadoc here.
2.3. Dependencies
How filter classes are loaded is not currently defined by the filter contract. In other words, filters might be loaded using a classloader-per-filter model, or using a single class loader. This doesn’t really make a difference to filter authors except where they want to make use of libraries as dependencies. Because those dependencies might be loaded by the same classloader as the dependencies of other filters there is the possibility of collision. Filter A and Filter B might both want to use Library C, and they might want to use different versions of Library C.
For common things like logging and metric facade APIs it is recommended to use the facade APIs which are also used by the proxy core.
2.4. Protocol filters
A protocol filter is a public
top-level, concrete class with a particular public constructor and which implements
one or more protocol filter interfaces. You can implement two distinct types of Custom Protocol Filter:
Note that these types are mutually exclusive, for example a Filter is not allowed to implement both RequestFilter
and
MetadataRequestFilter
. This is to prevent ambiguity. If we received a MetadataRequest
, would it be dispatched to
the onMetadataRequest(..)
method of MetadataRequestFilter
or the onRequest
method of RequestFilter
, or both?
Instead, we disallow these combinations, throwing an exception at runtime if your Filter implements incompatible interfaces.
2.4.1. Specific Message Protocol Filters
A filter may wish to intercept specific types of Kafka messages. For example, intercept all Produce Requests, or intercept all Fetch Responses. To support this case Kroxylicious provides an interfaces for all request types and response types supported by Kafka (at the version of Kafka Kroxylicious depends on). A filter implementation can implement any combination of these interfaces.
There is no requirement that a Filter handles both the request and response halves of an RPC. A Filter can choose to intercept only the request, or only the response, or both the request and response.
Examples
To intercept all Fetch Requests your class would implement FetchRequestFilter:
public class FetchRequestClientIdFilter implements FetchRequestFilter {
@Override
public CompletionStage<RequestFilterResult> onFetchRequest(short apiVersion,
RequestHeaderData header,
FetchRequestData request,
FilterContext context) {
header.setClientId("fetch-client!");
return context.forwardRequest(header, request);
}
}
To intercept all Fetch Responses your class would implement FetchResponseFilter:
public class FetchRequestClientIdFilter implements FetchResponseFilter {
@Override
public CompletionStage<ResponseFilterResult> onFetchResponse(short apiVersion,
ResponseHeaderData header,
FetchResponseData response,
FilterContext context) {
mutateResponse(response);
return context.forwardResponse(header, response);
}
}
To intercept all Fetch Requests and all Fetch Responses your class would implement FetchRequestFilter and FetchResponseFilter:
public class FetchRequestClientIdFilter implements FetchRequestFilter, FetchResponseFilter {
@Override
public CompletionStage<RequestFilterResult> onFetchRequest(short apiVersion,
RequestHeaderData header,
FetchRequestData request,
FilterContext context) {
header.setClientId("fetch-client!");
return context.forwardRequest(header, request);
}
@Override
public CompletionStage<ResponseFilterResult> onFetchResponse(short apiVersion,
ResponseHeaderData header,
FetchResponseData response,
FilterContext context) {
mutateResponse(response);
return context.forwardResponse(header, response);
}
}
Specific Message Filter interfaces are mutually exclusive with Request/Response. Kroxylicious will reject invalid combinations of interfaces.
2.4.2. Request/Response Protocol Filters
A filter may wish to intercept every message being sent from the Client to the Cluster or from the Cluster to the Client. To do this your custom filter will implement:
-
RequestFilter to intercept all requests.
-
ResponseFilter to intercept all responses.
Custom filters are free to implement either interface or both interfaces to intercept all messages.
For example:
public class FixedClientIdFilter implements RequestFilter {
@Override
public CompletionStage<RequestFilterResult> onRequest(ApiKeys apiKey,
RequestHeaderData header,
ApiMessage body,
FilterContext filterContext) {
header.setClientId("example!");
return filterContext.forwardRequest(header, body);
}
}
Request/Response Filter interfaces are mutually exclusive with Specific Message interfaces. Kroxylicious will reject invalid combinations of interfaces.
2.4.3. The Filter Result
As seen above, filter methods (onXyz[Request|Response]
) must return a CompletionStage<FilterResult>
object.
It is the job of FilterResult
to convey what message is to forwarded to the next filter in the chain (or broker
/client if at the chain’s beginning or end). It is also used to carry instructions such as indicating that the
connection must be closed, or a message dropped.
If the filter returns a CompletionStage
that is already completed normally, Kroxylicious will immediately perform
the action described by the FilterResult
.
The filter may return a CompletionStage
that is not yet completed. When this happens, Kroxylicious will pause
reading from the downstream (the Client writes will eventually block), and it begins to queue up in-flight
requests/responses arriving at the filter. This is done so that message order is maintained. Once the
CompletionStage
completes, the action described by the FilterResult
is performed, reading from the downstream
resumes and any queued up requests/responses are processed.
The pausing of reads from the downstream is a relatively costly operation. To maintain optimal performance
filter implementations should minimise the occasions on which an incomplete CompletionStage is returned.
|
If the CompletionStage
completes exceptionally, the connection is closed. This also applies if the
CompletionStage
does not complete within a timeout (20000 milliseconds).
Creating a Filter Result
The FilterContext
is the factory for the FilterResult
objects.
There are two convenience methods[1] that simply allow a filter to forward a result to the next filter. We’ve already seen these in action above.
-
context.forwardRequest(header, request)
used by result filter to forward a request. -
context.forwardResponse(header, response)
used by result filter to forward a request.
To access richer features, use the filter result builders context.requestFilterResultBuilder()
and
responseFilterResultBuilder()
.
Filter result builders allow you to:
-
forward a request/response:
.forward(header, request)
. -
signal that a connection is to be closed:
.withCloseConnection()
. -
signal that a message is to be dropped (i.e. not forwarded):
.drop()
. -
for requests only, send a short-circuit response:
.shortCircuitResponse(header, response)
The builder lets you combine legal behaviours together. For instance, to close the connection after forwarding a response to a client, a response filter could use:
return context.responseFilterResultBuilder()
.forward(header, response)
.withCloseConnection()
.complete();
The builders yield either a completed CompletionStage<FilterResult>
which can be returned directly from the
filter method, or bare FilterResult
. The latter exists to support asynchronous programming styles allowing you
to use your own Futures.
The drop behaviour can be legally used in very specific circumstances. The Kafka Protocol is,
for the most part, strictly request/response with responses expected in the order the request were sent. The client
will fail if the contract isn’t upheld. The exception is Produce where acks=0 . Filters may drop these requests without
introducing a protocol error.
|
2.4.4. The protocol filter lifecycle
Instances of the filter class are created on demand when a protocol message is first sent by a client. Instances are specific to the channel between a single client and a single broker.
It exists while the client remains connected.
2.4.5. Handling state
The simplest way of managing per-client state is to use member fields. The proxy guarantees that all methods of a given filter instance will always be invoked on the same thread (also true of the CompletionStage completion in the case of Sending asynchronous requests to the Cluster). Therefore, there is no need to use synchronization when accessing such fields.
See the io.kroxylicious.proxy.filter
package javadoc for more information on thread-safety.
2.4.6. Filter Patterns
Kroxylicious Protocol Filters support several patterns:
Intercepting Requests and Responses
This is a common pattern, we want to inspect or modify a message. For example:
public class SampleFetchResponseFilter implements FetchResponseFilter {
@Override
public CompletionStage<ResponseFilterResult> onFetchResponse(short apiVersion,
ResponseHeaderData header,
FetchResponseData response,
FilterContext context) {
mutateResponse(response, context); (1)
return context.forwardResponse(header, response); (2)
}
}
1 | We mutate the response object. For example, you could alter the records that have been fetched. |
2 | We forward the response, sending it towards the client, invoking Filters downstream of this one. |
We can only forward the response and header objects passed into the onFetchResponse . New instances are not
supported.
|
Sending Response messages from a Request Filter towards the Client (Short-circuit responses)
In some cases we may wish to not forward a request from the client to the Cluster. Instead, we want to intercept that request and generate a response message in a Kroxylicious Protocol Filter and send it towards the client. This is called a short-circuit response.
For example:
public class CreateTopicRejectFilter implements CreateTopicsRequestFilter {
public CompletionStage<RequestFilterResult> onCreateTopicsRequest(short apiVersion, RequestHeaderData header, CreateTopicsRequestData request,
FilterContext context) {
CreateTopicsResponseData response = new CreateTopicsResponseData();
CreateTopicsResponseData.CreatableTopicResultCollection topics = new CreateTopicsResponseData.CreatableTopicResultCollection(); (1)
request.topics().forEach(creatableTopic -> {
CreateTopicsResponseData.CreatableTopicResult result = new CreateTopicsResponseData.CreatableTopicResult();
result.setErrorCode(Errors.INVALID_TOPIC_EXCEPTION.code()).setErrorMessage(ERROR_MESSAGE);
result.setName(creatableTopic.name());
topics.add(result);
});
response.setTopics(topics);
return context.requestFilterResultBuilder().shortCircuitResponse(response).completed(); (2)
}
}
1 | Create a new instance of the corresponding response data and populate it. Note you may need to use the apiVersion
to check which fields can be set at this request’s API version. |
2 | We generate a short-circuit response that will send it towards the client, invoking Filters downstream of this one. |
This will respond to all Create Topic requests with an error response without forwarding any of those requests to the Cluster.
Closing the connections
There is a useful variation on the pattern above, where the filter needs, in addition to sending an error response, also to cause the connection to close. This is useful in use-cases where the filter wishes to disallow certain client behaviours.
public class DisallowAlterConfigs implements AlterConfigsRequestFilter {
@Override
public CompletionStage<RequestFilterResult> onAlterConfigsRequest(short apiVersion, RequestHeaderData header, AlterConfigsRequestData request,
FilterContext context) {
var response = new AlterConfigsResponseData();
response.setResponses(request.resources().stream()
.map(a -> new AlterConfigsResourceResponse()
.setErrorCode(Errors.INVALID_CONFIG.code())
.setErrorMessage("This service does not allow this operation - closing connection"))
.toList());
return context.requestFilterResultBuilder()
.shortCircuitResponse(response)
.withCloseConnection() (1)
.completed();
}
}
1 | We enable the close connection option on the builder. This will cause Kroxylicious to close the connection after the response is sent to the client. |
Sending asynchronous requests to the Cluster
Filters can make additional asynchronous requests to the Cluster. This is useful if the Filter needs additional information from the Cluster in order to know how to mutate the filtered request/response.
The Filter can make use of CompletionStage
chaining features ([#thenApply()
etc.) to organise for actions to be done once the asynchronous request completes.
For example, it could chain an action that mutates the filtered request/response using the asynchronous response, and
finally, chain an action to forward the request/response to the next filter.
The asynchronous request/response will be intercepted by Filters upstream of this Filter. Filters downstream of this Filter (and the Client) do not see the asynchronous response.
Let’s take a look at an example. We’ll send an asynchronous request towards the Cluster for topic metadata while handling a FetchRequest and use the response to mutate the FetchRequest before passing it to the next filter in the chain.
public class FetchFilter implements FetchRequestFilter {
public static final short METADATA_VERSION_SUPPORTING_TOPIC_IDS = (short) 12;
@Override
public CompletionStage<RequestFilterResult> onFetchRequest(ApiKeys apiKey,
RequestHeaderData header,
FetchRequestData request,
FilterContext context) {
var metadataRequestHeader = new RequestHeaderData().setRequestApiVersion(METADATA_VERSION_SUPPORTING_TOPIC_IDS); (1)
var metadataRequest = new MetadataRequestData(); (2)
var topic = new MetadataRequestData.MetadataRequestTopic();
topic.setTopicId(Uuid.randomUuid());
metadataRequest.topics().add(topic);
var stage = context.sendRequest(metadataRequestHeader, metadataRequest); (3)
return stage.thenApply(metadataResponse -> mutateFetchRequest(metadataResponse, request)) (4)
.thenCompose(mutatedFetchRequest -> context.forwardRequest(header, mutatedFetchRequest)); (5)
}
}
1 | We construct a header object for the asynchronous request. It is important to specify the API version of the request that is to be used. The version chosen must be a version known to the Kafka Client used by Kroxylicious and must be an API version supported by the Target Cluster. |
2 | We construct a new request object. When constructing the request object, care needs to be taken to ensure the request is populated with the structure which matches the API version you have chosen. Refer to the Kafka Protocol Guide for more details. |
3 | We asynchronously send the request towards the Cluster and obtain a CompletionStage which will contain the response. |
4 | We use a computation stage to mutate the filtered fetch request using the response from the request sent at <3>. |
5 | We use another computation stage to forward the mutated request. |
As you have read above, we need to know the API version we want our request to be encoded at. Your filter can discover
what versions of an API the Kafka Cluster supports. To do this use the
ApiVersionsService available from the FilterContext
to determine programmatically what versions of an API are support and then write code to make a suitable request
object.
Kroxylicious provides the guarantee that computation stages chained using the default execution methods are
executed on the same thread as the rest of the Filter work, so we can safely mutate Filter members without synchronising.
See the io.kroxylicious.proxy.filter
package javadoc for more information on thread-safety.
|
Filtering specific API Versions
Kafka has a "bidirectional" client compatibility policy. In other words, new clients can talk to old servers, and old clients can talk to new servers. This allows users to upgrade either clients or servers without experiencing any downtime.
Since the Kafka protocol has changed over time, clients and servers need to agree on the schema of the message that they are sending over the wire. This is done through API versioning.
Before each request is sent, the client sends the API key and the API version. These two 16-bit numbers, when taken together, uniquely identify the schema of the message to follow.
You may wish to restrict your Filter to only apply to specific versions of an API. For example, "intercept all FetchRequest
messages greater than api version 7". To do this you can override a method named shouldHandleXyz[Request|Response]
on your filter like:
public class FetchFilter implements FetchRequestFilter {
@Override
public boolean shouldHandleFetchRequest(short apiVersion) {
return apiVersion > 7;
}
@Override
@Override
public CompletionStage<RequestFilterResult> onRequest(ApiKeys apiKey,
RequestHeaderData header,
ApiMessage body,
FilterContext filterContext) {
return context.forwardRequest(header, request);
}
}
2.4.7. Filter Construction and Configuration
For Kroxylicious to instantiate and configure your custom filter we use Java’s ServiceLoader API.
Each Custom Filter should provide a corresponding FilterFactory
implementation that can create an instance of your custom Filter. The factory can optionally declare a configuration class that Kroxylicious will
populate (using Jackson) when loading your custom Filter. The module must package a META-INF/services/io.kroxylicious.proxy.filter.FilterFactory
file containing the classnames of each filter factory implementation into the JAR file.
For example in the kroxylicious-samples we have the SampleFilterConfig class. This is used in the SampleFetchResponseFilter). The configuration is routed to the Filter instance via the SampleFetchResponse.
Then, when we configure a filter in Kroxylicious configuration like:
filters:
- type: SampleFetchResponse
config:
findValue: a
replacementValue: b
Kroxylicious will deserialize the config
object into a SampleFilterConfig
and use it to construct a
SampleFetchResponseFilter
passing the SampleFilterConfig
instance as a constructor argument.
2.5. Packaging filters
Filters are packaged as standard .jar
files. A typical Custom Filter jar contains:
-
Filter implementation classes
-
A FilterFactory implementation per Filter and service metadata (see Filter Construction and Configuration)
3. Trademark notice
-
Hashicorp Vault is a registered trademark of HashiCorp, Inc.
-
AWS Key Management Service is a trademark of Amazon.com, Inc. or its affiliates.
-
Fortanix and Data Security Manager are trademarks of Fortanix, Inc.
-
Apache Kafka is a registered trademark of The Apache Software Foundation.