The Kafka extensions to Creek make writing Kafka Client and Kafka Streams based microservices easy.

The source code is available on GitHub:   View on GitHub

An example of how to write Kafka Streams based microservices using this Creek extension can be found in the aptly named Basic Kafka Streams Tutorial.

Kafka client compatability

The extentions themselves are compiled with the latest versions of the Kafka clients and Kafka streams libraries. However, they are compatible with older versions, as set out in the table below. The tested version column details the exact version of Kafka libraries testing covers.

Kafka version Tested version Notes
< 2.8   Not compatible due to API changes in Streams
2.8.+ 2.8.2 Supported & tested
3.0.+ 3.0.2 Supported & tested
3.1.+ 3.1.2 Supported & tested
3.2.+ 3.2.3 Supported & tested
3.3.+ 3.3.2 Supported & tested
3.4.+ 3.4.0 Supported & tested
> 3.4   Not currently tested / released. Should work…

In Gradle, it is possible to force the use of an older Kafka client, if you wish, using a resolution strategy. In Kotlin, this looks like:

configurations.all {
    resolutionStrategy.eachDependency {
        if (requested.group == "org.apache.kafka") {
            useVersion("2.8.2")
        }
    }
}

Metadata types

The creek-kafka-metadata.jar contains metadata types that can be used in aggregate and service descriptor to define Kafka resources the component uses or exposes.

To use the metadata, simply the creek-kafka-metadata.jar as a dependency to the api or services module.

dependencies {
    implementation("org.creekservice:creek-kafka-metadata:0.4.1")
}

Input types

Input topics are topics a service/aggregate consumes from:

  • OwnedKafkaTopicInput: an input topic that is conceptually owned by the component.
  • KafkaTopicInput: an input topic that is not conceptually owned by the component, i.e. it is an owned output topic of another service.

Internal types

Internal topics are things like changelog topics for internal state stores and repartition topics:

  • KafkaTopicInternal: used to represent an internal topic that is implicitly created, e.g. a changelog or repartition topic.
  • CreatableKafkaTopicInternal: used to represent an internal topic that is not implicitly created and hence should be created during deployment.

Output types

Output topics are topics that a service/aggregate produce to:

  • OwnedKafkaTopicOutput: an output topic that is conceptually owned by the component.
  • KafkaTopicOutput: an output topic that is not conceptually owned by the component, i.e. it is an owned input topic of another service.

Extensions

Kafka Clients Extension

Provides an extension to Creek to allow it to work with Kafka resources using standard Kafka Clients, e.g. producers, consumers and the admin client.

By default, if the creek-kafka-extension.jar is on the class or module path, Creek will load the extension and use it to handle any topic resources.

To use the extension, simply the creek-kafka-extension.jar as a dependency to a service.

dependencies {
    implementation("org.creekservice:creek-kafka-client-extension:0.4.1")
}

Basic usage

public final class ServiceMain {

    public static void main(String... args) {
        // Initialize Creek in the main application entry point:
        CreekContext ctx = CreekServices.context(new MyServiceDescriptor());

        // Access the extension, and use to help create the application:
        new ServiceMain(ctx.extension(KafkaClientsExtension.class)).run();
    }

    private final KafkaClientsExtension ext;
    private final KafkaTopic<Long, String> inputTopic;
    private final KafkaTopic<Long, String> outputTopic;

    private ServiceMain(KafkaClientsExtension ext) {
        this.ext = ext;
        // Retrieve type-safe topic metadata and serde:
        this.inputTopic = ext.topic(MyServiceDescriptor.InputTopic);
        this.outputTopic = ext.topic(MyServiceDescriptor.OutputTopic);
    }

    private void run() {
        // Acquire Kafka consumers and producers:
        try (Consumer<byte[], byte[]> consumer = ext.consumer();
             Producer<byte[], byte[]> producer = ext.producer()) {

            consumer.subscribe(List.of(inputTopic.name()));

            while (running()) {
                consumer.poll(Duration.ofSeconds(1))
                        .records(inputTopic.name())
                        .forEach(r -> processInput(r, producer));
            }
        }
    }

    private void processInput(final ConsumerRecord<byte[], byte[]> input,
                              final Producer<byte[], byte[]> producer) {

        // Access type-safe topic deserializers:
        long key = inputTopic.deserializeKey(input.key());
        String value = inputTopic.deserializeValue(input.value());

        // ... do stuff with key & value.

        producer.send(new ProducerRecord<>(outputTopic.name(),
                // Access type-safe topic serializers:
                outputTopic.serializeKey(key),
                outputTopic.serializeValue(value)
        ));
    }
}

Configuration

Extension options

Note: It’s common to use System environment variables to configure Kafka for settings such as boostrap.servers and authentication information.

The extension can be configured by passing an instance of KafkaClientsExtensionOptions when creating the Creek context. For example,

CreekContext ctx = CreekServices.builder(new MyServiceDescriptor())
        .with(
                KafkaClientsExtensionOptions.builder()
                        .withKafkaProperty(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, 200L)
                        .build()
        )
        .build();
}

See KafkaClientsExtensionOptions for more info.

System environment variables

An alternative to using KafkaClientsExtensionOptions to configure Kafka client properties is to use environment variables. By default, any environment variable prefixed with KAFKA_ will be passed to the Kafka clients.

It is common to pass bootstrap.servers and authentication information to the service in this way, so that different values can be passed in different environments. For example, bootstrap.servers cam be passed by setting a KAFKA_BOOTSTRAP_SERVERS environment variable. This is how the system tests pass the Kafka bootstrap to your service.

See SystemEnvPropertyOverrides for more info, including multi-cluster support.

This behaviour is customizable. See KafkaClientsExtensionOptions.Builder.withKafkaPropertiesOverrides for more info.

Unit testing

ProTip: Creek recommends focusing on system-tests for testing the combined business functionality of a service, or services. Of course, unit testing still has its place.

Creek-kafka can be configured to not require an actual Kafka cluster during unit testing. This is most commonly achieved by configuring creek with the options built from KafkaClientsExtensionOptions.testBuilder():

class ClientTest {

  private static KafkaClientsExtension ext;

  @BeforeAll
  public static void classSetup() {
    CreekContext ctx = CreekServices.builder(new MyServiceDescriptor())
            // Configure Creek to work without an actual cluster:
            .with(KafkaClientsExtensionOptions.testBuilder().build())
            .build();

    ext = ctx.extension(KafkaClientsExtension.class);
  }
  
  // Tests are free to get serde from ext...
}

…alternatively, a custom client can be installed. The CustomTopicClient type used below can implement MockTopicClient or TopicClient:

class ClientTest {

  private static KafkaClientsExtension ext;

  @BeforeAll
  public static void classSetup() {
    CreekContext ctx = CreekServices.builder(new MyServiceDescriptor())
            // Configure Creek to work without an actual cluster:
            .with(KafkaClientsExtensionOptions.builder()
                    .withTypeOverride(TopicClient.Factory.class, CustomTopicClient::new)
                    .build())
            .build();

    ext = ctx.extension(KafkaClientsExtension.class);
  }
  
  // Tests are free to get serde from ext...
}

ProTip: The serde being used may also need configuring in unit tests to work without external services, e.g. a Schema Registry. Consult the documentation for the serde in use.

Kafka Streams extension

Provides an extension to Creek to allow it to work with Kafka Streams and Kafka resources. The extension extends the functionality provided by the Kafka client extension, adding Streams specific functionality.

By default, if the creek-kafka-streams-extension.jar is on the class or module path, Creek will load the extension and use it to handle any topic resources and provide functionality for Kafka Streams based apps.

To use the extension, simply the creek-kafka-streams-extension.jar as a dependency to a service.

dependencies {
    implementation("org.creekservice:creek-kafka-streams-extension:0.4.1")
}

Basic usage

@SuppressWarnings("unused")
public final class ServiceMain {

    private static final Name name = Name.root();

    public static void main(String... args) {
        // Initialize Creek in the main application entry point:
        CreekContext ctx = CreekServices.context(new MyServiceDescriptor());

        // Access the Kafka Streams extension:
        final KafkaStreamsExtension ext = ctx.extension(KafkaStreamsExtension.class);
        // Use it to help build the topology:
        final Topology topology = new TopologyBuilder(ext).build();
        // And execute it:
        ext.execute(topology);
    }
}

public final class TopologyBuilder {

    private final KafkaStreamsExtension ext;
    private final Name name = Name.root();

    public TopologyBuilder(final KafkaStreamsExtension ext) {
        this.ext = ext;
    }

    public Topology build() {
        final StreamsBuilder builder = new StreamsBuilder();

        // Retrieve type-safe topic metadata and serde:
        final KafkaTopic<Long, String> input = ext.topic(MyServiceDescriptor.InputTopic);
        final KafkaTopic<Long, String> output = ext.topic(MyServiceDescriptor.OutputTopic);

        builder.stream(
                        input.name(),
                        // Type-safe access to topic serde:
                        Consumed.with(input.keySerde(), input.valueSerde())
                                .withName(name.name("ingest-" + input.name())))
                // ... perform business logic
                .to(
                        output.name(),
                        // Type-safe access to topic serde:
                        Produced.with(output.keySerde(), output.valueSerde())
                                .withName(name.name("egress-" + output.name())));

        // Grab the cluster properties from Creek to build and return the Topology:
        return builder.build(ext.properties(DEFAULT_CLUSTER_NAME));
    }
}

Configuration

Extension options

Note: It’s common to use System environment variables to configure Kafka for settings such as boostrap.servers and authentication information.

The extension can be configured by passing an instance of KafkaStreamsExtensionOptions when creating the Creek context. For example,

// Initialize Creek in the main application entry point:
CreekContext ctx = CreekServices.builder(new MyServiceDescriptor())
        // Optionally, override default extension options:
        .with(
                KafkaStreamsExtensionOptions.builder()
                        .withKafkaProperty(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, 200L)
                        .withMetricsPublishing(
                                KafkaMetricsPublisherOptions.builder()
                                        .withPublishPeriod(Duration.ofMinutes(5))
                        )
                        .build()
        )
        .build();

See KafkaStreamsExtensionOptions for more info.

System environment variables

An alternative to using KafkaStreamsExtensionOptions to configure Kafka client properties is to use environment variables. By default, any environment variable prefixed with KAFKA_ will be passed to the Kafka clients.

It is common to pass bootstrap.servers and authentication information to the service in this way, so that different values can be passed in different environments. For example, bootstrap.servers cam be passed by setting a KAFKA_BOOTSTRAP_SERVERS environment variable. This is how the system tests pass the Kafka bootstrap to your service.

See SystemEnvPropertyOverrides for more info, including multi-cluster support.

This behaviour is customizable. See KafkaStreamsExtensionOptions.Builder.withKafkaPropertiesOverrides for more info.

Unit testing topologies

ProTip: Creek recommends focusing on system-tests for testing the combined business functionality of a service, or services. Of course, unit testing still has its place.

The creek-kafka-streams-test.jar contains test helpers, that can be added as a test dependency, to help with unit testing Kafka Streams topologies.

dependencies {
    testImplementation("org.creekservice:creek-kafka-streams-test:0.4.1")
}
Writing topology unit tests

This module provides classes to make it easier to write unit tests for Kafka Streams’ topologies:

For example:

class TopologyBuilderTest {

    private static CreekContext ctx;

    private TopologyTestDriver testDriver;
    private TestInputTopic<Long, String> inputTopic;
    private TestOutputTopic<Long, String> outputTopic;

    @BeforeAll
    public static void classSetup() {
        ctx = CreekServices.builder(new MyServiceDescriptor())
                // Configure Creek to work without an actual cluster:
                .with(TestKafkaStreamsExtensionOptions.defaults())
                .build();
    }

    @BeforeEach
    public void setUp() {
        final KafkaStreamsExtension ext = ctx.extension(KafkaStreamsExtension.class);

        // Build topology using the extension:
        final Topology topology = new TopologyBuilder(ext).build();

        testDriver = new TopologyTestDriver(topology, ext.properties(DEFAULT_CLUSTER_NAME));

        // Use Creek's `TestTopics` to build topics:
        inputTopic = TestTopics.inputTopic(InputTopic, ctx, testDriver);
        outputTopic = TestTopics.outputTopic(OutputTopic, ctx, testDriver);
    }

    @AfterEach
    public void tearDown() {
        testDriver.close();
    }

    @Test
    void shouldTestSomething() {
        // When:
        inputTopic.pipeInput(1L, "a");

        // Then:
        assertThat(outputTopic.readKeyValuesToList(), contains(pair(1L, "a")));
    }
}
Using with JPMS

The Java platform module system will complain about split packages because both the Kafka Streams and Kafka Streams Test jars expose classes in the same packages:

error: module some.module reads package org.apache.kafka.streams from both kafka.streams and kafka.streams.test.utils

This is tracked by Apache Kafka in issue KAFKA-7491.

Until the issue is fixed, or if using an older version of Kafka, any module using both jars under JPMS will need to patch the streams test jar into the Kafka streams jar.

The module patching can be achieved using the org.javamodularity.moduleplugin plugin:

plugins {
    id("org.javamodularity.moduleplugin") version "1.8.15"
}

// Patch Kafka Streams test jar into main Kafka Streams module to avoid split packages:
modularity.patchModule("kafka.streams", "kafka-streams-test-utils-2.8.2.jar")

System test extension

The creek-kafka-test-extension.jar is a Creek system test extension, allowing system tests that seed Kafka clusters with test data, produce inputs to Kafka topics, and assert expectations of records in Kafka topics.

Configuring the extension

The test extension can be added to any module that runs system tests. How this is done will depend on the build plugin being used to run system tests. For the gradle plugin the text extension can be added using the systemTestExtension dependency configuration:

dependencies {
    systemTestExtension("org.creekservice:creek-kafka-test-extension:0.4.1")
}

ProTip: The systemTestExtension dependency configuration is added by the org.creekservice.system.test Gradle plugin.

Test model

The extension registers the following model subtypes to support system testing of Kafka based microservices:

Option model extensions

The behaviour of the Kafka test extension can be controlled via the creek/kafka-option@1 option type.
This option type defines the following:

Property name Property type Description
outputOrdering Enum (NONE, BY_KEY) (Optional) Controls the ordering requirements for the expected output records on the same topic. Valid values are:
None: records can be in any order.
BY_KEY: record expectations that share the same key must be received in the order defined.
Default: BY_KEY
verifierTimeout Duration (String/Number) (Optional) Overrides the global verifier timeout. Can be set to number of seconds, e.g. 60 or a string that can be parsed by Java Duration type, e.g. PT2M.
extraTimeout Duration (String/Number) (Optional) Sets the time the tests will wait for extra, unexpected, records to be produced. Can be set to number of seconds, e.g. 60 or a string that can be parsed by Java Duration type, e.g. PT2M. Default: 1 second.
kafkaDockerImage String (Optional) Override the default docker image used for the Kafka server in the tests. Default: confluentinc/cp-kafka:7.3.1.
notes String (Optional) A notes field. Ignored by the system tests. Can be used to document intent.

For example, the following defines a suite that turns off ordering requirements for expectation records:

---
name: Test suite with expectation ordering disabled
options:
  - !creek/kafka-options@1
    outputOrdering: NONE
    notes: ordering turned off because blah blah.
services:
  - some-service
tests:
  - name: test 1
    inputs:
      - some_input
    expectations:
      - unordered_output

Input model extensions

The Kafka test extension registers a creek/kafka-topic@1 input model extension. This can be used to define seed and input records to be produced to Kafka. It supports the following properties:

Property Name Property Type Description
topic String (Optional) A default topic name that will be used for any records that do not define their own. If not set, any records without a topic set will result in an error.
cluster String (Optional) A default cluster name that will be used for any records that do not define their own. If not set, any records will default to the default cluster name.
notes String (Optional) A notes field. Ignored by the system tests. Can be used to document intent.
records Array of TopicRecords (Required) The records to produce to Kafka.

Each TopicRecord supports the following properties:

Property Name Property Type Description
topic String (Optional) The topic to produce the record to. If not set, the file level default topic will be used. Neither being set will result in an error.
cluster String (Optional) The cluster to produce the record to. If not set, the file level default cluster will be used. If neither are set the default cluster will be used.
key Any (Optional) The key of the record to produce. The type can be any type supported by the topic’s key serde. If not set, the produced record will have a null key.
value Any (Optional) The value of the record to produce. The type can be any type supported by the topic’s value serde. If not set, the produced record will have a null value.
notes String (Optional) An optional notes field. Ignored by the system tests. Can be used to document intent.

For example, the following defines an input that will produce two records to an input topic on the default cluster:

---
!creek/kafka-topic@1
topic: input
records:
  - key: 1
    value: foo
  - notes: this record has no value set. The record produced to kafka will therefore have a null value.
    key: 2    

Expectation model extensions

The Kafka test extension registers a creek/kafka-topic@1 expectation model extension. This can be used to define the records services are expected to produce to Kafka. It supports the following properties:

Property Name Property Type Description
topic String (Optional) A default topic name that will be used for any records that do not define their own. If not set, any records without a topic set will result in an error.
cluster String (Optional) A default cluster name that will be used for any records that do not define their own. If not set, any records will default to the default cluster name.
notes String (Optional) A notes field. Ignored by the system tests. Can be used to document intent.
records Array of TopicRecords (Required) The records to produce to Kafka.

Each TopicRecord supports the following properties:

Property Name Property Type Description
topic String (Optional) The topic to consume the record from. If not set, the file level default topic will be used. Neither being set will result in an error.
cluster String (Optional) The cluster to consume the record from. If not set, the file level default cluster will be used. If neither are set the default cluster will be used.
key Any (Optional) The expected key of the record. If not set, the consumed record’s key will be ignored.
value Any (Optional) The expected value of the record. If not set, the consumed record’s value will be ignored.
notes String (Optional) An optional notes field. Ignored by the system tests. Can be used to document intent.

For example, the following defines an expectation that two records will be produced to the output topic on the primary cluster:

---
!creek/kafka-topic@1
topic: input
cluster: primary
records:
  - notes: this record expectation does not define any value, meaning the value is ignored, i.e. it can hold any value.
    key: 1    
  - notes: this record expectation explicitly requires the value to be null
    key: 2
    value: ~

Kafka serialization formats

The creek-kafka-serde.jar provides the base types used to define and register a serde provider for Creek Kafka.

Serialization formats are pluggable. allowing users to plug in their own custom serialization formats, should they want. Each format a component uses must have exactly one matching KafkaSerdeProvider implementation available at runtime, on the class-path or module-path.

Currently supported serialization formats:

Serialization format Notes
kafka Serialization using the Kafka clients serializers.
json Schema validated JSON serialization

…or extend Creek with a custom format.

kafka format

The creek-kafka-serde.jar also comes with an in-built kafka serialization format, which supports the standard set of Kafka serializers. The serializer handles the following types:

Java type Serde class
UUID Serdes.UUIDSerde
long Serdes.LongSerde
Long Serdes.LongSerde
int Serdes.IntegerSerde
Integer Serdes.IntegerSerde
short Serdes.ShortSerde
Short Serdes.ShortSerde
float Serdes.FloatSerde
Float Serdes.FloatSerde
double Serdes.DoubleSerde
Double Serdes.DoubleSerde
String Serdes.StringSerde
ByteBuffer Serdes.ByteBufferSerde
Bytes Serdes.BytesSerde
byte[] Serdes.ByteArraySerde
Void Serdes.VoidSerde

This format can be used by having KafkaTopicDescriptor.PartDescriptor.format() method return kafka as the serialization format.

For an example of using the kafka format in resource descriptors see the TopicDescriptors class, or the basic Kafka Streams tutorial.

json-schema format

This serialization format is still under development. See issue #25 for remaining tasks.

The creek-kafka-json-serde.jar provides a json-schema serialization format. This format supports serializing Java types as JSON, where the JSON payload is validated against a schema.

The format supports per-topic key and value schemas. It stores and loads JSON schemas from Confluent’s own Schema Registry. Producers load their schemas from the classpath at runtime and ensure they are registered in the Schema Registry. Consumers load their schemas from the classpath, and require the schema to already be registered in the Schema Registry, i.e. by the producing application. See Capturing schema in the Schema Registry for more information on why only producers register schemas in the Schema Registry.

It is recommended that schemas are generated from Java classes using the Creek JSON Schema Gradle plugin. This plugin will, by default, create the closed content model JSON schemas that this serde requires.

Confluent compatability

Note, the JSON serde is not currently compatible with Confluent’s own JSON serde, as Confluent’s serde prefixes the serialized JSON with the schema-id. This is not necessary with Creek’s JSON format. However, there is a task to track optionally enabling Confluent JSON serde compatability

Note, this serde does not use the standard JSON Schema compatability checks defined in the Confluent Schema Registry. We think Confluent’s checks are not fit for purpose. See this article series to understand why, and how Creek implements schema compatibility checks for JSON.

In its current form, the JSON serde does not persist the schema id used to serialize the key or value in the Kafka record. This is because the schema id is not needed, as there are checks to ensure all consuming schemas are backwards compatible with producing schemas, i.e. all consumers can consume all date produced by producers.

Why did we choose to not use the Confluent JSON schema serde? In our view the current Confluent’s current JSON schema serde is not fit for purpose. Hence, coming up with our own.

Let’s look at the pros and cons between the two:

  Confluent Serde Creek Serde
1. Broken schema evolution Usable schema evolution.
2. Generates schema at runtime. Generates schema at compile-time.
3. Schemas published on first use. Schemas published on startup.
4. Supports per-record & per-topic schemas. Supports only per-topic schemas.
5. Compatible with Confluent UI Unsure if compatible with UI
6. Hard to evolve a key schema Key schemas can be evolved.

Let’s look at each of these in more detail:

  1. Probably the biggest difference is how the two serde handle schema compatability. In our view Confluent’s currently model just doesn’t work, and we think ours is better.
  2. Generating schemas at compile-time reduces service startup times, and allows engineers the freedom to inspect schemas, and even test they are as expected or don’t change unexpectedly, if they wish
  3. Publishing schemas on first use has a few downsides, especially on a topic that doesn’t see much traffic.
    1. Schema changes that break evolvability rules are not detected on startup. In contrast, publishing & validating schemas on service startup ensures services fail eagerly if there are issues.
    2. The set of schema versions for a topic become less deterministic across environments, as service needs to have started and produced messages. In contrast, publishing on start-up allows the schema versions in an environment to be derived from the versions of the services deployed.
  4. Per-record schemas is, in our opinion, hard to manage in organisations and doesn’t lend itself to have self-service data-products in Kafka. Publishing a new record schema to a topic isn’t a compatible change and can break downstream consumers if things aren’t managed correctly. Yet, with per-record schemas its very easy to publish a message with a new schema. For these reasons, we see per-record schemas as an anti-pattern, and therefore only support per-topic schemas. Defining the explicit type or types that can be found in a topic defines a clear contract with users. Multiple types can be better supported and polymorphism can be achieved via subtyping and JSON schema’s anyOf.
  5. Obviously, the Confluent JSON serde is compatible with Confluent’s own UIs and therefore likely other UIs built by others around the schema store. We’ve not actually checked, but it’s certainly possible the UI expects JSON key and values to be prefixed with the schema id, and balks if that’s not the case. Personally, we prefer the payload being actual JSON, though we’ve a planned enhancement to support Confluent’s format to allow interoperability.
  6. One of the implications of prefixing the payload with the schema id, as Confluent’s serde do, is that its impossible to evolve the schema of a topic’s key, unless using a custom partitioning strategy. This is because the schema id forms part of the binary key. Evolving the schema means a new schema id, which changes the serialised form of a specific key, meaning it may be produced to a different partition. By not prefixing with the schema id, the Creek serde allows the key schema to be evolved. For example, there’s no reason why a new optional property can’t be added.

Dependencies

The creek-kafka-json-serde.jar module has dependencies not stored in maven central. To use the module add Confluent’s and JitPack’s repositories to your build scripts.

For example, in Gradle build.gradle.kts:

repositories {
    maven {
        url = uri("https://jitpack.io")
        // Optionally limit the scope artefacts:
        mavenContent {
            includeGroup("net.jimblackler.jsonschemafriend")
        }
    }

    maven {
        url = uri("https://packages.confluent.io/maven/")
        // Optionally limit the scope artefacts:
        mavenContent {
            includeGroup("io.confluent")
        }
    }
}

Options

The format supports customisation via the JsonSerdeExtensionOptions type.

For example, it is possible to register subtypes of polymorphic base types:

public final class ServiceMain {

    public static void main(String... args) {
        CreekContext ctx = CreekServices.builder(new MyServiceDescriptor())
                .with(
                        JsonSerdeExtensionOptions.builder()
                                // Register subtypes:
                                .withSubtypes(
                                        SubType1.class,
                                        SubType2.class)
                                // Register subtype with specific logical name:
                                .withSubtype(SubType3.class, "type-3")
                                .build()                        
                )
                .build();

        new ServiceMain(ctx.extension(KafkaClientsExtension.class)).run();
    }
    
    private ServiceMain(KafkaClientsExtension ext) {
      //...
    }
    
    private void run() {
      //...
    }
}

Manually registering subtypes is only necessary when this information is not available to Jackson already, i.e. when a base type is annotated with @JsonTypeInfo, but not with @JsonSubTypes.

Writing tests

If you are writing unit or functional tests that require JSON serde, you can configure the serde provider to use a mock Schema Registry client.

This is most commonly achieved using the testBuilder method on the options class:

class UnitTest {
    
  private static CreekContext ctx;
 
  @BeforeAll
  public static void classSetup() {
    ctx = CreekServices.builder(new TestServiceDescriptor())
            // Configure JSON Serde for testing:
            .with(JsonSerdeExtensionOptions.testBuilder().build())
            // Configure Kafka clients for testing:
            .with(KafkaClientsExtensionOptions.testBuilder().build())
            .build();
   }

   // Tests are free to get serde from ext...
}

…alternatively, a custom schema client can be installed. The CustomSchemaClient type used below can implement MockJsonSchemaStoreClient or JsonSchemaStoreClient:

class TopologyTest {
    
  private static CreekContext ctx;
 
  @BeforeAll
  public static void classSetup() {
    ctx = CreekServices.builder(new TestServiceDescriptor())
            .with(TestKafkaStreamsExtensionOptions.defaults())
            .with(JsonSerdeExtensionOptions.builder()
                    // Install custom client:
                    .withTypeOverride(
                            JsonSchemaStoreClient.Factory.class,
                            (schemaRegistryName, endpoints) ->
                                    new CustomSchemaClient(
                                            schemaRegistryName,
                                            new MockSchemaRegistryClient(
                                                    List.of(new JsonSchemaProvider()))))
                    // Install custom endpoint loader:
                    .withTypeOverride(SchemaStoreEndpoints.Loader.class, new MockEndpointsLoader() {}))
            .build();
   }
}

Custom formats

Creek Kafka has a pluggable serialization format, allowing new formats to be added easily. New formats should implement the KafkaSerdeProvider interface:

/**
 * Base type for extensions that provide Kafka serde
 *
 * <p>Creek loads extensions using the standard {@link java.util.ServiceLoader}. To be loaded by
 * Creek the provider must be registered in either the {@code module-info.java} file as a {@code
 * provider} of {@link KafkaSerdeProvider} and/or have a suitable entry in the {@code
 * META-INFO.services} directory.
 */
public interface KafkaSerdeProvider {

    /**
     * @return the <i>unique</i> serialization format the serde provides.
     */
    SerializationFormat format();

    SerdeFactory initialize(CreekService api);

    interface SerdeFactory {

        /**
         * Get the serde for the supplied Kafka topic {@code part}.
         *
         * <p>{@link Serde#configure} will be called on the returned serde.
         *
         * @param <T> the type of the part.
         * @param part the descriptor for the topic part.
         * @return the serde to use to serialize and deserialize the part.
         */
        <T> Serde<T> createSerde(PartDescriptor<T> part);
    }
}

The provider will be used to create Serde for any topic keys or values that use the provider’s format(). The provider can query the part, passed to create, to obtain the class of the key or value part.

Registering custom formats

Serialization formats are discovered from the class-path and module-path using the standard Java ServiceLoader.

How to make a serialization format discoverable by Creek will depend on whether it is on the JVM’s class or module path.

ProTip: We suggest registering component descriptors for use on both the class-path and module-path, ensuring they work today and tomorrow.

Formats on the module path

If the format resides in a module it needs to be declared in the module’s descriptor, i.e. in the module-info.java file, as a provider of the KafkaSerdeProvider type:

module custom.serde.format {
    requires creek.kafka.serde;
    provides KafkaSerdeProvider with CustomSerdeFormatProvider;
}
Formats on the class path

If the format does not reside in a module, or the jar is on the class-path, it is registered by placing a provider-configuration file in the META-INF/services resource directory.

This is a plain text file named org.creekservice.api.kafka.serde.provider.KafkaSerdeProvider. Add the fully-qualified name of the type implementing the KafkaSerdeProvider type to this file:

com.acme.examples.serde.CustomSerdeFormatProvider

Testing format registration

The creek-kafka-serde-test jar contains a test utility that will test a serialization format is registered correctly:

KafkaSerdeProviderTester.tester(ExampleTestSerdeProvider.class)
        .withExpectedFormat(serializationFormat("example"))
        .test();