Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Take the score into account when moving chunks #2107

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 50 additions & 14 deletions oio/content/content.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
from oio.blob.client import BlobClient
from oio.container.client import ContainerClient, extract_chunk_qualities
from oio.common.constants import OIO_VERSION
from oio.common.exceptions import UnrecoverableContent
from oio.common.fullpath import encode_fullpath
from oio.common.storage_functions import _get_weighted_random_score


def compare_chunk_quality(current, candidate):
Expand Down Expand Up @@ -262,18 +264,41 @@ def move_chunk(self, chunk_id, check_quality=False, dry_run=False,
other_chunks, [current_chunk], check_quality=check_quality,
max_attempts=max_attempts, **kwargs)

# Sort chunks by score to try to copy with higher score.
# When scores are close together (e.g. [95, 94, 94, 93, 50]),
# don't always start with the highest element.
duplicate_chunks = self.chunks \
.filter(pos=current_chunk.pos) \
.sort(key=lambda chunk: _get_weighted_random_score(chunk.raw()),
reverse=True) \
.all()
if dry_run:
self.logger.info("Dry-run: would copy chunk from %s to %s",
current_chunk.url, spare_urls[0])
self.logger.info(
'Dry-run: would copy chunk from %s to %s',
duplicate_chunks[0].url, spare_urls[0])
else:
self.logger.info("Copying chunk from %s to %s",
current_chunk.url, spare_urls[0])
# TODO(FVE): retry to copy (max_attempts times)
self.blob_client.chunk_copy(
current_chunk.url, spare_urls[0], chunk_id=chunk_id,
fullpath=self.full_path, cid=self.container_id,
path=self.path, version=self.version,
content_id=self.content_id, **kwargs)
# To reduce the load on the rawx to decommission,
# use one of the rawx with a copy of the chunk to move.
for src in duplicate_chunks:
try:
self.logger.info(
'Copying chunk from %s to %s', src.url, spare_urls[0])
# TODO(FVE): retry to copy (max_attempts times)
self.blob_client.chunk_copy(
src.url, spare_urls[0], chunk_id=chunk_id,
fullpath=self.full_path, cid=self.container_id,
path=self.path, version=self.version,
content_id=self.content_id, **kwargs)
break
except Exception as err:
self.logger.warn(
'Failed to copy chunk from %s to %s: %s', src.url,
spare_urls[0], err)
if len(duplicate_chunks) == 1:
raise
else:
raise UnrecoverableContent(
'No copy available of chunk to move')

self._update_spare_chunk(current_chunk, spare_urls[0])

Expand Down Expand Up @@ -432,12 +457,15 @@ def __cmp__(self, other):


class ChunksHelper(object):
def __init__(self, chunks, raw_chunk=True):
def __init__(self, chunks, raw_chunk=True,
sort_key=None, sort_reverse=False):
if raw_chunk:
self.chunks = [Chunk(c) for c in chunks]
else:
self.chunks = chunks
self.chunks.sort()
self.sort_key = sort_key
self.sort_reverse = sort_reverse
self.chunks.sort(key=self.sort_key, reverse=self.sort_reverse)

def filter(self, id=None, pos=None, metapos=None, subpos=None, host=None,
url=None):
Expand All @@ -456,7 +484,9 @@ def filter(self, id=None, pos=None, metapos=None, subpos=None, host=None,
if url is not None and c.url != url:
continue
found.append(c)
return ChunksHelper(found, False)
return ChunksHelper(found, False,
sort_key=self.sort_key,
sort_reverse=self.sort_reverse)

def exclude(self, id=None, pos=None, metapos=None, subpos=None, host=None,
url=None):
Expand All @@ -475,7 +505,13 @@ def exclude(self, id=None, pos=None, metapos=None, subpos=None, host=None,
if url is not None and c.url == url:
continue
found.append(c)
return ChunksHelper(found, False)
return ChunksHelper(found, False,
sort_key=self.sort_key,
sort_reverse=self.sort_reverse)

def sort(self, key=None, reverse=False):
return ChunksHelper(list(self.chunks), False,
sort_key=key, sort_reverse=reverse)

def one(self):
if len(self.chunks) != 1:
Expand Down
11 changes: 9 additions & 2 deletions oio/content/ec.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from oio.common.storage_functions import _sort_chunks, fetch_stream_ec
from oio.common.utils import GeneratorIO
from oio.common.constants import OIO_VERSION
from oio.common.storage_functions import _get_weighted_random_score


class ECContent(Content):
Expand All @@ -39,8 +40,14 @@ def rebuild_chunk(self, chunk_id, allow_same_rawx=False, chunk_pos=None,
self.logger.debug('Chunk at pos %s has id %s',
chunk_pos, chunk_id)

chunks = self.chunks.filter(metapos=current_chunk.metapos)\
.exclude(id=chunk_id, pos=chunk_pos)
# Sort chunks by score to try to rebuild with higher score.
# When scores are close together (e.g. [95, 94, 94, 93, 50]),
# don't always start with the highest element.
chunks = self.chunks \
.filter(metapos=current_chunk.metapos) \
.exclude(id=chunk_id, pos=chunk_pos) \
.sort(key=lambda chunk: _get_weighted_random_score(chunk.raw()),
reverse=True)

if chunk_id is None:
current_chunk.size = chunks[0].size
Expand Down
16 changes: 11 additions & 5 deletions oio/content/plain.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from oio.content.content import Content, Chunk
from oio.common import exceptions as exc
from oio.common.exceptions import UnrecoverableContent
from oio.common.storage_functions import _get_weighted_random_score


class PlainContent(Content):
Expand Down Expand Up @@ -57,8 +58,15 @@ def rebuild_chunk(self, chunk_id, allow_same_rawx=False, chunk_pos=None,
elif chunk_pos is None:
chunk_pos = current_chunk.pos

duplicate_chunks = self.chunks.filter(
pos=chunk_pos).exclude(id=chunk_id).all()
# Sort chunks by score to try to copy with higher score.
# When scores are close together (e.g. [95, 94, 94, 93, 50]),
# don't always start with the highest element.
duplicate_chunks = self.chunks \
.filter(pos=chunk_pos) \
.exclude(id=chunk_id) \
.sort(key=lambda chunk: _get_weighted_random_score(chunk.raw()),
reverse=True) \
.all()
if len(duplicate_chunks) == 0:
raise UnrecoverableContent("No copy of missing chunk")

Expand All @@ -79,7 +87,6 @@ def rebuild_chunk(self, chunk_id, allow_same_rawx=False, chunk_pos=None,
spare_url = spare_urls[0]

# Actually create the spare chunk, by duplicating a good one
uploaded = False
for src in duplicate_chunks:
try:
self.blob_client.chunk_copy(
Expand All @@ -89,13 +96,12 @@ def rebuild_chunk(self, chunk_id, allow_same_rawx=False, chunk_pos=None,
content_id=self.content_id)
self.logger.debug('Chunk copied from %s to %s, registering it',
src.url, spare_url)
uploaded = True
break
except Exception as err:
self.logger.warn(
"Failed to copy chunk from %s to %s: %s %s", src.url,
spare_url, type(err), str(err.message))
if not uploaded:
else:
raise UnrecoverableContent("No copy available of missing chunk")

# Register the spare chunk in object's metadata
Expand Down