Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Stream keys on pbc issues, with early close of the stream [JIRA: CLIENTS-1051] #507

Open
Guibod opened this issue Nov 29, 2016 · 3 comments

Comments

@Guibod
Copy link

Guibod commented Nov 29, 2016

If you close a stream before it is finished, then it still try to drain an opened socket with incoming data.
Maybe you should close the resource instead of releasing it in such a case.

    def close(self):
        # We have to drain the socket to make sure that we don't get
        # weird responses when some other request comes after a
        # failed/prematurely-terminated one.
        try:
            while self.next():
                pass
        except StopIteration:
            pass
        self.resource.release()

Of course you cannot achieve that with the client wrapper that yields results through an iterator, and is not closeable.

@Basho-JIRA Basho-JIRA changed the title Stream keys on pbc issues, with early close of the stream Stream keys on pbc issues, with early close of the stream [JIRA: CLIENTS-1051] Nov 29, 2016
@Guibod
Copy link
Author

Guibod commented Nov 29, 2016

I also suggest that you update documentation. Because the stream_keys() wrapper cannot be manually closed as stated in documentation. The wrapper try/finally for us.

@Guibod
Copy link
Author

Guibod commented Nov 29, 2016

I've solved, by deleting my resource

   # obviously tthis is a method in a class with self.riak = client, and self.bucket, the bucket
    def stream(self, limit=10):
        i = 0
        resource = self.riak._acquire()
        transport = resource.object
        stream = transport.stream_keys(self.bucket, timeout=timeout)
        stream.attach(resource)

        try:
            for keylist in stream:
                for key in keylist:
                    key = bytes_to_str(key)
                    i += 1
                    if limit and i > limit:
                        logger.debug('Stream limit reached (%d)' % limit)
                        raise StopIteration
                    yield self.bucket.get(key)
        finally:
            self.riak._choose_pool().delete_resource(stream.resource)

@lukebakken lukebakken added this to the riak-python-client-2.7.0 milestone Nov 29, 2016
@lukebakken lukebakken self-assigned this Nov 29, 2016
@lukebakken
Copy link
Contributor

Hello ... I think I get what's going on, but it would be more helpful to have a complete example if possible. Can you provide one?

@lukebakken lukebakken modified the milestones: riak-python-client-2.7.0, riak-python-client-2.7.1 Dec 12, 2016
@lukebakken lukebakken modified the milestones: riak-python-client-2.7.1, riak-python-client-3.0.0 Feb 22, 2017
# for free to join this conversation on GitHub. Already have an account? # to comment
Projects
None yet
Development

No branches or pull requests

3 participants