With the topic resources defined in the last step, it’s time to write a simple Kafka Streams topology to perform the business logic of this service.

The service will search each tweets text for occurrences of Twitter handles, e.g. @katyperry. For each handle found, it will produce a record mapping the Twitter handle to its number of occurrences. For example, it a tweet contained the handle @katyperry twice, then it would produce a record with a key of @katyperry and a value of 2.

Define the stream topology

The aggregate template provided a shell TopologyBuilder class. Flesh out the class’s build method to match what’s below:

public Topology build() {
    final StreamsBuilder builder = new StreamsBuilder();

    // Pass a topic descriptor to the Kafka Streams extension to
    // obtain a typed `KafkaTopic` instance, which provides access to serde:
    final KafkaTopic<Long, String> input = ext.topic(TweetTextStream);
    final KafkaTopic<String, Integer> output = ext.topic(TweetHandleUsageStream);

    // Build a simple topology:
    // Consume input topic:
    builder.stream(
                    input.name(),
                    Consumed.with(input.keySerde(), input.valueSerde())
                            .withName(name.name("ingest-" + input.name())))
            // extract any Twitter handles in the tweet text:
            .flatMap(this::extractHandles, name.named("extract-handles"))
            // Finally, produce to output:
            .to(
                    output.name(),
                    Produced.with(output.keySerde(), output.valueSerde())
                            .withName(name.name("egress-" + output.name())));

    // Grab the cluster properties from Creek to build and return the Topology:
    return builder.build(ext.properties(DEFAULT_CLUSTER_NAME));
}

ProTip: The example code deliberately names each step in the topology. This is good practice. Relying on default naming can result in topology evolution issues in the future. Internal store and topic names incorporate the step name. With default naming, the name of a step, and hence the store or topic, can change if steps are added or removed. This can lead to unintentional changes in internal topic names. If such a change was deployed, any unprocessed messages in the old topics would be skipped.

The above topology consumes TweetTextStream we defined in the service’s descriptor, transforms it in the extractHandles method, and produces any output to the TweetHandleUsageStream.

The Creek Kafka Streams extension provides type-safe access to the input and output topic metadata, serde, and Kafka cluster properties, allowing engineers and the code to focus on the business logic.

As a single input record can result in zero or more output records, depending on the occurrences of Twitter handles in the tweet text, we use the flatMap method to invoke the extractHandles method.

The details of the extractHandles method isn’t particularly important in the context of demonstrating Creek’s functionality. A simple solution might look like this:

private static final Pattern TWEET_HANDLE = Pattern.compile("(?<handle>@[a-zA-Z0-9_]*)");

private Iterable<KeyValue<String, Integer>> extractHandles(
        final long tweetId, final String tweetText) {
    final Map<String, Integer> counts = new HashMap<>();
    final Matcher matcher = TWEET_HANDLE.matcher(tweetText);
    while (matcher.find()) {
        final String handle = matcher.group("handle");
        counts.compute(handle, (h, count) -> count == null ? 1 : count + 1);
    }

    return counts.entrySet().stream()
            .map(e -> KeyValue.pair(e.getKey(), e.getValue()))
            .collect(Collectors.toList());
}

…and that’s the production code of the service complete!

ProTip: The Name instance defined in the TopologyBuilder doesn’t add much in this example, but as topologies get more complex, getting broken down into multiple builder classes, it really comes into its own. Check out its JavaDoc to see how it can be used to help avoid topology node name clashes.

Updated: