KafkaNotificationPublisher.java

1
package com.reallifedeveloper.common.infrastructure.messaging;
2
3
import static com.reallifedeveloper.common.domain.LogUtil.removeCRLF;
4
5
import java.io.IOException;
6
import java.util.List;
7
8
import org.slf4j.Logger;
9
import org.slf4j.LoggerFactory;
10
import org.springframework.kafka.core.KafkaTemplate;
11
12
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
13
14
import com.reallifedeveloper.common.application.notification.Notification;
15
import com.reallifedeveloper.common.application.notification.NotificationPublisher;
16
import com.reallifedeveloper.common.domain.ErrorHandling;
17
import com.reallifedeveloper.common.domain.ObjectSerializer;
18
19
/**
20
 * An implementation of the {@link NotificationPublisher} interface which uses <a href="https://kafka.apache.org/">Apache Kafka</a>.
21
 *
22
 * @author RealLifeDeveloper
23
 */
24
public final class KafkaNotificationPublisher implements NotificationPublisher {
25
26
    private static final Logger LOG = LoggerFactory.getLogger(KafkaNotificationPublisher.class);
27
28
    private final KafkaTemplate<String, String> kafkaTemplate;
29
30
    private final ObjectSerializer<String> objectSerializer;
31
32
    /**
33
     * Creates a new {@code KafkaNotificationPublisher} which uses the given {@code ObjectSerializer} to serialize notifications, and the
34
     * given {@code KafkaTemplate} to send them to Kafka.
35
     *
36
     * @param kafkaTemplate    the {@code KafkaTemplate} to use
37
     * @param objectSerializer the {@code ObjectSerializer} to use
38
     */
39
    @SuppressFBWarnings(value = { "EI_EXPOSE_REP2",
40
            "CRLF_INJECTION_LOGS" }, justification = "The KafkaTemplate is mutable, but that is OK; Logging only of objects, not user data")
41
    public KafkaNotificationPublisher(KafkaTemplate<String, String> kafkaTemplate, ObjectSerializer<String> objectSerializer) {
42 1 1. <init> : removed call to com/reallifedeveloper/common/domain/ErrorHandling::checkNull → KILLED
        ErrorHandling.checkNull("Arguments must not be null: kafkaTemplate=%s, objectSerializer=%s", kafkaTemplate, objectSerializer);
43
        LOG.info("Creating new {}: kafkaTemplate={}, objectSerializer={}", getClass().getSimpleName(), kafkaTemplate, objectSerializer);
44
        this.kafkaTemplate = kafkaTemplate;
45
        this.objectSerializer = objectSerializer;
46
    }
47
48
    @Override
49
    @SuppressFBWarnings(value = "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE", justification = "Key can be null, but that is OK")
50
    public void publish(List<Notification> notifications, String publicationChannel) throws IOException {
51 1 1. publish : removed call to com/reallifedeveloper/common/domain/ErrorHandling::checkNull → SURVIVED
        ErrorHandling.checkNull("Arguments must not be null: notifications={}, publicationChannel={}", notifications, publicationChannel);
52
        if (LOG.isTraceEnabled()) {
53
            LOG.trace("publish: notifications={}, publicationChannel={}", removeCRLF(notifications), removeCRLF(publicationChannel));
54
        }
55
        for (Notification notification : notifications) {
56
            String key = notification.eventType();
57
            String message = objectSerializer.serialize(notification);
58
            kafkaTemplate.send(publicationChannel, key, message);
59
        }
60
    }
61
62
}

Mutations

42

1.1
Location : <init>
Killed by : com.reallifedeveloper.common.infrastructure.messaging.KafkaNotificationPublisherTest.[engine:junit-jupiter]/[class:com.reallifedeveloper.common.infrastructure.messaging.KafkaNotificationPublisherTest]/[method:creatingPublisherWithNullKafkaTemplateShouldFail()]
removed call to com/reallifedeveloper/common/domain/ErrorHandling::checkNull → KILLED

51

1.1
Location : publish
Killed by : none
removed call to com/reallifedeveloper/common/domain/ErrorHandling::checkNull → SURVIVED
Covering tests

Active mutators

Tests examined


Report generated by PIT 1.20.0