| 1 | package com.reallifedeveloper.common.infrastructure.messaging; | |
| 2 | ||
| 3 | import static com.reallifedeveloper.common.domain.LogUtil.removeCRLF; | |
| 4 | ||
| 5 | import java.io.IOException; | |
| 6 | import java.util.List; | |
| 7 | ||
| 8 | import org.slf4j.Logger; | |
| 9 | import org.slf4j.LoggerFactory; | |
| 10 | import org.springframework.kafka.core.KafkaTemplate; | |
| 11 | ||
| 12 | import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; | |
| 13 | ||
| 14 | import com.reallifedeveloper.common.application.notification.Notification; | |
| 15 | import com.reallifedeveloper.common.application.notification.NotificationPublisher; | |
| 16 | import com.reallifedeveloper.common.domain.ErrorHandling; | |
| 17 | import com.reallifedeveloper.common.domain.ObjectSerializer; | |
| 18 | ||
| 19 | /** | |
| 20 | * An implementation of the {@link NotificationPublisher} interface which uses <a href="https://kafka.apache.org/">Apache Kafka</a>. | |
| 21 | * | |
| 22 | * @author RealLifeDeveloper | |
| 23 | */ | |
| 24 | public final class KafkaNotificationPublisher implements NotificationPublisher { | |
| 25 | ||
| 26 | private static final Logger LOG = LoggerFactory.getLogger(KafkaNotificationPublisher.class); | |
| 27 | ||
| 28 | private final KafkaTemplate<String, String> kafkaTemplate; | |
| 29 | ||
| 30 | private final ObjectSerializer<String> objectSerializer; | |
| 31 | ||
| 32 | /** | |
| 33 | * Creates a new {@code KafkaNotificationPublisher} which uses the given {@code ObjectSerializer} to serialize notifications, and the | |
| 34 | * given {@code KafkaTemplate} to send them to Kafka. | |
| 35 | * | |
| 36 | * @param kafkaTemplate the {@code KafkaTemplate} to use | |
| 37 | * @param objectSerializer the {@code ObjectSerializer} to use | |
| 38 | */ | |
| 39 | @SuppressFBWarnings(value = { "EI_EXPOSE_REP2", | |
| 40 | "CRLF_INJECTION_LOGS" }, justification = "The KafkaTemplate is mutable, but that is OK; Logging only of objects, not user data") | |
| 41 | public KafkaNotificationPublisher(KafkaTemplate<String, String> kafkaTemplate, ObjectSerializer<String> objectSerializer) { | |
| 42 |
1
1. <init> : removed call to com/reallifedeveloper/common/domain/ErrorHandling::checkNull → KILLED |
ErrorHandling.checkNull("Arguments must not be null: kafkaTemplate=%s, objectSerializer=%s", kafkaTemplate, objectSerializer); |
| 43 | LOG.info("Creating new {}: kafkaTemplate={}, objectSerializer={}", getClass().getSimpleName(), kafkaTemplate, objectSerializer); | |
| 44 | this.kafkaTemplate = kafkaTemplate; | |
| 45 | this.objectSerializer = objectSerializer; | |
| 46 | } | |
| 47 | ||
| 48 | @Override | |
| 49 | @SuppressWarnings("FutureReturnValueIgnored") // TODO: Remove this | |
| 50 | @SuppressFBWarnings(value = "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE", justification = "Key can be null, but that is OK") | |
| 51 | public void publish(List<Notification> notifications, String publicationChannel) throws IOException { | |
| 52 |
1
1. publish : removed call to com/reallifedeveloper/common/domain/ErrorHandling::checkNull → SURVIVED |
ErrorHandling.checkNull("Arguments must not be null: notifications={}, publicationChannel={}", notifications, publicationChannel); |
| 53 | if (LOG.isTraceEnabled()) { | |
| 54 | LOG.trace("publish: notifications={}, publicationChannel={}", removeCRLF(notifications), removeCRLF(publicationChannel)); | |
| 55 | } | |
| 56 | for (Notification notification : notifications) { | |
| 57 | String key = notification.eventType(); | |
| 58 | String message = objectSerializer.serialize(notification); | |
| 59 | kafkaTemplate.send(publicationChannel, key, message); | |
| 60 | } | |
| 61 | } | |
| 62 | ||
| 63 | } | |
Mutations | ||
| 42 |
1.1 |
|
| 52 |
1.1 |