Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Simple example of stream with Asyncio operations #429

Open
massyah opened this issue Sep 11, 2021 · 2 comments
Open

Simple example of stream with Asyncio operations #429

massyah opened this issue Sep 11, 2021 · 2 comments

Comments

@massyah
Copy link

massyah commented Sep 11, 2021

Hi,

I've been following the docs and reading through the tests, and I cannot get streamz working with Asyncio :/

Here's a very minimal example of a stream comprising of two async operation and one sync :

  1. we retrieve content via an aiohttp call
  2. return content length
  3. simulate DB write with an Asyncio sleep
  4. sink to stdout
import asyncio

import aiohttp
from streamz import Stream


async def fetch(url):
    print("fetching url {}", url)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            print(resp.status)
            body = await resp.text()
    print("Finished w/ url {}", url)
    return body


def count(x):
    print("I", x)
    return len(x)


async def write(x):
    await asyncio.sleep(0.2)
    print("O", x)
    return x

async def f():
    print("Starting stream")
    source = Stream(asynchronous=True)
    source.map(fetch).map(count).map(write).sink(print)
    urls = [
        'https://httpstatus.io/?i=1',
        'https://httpstatus.io/?i=2',
        'https://httpstatus.io/?i=3',
        'https://httpstatus.io/?i=4',
        'https://httpstatus.io/?i=5',
        'https://httpstatus.io/?i=6',
    ]
    for u in urls:
        await source.emit(u)


if __name__ == '__main__':
    asyncio.run(f())

I've tried a lot of combinations using tornado event loop etc. but didn't manage to get anything working.
Is this supposed to be possible or is the Asyncio support still behind?
Am I missing something obvious?

Thanks for the help

@massyah massyah changed the title Simple exemple of stream with Asyncio operations Simple example of stream with Asyncio operations Sep 11, 2021
@martindurant
Copy link
Member

Streamz uses asyncio/coroutines as a way to manage backpressure, i.e., that the emitting process must wait for there to be space in the pipeline to add stuff in. Your model is the opposite, waiting on some async process as a way to put data into the pipeline. We could very well have a source that does what you want, something like

def from_coroutines(Source):
    def __init__(self, coroutines):
        self.coos = coroutines

    async def _run(self):
        for coro in as_completed(self.coos):
             res = await coro
             await self._emit(res)

@martindurant
Copy link
Member

@massyah , did you have a chance to do something with my suggestion? It would make a nice example for the docs, I think - although you don't really need streamz for this particular linear workflow.

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants