| 1 | package com.reallifedeveloper.tools.rabbitmq; | |
| 2 | ||
| 3 | import java.io.IOException; | |
| 4 | import java.util.concurrent.TimeoutException; | |
| 5 | ||
| 6 | import com.rabbitmq.client.Channel; | |
| 7 | import com.rabbitmq.client.Connection; | |
| 8 | import com.rabbitmq.client.ConnectionFactory; | |
| 9 | import com.rabbitmq.client.Envelope; | |
| 10 | import com.rabbitmq.client.GetResponse; | |
| 11 | ||
| 12 | /** | |
| 13 | * Moves messages from a queue, e.g., a DLQ, to an exchange. This can be useful for redelivering messages that have been dead-lettered. | |
| 14 | * | |
| 15 | * @author RealLifeDeveloper | |
| 16 | */ | |
| 17 | public final class MoveMessages { | |
| 18 | ||
| 19 | private final ConnectionFactory connectionFactory; | |
| 20 | ||
| 21 | /** | |
| 22 | * Creates a new {@code MoveMessages} instance that uses the given {@code ConnectionFactory} to connect to RabbitMQ. | |
| 23 | * | |
| 24 | * @param connectionFactory the {@code ConnectionFactory} to use to connect to Rabb8itMQ | |
| 25 | */ | |
| 26 | public MoveMessages(ConnectionFactory connectionFactory) { | |
| 27 |
1
1. <init> : negated conditional → RUN_ERROR |
if (connectionFactory == null) { |
| 28 | throw new IllegalArgumentException("connectionFactory must not be null"); | |
| 29 | } | |
| 30 | this.connectionFactory = connectionFactory.clone(); | |
| 31 | } | |
| 32 | ||
| 33 | /** | |
| 34 | * Factory method that creates a new {@code MoveMessages} object that connects to RabbitMQ on the given host and vhost, using the given | |
| 35 | * username and password. | |
| 36 | * | |
| 37 | * @param host the name of the host on which RabbitMQ is running | |
| 38 | * @param username the username to use for authentication | |
| 39 | * @param password the password to use for authentication | |
| 40 | * @param virtualHost the vhost to connect to | |
| 41 | * | |
| 42 | * @return the new {@code MoveMessages} instance | |
| 43 | */ | |
| 44 | @SuppressWarnings("PMD.UseObjectForClearerAPI") | |
| 45 | public static MoveMessages createInstance(String host, String username, String password, String virtualHost) { | |
| 46 | ConnectionFactory factory = new ConnectionFactory(); | |
| 47 |
1
1. createInstance : removed call to com/rabbitmq/client/ConnectionFactory::setHost → KILLED |
factory.setHost(host); |
| 48 |
1
1. createInstance : removed call to com/rabbitmq/client/ConnectionFactory::setUsername → KILLED |
factory.setUsername(username); |
| 49 |
1
1. createInstance : removed call to com/rabbitmq/client/ConnectionFactory::setPassword → KILLED |
factory.setPassword(password); |
| 50 |
1
1. createInstance : removed call to com/rabbitmq/client/ConnectionFactory::setVirtualHost → KILLED |
factory.setVirtualHost(virtualHost); |
| 51 |
1
1. createInstance : replaced return value with null for com/reallifedeveloper/tools/rabbitmq/MoveMessages::createInstance → KILLED |
return new MoveMessages(factory); |
| 52 | } | |
| 53 | ||
| 54 | /** | |
| 55 | * Moves all messages from a queue to an exchange. | |
| 56 | * | |
| 57 | * @param fromQueue the queue from which to read messages | |
| 58 | * @param toExchange the exchange to which to send messages | |
| 59 | * | |
| 60 | * @throws IOException if moving the messages failed because of an I/O problem | |
| 61 | * @throws TimeoutException if connecting to the broker timed out | |
| 62 | */ | |
| 63 | public void moveAllMessagesToExchange(String fromQueue, String toExchange) throws IOException, TimeoutException { | |
| 64 | try (Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel()) { | |
| 65 | while (true) { | |
| 66 | GetResponse response = channel.basicGet(fromQueue, false); | |
| 67 |
1
1. moveAllMessagesToExchange : negated conditional → KILLED |
if (response == null) { |
| 68 | return; | |
| 69 | } | |
| 70 | Envelope envelope = response.getEnvelope(); | |
| 71 | String routingKey = envelope.getRoutingKey(); | |
| 72 |
1
1. moveAllMessagesToExchange : removed call to com/rabbitmq/client/Channel::basicPublish → KILLED |
channel.basicPublish(toExchange, routingKey, response.getProps(), response.getBody()); |
| 73 |
1
1. moveAllMessagesToExchange : removed call to com/rabbitmq/client/Channel::basicAck → SURVIVED |
channel.basicAck(envelope.getDeliveryTag(), false); |
| 74 | } | |
| 75 | } | |
| 76 | } | |
| 77 | ||
| 78 | } | |
Mutations | ||
| 27 |
1.1 |
|
| 47 |
1.1 |
|
| 48 |
1.1 |
|
| 49 |
1.1 |
|
| 50 |
1.1 |
|
| 51 |
1.1 |
|
| 67 |
1.1 |
|
| 72 |
1.1 |
|
| 73 |
1.1 |