Each service within an aggregate defines a service descriptor in the repository’s services module.

A service descriptor defines the external resources a service uses and the api it exposes. The types of resources a descriptor can reference depends on the installed Creek extensions.

ProTip: Service descriptors are accessible by other services within the aggregate, but not by those outside. Services from other aggregates should only use the aggregate’s public API defined in its aggregate descriptor. More information on aggregate APIs and descriptors can be found in the Kafka Streams: aggregate API tutorial.

This demo will use the Kafka Streams extension, and the handle-occurrence-service’s descriptor will define a twitter.tweet.text input topic, which the service will consume, and a twitter.handle.usage output topic, which the service will produces to.

Note: To keep this tutorial self-contained, the service’s input topic is owned by the service. It would be more common for an upstream service or aggregate to own the topic and for the topic’s definition to be imported from there. The Kafka Streams: aggregate API tutorial covers how to define an aggregate descriptor to allow interacting with parts of an architecture that don’t use Creek.

Define the topic resources

The aggregate template used to bootstrap the repository provided a shell service descriptor in the repository named HandleOccurrenceServiceDescriptor.java. Add the following to the class to define the service’s input and output topics:

import static io.github.creek.service.basic.kafka.streams.demo.internal.TopicConfigBuilder.withPartitions;
import static io.github.creek.service.basic.kafka.streams.demo.internal.TopicDescriptors.inputTopic;
import static io.github.creek.service.basic.kafka.streams.demo.internal.TopicDescriptors.outputTopic;

import org.creekservice.api.kafka.metadata.OwnedKafkaTopicInput;
import org.creekservice.api.kafka.metadata.OwnedKafkaTopicOutput;

public final class HandleOccurrenceServiceDescriptor implements ServiceDescriptor {

    ...

    // Define the tweet-text input topic, conceptually owned by this service:
    public static final OwnedKafkaTopicInput<Long, String> TweetTextStream =
            register(
                    inputTopic(
                            "twitter.tweet.text", // Topic name
                            Long.class, // Topic key: Tweet id
                            String.class, // Topic value: Tweet text
                            withPartitions(5))); // Topic config

    // Define the output topic, again conceptually owned by this service:
    public static final OwnedKafkaTopicOutput<String, Integer> TweetHandleUsageStream =
            register(outputTopic(
                    "twitter.handle.usage",
                    String.class, // Twitter handle
                    Integer.class,  // Usage count
                    withPartitions(6)
                        .withRetentionTime(Duration.ofHours(12))
            ));

    ...
}

The two class constants define the input and output topics the services use. These constants will be used later when building the Kafka Streams topology.

Each topic definition includes the topic name, the types stored in the topic’s records’ key and value, and the topic config.

In this instance, the topic config defines the number of partitions and, for one topic, the retention time for records in the topic. If no retention time was set, the cluster default would be used.

ProTip: Defaulting to the cluster’s default topic retention time can be useful as it allows different clusters to define different defaults. For example, development, QA and Staging environments can have much shorter times than production.

ProTip: The TopicConfigBuilder class, which defines the withPartitions and withRetentionTime methods used above, is part of the Git repository. It can be customised as your use-case requires.

The register method wrapping each resource descriptor ensures they are registered with the outer service descriptor.

Note: The system tests we’ll define later will use the service descriptor to discover the service metadata required to run the service, pipe in inputs and read outputs.

Updated: