Skip to content

Rabbitmq Consumers Stops consuming #1018

@HootanKhadem

Description

@HootanKhadem

Describe the bug

I tried to find the answer to my question in the old issues that were mentioned here but I could not find anything like my problem. I have asked the same question in StackOverflow but I did not get an answer from anyone. sorry if this is long and sorry if I am mentioning this in the wrong place, but I needed help.

I am using rabbitmq in a microservice environment, and I use it to send messages and events to other services.
my projects are java spring boot and I am using rabbitmq java client's latest version.
The problem is that some consumers stop consuming at random times, and the only way to get them going again is to restart the application. I have not been able to find a pattern. here is what I have done and know:

  • some consumers stop working not all of them.
  • I have enabled auto-recovery option
  • publishers of the same application have no problems.
  • This is my rabbitmq client: https://mvnrepository.com/artifact/com.rabbitmq/amqp-client/5.17.0
  • I have monitored both java applications and rabbimq memory and CPU usages, they do not have memory or CPU shortages
  • I have read the following links and they did not solve my problem:

https://stackoverflow.com/questions/22941644/rabbitmq-surviving-a-consumer-disconnection-using-defaultconsumer

https://stackoverflow.com/questions/49338247/rabbitmq-connect-disconnect-notifications

https://stackoverflow.com/questions/17811786/rabbitmq-disconnect-me-after-some-time

https://stackoverflow.com/questions/38771664/rabbitmq-java-client-stops-consuming-messages

https://stackoverflow.com/questions/48033118/temporarily-stop-consuming-rabbitmq-messages-and-resume-later

https://stackoverflow.com/questions/6732936/rabbitmq-consumer-stops-receiving-messages

https://stackoverflow.com/questions/3284731/rabbitmq-message-consumers-stop-consuming-messages

  • this is how I have coded my base consumer :
public abstract class BaseConsumer implements Consumer {
    private final Logger log = LoggerFactory.getLogger(BaseConsumer.class);

    protected Connection connection;
    protected Channel channel;
    protected String exchangeType;
    protected String exchangeName;
    protected String queueName;
    protected String routingKey;
    protected String consumerTag;
    protected String deadLetterExchangeName;
    protected String deadLetterQueueName;
    protected String deadLetterRoutingKey;
    protected boolean autoDelete;
    protected boolean durableQueue;
    protected boolean exclusiveQueue;
    protected boolean requiresConfirm;

    protected BaseConsumer(Connection connection) {
        this.connection = connection;
        this.requiresConfirm = false;
    }

    @Override
    public void consume() {
        consumeMessage();
    }

    protected void consumeMessage() {
        try {
            Channel channel = createChannel();
            declareChannel(channel);
            log.info(String.format("%s: [*] Waiting for %s. To exit press CTRL+C", getClassName(), queueName));
            channel.basicConsume(queueName, !requiresConfirm, consumerTag, getConsumer(channel));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private DefaultConsumer getConsumer(Channel channel) {
        return new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                handleMessageProcess(body, envelope);
            }
        };
    }

    private void handleMessageProcess(byte[] body, Envelope envelope) {
        String message = new String(body);
        log.info(String.format("%s: [x] received '%s'", getClassName(), message));
        long deliveryTag = envelope.getDeliveryTag();
        if (validateMessage(message)) {
            log.info("validation was Successful");
            if (requiresConfirm) {
                log.info("******** handling delivery");
                try {
                    basicConsume(message);
                    ackMessage(channel, deliveryTag);
                } catch (Exception e) {
                    log.error(String.format("Consume Failed Moving Message To DLQ. Error Is: \n%s", e));
                    nackMessage(channel, deliveryTag);
                }
            } else {
                basicConsume(message);
            }
        } else {
            log.info("validation was not Successful");
            messageNotValidProcess(message, deliveryTag);
        }
    }

    protected void ackMessage(Channel channel, long deliveryTag) {
        try {
            log.info("Message was acknowledged");
            channel.basicAck(deliveryTag, false);
        } catch (IOException ioe) {
            ioe.printStackTrace();
        }
    }

    protected void nackMessage(Channel channel, long deliveryTag) {
        try {
            log.info("Message Not Acknowledged");
            channel.basicNack(deliveryTag, false, false);
        } catch (IOException ioe) {
            ioe.printStackTrace();
        }
    }

    protected boolean validateMessage(String message) {
        return ValidationUtils.isJsonObject(message);
    }

    protected void basicConsume(String message) {
        log.info(" [x] consumed '" + message + "' (do nothing - not implemented)");
    }

    protected void messageNotValidProcess(String message, long deliveryTag) {
        log.info(String.format("[x] %d consuming was unsuccessful. %s is not valid %s", deliveryTag, queueName, message));
    }

    private Channel createChannel() throws IOException {
        if (this.channel == null || !this.channel.isOpen()) {
            this.channel = connection.createChannel();
            try {
                declareChannel(channel);
            } catch (IOException | ShutdownSignalException exception) {
                this.channel = connection.createChannel();
                log.warn("Queue parameters had changed, trying recreating them...");
                deleteQueueIfEmpty(channel, queueName);
                if (deadLetterQueueName != null) {
                    deleteQueueIfEmpty(channel, deadLetterQueueName);
                }
                declareChannel(channel);
            }
        }
        return channel;
    }

    private void declareChannel(Channel channel) throws IOException {
        channel.exchangeDeclare(exchangeName, exchangeType);
        Map<String, Object> args = new HashMap<>();
        if (deadLetterExchangeName != null && deadLetterRoutingKey != null && deadLetterQueueName != null) {
            args.put("x-dead-letter-exchange", deadLetterExchangeName);
            args.put("x-dead-letter-routing-key", deadLetterRoutingKey);
            channel.exchangeDeclare(deadLetterExchangeName, exchangeType);
            channel.queueDeclare(deadLetterQueueName, false, false, false, null);
            channel.queueBind(deadLetterQueueName, deadLetterExchangeName, deadLetterRoutingKey);
        }
        channel.queueDeclare(queueName, durableQueue, autoDelete, exclusiveQueue, args);
        channel.queueBind(queueName, exchangeName, routingKey);

        if (requiresConfirm) {
            channel.confirmSelect();
            channel.addConfirmListener(this::confirmed, this::notConfirmed);
        }
    }

    private void deleteQueueIfEmpty(Channel channel, String queueName) {
        log.info(String.format("Deleting Queue: %s", queueName));
        try {
            channel.queueDelete(queueName, false, true);
        } catch (Exception e) {
            log.error(String.format("Couldn't delete queue with name: %s", queueName));
            e.printStackTrace();
        }
    }

    protected void confirmed(long sequenceNumber, boolean multiple) {
        // code when message is confirmed
        log.info(String.format("Message with sequenceNumber: %d is confirmed.", sequenceNumber));
    }

    protected void notConfirmed(long sequenceNumber, boolean multiple) {
        // code when message is nack-ed
        log.info(String.format("Message with sequenceNumber: %d is NOT confirmed!!!", sequenceNumber));
    }

    private String getClassName() {
        return this.getClass().getSimpleName();
    }
}

in addition to the code above I have set these parameters as followed:

ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(hostName);
        if (password != null) {
            factory.setPassword(password);
        }
        if (username != null) {
            factory.setUsername(username);
        }
factory.setAutomaticRecoveryEnabled(true);
        factory.setRequestedHeartbeat(30);

lowering the default heartbeat made this problem happen less frequently, but it still happens. also, the documents recommend to not set this parameter too low, so I did not lower it from 30.
I have many classes extending this abstract class and implementing the parts that I need to. this is a serious problem for me because some features of our application do not work correctly.

thanks in advance.

Reproduction steps

As I explained in the description, there are no usual steps to this bug and I cannot figure out what is wrong.

Expected behavior

I expected that consumers would not stop listening, or recover if they ever get disconnected

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions