Skip to content

feat: DIA-2148: write exports using thread pool #7342

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 35 additions & 7 deletions label_studio/io_storages/base_models.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
"""This file and its contents are licensed under the Apache License 2.0. Please see the included NOTICE for copyright information and LICENSE for a copy of the license.
"""
import base64
import concurrent.futures
import itertools
import json
import logging
import os
import traceback as tb
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from typing import Union
from urllib.parse import urljoin
Expand Down Expand Up @@ -526,10 +530,23 @@
storage.info_set_failed()


# note: this is available in python 3.12 , #TODO to switch to builtin function when we move to it.
def _batched(iterable, n):
# batched('ABCDEFG', 3) --> ABC DEF G
if n < 1:
raise ValueError('n must be at least one')

Check warning on line 537 in label_studio/io_storages/base_models.py

View check run for this annotation

Codecov / codecov/patch

label_studio/io_storages/base_models.py#L537

Added line #L537 was not covered by tests
it = iter(iterable)
while batch := tuple(itertools.islice(it, n)):
yield batch


class ExportStorage(Storage, ProjectStorageMixin):
can_delete_objects = models.BooleanField(
_('can_delete_objects'), null=True, blank=True, help_text='Deletion from storage enabled'
)
# Use 8 threads, unless we know we only have a single core
# TODO from testing, more than 8 seems to cause problems. revisit to add more parallelism.
max_workers = min(8, (os.cpu_count() or 2) * 4)

def _get_serialized_data(self, annotation):
user = self.project.organization.created_by
Expand Down Expand Up @@ -557,13 +574,24 @@
self.info_set_in_progress()
self.cached_user = self.project.organization.created_by

for annotation in annotations.iterator(chunk_size=settings.STORAGE_EXPORT_CHUNK_SIZE):
annotation.cached_user = self.cached_user
self.save_annotation(annotation)

# update progress counters
annotation_exported += 1
self.info_update_progress(last_sync_count=annotation_exported, total_annotations=total_annotations)
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Batch annotations so that we update progress before having to submit every future.
# Updating progress in thread requires coordinating on count and db writes, so just
# batching to keep it simpler.
for annotation_batch in _batched(
Annotation.objects.filter(project=self.project).iterator(
chunk_size=settings.STORAGE_EXPORT_CHUNK_SIZE
),
settings.STORAGE_EXPORT_CHUNK_SIZE,
):
futures = []
for annotation in annotation_batch:
annotation.cached_user = self.cached_user
futures.append(executor.submit(self.save_annotation, annotation))

for future in concurrent.futures.as_completed(futures):
annotation_exported += 1
self.info_update_progress(last_sync_count=annotation_exported, total_annotations=total_annotations)

self.info_set_completed(last_sync_count=annotation_exported, total_annotations=total_annotations)

Expand Down
Loading