View Javadoc
1   package com.reallifedeveloper.common.infrastructure.messaging;
2   
3   import static com.reallifedeveloper.common.domain.LogUtil.removeCRLF;
4   
5   import java.io.IOException;
6   import java.util.List;
7   
8   import org.slf4j.Logger;
9   import org.slf4j.LoggerFactory;
10  import org.springframework.kafka.core.KafkaTemplate;
11  
12  import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
13  
14  import com.reallifedeveloper.common.application.notification.Notification;
15  import com.reallifedeveloper.common.application.notification.NotificationPublisher;
16  import com.reallifedeveloper.common.domain.ErrorHandling;
17  import com.reallifedeveloper.common.domain.ObjectSerializer;
18  
19  /**
20   * An implementation of the {@link NotificationPublisher} interface which uses <a href="https://kafka.apache.org/">Apache Kafka</a>.
21   *
22   * @author RealLifeDeveloper
23   */
24  public final class KafkaNotificationPublisher implements NotificationPublisher {
25  
26      private static final Logger LOG = LoggerFactory.getLogger(KafkaNotificationPublisher.class);
27  
28      private final KafkaTemplate<String, String> kafkaTemplate;
29  
30      private final ObjectSerializer<String> objectSerializer;
31  
32      /**
33       * Creates a new {@code KafkaNotificationPublisher} which uses the given {@code ObjectSerializer} to serialize notifications, and the
34       * given {@code KafkaTemplate} to send them to Kafka.
35       *
36       * @param kafkaTemplate    the {@code KafkaTemplate} to use
37       * @param objectSerializer the {@code ObjectSerializer} to use
38       */
39      @SuppressFBWarnings(value = "EI_EXPOSE_REP2", justification = "The KafkaTemplate is mutable, but that is OK")
40      public KafkaNotificationPublisher(KafkaTemplate<String, String> kafkaTemplate, ObjectSerializer<String> objectSerializer) {
41          ErrorHandling.checkNull("Arguments must not be null: kafkaTemplate=%s, objectSerializer=%s", kafkaTemplate, objectSerializer);
42          this.kafkaTemplate = kafkaTemplate;
43          this.objectSerializer = objectSerializer;
44      }
45  
46      @Override
47      @SuppressFBWarnings("NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE") // TODO: Remove this
48      public void publish(List<Notification> notifications, String publicationChannel) throws IOException {
49          if (LOG.isTraceEnabled()) {
50              LOG.trace("publish: notifications={}, publicationChannel={}", removeCRLF(notifications), removeCRLF(publicationChannel));
51          }
52          for (Notification notification : notifications) {
53              String key = notification.eventType();
54              String message = objectSerializer.serialize(notification);
55              kafkaTemplate.send(publicationChannel, key, message);
56          }
57      }
58  
59  }