Skip to content

Update domain type to uint64_t #458

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
Draft
14 changes: 6 additions & 8 deletions apis/python/src/tiledb/vector_search/flat_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from tiledb.vector_search.storage_formats import storage_formats
from tiledb.vector_search.storage_formats import validate_storage_version
from tiledb.vector_search.utils import MAX_FLOAT32
from tiledb.vector_search.utils import MAX_INT32
from tiledb.vector_search.utils import MAX_UINT64
from tiledb.vector_search.utils import add_to_group

Expand Down Expand Up @@ -182,7 +181,6 @@ def create(

index.create_metadata(
uri=uri,
dimensions=dimensions,
vector_type=vector_type,
index_type=INDEX_TYPE,
storage_version=storage_version,
Expand All @@ -202,9 +200,9 @@ def create(

ids_array_rows_dim = tiledb.Dim(
name="rows",
domain=(0, MAX_INT32),
tile=tile_size,
dtype=np.dtype(np.int32),
domain=(0, MAX_UINT64 - 1000),
tile=1000,
dtype=np.dtype(np.uint64),
)
ids_array_dom = tiledb.Domain(ids_array_rows_dim)
ids_attr = tiledb.Attr(
Expand All @@ -226,13 +224,13 @@ def create(
name="rows",
domain=(0, dimensions - 1),
tile=dimensions,
dtype=np.dtype(np.int32),
dtype=np.dtype(np.uint64),
)
parts_array_cols_dim = tiledb.Dim(
name="cols",
domain=(0, MAX_INT32),
domain=(0, MAX_UINT64 - tile_size),
tile=tile_size,
dtype=np.dtype(np.int32),
dtype=np.dtype(np.uint64),
)
parts_array_dom = tiledb.Domain(parts_array_rows_dim, parts_array_cols_dim)
parts_attr = tiledb.Attr(
Expand Down
1 change: 0 additions & 1 deletion apis/python/src/tiledb/vector_search/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,6 @@ def _open_updates_array(self, timestamp: int = None):

def create_metadata(
uri: str,
dimensions: int,
vector_type: np.dtype,
index_type: str,
storage_version: str,
Expand Down
63 changes: 46 additions & 17 deletions apis/python/src/tiledb/vector_search/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
from tiledb.vector_search._tiledbvspy import *
from tiledb.vector_search.storage_formats import STORAGE_VERSION
from tiledb.vector_search.storage_formats import validate_storage_version
from tiledb.vector_search.utils import MAX_INT32
from tiledb.vector_search.utils import MAX_UINT64
from tiledb.vector_search.utils import add_to_group
from tiledb.vector_search.utils import is_type_erased_index
from tiledb.vector_search.utils import to_temporal_policy
Expand Down Expand Up @@ -342,7 +344,6 @@ def ingest(
CENTRALISED_KMEANS_MAX_SAMPLE_SIZE = 1000000

DEFAULT_IMG_NAME = "3.9-vectorsearch"
MAX_INT32 = 2**31 - 1

class SourceType(enum.Enum):
"""SourceType of input vectors"""
Expand Down Expand Up @@ -405,8 +406,15 @@ def read_source_metadata(
) -> Tuple[int, int, np.dtype]:
if source_type == "TILEDB_ARRAY":
schema = tiledb.ArraySchema.load(source_uri)
size = schema.domain.dim(1).domain[1] + 1
dimensions = schema.domain.dim(0).domain[1] + 1
print("[ingestion@read_source_metdata@TILEDB_ARRAY] schema", schema)
size = int(schema.domain.dim(1).domain[1] + 1)
print("[ingestion@read_source_metdata@TILEDB_ARRAY] size", size, type(size))
dimensions = int(schema.domain.dim(0).domain[1] + 1)
print(
"[ingestion@read_source_metdata@TILEDB_ARRAY] dimensions",
dimensions,
type(dimensions),
)
return size, dimensions, schema.attr(0).dtype
if source_type == "TILEDB_SPARSE_ARRAY":
schema = tiledb.ArraySchema.load(source_uri)
Expand Down Expand Up @@ -494,13 +502,13 @@ def create_array(
name="rows",
domain=(0, dimensions - 1),
tile=dimensions,
dtype=np.dtype(np.int32),
dtype=np.dtype(np.uint64),
)
input_vectors_array_cols_dim = tiledb.Dim(
name="cols",
domain=(0, size - 1),
tile=tile_size,
dtype=np.dtype(np.int32),
dtype=np.dtype(np.uint64),
)
input_vectors_array_dom = tiledb.Domain(
input_vectors_array_rows_dim, input_vectors_array_cols_dim
Expand Down Expand Up @@ -560,7 +568,7 @@ def write_external_ids(
name="rows",
domain=(0, size - 1),
tile=int(size / partitions),
dtype=np.dtype(np.int32),
dtype=np.dtype(np.uint64),
)
ids_array_dom = tiledb.Domain(ids_array_rows_dim)
ids_attr = tiledb.Attr(
Expand Down Expand Up @@ -647,9 +655,9 @@ def create_partial_write_array_group(
logger.debug("Creating temp ids array")
ids_array_rows_dim = tiledb.Dim(
name="rows",
domain=(0, MAX_INT32),
domain=(0, MAX_UINT64),
tile=tile_size,
dtype=np.dtype(np.int32),
dtype=np.dtype(np.uint64),
)
ids_array_dom = tiledb.Domain(ids_array_rows_dim)
ids_attr = tiledb.Attr(
Expand Down Expand Up @@ -679,13 +687,13 @@ def create_partial_write_array_group(
name="rows",
domain=(0, dimensions - 1),
tile=dimensions,
dtype=np.dtype(np.int32),
dtype=np.dtype(np.uint64),
)
parts_array_cols_dim = tiledb.Dim(
name="cols",
domain=(0, MAX_INT32),
domain=(0, MAX_UINT64),
tile=tile_size,
dtype=np.dtype(np.int32),
dtype=np.dtype(np.uint64),
)
parts_array_dom = tiledb.Domain(parts_array_rows_dim, parts_array_cols_dim)
parts_attr = tiledb.Attr(name="values", dtype=vector_type, filters=filters)
Expand Down Expand Up @@ -1421,6 +1429,10 @@ def ingest_flat(

import tiledb.cloud

print("[ingestion@ingest_flat] dimensions", dimensions, type(dimensions))
print("[ingestion@ingest_flat] size", size, type(size))
print("[ingestion@ingest_flat] batch", batch, type(batch))

logger = setup(config, verbose)
with tiledb.scope_ctx(ctx_or_config=config):
updated_ids = read_updated_ids(
Expand Down Expand Up @@ -1478,18 +1490,32 @@ def ingest_flat(
logger.debug("Writing input data to array %s", ids_array_uri)
ids_array[write_offset:end_offset] = external_ids
write_offset = end_offset

print(
"[ingestion@ingest_flat] write_offset", write_offset, type(write_offset)
)
# Ingest additions
additions_vectors, additions_external_ids = read_additions(
updates_uri=updates_uri,
config=config,
verbose=verbose,
trace_id=trace_id,
)
print(
"[ingestion@ingest_flat] write_offset", write_offset, type(write_offset)
)
end = write_offset
if additions_vectors is not None:
end += len(additions_external_ids)
logger.debug("Writing additions data to array %s", parts_array_uri)
print(
"[ingestion@ingest_flat] dimensions", dimensions, type(dimensions)
)
print(
"[ingestion@ingest_flat] write_offset",
write_offset,
type(write_offset),
)
print("[ingestion@ingest_flat] end", end, type(end))
parts_array[0:dimensions, write_offset:end] = np.transpose(
additions_vectors
)
Expand All @@ -1510,7 +1536,7 @@ def ingest_type_erased(
vector_type: np.dtype,
external_ids_uri: str,
external_ids_type: str,
dimensions: int,
dimensions: np.uint64,
size: int,
batch: int,
partitions: int,
Expand Down Expand Up @@ -1645,7 +1671,7 @@ def write_centroids(
centroids: np.ndarray,
index_group_uri: str,
partitions: int,
dimensions: int,
dimensions: np.uint64,
config: Optional[Mapping[str, Any]] = None,
verbose: bool = False,
trace_id: Optional[str] = None,
Expand All @@ -1669,7 +1695,7 @@ def ingest_vectors_udf(
external_ids_uri: str,
external_ids_type: str,
partitions: int,
dimensions: int,
dimensions: np.uint64,
start: int,
end: int,
batch: int,
Expand Down Expand Up @@ -1972,7 +1998,7 @@ def consolidate_partition_udf(
partition_id_start: int,
partition_id_end: int,
batch: int,
dimensions: int,
dimensions: np.uint64,
config: Optional[Mapping[str, Any]] = None,
verbose: bool = False,
trace_id: Optional[str] = None,
Expand Down Expand Up @@ -2093,7 +2119,7 @@ def create_ingestion_dag(
external_ids_type: str,
size: int,
partitions: int,
dimensions: int,
dimensions: np.uint64,
copy_centroids_uri: str,
training_sample_size: int,
training_source_uri: Optional[str],
Expand Down Expand Up @@ -2695,6 +2721,9 @@ def consolidate_and_vacuum(
in_size, dimensions, vector_type = read_source_metadata(
source_uri=source_uri, source_type=source_type
)
print("[ingestion@ingest] in_size", in_size, type(in_size))
print("[ingestion@ingest] dimensions", dimensions, type(dimensions))
print("[ingestion@ingest] vector_type", vector_type, type(vector_type))
logger.debug("Ingesting Vectors into %r", index_group_uri)
arrays_created = False
if is_type_erased_index(index_type):
Expand Down
22 changes: 10 additions & 12 deletions apis/python/src/tiledb/vector_search/ivf_flat_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
from tiledb.vector_search.storage_formats import storage_formats
from tiledb.vector_search.storage_formats import validate_storage_version
from tiledb.vector_search.utils import MAX_FLOAT32
from tiledb.vector_search.utils import MAX_INT32
from tiledb.vector_search.utils import MAX_UINT64
from tiledb.vector_search.utils import add_to_group

Expand Down Expand Up @@ -527,7 +526,6 @@ def create(

index.create_metadata(
uri=uri,
dimensions=dimensions,
vector_type=vector_type,
index_type=INDEX_TYPE,
storage_version=storage_version,
Expand All @@ -554,13 +552,13 @@ def create(
name="rows",
domain=(0, dimensions - 1),
tile=dimensions,
dtype=np.dtype(np.int32),
dtype=np.dtype(np.uint64),
)
centroids_array_cols_dim = tiledb.Dim(
name="cols",
domain=(0, MAX_INT32),
domain=(0, MAX_UINT64),
tile=100000,
dtype=np.dtype(np.int32),
dtype=np.dtype(np.uint64),
)
centroids_array_dom = tiledb.Domain(
centroids_array_rows_dim, centroids_array_cols_dim
Expand All @@ -582,9 +580,9 @@ def create(

index_array_rows_dim = tiledb.Dim(
name="rows",
domain=(0, MAX_INT32),
domain=(0, MAX_UINT64),
tile=100000,
dtype=np.dtype(np.int32),
dtype=np.dtype(np.uint64),
)
index_array_dom = tiledb.Domain(index_array_rows_dim)
index_attr = tiledb.Attr(
Expand All @@ -604,9 +602,9 @@ def create(

ids_array_rows_dim = tiledb.Dim(
name="rows",
domain=(0, MAX_INT32),
domain=(0, MAX_UINT64),
tile=tile_size,
dtype=np.dtype(np.int32),
dtype=np.dtype(np.uint64),
)
ids_array_dom = tiledb.Domain(ids_array_rows_dim)
ids_attr = tiledb.Attr(
Expand All @@ -628,13 +626,13 @@ def create(
name="rows",
domain=(0, dimensions - 1),
tile=dimensions,
dtype=np.dtype(np.int32),
dtype=np.dtype(np.uint64),
)
parts_array_cols_dim = tiledb.Dim(
name="cols",
domain=(0, MAX_INT32),
domain=(0, MAX_UINT64),
tile=tile_size,
dtype=np.dtype(np.int32),
dtype=np.dtype(np.uint64),
)
parts_array_dom = tiledb.Domain(parts_array_rows_dim, parts_array_cols_dim)
parts_attr = tiledb.Attr(
Expand Down
13 changes: 7 additions & 6 deletions apis/python/src/tiledb/vector_search/module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ static void declareColMajorMatrix(py::module& mod, std::string const& suffix) {
py::format_descriptor<T>::format(), /* Python struct-style format
descriptor */
2, /* Number of dimensions */
{m.num_rows(), m.num_cols()}, /* Buffer dimensions */
{sizeof(T), sizeof(T) * m.num_rows()});
{static_cast<size_t>(m.num_rows()),
static_cast<size_t>(m.num_cols())}, /* Buffer dimensions */
{sizeof(T), sizeof(T) * static_cast<size_t>(m.num_rows())});
});
}

Expand Down Expand Up @@ -669,8 +670,8 @@ PYBIND11_MODULE(_tiledbvspy, m) {
"read_vector_u32",
[](const tiledb::Context& ctx,
const std::string& uri,
size_t start_pos,
size_t end_pos,
uint64_t start_pos,
uint64_t end_pos,
uint64_t timestamp) -> std::vector<uint32_t> {
TemporalPolicy temporal_policy =
(timestamp == 0) ? TemporalPolicy() :
Expand All @@ -683,8 +684,8 @@ PYBIND11_MODULE(_tiledbvspy, m) {
"read_vector_u64",
[](const tiledb::Context& ctx,
const std::string& uri,
size_t start_pos,
size_t end_pos,
uint64_t start_pos,
uint64_t end_pos,
uint64_t timestamp) -> std::vector<uint64_t> {
TemporalPolicy temporal_policy =
(timestamp == 0) ? TemporalPolicy() :
Expand Down
3 changes: 3 additions & 0 deletions apis/python/src/tiledb/vector_search/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ def load_as_matrix(
if isinstance(config, tiledb.Config):
config = dict(config)

print("[module@load_as_matrix] size", size, type(size))

if ctx is None:
ctx = vspy.Ctx(config)

a = tiledb.ArraySchema.load(path, ctx=tiledb.Ctx(config))
dtype = a.attr(0).dtype
print("[module@load_as_matrix] dtype", dtype)
# Read all rows from column 0 -> `size`. Set no upper_bound. Note that if `size` is None then
# we'll read to the column domain length.
if dtype == np.float32:
Expand Down
21 changes: 13 additions & 8 deletions apis/python/src/tiledb/vector_search/type_erased_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,15 +265,20 @@ void init_type_erased_module(py::module_& m) {
.def("ids_type_string", &FeatureVectorArray::ids_type_string)
.def_buffer([](FeatureVectorArray& v) -> py::buffer_info {
return py::buffer_info(
v.data(), /* Pointer to buffer */
datatype_to_size(v.feature_type()), /* Size of one scalar */
datatype_to_format(
v.feature_type()), /* Python struct-style format descriptor */
2, /* Number of dimensions */
{v.num_vectors(),
v.dimensions()}, /* Buffer dimensions -- row major */
/* Pointer to buffer */
v.data(),
/* Size of one scalar */
datatype_to_size(v.feature_type()),
/* Python struct-style format descriptor */
datatype_to_format(v.feature_type()),
/* Number of dimensions */
2,
/* Buffer dimensions -- row major */
{static_cast<size_t>(v.num_vectors()),
static_cast<size_t>(v.dimensions())},
/* Strides (in bytes) for each index */
{datatype_to_size(v.feature_type()) *
v.dimensions(), /* Strides (in bytes) for each index */
static_cast<size_t>(v.dimensions()),
datatype_to_size(v.feature_type())});
})
.def(
Expand Down
Loading
Loading