diff --git a/parfive/downloader.py b/parfive/downloader.py index 3967203..546bb1d 100644 --- a/parfive/downloader.py +++ b/parfive/downloader.py @@ -284,7 +284,7 @@ async def run_download(self): def _format_results(self, retvals, main_pb): # Squash all nested lists into a single flat list - if retvals: + if retvals and isinstance(retvals[0], list): retvals = list(reduce(list.__add__, retvals)) errors = sum([isinstance(i, FailedDownload) for i in retvals]) if errors: @@ -435,23 +435,28 @@ async def _run_ftp_download(self, main_pb): async def _run_from_queue(self, queue, tokens, main_pb, *, session=None): futures = [] - while not queue.empty(): - get_file = await queue.get() - token = await tokens.get() - file_pb = self.tqdm if self.config.file_progress else False - future = asyncio.create_task(get_file(session, token=token, file_pb=file_pb)) - - def callback(token, future, main_pb): - try: - tokens.put_nowait(token) - # Update the main progressbar - if main_pb and not future.exception(): - main_pb.update(1) - except asyncio.CancelledError: - return - - future.add_done_callback(partial(callback, token, main_pb=main_pb)) - futures.append(future) + try: + while not queue.empty(): + get_file = await queue.get() + token = await tokens.get() + file_pb = self.tqdm if self.config.file_progress else False + future = asyncio.create_task(get_file(session, token=token, file_pb=file_pb)) + + def callback(token, future, main_pb): + try: + tokens.put_nowait(token) + # Update the main progressbar + if main_pb and not future.exception(): + main_pb.update(1) + except asyncio.CancelledError: + return + + future.add_done_callback(partial(callback, token, main_pb=main_pb)) + futures.append(future) + + except asyncio.CancelledError: + for task in futures: + task.cancel() return futures