Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

The CachingConnectionFactory findOpenChannel message lock contention is severe #2542

Open
missence opened this issue Oct 12, 2023 · 3 comments

Comments

@missence
Copy link

          I get **22,900/second** with this app...
@SpringBootApplication
public class Gh644Application implements CommandLineRunner {

	public static void main(String[] args) {
		SpringApplication.run(Gh644Application.class, args).close();
	}

	@Autowired
	private RabbitTemplate template;

	@Autowired
	private RabbitListenerEndpointRegistry registry;

	private final CountDownLatch latch = new CountDownLatch(1_000_000);

	@Override
	public void run(String... arg0) throws Exception {
		for (int i = 0; i < 1_000_000; i++) {
			template.convertAndSend("perf", "foo");
		}
		StopWatch watch = new StopWatch();
		watch.start();
		this.registry.start();
		this.latch.await();
		watch.stop();
		System.out.println(watch.getTotalTimeMillis() + " rate: " + 1_000_000_000.0 / watch.getTotalTimeMillis());
	}

	@Bean
	public Queue perf() {
		return new Queue("perf", false, false, true);
	}

	@RabbitListener(queues = "perf")
	public void listen(Message message) {
		this.latch.countDown();
	}

}

and

spring.rabbitmq.listener.simple.prefetch=100
spring.rabbitmq.listener.simple.transaction-size=50
spring.rabbitmq.listener.simple.auto-startup=false

When I remove the transaction-size property (so we send an ack per message), I got 17,500/sec.

When I used the native consumer, I got very similar results (17k/sec).

...
//		this.registry.start();

		ConnectionFactory cf = new ConnectionFactory();
		Connection conn = cf.newConnection();
		Channel channel = conn.createChannel();
		channel.basicQos(100);
		channel.basicConsume("perf", new DefaultConsumer(channel) {

			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				latch.countDown();
				channel.basicAck(envelope.getDeliveryTag(), false);
			}

		});
		this.latch.await();
		watch.stop();
		System.out.println(watch.getTotalTimeMillis() + " rate: " + 1_000_000_000.0 / watch.getTotalTimeMillis());
		channel.close();
		conn.close();

So there must be something else going on in your code.

Originally posted by @garyrussell in #644 (comment)

Your example is single-threaded, and if you change it to multi-threaded, you will have a lock race. When executing the CachingConnectionFactory findOpenChannel method, a lock wait will occur, and the greater the concurrency, the longer the wait

@garyrussell
Copy link
Contributor

I am not sure what you are looking for here; of course there will be lock contention - what are you suggesting?

@missence
Copy link
Author

Lock waits occur, channel acquisition takes time, resulting in lower pressure test results than expected, and mq cluster copies are lost
企业微信截图_99a27718-f8cf-4136-bed0-2feea93d14af

@garyrussell
Copy link
Contributor

So, what do you suggest?

# for free to join this conversation on GitHub. Already have an account? # to comment
Projects
None yet
Development

No branches or pull requests

2 participants