KafkaNotificationPublisher.java

package com.reallifedeveloper.common.infrastructure.messaging;

import static com.reallifedeveloper.common.domain.LogUtil.removeCRLF;

import java.io.IOException;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

import com.reallifedeveloper.common.application.notification.Notification;
import com.reallifedeveloper.common.application.notification.NotificationPublisher;
import com.reallifedeveloper.common.domain.ErrorHandling;
import com.reallifedeveloper.common.domain.ObjectSerializer;

/**
 * An implementation of the {@link NotificationPublisher} interface which uses <a href="https://kafka.apache.org/">Apache Kafka</a>.
 *
 * @author RealLifeDeveloper
 */
public final class KafkaNotificationPublisher implements NotificationPublisher {

    private static final Logger LOG = LoggerFactory.getLogger(KafkaNotificationPublisher.class);

    private final KafkaTemplate<String, String> kafkaTemplate;

    private final ObjectSerializer<String> objectSerializer;

    /**
     * Creates a new {@code KafkaNotificationPublisher} which uses the given {@code ObjectSerializer} to serialize notifications, and the
     * given {@code KafkaTemplate} to send them to Kafka.
     *
     * @param kafkaTemplate    the {@code KafkaTemplate} to use
     * @param objectSerializer the {@code ObjectSerializer} to use
     */
    @SuppressFBWarnings(value = "EI_EXPOSE_REP2", justification = "The KafkaTemplate is mutable, but that is OK")
    public KafkaNotificationPublisher(KafkaTemplate<String, String> kafkaTemplate, ObjectSerializer<String> objectSerializer) {
        ErrorHandling.checkNull("Arguments must not be null: kafkaTemplate=%s, objectSerializer=%s", kafkaTemplate, objectSerializer);
        this.kafkaTemplate = kafkaTemplate;
        this.objectSerializer = objectSerializer;
    }

    @Override
    @SuppressFBWarnings("NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE") // TODO: Remove this
    public void publish(List<Notification> notifications, String publicationChannel) throws IOException {
        if (LOG.isTraceEnabled()) {
            LOG.trace("publish: notifications={}, publicationChannel={}", removeCRLF(notifications), removeCRLF(publicationChannel));
        }
        for (Notification notification : notifications) {
            String key = notification.eventType();
            String message = objectSerializer.serialize(notification);
            kafkaTemplate.send(publicationChannel, key, message);
        }
    }

}