1 | package com.reallifedeveloper.tools.rabbitmq; | |
2 | ||
3 | import java.io.IOException; | |
4 | import java.util.concurrent.TimeoutException; | |
5 | ||
6 | import com.rabbitmq.client.Channel; | |
7 | import com.rabbitmq.client.Connection; | |
8 | import com.rabbitmq.client.ConnectionFactory; | |
9 | import com.rabbitmq.client.Envelope; | |
10 | import com.rabbitmq.client.GetResponse; | |
11 | ||
12 | /** | |
13 | * Moves messages from a queue, e.g., a DLQ, to an exchange. This can be useful for redelivering messages that have been dead-lettered. | |
14 | * | |
15 | * @author RealLifeDeveloper | |
16 | */ | |
17 | public final class MoveMessages { | |
18 | ||
19 | private final ConnectionFactory connectionFactory; | |
20 | ||
21 | /** | |
22 | * Creates a new {@code MoveMessages} instance that uses the given {@code ConnectionFactory} to connect to RabbitMQ. | |
23 | * | |
24 | * @param connectionFactory the {@code ConnectionFactory} to use to connect to Rabb8itMQ | |
25 | */ | |
26 | public MoveMessages(ConnectionFactory connectionFactory) { | |
27 |
1
1. <init> : negated conditional → RUN_ERROR |
if (connectionFactory == null) { |
28 | throw new IllegalArgumentException("connectionFactory must not be null"); | |
29 | } | |
30 | this.connectionFactory = connectionFactory.clone(); | |
31 | } | |
32 | ||
33 | /** | |
34 | * Factory method that creates a new {@code MoveMessages} object that connects to RabbitMQ on the given host and vhost, using the given | |
35 | * username and password. | |
36 | * | |
37 | * @param host the name of the host on which RabbitMQ is running | |
38 | * @param username the username to use for authentication | |
39 | * @param password the password to use for authentication | |
40 | * @param virtualHost the vhost to connect to | |
41 | * | |
42 | * @return the new {@code MoveMessages} instance | |
43 | */ | |
44 | @SuppressWarnings("PMD.UseObjectForClearerAPI") | |
45 | public static MoveMessages createInstance(String host, String username, String password, String virtualHost) { | |
46 | ConnectionFactory factory = new ConnectionFactory(); | |
47 |
1
1. createInstance : removed call to com/rabbitmq/client/ConnectionFactory::setHost → KILLED |
factory.setHost(host); |
48 |
1
1. createInstance : removed call to com/rabbitmq/client/ConnectionFactory::setUsername → KILLED |
factory.setUsername(username); |
49 |
1
1. createInstance : removed call to com/rabbitmq/client/ConnectionFactory::setPassword → KILLED |
factory.setPassword(password); |
50 |
1
1. createInstance : removed call to com/rabbitmq/client/ConnectionFactory::setVirtualHost → KILLED |
factory.setVirtualHost(virtualHost); |
51 |
1
1. createInstance : replaced return value with null for com/reallifedeveloper/tools/rabbitmq/MoveMessages::createInstance → KILLED |
return new MoveMessages(factory); |
52 | } | |
53 | ||
54 | /** | |
55 | * Moves all messages from a queue to an exchange. | |
56 | * | |
57 | * @param fromQueue the queue from which to read messages | |
58 | * @param toExchange the exchange to which to send messages | |
59 | * | |
60 | * @throws IOException if moving the messages failed because of an I/O problem | |
61 | * @throws TimeoutException if connecting to the broker timed out | |
62 | */ | |
63 | public void moveAllMessagesToExchange(String fromQueue, String toExchange) throws IOException, TimeoutException { | |
64 | try (Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel()) { | |
65 | while (true) { | |
66 | GetResponse response = channel.basicGet(fromQueue, false); | |
67 |
1
1. moveAllMessagesToExchange : negated conditional → KILLED |
if (response == null) { |
68 | return; | |
69 | } | |
70 | Envelope envelope = response.getEnvelope(); | |
71 | String routingKey = envelope.getRoutingKey(); | |
72 |
1
1. moveAllMessagesToExchange : removed call to com/rabbitmq/client/Channel::basicPublish → KILLED |
channel.basicPublish(toExchange, routingKey, response.getProps(), response.getBody()); |
73 |
1
1. moveAllMessagesToExchange : removed call to com/rabbitmq/client/Channel::basicAck → SURVIVED |
channel.basicAck(envelope.getDeliveryTag(), false); |
74 | } | |
75 | } | |
76 | } | |
77 | ||
78 | } | |
Mutations | ||
27 |
1.1 |
|
47 |
1.1 |
|
48 |
1.1 |
|
49 |
1.1 |
|
50 |
1.1 |
|
51 |
1.1 |
|
67 |
1.1 |
|
72 |
1.1 |
|
73 |
1.1 |