-
Notifications
You must be signed in to change notification settings - Fork 32
/
Copy pathxenium.py
737 lines (644 loc) · 30.6 KB
/
xenium.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
from __future__ import annotations
import json
import logging
import os
import re
import tempfile
import warnings
import zipfile
from collections.abc import Mapping
from pathlib import Path
from types import MappingProxyType
from typing import Any, Optional
import dask.array as da
import numpy as np
import packaging.version
import pandas as pd
import pyarrow.parquet as pq
import tifffile
import zarr
from anndata import AnnData
from dask.dataframe import read_parquet
from dask_image.imread import imread
from geopandas import GeoDataFrame
from joblib import Parallel, delayed
from multiscale_spatial_image.multiscale_spatial_image import MultiscaleSpatialImage
from pyarrow import Table
from shapely import Polygon
from spatial_image import SpatialImage
from spatialdata import SpatialData
from spatialdata._core.query.relational_query import get_element_instances
from spatialdata._types import ArrayLike
from spatialdata.models import (
Image2DModel,
Labels2DModel,
PointsModel,
ShapesModel,
TableModel,
)
from spatialdata.transformations.transformations import Affine, Identity, Scale
from spatialdata_io._constants._constants import XeniumKeys
from spatialdata_io._docs import inject_docs
from spatialdata_io._utils import deprecation_alias
from spatialdata_io.readers._utils._read_10x_h5 import _read_10x_h5
from spatialdata_io.readers._utils._utils import _initialize_raster_models_kwargs
__all__ = ["xenium", "xenium_aligned_image", "xenium_explorer_selection"]
@deprecation_alias(cells_as_shapes="cells_as_circles", cell_boundaries="cells_boundaries", cell_labels="cells_labels")
@inject_docs(xx=XeniumKeys)
def xenium(
path: str | Path,
*,
cells_boundaries: bool = True,
nucleus_boundaries: bool = True,
cells_as_circles: bool = True,
cells_labels: bool = True,
nucleus_labels: bool = True,
transcripts: bool = True,
morphology_mip: bool = True,
morphology_focus: bool = True,
aligned_images: bool = True,
cells_table: bool = True,
n_jobs: int = 1,
imread_kwargs: Mapping[str, Any] = MappingProxyType({}),
image_models_kwargs: Mapping[str, Any] = MappingProxyType({}),
labels_models_kwargs: Mapping[str, Any] = MappingProxyType({}),
) -> SpatialData:
"""
Read a *10X Genomics Xenium* dataset into a SpatialData object.
This function reads the following files:
- ``{xx.XENIUM_SPECS!r}``: File containing specifications.
- ``{xx.NUCLEUS_BOUNDARIES_FILE!r}``: Polygons of nucleus boundaries.
- ``{xx.CELL_BOUNDARIES_FILE!r}``: Polygons of cell boundaries.
- ``{xx.TRANSCRIPTS_FILE!r}``: File containing transcripts.
- ``{xx.CELL_FEATURE_MATRIX_FILE!r}``: File containing cell feature matrix.
- ``{xx.CELL_METADATA_FILE!r}``: File containing cell metadata.
- ``{xx.MORPHOLOGY_MIP_FILE!r}``: File containing morphology mip.
- ``{xx.MORPHOLOGY_FOCUS_FILE!r}``: File containing morphology focus.
.. seealso::
- `10X Genomics Xenium file format <https://cf.10xgenomics.com/supp/xenium/xenium_documentation.html>`_.
Parameters
----------
path
Path to the dataset.
cells_boundaries
Whether to read cell boundaries (polygons).
nucleus_boundaries
Whether to read nucleus boundaries (polygons).
cells_as_circles
Whether to read cells also as circles. Useful for performant visualization. The radii of the nuclei,
not the ones of cells, will be used; using the radii of cells would make the visualization too cluttered
(the cell boundaries are computed as a maximum expansion of the nuclei location and therefore the
corresponding circles would show considerable overlap).
cells_labels
Whether to read cell labels (raster). The polygonal version of the cell labels are simplified
for visualization purposes, and using the raster version is recommended for analysis.
nucleus_labels
Whether to read nucleus labels (raster). The polygonal version of the nucleus labels are simplified
for visualization purposes, and using the raster version is recommended for analysis.
transcripts
Whether to read transcripts.
morphology_mip
Whether to read the morphology mip image (available in versions < 2.0.0).
morphology_focus
Whether to read the morphology focus image.
aligned_images
Whether to also parse, when available, additional H&E or IF aligned images.
cells_table
Whether to read the cell annotations in the `AnnData` table.
n_jobs
Number of jobs to use for parallel processing.
imread_kwargs
Keyword arguments to pass to the image reader.
image_models_kwargs
Keyword arguments to pass to the image models.
labels_models_kwargs
Keyword arguments to pass to the labels models.
Returns
-------
:class:`spatialdata.SpatialData`
"""
image_models_kwargs, labels_models_kwargs = _initialize_raster_models_kwargs(
image_models_kwargs, labels_models_kwargs
)
path = Path(path)
with open(path / XeniumKeys.XENIUM_SPECS) as f:
specs = json.load(f)
# to trigger the warning if the version cannot be parsed
version = _parse_version_of_xenium_analyzer(specs, hide_warning=False)
specs["region"] = "cell_circles" if cells_as_circles else "cell_boundaries"
# the table is required in some cases
if not cells_table:
if cells_as_circles:
logging.info(
'When "cells_as_circles" is set to `True` reading the table is required; setting `cell_annotations` to '
"`True`."
)
cells_table = True
if cells_boundaries or nucleus_boundaries:
logging.info(
'When "cell_boundaries" or "nucleus_boundaries" is set to `True` reading the table is required; '
"setting `cell_annotations` to `True`."
)
cells_table = True
if cells_table:
return_values = _get_tables_and_circles(path, cells_as_circles, specs)
if cells_as_circles:
table, circles = return_values
else:
table = return_values
else:
table = None
if version is not None and version >= packaging.version.parse("2.0.0") and table is not None:
cell_summary_table = _get_cells_metadata_table_from_zarr(path, XeniumKeys.CELLS_ZARR, specs)
if not cell_summary_table[XeniumKeys.CELL_ID].equals(table.obs[XeniumKeys.CELL_ID]):
warnings.warn(
'The "cell_id" column in the cells metadata table does not match the "cell_id" column in the annotation'
" table. This could be due to trying to read a new version that is not supported yet. Please "
"report this issue.",
UserWarning,
stacklevel=2,
)
table.obs[XeniumKeys.Z_LEVEL] = cell_summary_table[XeniumKeys.Z_LEVEL]
table.obs[XeniumKeys.NUCLEUS_COUNT] = cell_summary_table[XeniumKeys.NUCLEUS_COUNT]
polygons = {}
labels = {}
tables = {}
points = {}
images = {}
# From the public release notes here:
# https://www.10xgenomics.com/support/software/xenium-onboard-analysis/latest/release-notes/release-notes-for-xoa
# we see that for distinguishing between the nuclei of polinucleated cells, the `label_id` column is used.
# This column is currently not found in the preview data, while I think it is needed in order to unambiguously match
# nuclei to cells. Therefore for the moment we only link the table to the cell labels, and not to the nucleus
# labels.
if nucleus_labels:
labels["nucleus_labels"], _ = _get_labels_and_indices_mapping(
path,
XeniumKeys.CELLS_ZARR,
specs,
mask_index=0,
labels_name="nucleus_labels",
labels_models_kwargs=labels_models_kwargs,
)
if cells_labels:
labels["cell_labels"], cell_labels_indices_mapping = _get_labels_and_indices_mapping(
path,
XeniumKeys.CELLS_ZARR,
specs,
mask_index=1,
labels_name="cell_labels",
labels_models_kwargs=labels_models_kwargs,
)
if cell_labels_indices_mapping is not None and table is not None:
if not pd.DataFrame.equals(cell_labels_indices_mapping["cell_id"], table.obs[str(XeniumKeys.CELL_ID)]):
warnings.warn(
"The cell_id column in the cell_labels_table does not match the cell_id column derived from the cell "
"labels data. This could be due to trying to read a new version that is not supported yet. Please "
"report this issue.",
UserWarning,
stacklevel=2,
)
else:
table.obs["cell_labels"] = cell_labels_indices_mapping["label_index"]
if nucleus_boundaries:
polygons["nucleus_boundaries"] = _get_polygons(
path,
XeniumKeys.NUCLEUS_BOUNDARIES_FILE,
specs,
n_jobs,
idx=table.obs[str(XeniumKeys.CELL_ID)].copy(),
)
if cells_boundaries:
polygons["cell_boundaries"] = _get_polygons(
path,
XeniumKeys.CELL_BOUNDARIES_FILE,
specs,
n_jobs,
idx=table.obs[str(XeniumKeys.CELL_ID)].copy(),
)
if transcripts:
points["transcripts"] = _get_points(path, specs)
if version is None or version < packaging.version.parse("2.0.0"):
if morphology_mip:
images["morphology_mip"] = _get_images(
path,
XeniumKeys.MORPHOLOGY_MIP_FILE,
imread_kwargs,
image_models_kwargs,
)
if morphology_focus:
images["morphology_focus"] = _get_images(
path,
XeniumKeys.MORPHOLOGY_FOCUS_FILE,
imread_kwargs,
image_models_kwargs,
)
else:
if morphology_focus:
morphology_focus_dir = path / XeniumKeys.MORPHOLOGY_FOCUS_DIR
files = {f for f in os.listdir(morphology_focus_dir) if f.endswith(".ome.tif")}
if len(files) not in [1, 4]:
raise ValueError(
"Expected 1 (no segmentation kit) or 4 (segmentation kit) files in the morphology focus directory, "
f"found {len(files)}: {files}"
)
if files != {XeniumKeys.MORPHOLOGY_FOCUS_CHANNEL_IMAGE.value.format(i) for i in range(len(files))}:
raise ValueError(
"Expected files in the morphology focus directory to be named as "
f"{XeniumKeys.MORPHOLOGY_FOCUS_CHANNEL_IMAGE.value.format(0)} to "
f"{XeniumKeys.MORPHOLOGY_FOCUS_CHANNEL_IMAGE.value.format(len(files) - 1)}, found {files}"
)
# the 'dummy' channel is a temporary workaround, see _get_images() for more details
if len(files) == 1:
channel_names = {
0: XeniumKeys.MORPHOLOGY_FOCUS_CHANNEL_0.value,
}
else:
channel_names = {
0: XeniumKeys.MORPHOLOGY_FOCUS_CHANNEL_0.value,
1: XeniumKeys.MORPHOLOGY_FOCUS_CHANNEL_1.value,
2: XeniumKeys.MORPHOLOGY_FOCUS_CHANNEL_2.value,
3: XeniumKeys.MORPHOLOGY_FOCUS_CHANNEL_3.value,
4: "dummy",
}
# this reads the scale 0 for all the 1 or 4 channels (the other files are parsed automatically)
# dask.image.imread will call tifffile.imread which will give a warning saying that reading multi-file
# pyramids is not supported; since we are reading the full scale image and reconstructing the pyramid, we
# can ignore this
class IgnoreSpecificMessage(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
# Ignore specific log message
if "OME series cannot read multi-file pyramids" in record.getMessage():
return False
return True
logger = tifffile.logger()
logger.addFilter(IgnoreSpecificMessage())
image_models_kwargs = dict(image_models_kwargs)
assert (
"c_coords" not in image_models_kwargs
), "The channel names for the morphology focus images are handled internally"
image_models_kwargs["c_coords"] = list(channel_names.values())
images["morphology_focus"] = _get_images(
morphology_focus_dir,
XeniumKeys.MORPHOLOGY_FOCUS_CHANNEL_IMAGE.format(0),
imread_kwargs,
image_models_kwargs,
)
del image_models_kwargs["c_coords"]
logger.removeFilter(IgnoreSpecificMessage())
if table is not None:
tables["table"] = table
elements_dict = {"images": images, "labels": labels, "points": points, "tables": tables, "shapes": polygons}
if cells_as_circles:
elements_dict["shapes"][specs["region"]] = circles
sdata = SpatialData(**elements_dict)
# find and add additional aligned images
if aligned_images:
extra_images = _add_aligned_images(path, imread_kwargs, image_models_kwargs)
for key, value in extra_images.items():
sdata.images[key] = value
return sdata
def _decode_cell_id_column(cell_id_column: pd.Series) -> pd.Series:
if isinstance(cell_id_column.iloc[0], bytes):
return cell_id_column.apply(lambda x: x.decode("utf-8"))
return cell_id_column
def _get_polygons(
path: Path, file: str, specs: dict[str, Any], n_jobs: int, idx: Optional[ArrayLike] = None
) -> GeoDataFrame:
def _poly(arr: ArrayLike) -> Polygon:
return Polygon(arr[:-1])
# seems to be faster than pd.read_parquet
df = pq.read_table(path / file).to_pandas()
group_by = df.groupby(XeniumKeys.CELL_ID)
index = pd.Series(group_by.indices.keys())
index = _decode_cell_id_column(index)
out = Parallel(n_jobs=n_jobs)(
delayed(_poly)(i.to_numpy())
for _, i in group_by[[XeniumKeys.BOUNDARIES_VERTEX_X, XeniumKeys.BOUNDARIES_VERTEX_Y]]
)
geo_df = GeoDataFrame({"geometry": out})
version = _parse_version_of_xenium_analyzer(specs)
if version is not None and version < packaging.version.parse("2.0.0"):
assert idx is not None
assert len(idx) == len(geo_df)
assert np.unique(geo_df.index).size == len(geo_df)
assert index.equals(idx)
geo_df.index = idx
else:
geo_df.index = index
if not np.unique(geo_df.index).size == len(geo_df):
warnings.warn(
"Found non-unique polygon indices, this will be addressed in a future version of the reader. For the "
"time being please consider merging polygons with non-unique indices into single multi-polygons.",
UserWarning,
stacklevel=2,
)
scale = Scale([1.0 / specs["pixel_size"], 1.0 / specs["pixel_size"]], axes=("x", "y"))
return ShapesModel.parse(geo_df, transformations={"global": scale})
def _get_labels_and_indices_mapping(
path: Path,
file: str,
specs: dict[str, Any],
mask_index: int,
labels_name: str,
labels_models_kwargs: Mapping[str, Any] = MappingProxyType({}),
) -> tuple[GeoDataFrame, pd.DataFrame | None]:
if mask_index not in [0, 1]:
raise ValueError(f"mask_index must be 0 or 1, found {mask_index}.")
with tempfile.TemporaryDirectory() as tmpdir:
zip_file = path / XeniumKeys.CELLS_ZARR
with zipfile.ZipFile(zip_file, "r") as zip_ref:
zip_ref.extractall(tmpdir)
with zarr.open(str(tmpdir), mode="r") as z:
# get the labels
masks = z["masks"][f"{mask_index}"][...]
labels = Labels2DModel.parse(
masks, dims=("y", "x"), transformations={"global": Identity()}, **labels_models_kwargs
)
# build the matching table
version = _parse_version_of_xenium_analyzer(specs)
if mask_index == 0:
# nuclei currently not supported
return labels, None
if version is None or version is not None and version < packaging.version.parse("1.3.0"):
# supported in version 1.3.0 and not supported in version 1.0.2; conservatively, let's assume it is not
# supported in versions < 1.3.0
return labels, None
cell_id, dataset_suffix = z["cell_id"][...].T
cell_id_str = cell_id_str_from_prefix_suffix_uint32(cell_id, dataset_suffix)
# this information will probably be available in the `label_id` column for version > 2.0.0 (see public
# release notes mentioned above)
real_label_index = get_element_instances(labels).values
# background removal
if real_label_index[0] == 0:
real_label_index = real_label_index[1:]
if version < packaging.version.parse("2.0.0"):
expected_label_index = z["seg_mask_value"][...]
if not np.array_equal(expected_label_index, real_label_index):
raise ValueError(
"The label indices from the labels differ from the ones from the input data. Please report "
f"this issue. Real label indices: {real_label_index}, expected label indices: "
f"{expected_label_index}."
)
else:
labels_positional_indices = z["polygon_sets"][mask_index]["cell_index"][...]
if not np.array_equal(labels_positional_indices, np.arange(len(labels_positional_indices))):
raise ValueError(
"The positional indices of the labels do not match the expected range. Please report this "
"issue."
)
# labels_index is an uint32, so let's cast to np.int64 to avoid the risk of overflow on some systems
indices_mapping = pd.DataFrame(
{
"region": labels_name,
"cell_id": cell_id_str,
"label_index": real_label_index.astype(np.int64),
}
)
return labels, indices_mapping
@inject_docs(xx=XeniumKeys)
def _get_cells_metadata_table_from_zarr(
path: Path,
file: str,
specs: dict[str, Any],
) -> AnnData:
"""
Read cells metadata from ``{xx.CELLS_ZARR}``.
Read the cells summary table, which contains the z_level information for versions < 2.0.0, and also the
nucleus_count for versions >= 2.0.0.
"""
# for version >= 2.0.0, in this function we could also parse the segmentation method used to obtain the masks
with tempfile.TemporaryDirectory() as tmpdir:
zip_file = path / XeniumKeys.CELLS_ZARR
with zipfile.ZipFile(zip_file, "r") as zip_ref:
zip_ref.extractall(tmpdir)
with zarr.open(str(tmpdir), mode="r") as z:
x = z["cell_summary"][...]
column_names = z["cell_summary"].attrs["column_names"]
df = pd.DataFrame(x, columns=column_names)
cell_id_prefix = z["cell_id"][:, 0]
dataset_suffix = z["cell_id"][:, 1]
cell_id_str = cell_id_str_from_prefix_suffix_uint32(cell_id_prefix, dataset_suffix)
df[XeniumKeys.CELL_ID] = cell_id_str
return df
def _get_points(path: Path, specs: dict[str, Any]) -> Table:
table = read_parquet(path / XeniumKeys.TRANSCRIPTS_FILE)
table["feature_name"] = table["feature_name"].apply(
lambda x: x.decode("utf-8") if isinstance(x, bytes) else str(x), meta=("feature_name", "object")
)
transform = Scale([1.0 / specs["pixel_size"], 1.0 / specs["pixel_size"]], axes=("x", "y"))
points = PointsModel.parse(
table,
coordinates={"x": XeniumKeys.TRANSCRIPTS_X, "y": XeniumKeys.TRANSCRIPTS_Y, "z": XeniumKeys.TRANSCRIPTS_Z},
feature_key=XeniumKeys.FEATURE_NAME,
instance_key=XeniumKeys.CELL_ID,
transformations={"global": transform},
)
return points
def _get_tables_and_circles(
path: Path, cells_as_circles: bool, specs: dict[str, Any]
) -> AnnData | tuple[AnnData, AnnData]:
adata = _read_10x_h5(path / XeniumKeys.CELL_FEATURE_MATRIX_FILE)
metadata = pd.read_parquet(path / XeniumKeys.CELL_METADATA_FILE)
np.testing.assert_array_equal(metadata.cell_id.astype(str), adata.obs_names.values)
circ = metadata[[XeniumKeys.CELL_X, XeniumKeys.CELL_Y]].to_numpy()
adata.obsm["spatial"] = circ
metadata.drop([XeniumKeys.CELL_X, XeniumKeys.CELL_Y], axis=1, inplace=True)
adata.obs = metadata
adata.obs["region"] = specs["region"]
adata.obs["region"] = adata.obs["region"].astype("category")
adata.obs[XeniumKeys.CELL_ID] = _decode_cell_id_column(adata.obs[XeniumKeys.CELL_ID])
table = TableModel.parse(adata, region=specs["region"], region_key="region", instance_key=str(XeniumKeys.CELL_ID))
if cells_as_circles:
transform = Scale([1.0 / specs["pixel_size"], 1.0 / specs["pixel_size"]], axes=("x", "y"))
radii = np.sqrt(adata.obs[XeniumKeys.CELL_NUCLEUS_AREA].to_numpy() / np.pi)
circles = ShapesModel.parse(
circ,
geometry=0,
radius=radii,
transformations={"global": transform},
index=adata.obs[XeniumKeys.CELL_ID].copy(),
)
return table, circles
return table
def _get_images(
path: Path,
file: str,
imread_kwargs: Mapping[str, Any] = MappingProxyType({}),
image_models_kwargs: Mapping[str, Any] = MappingProxyType({}),
) -> SpatialImage | MultiscaleSpatialImage:
image = imread(path / file, **imread_kwargs)
if "c_coords" in image_models_kwargs and "dummy" in image_models_kwargs["c_coords"]:
# Napari currently interprets 4 channel images as RGB; a series of PRs to fix this is almost ready but they will
# not be merged soon.
# Here, since the new data from the xenium analyzer version 2.0.0 gives 4-channel images that are not RGBA,
# let's add a dummy channel as a temporary workaround.
image = da.concatenate([image, da.zeros_like(image[0:1])], axis=0)
return Image2DModel.parse(
image, transformations={"global": Identity()}, dims=("c", "y", "x"), **image_models_kwargs
)
def _add_aligned_images(
path: Path,
imread_kwargs: Mapping[str, Any] = MappingProxyType({}),
image_models_kwargs: Mapping[str, Any] = MappingProxyType({}),
) -> dict[str, MultiscaleSpatialImage]:
"""Discover and parse aligned images."""
images = {}
ome_tif_files = list(path.glob("*.ome.tif"))
csv_files = list(path.glob("*.csv"))
for file in ome_tif_files:
element_name = None
for suffix in [XeniumKeys.ALIGNED_HE_IMAGE_SUFFIX, XeniumKeys.ALIGNED_IF_IMAGE_SUFFIX]:
if file.name.endswith(suffix):
element_name = suffix.replace(XeniumKeys.ALIGNMENT_FILE_SUFFIX_TO_REMOVE, "")
break
if element_name is not None:
# check if an alignment file exists
expected_filename = file.name.replace(
XeniumKeys.ALIGNMENT_FILE_SUFFIX_TO_REMOVE, XeniumKeys.ALIGNMENT_FILE_SUFFIX_TO_ADD
)
alignment_files = [f for f in csv_files if f.name == expected_filename]
assert len(alignment_files) <= 1, f"Found more than one alignment file for {file.name}."
alignment_file = alignment_files[0] if alignment_files else None
# parse the image
image = xenium_aligned_image(file, alignment_file, imread_kwargs, image_models_kwargs)
images[element_name] = image
return images
def xenium_aligned_image(
image_path: str | Path,
alignment_file: str | Path | None,
imread_kwargs: Mapping[str, Any] = MappingProxyType({}),
image_models_kwargs: Mapping[str, Any] = MappingProxyType({}),
) -> MultiscaleSpatialImage:
"""
Read an image aligned to a Xenium dataset, with an optional alignment file.
Parameters
----------
image_path
Path to the image.
alignment_file
Path to the alignment file, if not passed it is assumed that the image is aligned.
image_models_kwargs
Keyword arguments to pass to the image models.
Returns
-------
The single-scale or multi-scale aligned image element.
"""
image_path = Path(image_path)
assert image_path.exists(), f"File {image_path} does not exist."
image = imread(image_path, **imread_kwargs)
# Depending on the version of pipeline that was used, some images have shape (1, y, x, 3) and others (3, y, x) or
# (4, y, x).
# since y and x are always different from 1, let's differentiate from the two cases here, independently of the
# pipeline version.
# Note that a more robust approach is to look at the xml metadata in the ome.tif; we should use this in a future PR.
# In fact, it could be that the len(image.shape) == 4 has actually dimes (1, x, y, c) and not (1, y, x, c). This is
# not a problem because the transformation is constructed to be consistent, but if is the case, the data orientation
# would be transposed compared to the original image, not ideal.
if len(image.shape) == 4:
assert image.shape[0] == 1
assert image.shape[-1] == 3
image = image.squeeze(0)
dims = ("y", "x", "c")
else:
assert len(image.shape) == 3
assert image.shape[0] in [3, 4]
if image.shape[0] == 4:
# as explained before in _get_images(), we need to add a dummy channel until we support 4-channel images as
# non-RGBA images in napari
image = da.concatenate([image, da.zeros_like(image[0:1])], axis=0)
dims = ("c", "y", "x")
if alignment_file is None:
transformation = Identity()
else:
alignment_file = Path(alignment_file)
assert alignment_file.exists(), f"File {alignment_file} does not exist."
alignment = pd.read_csv(alignment_file, header=None).values
transformation = Affine(alignment, input_axes=("x", "y"), output_axes=("x", "y"))
return Image2DModel.parse(
image,
dims=dims,
transformations={"global": transformation},
**image_models_kwargs,
)
def _selection_to_polygon(df: pd.DataFrame, pixel_size: float) -> Polygon:
xy_keys = [XeniumKeys.EXPLORER_SELECTION_X, XeniumKeys.EXPLORER_SELECTION_Y]
return Polygon(df[xy_keys].values / pixel_size)
def xenium_explorer_selection(
path: str | Path, pixel_size: float = 0.2125, return_list: bool = False
) -> Polygon | list[Polygon]:
"""Read the coordinates of a selection `.csv` file exported from the `Xenium Explorer <https://www.10xgenomics.com/support/software/xenium-explorer/latest>`_.
This file can be generated by the "Freehand Selection" or the "Rectangular Selection".
The output `Polygon` can be used for a polygon query on the pixel coordinate
system (by default, this is the `"global"` coordinate system for Xenium data).
If `spatialdata_xenium_explorer <https://github.com/quentinblampey/spatialdata_xenium_explorer>`_ was used,
the `pixel_size` argument must be set to the one used during conversion with `spatialdata_xenium_explorer`.
In case multiple polygons were selected on the Explorer and exported into a single file, it will return a list of polygons.
Parameters
----------
path
Path to the `.csv` file containing the selection coordinates
pixel_size
Size of a pixel in microns. By default, the Xenium pixel size is used.
return_list
If `True`, returns a list of Polygon even if only one polygon was selected
Returns
-------
:class:`shapely.geometry.polygon.Polygon`
"""
df = pd.read_csv(path, skiprows=2)
if XeniumKeys.EXPLORER_SELECTION_KEY not in df:
polygon = _selection_to_polygon(df, pixel_size)
return [polygon] if return_list else polygon
return [_selection_to_polygon(sub_df, pixel_size) for _, sub_df in df.groupby(XeniumKeys.EXPLORER_SELECTION_KEY)]
def _parse_version_of_xenium_analyzer(
specs: dict[str, Any],
hide_warning: bool = True,
) -> packaging.version.Version | None:
string = specs[XeniumKeys.ANALYSIS_SW_VERSION]
pattern = r"^(?:x|X)enium-(\d+\.\d+\.\d+(\.\d+-\d+)?)"
result = re.search(pattern, string)
# Example
# Input: xenium-2.0.0.6-35-ga7e17149a
# Output: 2.0.0.6-35
warning_message = (
f"Could not parse the version of the Xenium Analyzer from the string: {string}. This may happen for "
"experimental version of the data. Please report in GitHub https://github.com/scverse/spatialdata-io/issues.\n"
"The reader will continue assuming the latest version of the Xenium Analyzer."
)
if result is None:
if not hide_warning:
warnings.warn(warning_message, stacklevel=2)
return None
group = result.groups()[0]
try:
return packaging.version.parse(group)
except packaging.version.InvalidVersion:
if not hide_warning:
warnings.warn(warning_message, stacklevel=2)
return None
def cell_id_str_from_prefix_suffix_uint32(cell_id_prefix: ArrayLike, dataset_suffix: ArrayLike) -> ArrayLike:
# explained here:
# https://www.10xgenomics.com/support/software/xenium-onboard-analysis/latest/analysis/xoa-output-zarr#cellID
# convert to hex, remove the 0x prefix
cell_id_prefix_hex = [hex(x)[2:] for x in cell_id_prefix]
# shift the hex values
hex_shift = {str(i): chr(ord("a") + i) for i in range(10)} | {
chr(ord("a") + i): chr(ord("a") + 10 + i) for i in range(6)
}
cell_id_prefix_hex_shifted = ["".join([hex_shift[c] for c in x]) for x in cell_id_prefix_hex]
# merge the prefix and the suffix
cell_id_str = [str(x[0]).rjust(8, "a") + f"-{x[1]}" for x in zip(cell_id_prefix_hex_shifted, dataset_suffix)]
return np.array(cell_id_str)
def prefix_suffix_uint32_from_cell_id_str(cell_id_str: ArrayLike) -> tuple[ArrayLike, ArrayLike]:
# parse the string into the prefix and suffix
cell_id_prefix_str, dataset_suffix = zip(*[x.split("-") for x in cell_id_str])
dataset_suffix_int = [int(x) for x in dataset_suffix]
# reverse the shifted hex conversion
hex_unshift = {chr(ord("a") + i): str(i) for i in range(10)} | {
chr(ord("a") + 10 + i): chr(ord("a") + i) for i in range(6)
}
cell_id_prefix_hex = ["".join([hex_unshift[c] for c in x]) for x in cell_id_prefix_str]
# Convert hex (no need to add the 0x prefix)
cell_id_prefix = [int(x, 16) for x in cell_id_prefix_hex]
return np.array(cell_id_prefix, dtype=np.uint32), np.array(dataset_suffix_int)