This step will add the business logic for the new service, in the form of a simple Kafka Streams topology.

As this is not the focus of this tutorial, this logic is kept deliberately trivial: it will filter the input topic, and produce the resulting records to the service’s output topic. The filter will exclude any records with Twitter handles not in a hardcoded list of accounts linked to presidents of the USA.

As a reminder, the input records store the Twitter handle in the record’s key and the number of occurrences in the record’s value. The service’s output topic will follow the same schema.

Define the stream topology

The Add service module provided a shell TopologyBuilder class in the new handle-occurrence-filtering-service module. Flesh out the class’s build method to match what’s below:

public Topology build() {
    final StreamsBuilder builder = new StreamsBuilder();

    // Pass a topic descriptor to the Kafka Streams extension to
    // obtain a typed `KafkaTopic` instance, which provides access to serde:
    final KafkaTopic<String, Integer> input = ext.topic(HandleUsageStream);
    final KafkaTopic<String, Integer> output = ext.topic(HandleUsagePresidentsStream);

    // Build a simple topology:
    // Consume input topic:
    builder.stream(
                    input.name(),
                    Consumed.with(input.keySerde(), input.valueSerde())
                            .withName(name.name("ingest-" + input.name())))
            // filter the input, allowing only USA presidents:
            .filter(this::presidentsOnly, name.named("filter-out-non-presidents"))
            // Finally, produce to output:
            .to(
                    output.name(),
                    Produced.with(output.keySerde(), output.valueSerde())
                            .withName(name.name("egress-" + output.name())));

    // Grab the cluster properties from Creek to build and return the Topology:
    return builder.build(ext.properties(DEFAULT_CLUSTER_NAME));
}

ProTip: The example code deliberately names each step in the topology. This is good practice. Relying on default naming can result in topology evolution issues in the future. Internal store and topic names incorporate the step name. With default naming, the name of a step, and hence the store or topic, can change if steps are added or removed. This can lead to unintentional changes in internal topic names. If such a change was deployed, any unprocessed messages in the old topics would be skipped.

The above topology consumes the HandleUsageStream defined in the service’s descriptor, filters it using the presidentsOnly method, and produces any output to the HandleUsagePresidentsStream.

The Creek Kafka Streams extension provides type-safe access to the topic metadata & serde, and Kafka cluster properties, allowing engineers and the code to focus on the business logic.

The details of the presidentsOnly method isn’t particularly important in the context of this tutorial. A simple solution might look like this:

private static final Set<String> PRESIDENT_HANDLES =
        Set.of("@POTUS", "@JoeBiden", "@realDonaldTrump", "@BarackObama");

private boolean presidentsOnly(final String twitterHandle, final Object ignored) {
    return PRESIDENT_HANDLES.contains(twitterHandle);
}

…and that’s the production code of the service complete!

ProTip: The Name instance defined in the TopologyBuilder doesn’t add much in this example, but as topologies get more complex, getting broken down into multiple builder classes, it really comes into its own. Check out its JavaDoc to see how it can be used to help avoid topology node name clashes.

Topology builder unit test

Unit testing is not the focus of this tutorial. However, as it stands, the unit tests will fail. For completeness, this is addressed below.

Unit testing of Kafka Streams topologies is covered in more detail in the Basic Kafka Streams Tutorial .

The Add service workflow added a new TopologyBuilderTest for the new service’s topology. This comes with a shouldNotChangeTheTopologyUnintentionally test which, as it’s JavaDoc states, is there to capture unintentional changes to the topology. Unintentional changes could introduce the possibility of data-loss, if deployed.

The test compares the topology with the last know topology and fails if they differ. If the change is intentional, then the handle-occurrence-filtering-service/src/test/resources/kafka/streams/expected_topology.txt file can be updated to reflect the latest topology.

For this tutorial, the test can simple be disabled or deleted.

Updated: