| 1 | package com.reallifedeveloper.common.infrastructure.messaging; | |
| 2 | ||
| 3 | import static com.reallifedeveloper.common.domain.LogUtil.removeCRLF; | |
| 4 | ||
| 5 | import java.io.IOException; | |
| 6 | import java.nio.charset.StandardCharsets; | |
| 7 | import java.util.List; | |
| 8 | import java.util.concurrent.TimeoutException; | |
| 9 | ||
| 10 | import org.slf4j.Logger; | |
| 11 | import org.slf4j.LoggerFactory; | |
| 12 | ||
| 13 | import com.rabbitmq.client.AMQP.BasicProperties; | |
| 14 | import com.rabbitmq.client.Channel; | |
| 15 | import com.rabbitmq.client.Connection; | |
| 16 | import com.rabbitmq.client.ConnectionFactory; | |
| 17 | import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; | |
| 18 | ||
| 19 | import com.reallifedeveloper.common.application.notification.Notification; | |
| 20 | import com.reallifedeveloper.common.application.notification.NotificationPublisher; | |
| 21 | import com.reallifedeveloper.common.domain.ErrorHandling; | |
| 22 | import com.reallifedeveloper.common.domain.ObjectSerializer; | |
| 23 | ||
| 24 | /** | |
| 25 | * An implementation of the {@link NotificationPublisher} interface that uses <a href="http://www.rabbitmq.com/">RabbiMQ</a>. | |
| 26 | * | |
| 27 | * @author RealLifeDeveloper | |
| 28 | */ | |
| 29 | public final class RabbitMQNotificationPublisher implements NotificationPublisher { | |
| 30 | ||
| 31 | private static final BasicProperties EMPTY_PROPERTIES = new BasicProperties(); | |
| 32 | ||
| 33 | private static final Logger LOG = LoggerFactory.getLogger(RabbitMQNotificationPublisher.class); | |
| 34 | ||
| 35 | private final ConnectionFactory connectionFactory; | |
| 36 | ||
| 37 | private final ObjectSerializer<String> objectSerializer; | |
| 38 | ||
| 39 | /** | |
| 40 | * Creates a new {@code RabbitMQNotificationPublisher} that connects to RabbitMQ using the given {@code ConnectionFactory}. | |
| 41 | * | |
| 42 | * @param connectionFactory the {@code ConnectionFactory} to use to create connections to RabbitMQ | |
| 43 | * @param objectSerializer the {@code ObjectSerializer} to use to serialize notifications | |
| 44 | */ | |
| 45 | @SuppressFBWarnings(value = { "EI_EXPOSE_REP2", | |
| 46 | "CRLF_INJECTION_LOGS" }, justification = "The ConnectionFactory is mutable, but that is OK; " | |
| 47 | + "Logging only of objects, not user data") | |
| 48 | public RabbitMQNotificationPublisher(ConnectionFactory connectionFactory, ObjectSerializer<String> objectSerializer) { | |
| 49 |
1
1. <init> : removed call to com/reallifedeveloper/common/domain/ErrorHandling::checkNull → KILLED |
ErrorHandling.checkNull("Arguments must not be null: connectionFactory=%s, objectSerializer=%s", connectionFactory, |
| 50 | objectSerializer); | |
| 51 | LOG.info("Creating new {}: connectionFactory={}, objectSerializer={}", getClass().getSimpleName(), connectionFactory, | |
| 52 | objectSerializer); | |
| 53 | this.connectionFactory = connectionFactory; | |
| 54 | this.objectSerializer = objectSerializer; | |
| 55 | } | |
| 56 | ||
| 57 | @Override | |
| 58 | public void publish(List<Notification> notifications, String publicationChannel) throws IOException { | |
| 59 | if (LOG.isTraceEnabled()) { | |
| 60 | LOG.trace("publish: notifications={}, publicationChannel={}", removeCRLF(notifications), removeCRLF(publicationChannel)); | |
| 61 | } | |
| 62 |
1
1. publish : removed call to com/reallifedeveloper/common/domain/ErrorHandling::checkNull → SURVIVED |
ErrorHandling.checkNull("Arguments must not be null: notifications=%s, publicationChannel=%s", notifications, publicationChannel); |
| 63 |
1
1. publish : negated conditional → KILLED |
if (!notifications.isEmpty()) { |
| 64 | try (Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel()) { | |
| 65 | for (Notification notification : notifications) { | |
| 66 | String message = objectSerializer.serialize(notification); | |
| 67 |
1
1. publish : removed call to com/rabbitmq/client/Channel::basicPublish → KILLED |
channel.basicPublish(publicationChannel, "", EMPTY_PROPERTIES, message.getBytes(StandardCharsets.UTF_8)); |
| 68 | } | |
| 69 | } catch (TimeoutException e) { | |
| 70 | throw new IOException("Timeout occurred", e); | |
| 71 | } | |
| 72 | } | |
| 73 | } | |
| 74 | } | |
Mutations | ||
| 49 |
1.1 |
|
| 62 |
1.1 |
|
| 63 |
1.1 |
|
| 67 |
1.1 |