1 package com.reallifedeveloper.common.infrastructure.messaging;
2
3 import static com.reallifedeveloper.common.domain.LogUtil.removeCRLF;
4
5 import java.io.IOException;
6 import java.util.List;
7
8 import org.slf4j.Logger;
9 import org.slf4j.LoggerFactory;
10 import org.springframework.kafka.core.KafkaTemplate;
11
12 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
13
14 import com.reallifedeveloper.common.application.notification.Notification;
15 import com.reallifedeveloper.common.application.notification.NotificationPublisher;
16 import com.reallifedeveloper.common.domain.ErrorHandling;
17 import com.reallifedeveloper.common.domain.ObjectSerializer;
18
19
20
21
22
23
24 public final class KafkaNotificationPublisher implements NotificationPublisher {
25
26 private static final Logger LOG = LoggerFactory.getLogger(KafkaNotificationPublisher.class);
27
28 private final KafkaTemplate<String, String> kafkaTemplate;
29
30 private final ObjectSerializer<String> objectSerializer;
31
32
33
34
35
36
37
38
39 @SuppressFBWarnings(value = "EI_EXPOSE_REP2", justification = "The KafkaTemplate is mutable, but that is OK")
40 public KafkaNotificationPublisher(KafkaTemplate<String, String> kafkaTemplate, ObjectSerializer<String> objectSerializer) {
41 ErrorHandling.checkNull("Arguments must not be null: kafkaTemplate=%s, objectSerializer=%s", kafkaTemplate, objectSerializer);
42 this.kafkaTemplate = kafkaTemplate;
43 this.objectSerializer = objectSerializer;
44 }
45
46 @Override
47 @SuppressFBWarnings("NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE")
48 public void publish(List<Notification> notifications, String publicationChannel) throws IOException {
49 if (LOG.isTraceEnabled()) {
50 LOG.trace("publish: notifications={}, publicationChannel={}", removeCRLF(notifications), removeCRLF(publicationChannel));
51 }
52 for (Notification notification : notifications) {
53 String key = notification.eventType();
54 String message = objectSerializer.serialize(notification);
55 kafkaTemplate.send(publicationChannel, key, message);
56 }
57 }
58
59 }