1 package com.reallifedeveloper.common.infrastructure.messaging;
2
3 import static com.reallifedeveloper.common.domain.LogUtil.removeCRLF;
4
5 import java.io.IOException;
6 import java.util.List;
7
8 import org.slf4j.Logger;
9 import org.slf4j.LoggerFactory;
10 import org.springframework.kafka.core.KafkaTemplate;
11
12 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
13
14 import com.reallifedeveloper.common.application.notification.Notification;
15 import com.reallifedeveloper.common.application.notification.NotificationPublisher;
16 import com.reallifedeveloper.common.domain.ErrorHandling;
17 import com.reallifedeveloper.common.domain.ObjectSerializer;
18
19
20
21
22
23
24 public final class KafkaNotificationPublisher implements NotificationPublisher {
25
26 private static final Logger LOG = LoggerFactory.getLogger(KafkaNotificationPublisher.class);
27
28 private final KafkaTemplate<String, String> kafkaTemplate;
29
30 private final ObjectSerializer<String> objectSerializer;
31
32
33
34
35
36
37
38
39 @SuppressFBWarnings(value = { "EI_EXPOSE_REP2",
40 "CRLF_INJECTION_LOGS" }, justification = "The KafkaTemplate is mutable, but that is OK; Logging only of objects, not user data")
41 public KafkaNotificationPublisher(KafkaTemplate<String, String> kafkaTemplate, ObjectSerializer<String> objectSerializer) {
42 ErrorHandling.checkNull("Arguments must not be null: kafkaTemplate=%s, objectSerializer=%s", kafkaTemplate, objectSerializer);
43 LOG.info("Creating new {}: kafkaTemplate={}, objectSerializer={}", getClass().getSimpleName(), kafkaTemplate, objectSerializer);
44 this.kafkaTemplate = kafkaTemplate;
45 this.objectSerializer = objectSerializer;
46 }
47
48 @Override
49 @SuppressWarnings("FutureReturnValueIgnored")
50 @SuppressFBWarnings(value = "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE", justification = "Key can be null, but that is OK")
51 public void publish(List<Notification> notifications, String publicationChannel) throws IOException {
52 ErrorHandling.checkNull("Arguments must not be null: notifications={}, publicationChannel={}", notifications, publicationChannel);
53 if (LOG.isTraceEnabled()) {
54 LOG.trace("publish: notifications={}, publicationChannel={}", removeCRLF(notifications), removeCRLF(publicationChannel));
55 }
56 for (Notification notification : notifications) {
57 String key = notification.eventType();
58 String message = objectSerializer.serialize(notification);
59 kafkaTemplate.send(publicationChannel, key, message);
60 }
61 }
62
63 }