Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Admininistration functionality for [Simple|Direct]MessageListenerContainer via jmx #1521

Open
garyrussell opened this issue Oct 18, 2022 Discussed in #1520 · 2 comments
Open

Comments

@garyrussell
Copy link
Contributor

Discussed in #1520

Originally posted by rfelgent October 18, 2022
Hi @ALL,

at my company we have created code that exposes basic admininistration functionality for [Simple|Direct]MessageListenerContainer: the so called MessageListenerContainerAdmin.

The current feature set is compromised of:

  • start|stop of listener containers for queue x
  • setting concurrency like maxConsumer, minConsumer for queue x
  • setting batch like batchSize, receiveTimeout for queue x

The idea of a MessageListenerContainerAdmin is highly inspired by AmqpAdmin.
My company would like to contribute such code to the core spring-amqp framework.

If you people (especially @garyrussell as lead developer of spring-amqp) are interested in this feature, I could make a PR.

@garyrussell garyrussell added this to the 3.0.0 milestone Oct 18, 2022
@rfelgent
Copy link
Contributor

rfelgent commented Nov 5, 2022

Hi @garyrussell ,

I would like to get some guidance how and where to start.

Here ist the (not cleansed) code to serve as a basis for a discussion:

@Component
@ConditionalOnProperty(prefix = "app.amqp", name = "enable-listener", havingValue = "true")
@ManagedResource(description = "Administration functionality for SimpleMessageListenerContainer")
@RequiredArgsConstructor
@Slf4j
public class SimpleMessageListenerContainerAdmin implements ContainerCustomizer<SimpleMessageListenerContainer> {

  private final RabbitMqProperties rabbitMqProperties;
  // invocation of ContainerCustomizer#configure() happens before endpoint registry bean is available => some kind of "lazy" mode is required
  private final ObjectProvider<RabbitListenerEndpointRegistry> rabbitListenerEndpointRegistry;

  @Override
  public void configure(SimpleMessageListenerContainer container) {
    Arrays.stream(container.getQueueNames()).forEach(queue -> {
      QueueSettings queueSettings = findQueueSettingsByQueue(queue);
      applyQueueSettings(container, queueSettings);
    });
  }

  @ManagedOperation(description = "Concurrency configuration of the consumers for given queue")
  @ManagedOperationParameters({
      @ManagedOperationParameter(name = "queue", description = "name of the queue"),
      @ManagedOperationParameter(name = "concurrency", description = "the number of concurrent consumer to set. Leave empty if no change is desired"),
      @ManagedOperationParameter(name = "maxConcurrency", description = "the number of maximal concurrent consumer to set. Leave empty if no change is desired"),
      @ManagedOperationParameter(name = "autoStartup", description = "auto-startup of the container listener. Leave empty if no change is desired")
  })
  public void setConcurrencyForQueue(String queue, Integer concurrency, Integer maxConcurrency) {
    QueueSettings queueSettings = new QueueSettings(queue, maxConcurrency, concurrency, null, null);
    SimpleMessageListenerContainer container = findContainerByQueue(queue);
    applyQueueSettings(container, queueSettings);
  }

  @ManagedOperation(description = "Batching configuration of the consumers for given queue")
  @ManagedOperationParameters({
      @ManagedOperationParameter(name = "queue", description = "name of the queue"),
      @ManagedOperationParameter(name = "receiveTimeout", description = "the receive timeout in textual format (like PT20s) to set. Leave empty if no change is desired"),
      @ManagedOperationParameter(name = "batchSize", description = "the batch size to set. Leave empty if no change is desired")
  })
  public void setBatchingForQueue(String queue, Duration receiveTimeout, Integer batchSize) {
    QueueSettings queueSettings = new QueueSettings(queue, null, null, receiveTimeout, batchSize);
    SimpleMessageListenerContainer container = findContainerByQueue(queue);
    applyQueueSettings(container, queueSettings);
  }

  @ManagedOperation(description = "Starting or stopping all consumers for given queue")
  @ManagedOperationParameters({
      @ManagedOperationParameter(name = "queue", description = "name of the queue"),
      @ManagedOperationParameter(name = "start", description = "if <b>true</b> the consumers get started otherwise stopped")
  })
  public void startStopConsumers(String queue, boolean start) {
    SimpleMessageListenerContainer container = findContainerByQueue(queue);
    if (start) {
      container.start();
    } else {
      container.stop();
    }
  }

  @ManagedOperation(description = "Returns a list of queues all SimpleMessageListeners are configured for")
  public List<String> getQueueNames() {
    return Objects.requireNonNull(rabbitListenerEndpointRegistry.getIfUnique()).getListenerContainers().stream()
        .filter(SimpleMessageListenerContainer.class::isInstance)
        .map(SimpleMessageListenerContainer.class::cast)
        .map(AbstractMessageListenerContainer::getQueueNames)
        .flatMap(Arrays::stream)
        .collect(Collectors.toList());
  }

  private void applyQueueSettings(SimpleMessageListenerContainer container, QueueSettings queueSettings) {
    Optional.ofNullable(queueSettings.getMaxConcurrency()).ifPresent(container::setMaxConcurrentConsumers);
    Optional.ofNullable(queueSettings.getConcurrency()).ifPresent(container::setConcurrentConsumers);
    Optional.ofNullable(queueSettings.getBatchSize()).ifPresent(container::setBatchSize);
    Optional.ofNullable(queueSettings.getReceiveTimeout()).map(Duration::toMillis).ifPresent(container::setReceiveTimeout);
    Optional.ofNullable(queueSettings.getAutoStartup()).ifPresent(container::setAutoStartup);
  }

  private QueueSettings findQueueSettingsByQueue(String queue) {
    return Stream.concat(rabbitMqProperties.getUpload().queueSettings(), rabbitMqProperties.getDownload().queueSettings())
        .filter(queueSetting -> queueSetting.getQueue().equalsIgnoreCase(queue))
        .findFirst()
        .orElseThrow();
  }

  private SimpleMessageListenerContainer findContainerByQueue(String queue) {
    return Objects.requireNonNull(rabbitListenerEndpointRegistry.getIfUnique()).getListenerContainers().stream()
        .filter(SimpleMessageListenerContainer.class::isInstance)
        .map(SimpleMessageListenerContainer.class::cast)
        .filter(c -> Arrays.asList(c.getQueueNames()).contains(queue))
        .findFirst()
        .orElseThrow();
  }
}

Some explainations:

  • support for build (via Customizer) and runtime (via JMX) configuration
  • the resolve mechanism of the concurrency settings require some refactoring, as it is highly coupled with SpringBoots-Configuration properties (see QueueSettings and RabbitMqProperties)
  • the lombock annotations must be refactored/removed (like @RequiredArgsConstructor)
  • the configuration is "queue" driven not "container"-driven; by knowing the queue-name (and not container-id) you configure everything...

some questions about the concept:

  • this solution handles several MessageListenerContainer instances. Putting @ManagedResource(description = "Administration functionality for SimpleMessageListenerContainer") on the SimpleMessageListenerContainer is not applicable, so a "manager" class required, right ?
  • do you wanna seperate the runtime and build time configuration

@garyrussell
Copy link
Contributor Author

@rfelgent

You are correct; we can't use Lombok in the framework.

We also can't use Boot @Conditional... annotations.

Yes, I prefer a separate class rather than annotating the container.

I am not sure what you mean by "build time configuration" - isn't that already handled by Boot?

# for free to join this conversation on GitHub. Already have an account? # to comment
Projects
None yet
Development

No branches or pull requests

2 participants