Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Adds the ability to add record interceptors instead of override them #3542

Open
Nyamiou opened this issue Oct 8, 2024 · 5 comments
Open

Comments

@Nyamiou
Copy link

Nyamiou commented Oct 8, 2024

Expected Behavior

I'm using a Spring Cloud Steam ListenerContainerCustomizer to customize a AbstractMessageListenerContainer in order to add a record interceptor. I would expect to be able to add a record interceptor at the last position, or to be able to get the current record interceptor, create a CompositeRecordInterceptor to do the same thing and then set it (either is fine).

Current Behavior

The issue I have is that only setRecordInterceptor(...) is public, not getRecordInterceptor() and there is no addRecordInterceptor(...) either. I would like to keep the existing record interceptors there and just add mine in last position but there seem to be no easy way to do this.

Context

I have a code similar to this:

@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> listenerContainerCustomizer() {
	return (container, destinationName, group) -> container.setRecordInterceptor(new MyRecordInterceptor());
}

That I want to use in a small company internal library, but I don't want to remove the other projects the ability to use their own record interceptors.

I tried to use Spring Cloud Steam ChannelInterceptor (with GlobalChannelInterceptor) to achieve the thing I wanted, but they call afterSendCompletion(...) after each attempts, while the record interceptor afterRecord(...) is called after all attempts. Also they do not allow to modify headers.

@artembilan
Copy link
Member

Thank for the report!
We don't find this as an easy fix since there is going to be too many moving parts with those add/remove([indext]) in the CompositeRecordInterceptor and AbstractMessageListenerContainer.
So, unless you see how to fix it quickly, we are going to look into that for the next version.

@Nyamiou
Copy link
Author

Nyamiou commented Oct 8, 2024

No problem, this is not urgent. I just wanted this to hopefully be fixed in the future. Thank you for answering so quickly.

@chickenchickenlove
Copy link
Contributor

chickenchickenlove commented Oct 9, 2024

@Nyamiou
I have an idea for you. maybe, it could bo short-term solution.
How about using this class for your case?

public class TestRecordInterceptor<K,V> implements RecordInterceptor<K, V> {

    private final List<RecordInterceptor<K, V>> children = new ArrayList<>();

    public void addRecordInterceptor(RecordInterceptor<K, V> interceptor) {
        this.children.add(interceptor);
    }
    
    @Override
    public ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {
        ConsumerRecord<K, V> invokedRecord = record;
        for (RecordInterceptor<K, V> child : children) {
            invokedRecord = child.intercept(invokedRecord, consumer);
        }
        return invokedRecord;
    }
    
}

@artembilan
Copy link
Member

Right, @chickenchickenlove , that will work for some use-case.
But I believe @Nyamiou 's request is more about AbstractMessageListenerContainer internals, where an addRecordInterceptor() API would be exposed to deal with a CompositeRecordInterceptor behind the scene.
That's why the request, but not a workaround in the target project.

@chickenchickenlove
Copy link
Contributor

I see!
Thank you for the explanation 🙇‍♂️

# for free to join this conversation on GitHub. Already have an account? # to comment
Projects
None yet
Development

No branches or pull requests

4 participants