An aggregate’s API defines its inputs and outputs. The types of inputs and outputs the API exposes will depend on the Creek extensions installed. The quick-start tutorial series focuses on the Kafka Streams extension, and the aggregate’s API will be defined by the Kafka topics it exposes.

ProTip: In Data Mesh terminology, each aggregate output topic, and to a lesser extent, input topic, may define a data product: a defined data set, with a documented schema, the aggregate manages and exposes to the rest of the architecture and organisation.

In this case, the aggregate will expose a single output: the twitter.handle.usage output topic, defined during the first quick-start tutorial.

An aggregate’s API is defined in an aggregate descriptor: a Java class that captures the metadata about the aggregate, including its inputs and outputs.

The aggregate template repository, used to bootstrap a new repository during the Basic Kafka Streams tutorial, provides an empty aggregate descriptor. So far, this descriptor has been left untouched by quick-start tutorial series. The aggregate descriptor can be found in the api module.

Note: To avoid descriptor name clashes, the name of the aggregate descriptor is derived from the aggregate name, i.e. the repository name.

For example, a repository named basic-kafka-streams-demo would contain an aggregate descriptor named BasicKafkaStreamsDemoAggregateDescriptor. Whereas, in a repository named ks-aggregate-api-demo would contain a KsAggregateApiDemoAggregateDescriptor.

The steps below define the API of the tutorial’s aggregate:

Define a Creek aggregate API

Locate the aggregate’s descriptor: this is a class named <something>AggregateDescriptor in the api module.

To keep things consistent, rename the class to OccurrenceAggregateDescriptor. This should help avoid confusion later due to potentially different class names.

Locate the handle-occurrence-service’s descriptor: declared in the HandleOccurrenceServiceDescriptor class, within the services module. Copy it’s TweetHandleUsageStream declaration into OccurrenceAggregateDescriptor. It should look like the following:

public final class OccurrenceAggregateDescriptor implements AggregateDescriptor {

    ...

    public static final OwnedKafkaTopicOutput<String, Integer> TweetHandleUsageStream =
            register(outputTopic(
                    "twitter.handle.usage",
                    String.class, // Twitter handle
                    Integer.class,  // Usage count
                    withPartitions(6)
                            .withRetentionTime(Duration.ofHours(12))
            ));

    ...
}

This adds an output topic and registers it with the descriptor.

Update the service descriptor

There are now two definitions of the same topic: one in the aggregate descriptor and one in the service descriptor. This code duplication is to be avoided.

Update the TweetHandleUsageStream declaration in the HandleOccurrenceServiceDescriptor class to use the aggregate’s topic declaration:

public final class HandleOccurrenceServiceDescriptor implements ServiceDescriptor {

    ...

    // Define the service's output topic, which is part of this aggregate's API:
    public static final OwnedKafkaTopicOutput<String, Integer> TweetHandleUsageStream =
            register(OccurrenceAggregateDescriptor.TweetHandleUsageStream);
    ...
}

Referencing the aggregate’s topic descriptor, defines in code, that the service’s output topic is part of the aggregate’s api.

Testing the changes

To ensure that changes are correct, run the build by running:

./gradlew

This will compile the changes and run the tests. The build should be green.

A word about dependencies

Before moving on, its worth having a quiet word about the dependencies on the api module. If you were to look at the Gradle build file in the api module: api/build.gradle.kts. The dependencies block looks like the following:

dependencies {
    api("org.creekservice:creek-kafka-metadata:$creekVersion")

    // To avoid dependency hell downstream, avoid adding any more dependencies except Creek metadata jars and test dependencies.

    testImplementation("org.apache.kafka:kafka-clients:$kafkaVersion")
}

The module has a single direct production dependency: the creek-kafka-metadata that contains the topic descriptor and config types used within the aggregate’s descriptor.

As the API module is shared code, as the comment states, it is strongly advised to avoid adding production dependencies to this module, other than other metadata jars for specific Creek extensions.

The Creek extension metadata jars themselves deliberately do not provide implementations for the types they define. Instead, the aggregate template repository provides a default implementation, which aggregates are free to customise as needed, without any risk of causing dependency conflicts in projects that include api jars from multiple aggregates.

ProTip: Creek deliberately minimises the surface area of the shared code used in api jars. To avoid dependency hell, it is strongly recommended that you do the same. There’s nothing worst that not being able to patch a production issue due to a dependency conflict!

Updated: