RabbitMQNotificationPublisher.java

package com.reallifedeveloper.common.infrastructure.messaging;

import static com.reallifedeveloper.common.domain.LogUtil.removeCRLF;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeoutException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

import com.reallifedeveloper.common.application.notification.Notification;
import com.reallifedeveloper.common.application.notification.NotificationPublisher;
import com.reallifedeveloper.common.domain.ErrorHandling;
import com.reallifedeveloper.common.domain.ObjectSerializer;

/**
 * An implementation of the {@link NotificationPublisher} interface that uses <a href="http://www.rabbitmq.com/">RabbiMQ</a>.
 *
 * @author RealLifeDeveloper
 */
public final class RabbitMQNotificationPublisher implements NotificationPublisher {

    private static final BasicProperties EMPTY_PROPERTIES = new BasicProperties();

    private static final Logger LOG = LoggerFactory.getLogger(RabbitMQNotificationPublisher.class);

    private final ConnectionFactory connectionFactory;

    private final ObjectSerializer<String> objectSerializer;

    /**
     * Creates a new {@code RabbitMQNotificationPublisher} that connects to RabbitMQ using the given {@code ConnectionFactory}.
     *
     * @param connectionFactory the {@code ConnectionFactory} to use to create connections to RabbitMQ
     * @param objectSerializer  the {@code ObjectSerializer} to use to serialize notifications
     */
    @SuppressFBWarnings(value = "EI_EXPOSE_REP2", justification = "The ConnectionFactory is mutable, but that is OK")
    public RabbitMQNotificationPublisher(ConnectionFactory connectionFactory, ObjectSerializer<String> objectSerializer) {
        ErrorHandling.checkNull("Arguments must not be null: connectionFactory=%s, objectSerializer=%s", connectionFactory,
                objectSerializer);
        this.connectionFactory = connectionFactory;
        this.objectSerializer = objectSerializer;
    }

    @Override
    public void publish(List<Notification> notifications, String publicationChannel) throws IOException {
        if (LOG.isTraceEnabled()) {
            LOG.trace("publish: notifications={}, publicationChannel={}", removeCRLF(notifications), removeCRLF(publicationChannel));
        }
        ErrorHandling.checkNull("Arguments must not be null: notifications=%s, publicationChannel=%s", notifications, publicationChannel);
        if (!notifications.isEmpty()) {
            try (Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel()) {
                for (Notification notification : notifications) {
                    String message = objectSerializer.serialize(notification);
                    channel.basicPublish(publicationChannel, "", EMPTY_PROPERTIES, message.getBytes("UTF-8"));
                }
            } catch (TimeoutException e) {
                throw new IOException("Timeout occurred", e);
            }
        }
    }
}