MoveMessages.java
package com.reallifedeveloper.tools.rabbitmq;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.GetResponse;
/**
* Moves messages from a queue, e.g., a DLQ, to an exchange. This can be useful for redelivering messages that have been dead-lettered.
*
* @author RealLifeDeveloper
*/
public final class MoveMessages {
private final ConnectionFactory connectionFactory;
/**
* Creates a new {@code MoveMessages} instance that uses the given {@code ConnectionFactory} to connect to RabbitMQ.
*
* @param connectionFactory the {@code ConnectionFactory} to use to connect to Rabb8itMQ
*/
public MoveMessages(ConnectionFactory connectionFactory) {
if (connectionFactory == null) {
throw new IllegalArgumentException("connectionFactory must not be null");
}
this.connectionFactory = connectionFactory.clone();
}
/**
* Factory method that creates a new {@code MoveMessages} object that connects to RabbitMQ on the given host and vhost, using the given
* username and password.
*
* @param host the name of the host on which RabbitMQ is running
* @param username the username to use for authentication
* @param password the password to use for authentication
* @param virtualHost the vhost to connect to
*
* @return the new {@code MoveMessages} instance
*/
@SuppressWarnings("PMD.UseObjectForClearerAPI")
public static MoveMessages createInstance(String host, String username, String password, String virtualHost) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return new MoveMessages(factory);
}
/**
* Moves all messages from a queue to an exchange.
*
* @param fromQueue the queue from which to read messages
* @param toExchange the exchange to which to send messages
*
* @throws IOException if moving the messages failed because of an I/O problem
* @throws TimeoutException if connecting to the broker timed out
*/
public void moveAllMessagesToExchange(String fromQueue, String toExchange) throws IOException, TimeoutException {
try (Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel()) {
while (true) {
GetResponse response = channel.basicGet(fromQueue, false);
if (response == null) {
return;
}
Envelope envelope = response.getEnvelope();
String routingKey = envelope.getRoutingKey();
channel.basicPublish(toExchange, routingKey, response.getProps(), response.getBody());
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
}
}