Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[V8] fix instance id with insert/retrieve dataframe #2116

Draft
wants to merge 6 commits into
base: v8-release
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 61 additions & 33 deletions cognite/client/_api/datapoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
Any,
Literal,
NamedTuple,
TypedDict,
TypeGuard,
TypeVar,
cast,
overload,
)

from typing_extensions import Self
from typing_extensions import NotRequired, Self

from cognite.client._api.datapoint_tasks import (
BaseDpsFetchSubtask,
Expand Down Expand Up @@ -87,6 +88,13 @@
_TResLst = TypeVar("_TResLst", DatapointsList, DatapointsArrayList)


class DatapointsInsertType(TypedDict):
datapoints: list[tuple | dict] | Datapoints | DatapointsArray
id: NotRequired[int]
external_id: NotRequired[str]
instance_id: NotRequired[NodeId]


class DpsFetchStrategy(ABC):
def __init__(self, dps_client: DatapointsAPI, all_queries: list[DatapointsQuery], max_workers: int) -> None:
self.dps_client = dps_client
Expand Down Expand Up @@ -1128,13 +1136,18 @@ def retrieve_dataframe(
include_status (bool): Also return the status code, an integer, for each datapoint in the response. Only relevant for raw datapoint queries, not aggregates.
ignore_bad_datapoints (bool): Treat datapoints with a bad status code as if they do not exist. If set to false, raw queries will include bad datapoints in the response, and aggregates will in general omit the time period between a bad datapoint and the next good datapoint. Also, the period between a bad datapoint and the previous good datapoint will be considered constant. Default: True.
treat_uncertain_as_bad (bool): Treat datapoints with uncertain status codes as bad. If false, treat datapoints with uncertain status codes as good. Used for both raw queries and aggregates. Default: True.
uniform_index (bool): If only querying aggregates AND a single granularity is used AND no limit is used, specifying `uniform_index=True` will return a dataframe with an equidistant datetime index from the earliest `start` to the latest `end` (missing values will be NaNs). If these requirements are not met, a ValueError is raised. Default: False
uniform_index (bool): If only querying aggregates AND a single granularity is used (that's NOT a calendar granularity like month/quarter/year) AND no limit is used AND no timezone is used, specifying `uniform_index=True` will return a dataframe with an equidistant datetime index from the earliest `start` to the latest `end` (missing values will be NaNs). If these requirements are not met, a ValueError is raised. Default: False
include_aggregate_name (bool): Include 'aggregate' in the column name, e.g. `my-ts|average`. Ignored for raw time series. Default: True
include_granularity_name (bool): Include 'granularity' in the column name, e.g. `my-ts|12h`. Added after 'aggregate' when present. Ignored for raw time series. Default: False
column_names (Literal['id', 'external_id', 'instance_id']): Use either instance IDs, external IDs or IDs as column names. Time series missing instance ID will use external ID if it exists then ID as backup. Default: "instance_id"

Returns:
pd.DataFrame: A pandas DataFrame containing the requested time series. The ordering of columns is ids first, then external_ids. For time series with multiple aggregates, they will be sorted in alphabetical order ("average" before "max").
pd.DataFrame: A pandas DataFrame containing the requested time series. The ordering of columns is ids first, then external_ids, and lastly instance_ids. For time series with multiple aggregates, they will be sorted in alphabetical order ("average" before "max").

Tip:
Pandas DataFrames have one shared index, so when you fetch datapoints from multiple time series, the final index will be
the union of all the timestamps. Thus, unless all time series have the exact same timestamps, the various columns will contain
NaNs to fill the "missing" values. For lower memory usage on unaligned data, use the :py:meth:`~DatapointsAPI.retrieve_arrays` method.

Warning:
If you have duplicated time series in your query, the dataframe columns will also contain duplicates.
Expand Down Expand Up @@ -1170,20 +1183,21 @@ def retrieve_dataframe(
... end=datetime(2020, 12, 31, tzinfo=timezone.utc),
... uniform_index=True)

Get a pandas dataframe containing the 'average' aggregate for two time series using a 30-day granularity,
Get a pandas dataframe containing the 'average' aggregate for two time series using a monthly granularity,
starting Jan 1, 1970 all the way up to present, without having the aggregate name in the column names:

>>> df = client.time_series.data.retrieve_dataframe(
... external_id=["foo", "bar"],
... aggregates="average",
... granularity="30d",
... granularity="1mo",
... include_aggregate_name=False)

You may also use ``pandas.Timestamp`` to define start and end:
You may also use ``pandas.Timestamp`` to define start and end. Here we fetch using instance_id:

>>> import pandas as pd
>>> from cognite.client.data_classes.data_modeling import NodeId
>>> df = client.time_series.data.retrieve_dataframe(
... external_id="foo",
... instance_id=NodeId("my-space", "my-ts-xid"),
... start=pd.Timestamp("2023-01-01"),
... end=pd.Timestamp("2023-02-01"))
"""
Expand Down Expand Up @@ -1574,12 +1588,13 @@ def insert(
... )
>>> client.time_series.data.insert(data, external_id="foo")
"""
to_insert = DatapointsInsertType(
datapoints=datapoints,
**Identifier.of_either(id, external_id, instance_id).as_dict(), # type: ignore [typeddict-item]
)
DatapointsPoster(self).insert([to_insert])

post_dps_object = Identifier.of_either(id, external_id, instance_id).as_dict()
post_dps_object["datapoints"] = datapoints
DatapointsPoster(self).insert([post_dps_object])

def insert_multiple(self, datapoints: list[dict[str, str | int | list | Datapoints | DatapointsArray]]) -> None:
def insert_multiple(self, datapoints: list[DatapointsInsertType]) -> None:
"""`Insert datapoints into multiple time series <https://developer.cognite.com/api#tag/Time-series/operation/postMultiTimeSeriesDatapoints>`_

Timestamps can be represented as milliseconds since epoch or datetime objects. Note that naive datetimes
Expand All @@ -1589,7 +1604,7 @@ def insert_multiple(self, datapoints: list[dict[str, str | int | list | Datapoin
`status codes. <https://developer.cognite.com/dev/concepts/reference/quality_codes/>`_

Args:
datapoints (list[dict[str, str | int | list | Datapoints | DatapointsArray]]): The datapoints you wish to insert along with the ids of the time series. See examples below.
datapoints (list[DatapointsInsertType]): The datapoints you wish to insert along with the ids of the time series. See examples below.

Note:
All datapoints inserted without a status code (or symbol) is assumed to be good (code 0). To mark a value, pass
Expand All @@ -1607,7 +1622,6 @@ def insert_multiple(self, datapoints: list[dict[str, str | int | list | Datapoin

When passing tuples, the third element is optional and may contain the status code for the datapoint. To pass by symbol, a dictionary must be used.


>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.data_modeling import NodeId
>>> from cognite.client.data_classes import StatusCode
Expand Down Expand Up @@ -1718,34 +1732,43 @@ def delete_ranges(self, ranges: list[dict[str, Any]]) -> None:
def _delete_datapoints_ranges(self, delete_range_objects: list[dict]) -> None:
self._post(url_path=self._RESOURCE_PATH + "/delete", json={"items": delete_range_objects})

def insert_dataframe(self, df: pd.DataFrame, external_id_headers: bool = True, dropna: bool = True) -> None:
"""Insert a dataframe (columns must be unique).
def insert_dataframe(self, df: pd.DataFrame, dropna: bool = True) -> None:
"""Insert a dataframe containing datapoints to one or more time series.

The index of the dataframe must contain the timestamps (pd.DatetimeIndex). The names of the columns specify
the ids or external ids of the time series to which the datapoints will be written.
The index of the dataframe must contain the timestamps (pd.DatetimeIndex). The column identifiers
must contain the IDs (int), external IDs (str) or instance IDs (NodeId) of the already existing
time series to which the datapoints from that particular column will be written.

Said time series must already exist.
Note:
The column identifiers must be unique.

Args:
df (pd.DataFrame): Pandas DataFrame object containing the time series.
external_id_headers (bool): Interpret the column names as external id. Pass False if using ids. Default: True.
dropna (bool): Set to True to ignore NaNs in the given DataFrame, applied per column. Default: True.
dropna (bool): Set to True to ignore NaNs in the given DataFrame, applied per individual column. Default: True.

Warning:
You can not insert datapoints with status codes using this method (``insert_dataframe``), you'll need
to use the :py:meth:`~DatapointsAPI.insert` method instead (or :py:meth:`~DatapointsAPI.insert_multiple`)!

Examples:
Post a dataframe with white noise:

Post a dataframe with white noise to three time series, one using ID, one using external id
and one using instance id:

>>> import numpy as np
>>> import pandas as pd
>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.data_modeling import NodeId
>>> client = CogniteClient()
>>> ts_xid = "my-foo-ts"
>>> idx = pd.date_range(start="2018-01-01", periods=100, freq="1d")
>>> noise = np.random.normal(0, 1, 100)
>>> df = pd.DataFrame({ts_xid: noise}, index=idx)
>>> node_id = NodeId("my-space", "my-ts-xid")
>>> df = pd.DataFrame(
... {
... 123: np.random.normal(0, 1, 100),
... "foo": np.random.normal(0, 1, 100),
... node_id: np.random.normal(0, 1, 100),
... },
... index=pd.date_range(start="2018-01-01", periods=100, freq="1d")
... )
>>> client.time_series.data.insert_dataframe(df)
"""
np, pd = local_import("numpy", "pandas")
Expand All @@ -1762,13 +1785,18 @@ def insert_dataframe(self, df: pd.DataFrame, external_id_headers: bool = True, d
idx = df.index.to_numpy("datetime64[ms]").astype(np.int64)
for column_id, col in df.items():
mask = col.notna()
datapoints = list(map(_InsertDatapoint, idx[mask], col[mask]))
datapoints: list[Any] = list(map(_InsertDatapoint, idx[mask], col[mask]))
if not datapoints:
continue
if external_id_headers:
dps.append({"datapoints": datapoints, "externalId": column_id})
else:
dps.append({"datapoints": datapoints, "id": int(column_id)})
match column_id:
case int():
dps.append(DatapointsInsertType(datapoints=datapoints, id=column_id))
case str():
dps.append(DatapointsInsertType(datapoints=datapoints, external_id=column_id))
case NodeId():
dps.append(DatapointsInsertType(datapoints=datapoints, instance_id=column_id))
case _:
raise ValueError(f"Column identifiers must be either int, str or NodeId, not {type(column_id)}")
self.insert_multiple(dps)

def _select_dps_fetch_strategy(self, queries: list[DatapointsQuery]) -> type[DpsFetchStrategy]:
Expand Down Expand Up @@ -1811,7 +1839,7 @@ def __init__(self, dps_client: DatapointsAPI) -> None:
self.ts_limit = self.dps_client._POST_DPS_OBJECTS_LIMIT
self.max_workers = self.dps_client._config.max_workers

def insert(self, dps_object_lst: list[dict[str, Any]]) -> None:
def insert(self, dps_object_lst: list[DatapointsInsertType]) -> None:
to_insert = self._verify_and_prepare_dps_objects(dps_object_lst)
# To ensure we stay below the max limit on objects per request, we first chunk based on it:
# (with 10k limit this is almost always just one chunk)
Expand All @@ -1827,7 +1855,7 @@ def insert(self, dps_object_lst: list[dict[str, Any]]) -> None:
)

def _verify_and_prepare_dps_objects(
self, dps_object_lst: list[dict[str, Any]]
self, dps_object_lst: list[DatapointsInsertType]
) -> list[tuple[Identifier, list[_InsertDatapoint]]]:
dps_to_insert: dict[Identifier, list[_InsertDatapoint]] = defaultdict(list)
for obj in dps_object_lst:
Expand Down
36 changes: 24 additions & 12 deletions cognite/client/data_classes/datapoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -835,9 +835,14 @@ def to_pandas( # type: ignore [override]
if tz is not None:
idx = pd.to_datetime(idx, utc=True).tz_convert(convert_tz_for_pandas(tz))

identifier = resolve_ts_identifier_as_df_column_name(self, column_names)
identifier = resolve_ts_identifier_as_df_column_name(
self,
column_names,
include_aggregate_name,
include_granularity_name,
)
if self.value is not None:
raw_columns: dict[str, npt.NDArray] = {identifier: self.value}
raw_columns: dict[int | str | NodeId, npt.NDArray] = {identifier: self.value}
if include_status:
if self.status_code is not None:
raw_columns[f"{identifier}|status_code"] = self.status_code
Expand All @@ -846,10 +851,15 @@ def to_pandas( # type: ignore [override]
return pd.DataFrame(raw_columns, index=idx, copy=False)

(_, *agg_names), (_, *arrays) = self._data_fields()
aggregate_columns = [
identifier + include_aggregate_name * f"|{agg}" + include_granularity_name * f"|{self.granularity}"
for agg in agg_names
]
if include_aggregate_name or include_granularity_name:
# User wants to include info in the column names, so we convert to strings:
aggregate_columns: list[Any] = [
str(identifier) + include_aggregate_name * f"|{agg}" + include_granularity_name * f"|{self.granularity}"
for agg in agg_names
]
else:
# Keep id as int, or instance_id as NodeId (presumable only a single aggregate (but we support more):
aggregate_columns = [identifier for _ in agg_names]
# Since columns might contain duplicates, we can't instantiate from dict as only the
# last key (array/column) would be kept:
(df := pd.DataFrame(dict(enumerate(arrays)), index=idx, copy=False)).columns = aggregate_columns
Expand Down Expand Up @@ -1054,10 +1064,12 @@ def to_pandas( # type: ignore [override]
pandas.DataFrame: The dataframe.
"""
pd = local_import("pandas")
if column_names == "externalId":
column_names = "external_id" # Camel case for backwards compatibility
identifier = resolve_ts_identifier_as_df_column_name(self, column_names)

identifier = resolve_ts_identifier_as_df_column_name(
self,
column_names,
include_aggregate_name,
include_granularity_name,
)
if include_errors and self.error is None:
raise ValueError("Unable to 'include_errors', only available for data from synthetic datapoint queries")

Expand All @@ -1082,9 +1094,9 @@ def to_pandas( # type: ignore [override]
data_lists.append(self.status_symbol)
continue
if include_aggregate_name:
id_col_name += f"|{attr}"
id_col_name = f"{id_col_name}|{attr}"
if include_granularity_name and self.granularity is not None:
id_col_name += f"|{self.granularity}"
id_col_name = f"{id_col_name}|{self.granularity}"
field_names.append(id_col_name)
if attr == "error":
data_lists.append(data)
Expand Down
25 changes: 18 additions & 7 deletions cognite/client/utils/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing import TYPE_CHECKING, Any, Literal

from cognite.client.exceptions import CogniteImportError
from cognite.client.utils._auxiliary import no_op
from cognite.client.utils._importing import local_import
from cognite.client.utils._text import to_camel_case
from cognite.client.utils._time import TIME_ATTRIBUTES, ZoneInfo
Expand All @@ -19,6 +20,7 @@

from cognite.client.data_classes import Datapoints, DatapointsArray, DatapointsArrayList, DatapointsList
from cognite.client.data_classes._base import T_CogniteResource, T_CogniteResourceList
from cognite.client.data_classes.data_modeling.ids import NodeId


NULLABLE_INT_COLS = {
Expand Down Expand Up @@ -150,10 +152,20 @@ def concat_dataframes_with_nullable_int_cols(dfs: Sequence[pd.DataFrame]) -> pd.


def resolve_ts_identifier_as_df_column_name(
dps: Datapoints | DatapointsArray, column_names: Literal["id", "external_id", "instance_id"]
) -> str:
# Note: Although pandas columns can support numbers and objects like tuples, we may need to pad with
# e.g. aggregate info, so we always ensure the column names are strings.
dps: Datapoints | DatapointsArray,
column_names: Literal["id", "external_id", "instance_id"],
include_aggregate_name: bool,
include_granularity_name: bool,
) -> int | str | NodeId:
"""
When we need to pad the identifier with additional info, e.g. aggregate name or granularity, we
must convert non-string identifiers like id (int) and instance_id (NodeId) to strings.

Otherwise, we can use the identifier as is (always done for raw datapoints).
"""
is_raw = dps.value is not None
maybe_string_fn = no_op if is_raw or (not include_aggregate_name and not include_granularity_name) else str

if column_names not in {"id", "external_id", "instance_id"}:
# Don't raise here, the user may have waited a long time for datapoints fetching
warnings.warn(
Expand All @@ -166,8 +178,7 @@ def resolve_ts_identifier_as_df_column_name(
original = column_names
if column_names == "instance_id":
if dps.instance_id:
# Keep column name as short as possible, default repr adds "space=..." and "external_id=..."
return "NodeId({}, {})".format(*dps.instance_id.as_tuple())
return maybe_string_fn(dps.instance_id)
column_names = "external_id" # Fallback to external_id

if column_names == "external_id":
Expand All @@ -177,7 +188,7 @@ def resolve_ts_identifier_as_df_column_name(

if column_names == "id":
if dps.id:
return str(dps.id)
return maybe_string_fn(dps.id)

fallbacks = {"instance_id": ["external_id", "id"], "external_id": ["id"], "id": []}
raise ValueError(
Expand Down
Loading