View Javadoc
1   package com.reallifedeveloper.common.infrastructure.messaging;
2   
3   import static com.reallifedeveloper.common.domain.LogUtil.removeCRLF;
4   
5   import java.io.IOException;
6   import java.util.List;
7   
8   import org.slf4j.Logger;
9   import org.slf4j.LoggerFactory;
10  import org.springframework.kafka.core.KafkaTemplate;
11  
12  import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
13  
14  import com.reallifedeveloper.common.application.notification.Notification;
15  import com.reallifedeveloper.common.application.notification.NotificationPublisher;
16  import com.reallifedeveloper.common.domain.ErrorHandling;
17  import com.reallifedeveloper.common.domain.ObjectSerializer;
18  
19  /**
20   * An implementation of the {@link NotificationPublisher} interface which uses <a href="https://kafka.apache.org/">Apache Kafka</a>.
21   *
22   * @author RealLifeDeveloper
23   */
24  public final class KafkaNotificationPublisher implements NotificationPublisher {
25  
26      private static final Logger LOG = LoggerFactory.getLogger(KafkaNotificationPublisher.class);
27  
28      private final KafkaTemplate<String, String> kafkaTemplate;
29  
30      private final ObjectSerializer<String> objectSerializer;
31  
32      /**
33       * Creates a new {@code KafkaNotificationPublisher} which uses the given {@code ObjectSerializer} to serialize notifications, and the
34       * given {@code KafkaTemplate} to send them to Kafka.
35       *
36       * @param kafkaTemplate    the {@code KafkaTemplate} to use
37       * @param objectSerializer the {@code ObjectSerializer} to use
38       */
39      @SuppressFBWarnings(value = { "EI_EXPOSE_REP2",
40              "CRLF_INJECTION_LOGS" }, justification = "The KafkaTemplate is mutable, but that is OK; Logging only of objects, not user data")
41      public KafkaNotificationPublisher(KafkaTemplate<String, String> kafkaTemplate, ObjectSerializer<String> objectSerializer) {
42          ErrorHandling.checkNull("Arguments must not be null: kafkaTemplate=%s, objectSerializer=%s", kafkaTemplate, objectSerializer);
43          LOG.info("Creating new {}: kafkaTemplate={}, objectSerializer={}", getClass().getSimpleName(), kafkaTemplate, objectSerializer);
44          this.kafkaTemplate = kafkaTemplate;
45          this.objectSerializer = objectSerializer;
46      }
47  
48      @Override
49      @SuppressWarnings("FutureReturnValueIgnored") // TODO: Remove this
50      @SuppressFBWarnings(value = "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE", justification = "Key can be null, but that is OK")
51      public void publish(List<Notification> notifications, String publicationChannel) throws IOException {
52          ErrorHandling.checkNull("Arguments must not be null: notifications={}, publicationChannel={}", notifications, publicationChannel);
53          if (LOG.isTraceEnabled()) {
54              LOG.trace("publish: notifications={}, publicationChannel={}", removeCRLF(notifications), removeCRLF(publicationChannel));
55          }
56          for (Notification notification : notifications) {
57              String key = notification.eventType();
58              String message = objectSerializer.serialize(notification);
59              kafkaTemplate.send(publicationChannel, key, message);
60          }
61      }
62  
63  }