schemaValidationConfig:
apicurioGlobalId: 1001 (1)
apicurioRegistryUrl: http://registry.local:8080 (2)
allowNulls: true (3)
allowEmpty: true (4)
About this guide
This guide covers using the Kroxylicious Record Validation Filter to validate records sent by Kafka client to Kafka brokers. Refer to other Kroxylicious guides for information on running the proxy or for advanced topics such as plugin development.
The Record Validation filter validates records sent by a producer. Only records that pass the validation are sent to the broker. This filter can be used to prevent poison messages—such as those containing corrupted data or invalid formats—from entering the Kafka system, which may otherwise lead to consumer failure.
The filter currently supports two modes of operation:
-
Schema Validation ensures the content of the record conforms to a schema stored in an Apicurio Registry.
-
JSON Syntax Validation ensures the content of the record contains syntactically valid JSON.
Validation rules can be applied to check the content of the Kafka record key or value.
If the validation fails, the product request is rejected and the producing application receives an error response. The broker will not receive the rejected records.
This filter is currently in incubation and available as a preview. We would not recommend using it in a production environment. |
1. (Preview) Setting up the Record Validation filter
This procedure describes how to set up the Record Validation filter. Provide the filter configuration and rules that the filter uses to check against Kafka record keys and values.
-
An instance of Kroxylicious. For information on deploying Kroxylicious, see the Kroxylicious Proxy guide or Kroxylicious Operator for Kubernetes guide.
-
A config map for Kroxylicious that includes the configuration for creating a virtual cluster.
-
Apicurio Registry (if wanting to use Schema validation).
-
Configure a
RecordValidation
type filter.-
In a standalone proxy deployment. See Example proxy configuration file
-
In a Kubernetes deployment using a
KafkaProcotolFilter
resource. See ExampleKafkaProtocolFilter
resource
-
Replace the token <rule definition>
in the YAML configuration with either a Schema Validation rule or a JSON Syntax Validation rule depending on your requirements.
The Schema Validation rule validates that the key or value matches a schema identified by its global ID within an Apicurio Schema Registry.
If the key or value does not adhere to the schema, the record will be rejected.
Additionally, if the kafka producer has embedded a global ID within the record it will be validated against the global ID defined by the rule. If they do not match, the record will be rejected. See the Apicurio documentation for details on how the global ID could be embedded into the record. The filter supports extracting ID’s from either the Apicurio globalId
record header or from the initial bytes of the serialized content itself.
1 | Apicurio registry global ID identifying the schema that will be enforced. |
2 | Apicurio Registry endpoint. |
3 | if true , the validator allows keys and or values to be null . The default is false . |
4 | if true , the validator allows keys and or values to be empty. The default is false . |
Schema validation mode currently has the capability to enforce only JSON schemas (issue) |
The JSON Syntax Validation rule validates that the key or value contains only syntactically correct JSON.
syntacticallyCorrectJson:
validateObjectKeysUnique: true (1)
allowNulls: true (2)
allowEmpty: true (3)
1 | If true , the validator enforces that objects keys must be unique. The default is false . |
2 | if true , the validator allows keys and or values to be null . The default is false . |
3 | if true , the validator allows keys and or values to be empty. The default is false . |
1.1. Example proxy configuration file
If your instance of the Kroxylicious Proxy runs directly on an operating system, provide the filter configuration in the filterDefinitions
list of your proxy configuration. Here’s a complete example of a filterDefinitions
entry configured for record validation:
filterDefinitions
configuration
filterDefinitions:
- name: my-record-validation
type: RecordValidation
config:
rules:
- topicNames: (1)
- <topic name>
keyRule:
<rule definition> (2)
valueRule:
<rule definition> (3)
defaultRule: (4)
keyRule:
<rule definition> (2)
valueRule:
<rule definition> (3)
1 | List of topic names to which the validation rules will be applied. |
2 | Validation rules that are applied to the record’s key. |
3 | Validation rules that are applied to the record’s value. |
4 | (Optional) Default rule that is applied to any topics for which there is no explict rule defined. |
Refer to the Kroxylicious Proxy guide for more information about configuring the proxy.
1.2. Example KafkaProtocolFilter
resource
If your instance of Kroxylicious runs on Kubernetes, you must use a KafkaProcotolFilter
resource to contain the filter configuration.
Here’s a complete example of a KafkaProtocolFilter
resource configured for record validation:
KafkaProtocolFilter
resource for record validation
kind: KafkaProtocolFilter
metadata:
name: my-validation-filter
spec:
type: RecordValidation
configTemplate:
rules:
- topicNames: (1)
- <topic name>
keyRule:
<rule definition> (2)
valueRule:
<rule definition> (3)
defaultRule: (4)
keyRule:
<rule definition> (2)
valueRule:
<rule definition> (3)
1 | List of topic names to which the validation rules will be applied. |
2 | Validation rules that are applied to the record’s key. |
3 | Validation rules that are applied to the record’s value. |
4 | (Optional) Default rule that is applied to any topics for which there is no explict rule defined. |
Refer to the Kroxylicious Operator for Kubernetes guide for more information about configuration on Kubernetes.
2. Trademark notice
-
Apache Kafka is a registered trademark of The Apache Software Foundation.
-
Kubernetes is a registered trademark of The Linux Foundation.
-
Prometheus is a registered trademark of The Linux Foundation.
-
Strimzi is a trademark of The Linux Foundation.
-
Hashicorp Vault is a registered trademark of HashiCorp, Inc.
-
AWS Key Management Service is a trademark of Amazon.com, Inc. or its affiliates.
-
Fortanix and Data Security Manager are trademarks of Fortanix, Inc.