diff --git a/ravendb/documents/indexes/index_creation.py b/ravendb/documents/indexes/index_creation.py index aea06ff4..a2a13cf3 100644 --- a/ravendb/documents/indexes/index_creation.py +++ b/ravendb/documents/indexes/index_creation.py @@ -155,7 +155,7 @@ def __init__(self, index_name: str): if len(self._index_name) > 256: raise ValueError(f"The index name is limited to 256 characters, but was: {self._index_name}") - self.reduce: Union[None, str] = None + self.reduce: Optional[str] = None self.stores_strings: Dict[str, FieldStorage] = {} self.indexes_strings: Dict[str, FieldIndexing] = {} @@ -164,17 +164,17 @@ def __init__(self, index_name: str): self.term_vectors_strings: Dict[str, FieldTermVector] = {} self.spatial_indexes_strings: Dict[str, SpatialOptions] = {} - self.lock_mode: Union[None, IndexLockMode] = None - self.priority: Union[None, IndexLockMode] = None - self.state: Union[None, IndexState] = None - self.deployment_mode: Union[None, IndexDeploymentMode] = None + self.lock_mode: Optional[IndexLockMode] = None + self.priority: Optional[IndexLockMode] = None + self.state: Optional[IndexState] = None + self.deployment_mode: Optional[IndexDeploymentMode] = None - self.output_reduce_to_collection: Union[None, str] = None - self.pattern_for_output_reduce_to_collection_references: Union[None, str] = None - self.pattern_references_collection_name: Union[None, str] = None + self.output_reduce_to_collection: Optional[str] = None + self.pattern_for_output_reduce_to_collection_references: Optional[str] = None + self.pattern_references_collection_name: Optional[str] = None - self.additional_sources: Union[None, Dict[str, str]] = None - self.additional_assemblies: Union[None, Set[AdditionalAssembly]] = None + self.additional_sources: Optional[Dict[str, str]] = None + self.additional_assemblies: Optional[Set[AdditionalAssembly]] = None self.configuration: Dict[str, str] = {} @abstractmethod diff --git a/ravendb/documents/indexes/time_series.py b/ravendb/documents/indexes/time_series.py new file mode 100644 index 00000000..97e7dc81 --- /dev/null +++ b/ravendb/documents/indexes/time_series.py @@ -0,0 +1,270 @@ +from abc import ABC +from typing import Optional, Set, Dict, Callable, Union, TypeVar, List + +from ravendb import IndexDefinition, IndexSourceType +from ravendb.documents.conventions import DocumentConventions +from ravendb.documents.indexes.definitions import ( + IndexPriority, + IndexLockMode, + IndexDeploymentMode, + IndexState, + FieldStorage, + FieldIndexing, + FieldTermVector, + IndexFieldOptions, + IndexType, +) +from ravendb.documents.indexes.index_creation import AbstractIndexCreationTaskBase, AbstractIndexDefinitionBuilder +from ravendb.documents.indexes.spatial.configuration import SpatialOptions, SpatialOptionsFactory +from ravendb.primitives import constants + + +_T_IndexDefinition = TypeVar("_T_IndexDefinition", bound=IndexDefinition) + + +class TimeSeriesIndexDefinition(IndexDefinition): + @property + def source_type(self) -> IndexSourceType: + return IndexSourceType.TIME_SERIES + + +class TimeSeriesIndexDefinitionBuilder(AbstractIndexDefinitionBuilder[TimeSeriesIndexDefinition]): + def __init__(self, index_name: Optional[str] = None): + super().__init__(index_name) + self.map: Optional[str] = None + + def _new_index_definition(self) -> TimeSeriesIndexDefinition: + return TimeSeriesIndexDefinition() + + def to_index_definition( + self, conventions: DocumentConventions, validate_map: bool = True + ) -> TimeSeriesIndexDefinition: + if self.map is None and validate_map: + raise RuntimeError( + f"Map is required to generate an index, " + f"you cannot create an index without a valid Map property (in index {self._index_name})." + ) + return super().to_index_definition(conventions, validate_map) + + def _to_index_definition( + self, index_definition: TimeSeriesIndexDefinition, conventions: DocumentConventions + ) -> None: + if map is None: + return + + index_definition.maps.add(self.map) + + +class AbstractGenericTimeSeriesIndexCreationTask(AbstractIndexCreationTaskBase[TimeSeriesIndexDefinition], ABC): + def __init__( + self, + conventions: DocumentConventions = None, + priority: IndexPriority = None, + lock_mode: IndexLockMode = None, + deployment_mode: IndexDeploymentMode = None, + state: IndexState = None, + ): + super().__init__(conventions, priority, lock_mode, deployment_mode, state) + self._reduce: Optional[str] = None + self.stores_strings: Dict[str, FieldStorage] = {} + self.indexes_strings: Dict[str, FieldIndexing] = {} + self.analyzers_strings: Dict[str, str] = {} + self.index_suggestions: Set[str] = set() + self.term_vectors_strings: Dict[str, FieldTermVector] = {} + self.spatial_options_strings: Dict[str, SpatialOptions] = {} + + self._output_reduce_to_collection: Optional[str] = None + self._pattern_for_output_reduce_to_collection_references: Optional[str] = None + self._pattern_references_collection_name: Optional[str] = None + + @property + def is_map_reduce(self) -> bool: + return self._reduce is not None + + def _index(self, field: str, indexing: FieldIndexing) -> None: + self.indexes_strings[field] = indexing + + def _spatial(self, field: str, indexing: Callable[[SpatialOptionsFactory], SpatialOptions]) -> None: + self.spatial_options_strings[field] = indexing(SpatialOptionsFactory()) + + def _store_all_fields(self, storage: FieldStorage) -> None: + self.stores_strings[constants.Documents.Indexing.Fields.ALL_FIELDS] = storage + + def _store(self, field: str, storage: FieldStorage) -> None: + self.stores_strings[field] = storage + + def _analyze(self, field: str, analyzer: str) -> None: + self.analyzers_strings[field] = analyzer + + def _term_vector(self, field: str, term_vector: FieldTermVector) -> None: + self.term_vectors_strings[field] = term_vector + + def _suggestion(self, field: str) -> None: + self.index_suggestions.add(field) + + +class AbstractTimeSeriesIndexCreationTask(AbstractGenericTimeSeriesIndexCreationTask): + def __init__( + self, + conventions: DocumentConventions = None, + priority: IndexPriority = None, + lock_mode: IndexLockMode = None, + deployment_mode: IndexDeploymentMode = None, + state: IndexState = None, + ): + super().__init__(conventions, priority, lock_mode, deployment_mode, state) + self.map: Optional[str] = None + + def create_index_definition(self) -> Union[IndexDefinition, _T_IndexDefinition]: + if self.conventions is None: + self.conventions = DocumentConventions() + + index_definition_builder = TimeSeriesIndexDefinitionBuilder(self.index_name) + index_definition_builder.indexes_strings = self.indexes_strings + index_definition_builder.analyzers_strings = self.analyzers_strings + index_definition_builder.map = self.map + index_definition_builder.reduce = self._reduce + index_definition_builder.stores_strings = self.stores_strings + index_definition_builder.suggestions_options = self.index_suggestions + index_definition_builder.term_vector_strings = self.term_vectors_strings + index_definition_builder.spatial_indexes_strings = self.spatial_options_strings + index_definition_builder.output_reduce_to_collection = self._output_reduce_to_collection + index_definition_builder.pattern_for_output_reduce_to_collection_references = ( + self._pattern_for_output_reduce_to_collection_references + ) + index_definition_builder.pattern_references_collection_name = self._pattern_references_collection_name + index_definition_builder.additional_sources = self.additional_sources + index_definition_builder.additional_assemblies = self.additional_assemblies + index_definition_builder.configuration = self.configuration + index_definition_builder.lock_mode = self.lock_mode + index_definition_builder.priority = self.priority + index_definition_builder.state = self.state + index_definition_builder.deployment_mode = self.deployment_mode + + return index_definition_builder.to_index_definition(self.conventions) + + +class AbstractMultiMapTimeSeriesIndexCreationTask(AbstractGenericTimeSeriesIndexCreationTask): + def __init__( + self, + conventions: DocumentConventions = None, + priority: IndexPriority = None, + lock_mode: IndexLockMode = None, + deployment_mode: IndexDeploymentMode = None, + state: IndexState = None, + ): + super().__init__(conventions, priority, lock_mode, deployment_mode, state) + self.maps: List[str] = [] + + def _add_map(self, map: str) -> None: + if map is None: + raise ValueError("Map cannot be None.") + self.maps.append(map) + + def create_index_definition(self) -> Union[IndexDefinition, _T_IndexDefinition]: + if self.conventions is None: + self.conventions = DocumentConventions() + + index_definition_builder = TimeSeriesIndexDefinitionBuilder(self.index_name) + index_definition_builder.indexes_strings = self.indexes_strings + index_definition_builder.analyzers_strings = self.analyzers_strings + index_definition_builder.reduce = self._reduce + index_definition_builder.stores_strings = self.stores_strings + index_definition_builder.suggestions_options = self.index_suggestions + index_definition_builder.term_vector_strings = self.term_vectors_strings + index_definition_builder.spatial_indexes_strings = self.spatial_options_strings + index_definition_builder.output_reduce_to_collection = self._output_reduce_to_collection + index_definition_builder.pattern_for_output_reduce_to_collection_references = ( + self._pattern_for_output_reduce_to_collection_references + ) + index_definition_builder.pattern_references_collection_name = self._pattern_references_collection_name + index_definition_builder.additional_sources = self.additional_sources + index_definition_builder.additional_assemblies = self.additional_assemblies + index_definition_builder.configuration = self.configuration + index_definition_builder.lock_mode = self.lock_mode + index_definition_builder.priority = self.priority + index_definition_builder.state = self.state + index_definition_builder.deployment_mode = self.deployment_mode + + index_definition = index_definition_builder.to_index_definition(self.conventions, False) + index_definition.maps = set(self.maps) + + return index_definition + + +class AbstractJavaScriptTimeSeriesIndexCreationTask(AbstractIndexCreationTaskBase[TimeSeriesIndexDefinition]): + def __init__( + self, + conventions: DocumentConventions = None, + priority: IndexPriority = None, + lock_mode: IndexLockMode = None, + deployment_mode: IndexDeploymentMode = None, + state: IndexState = None, + ): + super().__init__(conventions, priority, lock_mode, deployment_mode, state) + self._definition = TimeSeriesIndexDefinition() + + @property + def maps(self) -> Set[str]: + return self._definition.maps + + @maps.setter + def maps(self, maps: Set[str]): + self._definition.maps = maps + + @property + def fields(self) -> Dict[str, IndexFieldOptions]: + return self._definition.fields + + @fields.setter + def fields(self, fields: Dict[str, IndexFieldOptions]): + self._definition.fields = fields + + @property + def reduce(self) -> str: + return self._definition.reduce + + @reduce.setter + def reduce(self, reduce: str): + self._definition.reduce = reduce + + def is_map_reduce(self) -> bool: + return self.reduce is not None + + @property + def output_reduce_to_collection(self) -> str: + return self._definition.output_reduce_to_collection + + @output_reduce_to_collection.setter + def output_reduce_to_collection(self, output_reduce_to_collection: str): + self._definition.output_reduce_to_collection = output_reduce_to_collection + + @property + def pattern_references_collection_name(self) -> str: + return self._definition.pattern_references_collection_name + + @pattern_references_collection_name.setter + def pattern_references_collection_name(self, pattern_references_collection_name: str): + self._definition.pattern_references_collection_name = pattern_references_collection_name + + @property + def pattern_for_output_reduce_to_collection_references(self) -> str: + return self._definition.pattern_for_output_reduce_to_collection_references + + @pattern_for_output_reduce_to_collection_references.setter + def pattern_for_output_reduce_to_collection_references(self, pattern_for_output_reduce_to_collection_references): + self._definition.pattern_for_output_reduce_to_collection_references = ( + pattern_for_output_reduce_to_collection_references + ) + + def create_index_definition(self) -> TimeSeriesIndexDefinition: + self._definition.name = self.index_name + self._definition.type = IndexType.JAVA_SCRIPT_MAP_REDUCE if self.is_map_reduce() else IndexType.JAVA_SCRIPT_MAP + self._definition.additional_sources = self.additional_sources or {} + self._definition.additional_assemblies = self.additional_assemblies or set() + self._definition.configuration = self.configuration + self._definition.lock_mode = self.lock_mode + self._definition.priority = self.priority + self._definition.state = self.state + self._definition.deployment_mode = self.deployment_mode + return self._definition diff --git a/ravendb/documents/operations/indexes.py b/ravendb/documents/operations/indexes.py index 00b2737c..4392f27c 100644 --- a/ravendb/documents/operations/indexes.py +++ b/ravendb/documents/operations/indexes.py @@ -1,7 +1,7 @@ from __future__ import annotations import enum import json -from typing import List, TYPE_CHECKING +from typing import List, TYPE_CHECKING, Optional import requests @@ -556,7 +556,7 @@ def get_raft_unique_request_id(self) -> str: class GetTermsOperation(MaintenanceOperation[List[str]]): - def __init__(self, index_name: str, field: str, from_value: str, page_size: int = None): + def __init__(self, index_name: str, field: str, from_value: Optional[str], page_size: int = None): if index_name is None: raise ValueError("Index name cannot be None") if field is None: @@ -584,8 +584,8 @@ def create_request(self, server_node) -> requests.Request: f"{server_node.url}/databases/{server_node.database}" f"/indexes/terms?name={Utils.escape(self.__index_name, False, False)}" f"&field={Utils.escape(self.__field, False, False)}" - f"&fromValue={self.__from_value if self.__from_value is not None else ''}" - f"&pageSize={self.__page_size if self.__page_size is not None else ''}", + f"&fromValue={self.__from_value or ''}" + f"&pageSize={self.__page_size or ''}", ) def set_response(self, response: str, from_cache: bool) -> None: diff --git a/ravendb/documents/operations/time_series.py b/ravendb/documents/operations/time_series.py index 607f1d7f..efc383d2 100644 --- a/ravendb/documents/operations/time_series.py +++ b/ravendb/documents/operations/time_series.py @@ -32,10 +32,10 @@ def __init__( if not name or name.isspace(): raise ValueError("Name cannot be None or empty") - if aggregation_time is not None and aggregation_time.compare_to(TimeValue.ZERO()) <= 0: + if aggregation_time and aggregation_time.compare_to(TimeValue.ZERO()) <= 0: raise ValueError("Aggregation time must be greater than zero") - if retention_time is not None and retention_time.compare_to(TimeValue.ZERO()) <= 0: + if retention_time is None or retention_time.compare_to(TimeValue.ZERO()) <= 0: raise ValueError("Retention time must be greater than zero") self.retention_time = retention_time @@ -43,22 +43,14 @@ def __init__( self.name = name - @classmethod - def from_json(cls, json_dict: Dict[str, Any]) -> TimeSeriesPolicy: - return cls( - json_dict["Name"], # todo: Invalid deserialization - Utils.string_to_timedelta(json_dict["AggregationTime"]), - Utils.string_to_timedelta(json_dict["RetentionTime"]), - ) - def get_time_series_name(self, raw_name: str) -> str: return raw_name + TimeSeriesConfiguration.TIME_SERIES_ROLLUP_SEPARATOR + self.name def to_json(self) -> Dict[str, Any]: return { - "Name": self.name, # todo: Invalid serialization - "AggregationTime": Utils.timedelta_to_str(self.aggregation_time), - "RetentionTime": Utils.timedelta_to_str(self.retention_time), + "Name": self.name, + "AggregationTime": self.aggregation_time.to_json() if self.aggregation_time else None, + "RetentionTime": self.retention_time.to_json(), } @@ -72,15 +64,13 @@ def DEFAULT_POLICY(cls) -> RawTimeSeriesPolicy: def __init__(self, retention_time: TimeValue = TimeValue.MAX_VALUE()): if retention_time.compare_to(TimeValue.ZERO()) <= 0: raise ValueError("Retention time must be greater than zero") - - self.name = self.POLICY_STRING - self.retention_time = retention_time + super().__init__(self.POLICY_STRING, retention_time=retention_time) class TimeSeriesCollectionConfiguration: def __init__( self, - disabled: Optional[bool] = None, + disabled: Optional[bool] = False, policies: Optional[List[TimeSeriesPolicy]] = None, raw_policy: Optional[RawTimeSeriesPolicy] = RawTimeSeriesPolicy.DEFAULT_POLICY(), ): @@ -88,14 +78,6 @@ def __init__( self.policies = policies self.raw_policy = raw_policy - @classmethod - def from_json(cls, json_dict: Dict[str, Any]) -> TimeSeriesCollectionConfiguration: - return cls( - json_dict["Disabled"], - [TimeSeriesPolicy.from_json(policy_json) for policy_json in json_dict["Policies"]], - RawTimeSeriesPolicy.from_json(json_dict["RawPolicy"]), - ) - def to_json(self) -> Dict[str, Any]: return { "Disabled": self.disabled, @@ -348,12 +330,12 @@ def to_json(self) -> Dict[str, Any]: def append(self, append_operation: AppendOperation) -> None: if self._appends is None: - self._appends = [] - filtered = list(filter(lambda x: x.timestamp == append_operation.timestamp, self._appends)) + self._appends = [] # todo: perf + filtered = self._appends - if len(filtered) != 0: - # element with given timestamp already exists - remove and retry add operation - self._appends.remove(filtered.pop()) + # if len(filtered) != 0: + # # element with given timestamp already exists - remove and retry add operation + # self._appends.remove(filtered.pop()) self._appends.append(append_operation) @@ -699,3 +681,36 @@ def create_request(self, node: ServerNode) -> requests.Request: def set_response(self, response: Optional[str], from_cache: bool) -> None: self.result = TimeSeriesStatistics.from_json(json.loads(response)) + + +class ConfigureTimeSeriesOperation(MaintenanceOperation[ConfigureTimeSeriesOperationResult]): + def __init__(self, configuration: TimeSeriesConfiguration): + if not configuration: + raise ValueError("Configuration cannot be None") + + self._configuration = configuration + + def get_command(self, conventions: "DocumentConventions") -> "RavenCommand[ConfigureTimeSeriesOperationResult]": + return self.ConfigureTimeSeriesCommand(self._configuration) + + class ConfigureTimeSeriesCommand(RavenCommand[ConfigureTimeSeriesOperationResult], RaftCommand): + def __init__(self, configuration: TimeSeriesConfiguration): + super().__init__(ConfigureTimeSeriesOperationResult) + self._configuration = configuration + + def is_read_request(self) -> bool: + return False + + def create_request(self, node: ServerNode) -> requests.Request: + request = requests.Request("POST", f"{node.url}/databases/{node.database}/admin/timeseries/config") + request.data = self._configuration.to_json() + return request + + def set_response(self, response: Optional[str], from_cache: bool) -> None: + if not response: + self._throw_invalid_response() + + self.result = ConfigureTimeSeriesOperationResult.from_json(json.loads(response)) + + def get_raft_unique_request_id(self) -> str: + return RaftIdGenerator.new_id() diff --git a/ravendb/documents/queries/time_series.py b/ravendb/documents/queries/time_series.py new file mode 100644 index 00000000..a5e48866 --- /dev/null +++ b/ravendb/documents/queries/time_series.py @@ -0,0 +1,189 @@ +from __future__ import annotations + +import datetime +from typing import Optional, Dict, Any, List, Type, TypeVar, Generic + +from ravendb.documents.session.time_series import TimeSeriesEntry, TypedTimeSeriesEntry, TimeSeriesValuesHelper +from ravendb.tools.utils import Utils + +_T_TS_Bindable = TypeVar("_T_TS_Bindable") + + +class TimeSeriesQueryResult: + def __init__(self, count: Optional[int] = None): + self.count = count + + @classmethod + def from_json(cls, json_dict: Dict[str, Any]) -> TimeSeriesQueryResult: + return cls(json_dict["Count"]) + + +class TimeSeriesRawResult(TimeSeriesQueryResult): + def __init__(self, count: Optional[int] = None, results: Optional[List[TimeSeriesEntry]] = None): + super().__init__(count) + self.results = results + + @classmethod + def from_json(cls, json_dict: Dict[str, Any]) -> TimeSeriesQueryResult: + json_dict = json_dict["__timeSeriesQueryFunction"] + return cls( + json_dict["Count"], + [TimeSeriesEntry.from_json(time_series_entry_json) for time_series_entry_json in json_dict["Results"]], + ) + + def as_typed_result(self, object_type: Type[_T_TS_Bindable]) -> TypedTimeSeriesRawResult[_T_TS_Bindable]: + result = TypedTimeSeriesRawResult() + result.count = self.count + result.results = [time_series_entry.as_typed_entry(object_type) for time_series_entry in self.results] + return result + + +class TypedTimeSeriesRawResult(TimeSeriesQueryResult, Generic[_T_TS_Bindable]): + def __init__( + self, count: Optional[int] = None, results: Optional[List[TypedTimeSeriesEntry[_T_TS_Bindable]]] = None + ): + super().__init__(count) + self.results = results + + @classmethod + def from_json(cls, json_dict: Dict[str, Any]) -> TimeSeriesQueryResult: + return cls( + json_dict["Count"], + [TypedTimeSeriesEntry.from_json(typed_ts_entry_json) for typed_ts_entry_json in json_dict["Results"]], + ) + + +class TimeSeriesQueryBuilder: + def __init__(self, query: str = None): + self._query = query + + @property + def query_text(self): + return self._query + + def raw(self, query_text: str) -> TimeSeriesQueryResult: + self._query = query_text + return None + + +class TimeSeriesRangeAggregation: + def __init__( + self, + count: List[int] = None, + max: List[float] = None, + min: List[float] = None, + last: List[float] = None, + first: List[float] = None, + average: List[float] = None, + sum: List[float] = None, + to_date: datetime.datetime = None, + from_date: datetime.datetime = None, + ): + self.count = count + self.max = max + self.min = min + self.last = last + self.first = first + self.average = average + self.sum = sum + self.to_date = to_date + self.from_date = from_date + + @classmethod + def from_json(cls, json_dict: Dict[str, Any]) -> TimeSeriesRangeAggregation: + return cls( + json_dict["Count"] if "Count" in json_dict else None, + json_dict["Max"] if "Max" in json_dict else None, + json_dict["Min"] if "Min" in json_dict else None, + json_dict["Last"] if "Last" in json_dict else None, + json_dict["First"] if "First" in json_dict else None, + json_dict["Average"] if "Average" in json_dict else None, + json_dict["Sum"] if "Sum" in json_dict else None, + Utils.string_to_datetime(json_dict["To"]), + Utils.string_to_datetime(json_dict["From"]), + ) + + def as_typed_entry( + self, ts_bindable_object_type: Type[_T_TS_Bindable] + ) -> TypedTimeSeriesRangeAggregation[_T_TS_Bindable]: + typed_entry = TypedTimeSeriesRangeAggregation() + + typed_entry.from_date = self.from_date + typed_entry.to_date = self.to_date + typed_entry.min = ( + TimeSeriesValuesHelper.set_fields(ts_bindable_object_type, self.min, False) if self.min else None + ) + typed_entry.max = ( + TimeSeriesValuesHelper.set_fields(ts_bindable_object_type, self.max, False) if self.max else None + ) + typed_entry.first = ( + TimeSeriesValuesHelper.set_fields(ts_bindable_object_type, self.first, False) if self.first else None + ) + typed_entry.last = ( + TimeSeriesValuesHelper.set_fields(ts_bindable_object_type, self.last, False) if self.last else None + ) + typed_entry.sum = ( + TimeSeriesValuesHelper.set_fields(ts_bindable_object_type, self.sum, False) if self.sum else None + ) + counts = [float(count) for count in self.count] + typed_entry.count = ( + TimeSeriesValuesHelper.set_fields(ts_bindable_object_type, counts, False) if self.count else None + ) + typed_entry.average = ( + TimeSeriesValuesHelper.set_fields(ts_bindable_object_type, self.average, False) if self.average else None + ) + + return typed_entry + + +class TypedTimeSeriesRangeAggregation(Generic[_T_TS_Bindable]): + def __init__( + self, + count: _T_TS_Bindable = None, + max: _T_TS_Bindable = None, + min: _T_TS_Bindable = None, + last: _T_TS_Bindable = None, + first: _T_TS_Bindable = None, + average: _T_TS_Bindable = None, + sum: _T_TS_Bindable = None, + to_date: datetime.datetime = None, + from_date: datetime.datetime = None, + ): + self.count = count + self.max = max + self.min = min + self.last = last + self.first = first + self.average = average + self.sum = sum + self.to_date = to_date + self.from_date = from_date + + +class TypedTimeSeriesAggregationResult(TimeSeriesQueryResult, Generic[_T_TS_Bindable]): + def __init__( + self, count: Optional[int] = None, results: List[TypedTimeSeriesRangeAggregation[_T_TS_Bindable]] = None + ): + super(TypedTimeSeriesAggregationResult, self).__init__(count) + self.results = results + + +class TimeSeriesAggregationResult(TimeSeriesQueryResult): + def __init__(self, count: Optional[int] = None, results: Optional[List[TimeSeriesRangeAggregation]] = None): + super().__init__(count) + self.results = results + + def as_typed_result( + self, ts_bindable_object_type: Type[_T_TS_Bindable] + ) -> TypedTimeSeriesAggregationResult[_T_TS_Bindable]: + result = TypedTimeSeriesAggregationResult() + result.count = self.count + result.results = [x.as_typed_entry(ts_bindable_object_type) for x in self.results] + return result + + @classmethod + def from_json(cls, json_dict: Dict[str, Any]) -> TimeSeriesQueryResult: + json_dict = json_dict["__timeSeriesQueryFunction"] + return cls( + json_dict["Count"], [TimeSeriesRangeAggregation.from_json(result) for result in json_dict["Results"]] + ) diff --git a/ravendb/documents/session/document_session.py b/ravendb/documents/session/document_session.py index 60c0378c..7e52aac6 100644 --- a/ravendb/documents/session/document_session.py +++ b/ravendb/documents/session/document_session.py @@ -171,7 +171,7 @@ def execute_all_pending_lazy_operations(self) -> ResponseTimeInformation: requests = [] for i in range(len(self._pending_lazy_operations)): # todo: pending lazy operation create request - WIP - req = self._pending_lazy_operations[i].create_arequest() + req = self._pending_lazy_operations[i].create_request() if req is None: self._pending_lazy_operations.pop(i) i -= 1 @@ -1757,6 +1757,13 @@ def _throw_document_already_deleted_in_session(document_id: str, time_series: st ) def append_single(self, timestamp: datetime, value: float, tag: Optional[str] = None) -> None: + if isinstance(value, int): + value = float(value) + if not isinstance(value, float): + raise TypeError( + f"Value passed ('{value}') is not a '{float.__name__}'. " f"It is '{value.__class__.__name__}'." + ) + self.append(timestamp, [value], tag) def append(self, timestamp: datetime, values: List[float], tag: Optional[str] = None) -> None: @@ -2267,7 +2274,7 @@ def append(self, timestamp: datetime, entry: _T_TS_Values_Bindable, tag: Optiona super().append(timestamp, values, tag) def append_entry(self, entry: TypedTimeSeriesEntry[_T_TS_Values_Bindable]) -> None: - self.append_single(entry.timestamp, entry.value, entry.tag) + self.append(entry.timestamp, entry.value, entry.tag) class SessionDocumentRollupTypedTimeSeries(SessionTimeSeriesBase, Generic[_T_TS_Values_Bindable]): @@ -2307,7 +2314,7 @@ def get( to_date: Optional[datetime] = None, start: int = 0, page_size: int = int_max, - ): + ) -> List[TypedTimeSeriesRollupEntry[_T_TS_Values_Bindable]]: if self._not_in_cache(from_date, to_date): results = self.get_time_series_and_includes(from_date, to_date, None, start, page_size) return [TypedTimeSeriesRollupEntry.from_entry(self._object_type, x) for x in results] @@ -2317,4 +2324,8 @@ def get( def append_entry(self, entry: TypedTimeSeriesRollupEntry) -> None: values = entry.get_values_from_members() - self.append(entry.timestamp, values, entry.tag) + super().append(entry.timestamp, values, entry.tag) + + def append(self, entry: TypedTimeSeriesRollupEntry[_T_TS_Values_Bindable]) -> None: # todo: investigate warning + values = entry.get_values_from_members() + super().append(entry.timestamp, values, entry.tag) diff --git a/ravendb/documents/session/operations/query.py b/ravendb/documents/session/operations/query.py index 15f70498..1afca95b 100644 --- a/ravendb/documents/session/operations/query.py +++ b/ravendb/documents/session/operations/query.py @@ -1,5 +1,6 @@ import datetime import enum +import inspect import logging from typing import Union, Optional, TypeVar, List, Type, Callable, Dict, TYPE_CHECKING @@ -199,7 +200,12 @@ def deserialize( BeforeConversionToEntityEventArgs(session, key, object_type, document) ) - result = Utils.initialize_object(document, object_type) + # By custom defined 'from_json' serializer class method + # todo: make separate interface to do from_json + if "from_json" in object_type.__dict__ and inspect.ismethod(object_type.from_json): + result = object_type.from_json(document) + else: + result = Utils.initialize_object(document, object_type) session.after_conversion_to_entity_invoke(AfterConversionToEntityEventArgs(session, key, document, result)) return result diff --git a/ravendb/documents/session/query.py b/ravendb/documents/session/query.py index e0aadf7d..c58b2151 100644 --- a/ravendb/documents/session/query.py +++ b/ravendb/documents/session/query.py @@ -19,6 +19,7 @@ TYPE_CHECKING, ) +from ravendb.documents.queries.time_series import TimeSeriesQueryBuilder from ravendb.documents.session.time_series import TimeSeriesRange from ravendb.primitives import constants from ravendb.documents.conventions import DocumentConventions @@ -93,6 +94,7 @@ _T = TypeVar("_T") _TResult = TypeVar("_TResult") _TProjection = TypeVar("_TProjection") +_T_TS_Bindable = TypeVar("_T_TS_Bindable") if TYPE_CHECKING: from ravendb.documents.store.definition import Lazy from ravendb.documents.session.document_session import DocumentSession @@ -1298,8 +1300,13 @@ def _get_source_alias_if_exists(object_type: type, query_data: QueryData, fields return possible_alias - # todo: def _create_time_series_query_data(time_series_query: Callable[[TimeSeriesQueryBuilder], None]) - # -> QueryData: + def _create_time_series_query_data(self, time_series_query: Callable[[TimeSeriesQueryBuilder], None]) -> QueryData: + builder = TimeSeriesQueryBuilder() + time_series_query(builder) + + fields = [constants.TimeSeries.SELECT_FIELD_NAME + "(" + builder.query_text + ")"] + projections = [constants.TimeSeries.QUERY_FUNCTION] + return QueryData(fields, projections) def _add_before_query_executed_listener(self, action: Callable[[IndexQuery], None]) -> None: self._before_query_executed_callback.append(action) @@ -1585,6 +1592,12 @@ def get_query_result(self): return self._query_operation.current_query_results.create_snapshot() + def single(self) -> _T: + result = list(self.__execute_query_operation(2)) + if len(result) != 1: + raise ValueError(f"Expected single result, got: {len(result)} ") + return result[0] + def _aggregate_by(self, facet: FacetBase) -> None: for token in self._select_tokens: if isinstance(token, FacetToken): @@ -1735,7 +1748,11 @@ def __init__( is_project_into, ) - # todo: selectTimeSeries + def select_time_series( + self, projection_class: Type[_T_TS_Bindable], time_series_query: Callable[[TimeSeriesQueryBuilder], None] + ) -> DocumentQuery[_T_TS_Bindable]: + query_data = self._create_time_series_query_data(time_series_query) + return self.select_fields_query_data(projection_class, query_data) def distinct(self) -> DocumentQuery[_T]: self._distinct() @@ -1872,7 +1889,7 @@ def not_(self) -> DocumentQuery[_T]: self.negate_next() return self - def take(self, count: int) -> DocumentQuery[_T]: + def take(self: DocumentQuery[_T], count: int) -> DocumentQuery[_T]: self._take(count) return self @@ -1888,12 +1905,6 @@ def count(self) -> int: query_result = self.get_query_result() return query_result.total_results - def single(self) -> _T: - result = list(self.take(2)) - if len(result) != 1: - raise ValueError(f"Expected signle result, got: {len(result)} ") - return result[0] - def where_lucene(self, field_name: str, where_clause: str, exact: bool = False) -> DocumentQuery[_T]: self._where_lucene(field_name, where_clause, exact) return self diff --git a/ravendb/documents/session/time_series.py b/ravendb/documents/session/time_series.py index 86320d69..bf7ad496 100644 --- a/ravendb/documents/session/time_series.py +++ b/ravendb/documents/session/time_series.py @@ -64,41 +64,48 @@ def __init__(self, object_type: Type[_T_Values], timestamp: datetime.datetime): def _create_instance(self) -> _T_Values: try: - raise NotImplementedError() # create object type instance + return Utils.try_get_new_instance(self._object_type) # create object type instance except Exception as e: raise RavenException(f"Unable to create instance of class: {self._object_type.__name__}", e) - def get_max(self) -> _T_Values: + @property + def max(self) -> _T_Values: if self._max is None: self._max = self._create_instance() return self._max - def get_min(self) -> _T_Values: + @property + def min(self) -> _T_Values: if self._min is None: self._min = self._create_instance() return self._min - def get_count(self) -> _T_Values: + @property + def count(self) -> _T_Values: if self._count is None: self._count = self._create_instance() return self._count - def get_first(self) -> _T_Values: + @property + def first(self) -> _T_Values: if self._first is None: self._first = self._create_instance() return self._first - def get_last(self) -> _T_Values: + @property + def last(self) -> _T_Values: if self._last is None: self._last = self._create_instance() return self._last - def get_sum(self) -> _T_Values: + @property + def sum(self) -> _T_Values: if self._sum is None: self._sum = self._create_instance() return self._sum - def get_average(self) -> _T_Values: + @property + def average(self) -> _T_Values: if self._average is not None: return self._average @@ -139,7 +146,9 @@ def _assign_rollup(self, target: List[float], source: _T_Values, offset: int) -> target[i * 6 + offset] = values[i] @classmethod - def from_entry(cls, ts_bindable_object_type: Type[_T_TSBindable], entry: TimeSeriesEntry): + def from_entry( + cls, ts_bindable_object_type: Type[_T_TSBindable], entry: TimeSeriesEntry + ) -> TypedTimeSeriesRollupEntry[_T_TSBindable]: result = TypedTimeSeriesRollupEntry(ts_bindable_object_type, entry.timestamp) result.rollup = True result.tag = entry.tag @@ -153,6 +162,8 @@ def from_entry(cls, ts_bindable_object_type: Type[_T_TSBindable], entry: TimeSer result._sum = TimeSeriesValuesHelper.set_fields(ts_bindable_object_type, cls.extract_values(values, 4)) result._count = TimeSeriesValuesHelper.set_fields(ts_bindable_object_type, cls.extract_values(values, 5)) + return result + @staticmethod def extract_values(input: List[float], offset: int) -> List[float]: length = math.ceil((len(input) - offset) / 6.0) @@ -172,13 +183,13 @@ def __init__( timestamp: datetime.datetime = None, tag: str = None, values: List[int] = None, - rollup: bool = None, + is_rollup: bool = None, value: _T_TSBindable = None, ): self.timestamp = timestamp self.tag = tag self.values = values - self.rollup = rollup + self.is_rollup = is_rollup self.value = value @classmethod diff --git a/ravendb/documents/store/definition.py b/ravendb/documents/store/definition.py index 1ee5abd5..d434d80e 100644 --- a/ravendb/documents/store/definition.py +++ b/ravendb/documents/store/definition.py @@ -32,6 +32,7 @@ ) from ravendb.documents.session.misc import SessionOptions from ravendb.documents.subscriptions.document_subscriptions import DocumentSubscriptions +from ravendb.documents.time_series import TimeSeriesOperations from ravendb.http.request_executor import RequestExecutor from ravendb.documents.identity.hilo import MultiDatabaseHiLoGenerator from ravendb.http.topology import Topology @@ -321,6 +322,7 @@ def __init__(self, urls: Union[str, List[str]] = None, database: Optional[str] = self.__database_changes = {} self.__after_close: List[Callable[[], None]] = [] self.__before_close: List[Callable[[], None]] = [] + self.__time_series_operation: Optional[TimeSeriesOperations] = None def __enter__(self): return self @@ -550,3 +552,10 @@ def operations(self) -> OperationExecutor: self.__operation_executor = OperationExecutor(self) return self.__operation_executor + + @property + def time_series(self) -> TimeSeriesOperations: + if self.__time_series_operation is None: + self.__time_series_operation = TimeSeriesOperations(self) + + return self.__time_series_operation diff --git a/ravendb/http/request_executor.py b/ravendb/http/request_executor.py index ddce33a6..6cb15389 100644 --- a/ravendb/http/request_executor.py +++ b/ravendb/http/request_executor.py @@ -890,14 +890,14 @@ def __supply_async( return preferred_task.result().response - def __create_request(self, node: ServerNode, command: RavenCommand) -> requests.Request: + def __create_request(self, node: ServerNode, command: RavenCommand) -> Optional[requests.Request]: request = command.create_request(node) # todo: optimize that if - look for the way to make less ifs each time if request.data and not isinstance(request.data, str) and not inspect.isgenerator(request.data): request.data = json.dumps(request.data, default=self.conventions.json_default_method) # todo: 1117 - 1133 - return request if request else None + return request or None def should_broadcast(self, command: RavenCommand) -> bool: if not isinstance(command, Broadcast): diff --git a/ravendb/primitives/time_series.py b/ravendb/primitives/time_series.py index bf864b84..0460d31e 100644 --- a/ravendb/primitives/time_series.py +++ b/ravendb/primitives/time_series.py @@ -1,6 +1,6 @@ from __future__ import annotations from enum import Enum -from typing import List, Tuple +from typing import List, Tuple, Dict, Any from ravendb.primitives.constants import int_max, int_min @@ -22,6 +22,9 @@ def __init__(self, value: int, unit: TimeValueUnit): self.value = value self.unit = unit + def to_json(self) -> Dict[str, Any]: + return {"Value": self.value, "Unit": self.unit} + def __str__(self): if self.value == int_max: return "MaxValue" diff --git a/ravendb/tests/driver/raven_test_driver.py b/ravendb/tests/driver/raven_test_driver.py index d0a32df8..3704572d 100644 --- a/ravendb/tests/driver/raven_test_driver.py +++ b/ravendb/tests/driver/raven_test_driver.py @@ -51,7 +51,7 @@ def _run_server_internal( self._report_error(e) raise RuntimeError("Unable to start server") - + print(url) store = DocumentStore([url], "test.manager") store.conventions.disable_topology_updates = True diff --git a/ravendb/misc.py b/ravendb/tests/jvm_migrated_tests/client_tests/indexing_tests/time_series_tests/__init__.py similarity index 100% rename from ravendb/misc.py rename to ravendb/tests/jvm_migrated_tests/client_tests/indexing_tests/time_series_tests/__init__.py diff --git a/ravendb/tests/jvm_migrated_tests/client_tests/indexing_tests/time_series_tests/test_basic_time_series_indexes_java_script.py b/ravendb/tests/jvm_migrated_tests/client_tests/indexing_tests/time_series_tests/test_basic_time_series_indexes_java_script.py new file mode 100644 index 00000000..0fa89e0c --- /dev/null +++ b/ravendb/tests/jvm_migrated_tests/client_tests/indexing_tests/time_series_tests/test_basic_time_series_indexes_java_script.py @@ -0,0 +1,311 @@ +from datetime import datetime, timedelta + +from ravendb import GetTermsOperation +from ravendb.documents.indexes.index_creation import AbstractJavaScriptIndexCreationTask +from ravendb.documents.indexes.time_series import AbstractJavaScriptTimeSeriesIndexCreationTask +from ravendb.infrastructure.entities import User +from ravendb.infrastructure.orders import Company, Employee, Address +from ravendb.tests.test_base import TestBase +from ravendb.tools.raven_test_helper import RavenTestHelper + + +class Companies_ByTimeSeriesNames(AbstractJavaScriptIndexCreationTask): + def __init__(self): + super(Companies_ByTimeSeriesNames, self).__init__() + self.maps = ["map('Companies', function (company) {return ({names: timeSeriesNamesFor(company)})})"] + + +company_id = "companies/1" +base_line = datetime(2023, 8, 20, 21, 30) + + +class TestBasicTimeSeriesIndexesJavaScript(TestBase): + def setUp(self): + super(TestBasicTimeSeriesIndexesJavaScript, self).setUp() + + def test_time_series_names_for(self): + now = RavenTestHelper.utc_today() + index = Companies_ByTimeSeriesNames() + index.execute(self.store) + + with self.store.open_session() as session: + session.store(Company(), company_id) + session.save_changes() + + self.wait_for_indexing(self.store) + RavenTestHelper.assert_no_index_errors(self.store) + + terms = self.store.maintenance.send(GetTermsOperation(index.index_name, "names", None)) + self.assertEqual(0, len(terms)) + + terms = self.store.maintenance.send(GetTermsOperation(index.index_name, "names_IsArray", None)) + self.assertEqual(1, len(terms)) + self.assertIn("true", terms) + + with self.store.open_session() as session: + company = session.load(company_id, Company) + session.time_series_for_entity(company, "heartRate").append_single(now, 2.5, "tag1") + session.time_series_for_entity(company, "heartRate2").append_single(now, 3.5, "tag2") + session.save_changes() + + self.wait_for_indexing(self.store) + + RavenTestHelper.assert_no_index_errors(self.store) + + terms = self.store.maintenance.send(GetTermsOperation(index.index_name, "names", None)) + self.assertEqual(2, len(terms)) + self.assertIn("heartrate", terms) + self.assertIn("heartrate2", terms) + + terms = self.store.maintenance.send(GetTermsOperation(index.index_name, "names_IsArray", None)) + self.assertEqual(1, len(terms)) + self.assertIn("true", terms) + + def test_basic_map_index_with_load(self): + now1 = datetime.utcnow() + now2 = now1 + timedelta(seconds=1) + + with self.store.open_session() as session: + employee = Employee(first_name="John") + session.store(employee, "employees/1") + company = Company() + session.store(company, "companies/1") + + session.time_series_for_entity(company, "HeartRate").append_single(now1, 7.0, employee.Id) + + company2 = Company() + session.store(company2, "companies/11") + + session.time_series_for_entity(company2, "HeartRate").append_single(now1, 11.0, employee.Id) + + session.save_changes() + + time_series_index = MyTsIndex_Load() + index_name = time_series_index.index_name + index_definition = time_series_index.create_index_definition() + + time_series_index.execute(self.store) + + self.wait_for_indexing(self.store) + + terms = self.store.maintenance.send(GetTermsOperation(index_name, "employee", None)) + self.assertEqual(1, len(terms)) + self.assertIn("john", terms) + + with self.store.open_session() as session: + employee = session.load("employees/1", Employee) + employee.first_name = "Bob" + session.save_changes() + + self.wait_for_indexing(self.store) + + terms = self.store.maintenance.send(GetTermsOperation(index_name, "employee", None)) + self.assertEqual(1, len(terms)) + self.assertIn("bob", terms) + + with self.store.open_session() as session: + session.delete("employees/1") + session.save_changes() + + self.wait_for_indexing(self.store) + + terms = self.store.maintenance.send(GetTermsOperation(index_name, "employee", None)) + self.assertEqual(0, len(terms)) + + def test_basic_map_reduce_index_with_load(self): + today = RavenTestHelper.utc_today() + + with self.store.open_session() as session: + address = Address() + address.city = "NY" + + session.store(address, "addresses/1") + + user = User(address_id=address.Id) + session.store(user, "users/1") + + for i in range(10): + session.time_series_for_entity(user, "heartRate").append_single( + today + timedelta(hours=i), 180 + i, address.Id + ) + + session.save_changes() + + time_series_index = AverageHeartRateDaily_ByDateAndCity() + index_name = time_series_index.index_name + index_definition = time_series_index.create_index_definition() + + time_series_index.execute(self.store) + + self.wait_for_indexing(self.store) + + terms = self.store.maintenance.send(GetTermsOperation(index_name, "heart_beat", None)) + self.assertEqual(1, len(terms)) + self.assertIn("184.5", terms) + + terms = self.store.maintenance.send(GetTermsOperation(index_name, "date", None)) + self.assertEqual(1, len(terms)) + + terms = self.store.maintenance.send(GetTermsOperation(index_name, "city", None)) + self.assertEqual(1, len(terms)) + self.assertIn("ny", terms) + + terms = self.store.maintenance.send(GetTermsOperation(index_name, "count", None)) + self.assertEqual(1, len(terms)) + self.assertEqual("10", terms[0]) + + with self.store.open_session() as session: + address = session.load("addresses/1", Address) + address.city = "LA" + session.save_changes() + + self.wait_for_indexing(self.store) + + terms = self.store.maintenance.send(GetTermsOperation(index_name, "city", None)) + self.assertEqual(1, len(terms)) + self.assertIn("la", terms) + + def test_can_map_all_time_series_from_collection(self): + now1 = datetime.utcnow() + now2 = now1 + timedelta(seconds=1) + + with self.store.open_session() as session: + company = Company() + session.store(company, "companies/1") + session.time_series_for_entity(company, "heartRate").append_single(now1, 7.0, "tag1") + session.time_series_for_entity(company, "likes").append_single(now1, 3.0, "tag2") + + session.save_changes() + + MyTsIndex_AllTimeSeries().execute(self.store) + self.wait_for_indexing(self.store) + + terms = self.store.maintenance.send(GetTermsOperation("MyTsIndex/AllTimeSeries", "heart_beat", None)) + self.assertEqual(2, len(terms)) + self.assertIn("7", terms) + self.assertIn("3", terms) + + def test_basic_multi_map_index(self): + now = RavenTestHelper.utc_today() + + time_series_index = MyMultiMapTsIndex() + time_series_index.execute(self.store) + with self.store.open_session() as session: + company = Company() + session.store(company) + + session.time_series_for_entity(company, "heartRate").append_single(now, 2.5, "tag1") + session.time_series_for_entity(company, "heartRate2").append_single(now, 3.5, "tag2") + + user = User() + session.store(user) + session.time_series_for_entity(user, "heartRate").append_single(now, 4.5, "tag3") + + session.save_changes() + + self.wait_for_indexing(self.store) + + with self.store.open_session() as session: + results = list(session.query_index_type(MyMultiMapTsIndex, MyMultiMapTsIndex.Result)) + self.assertEqual(3, len(results)) + + +class MyTsIndex_Load(AbstractJavaScriptTimeSeriesIndexCreationTask): + def __init__(self): + super().__init__() + self.maps = [ + "timeSeries.map('Companies', 'HeartRate', function (ts) {\n" + + "return ts.Entries.map(entry => ({\n" + + " heart_beat: entry.Value,\n" + + " date: new Date(entry.Timestamp.getFullYear(), entry.Timestamp.getMonth(), entry.Timestamp.getDate()),\n" + + " user: ts.DocumentId,\n" + + " employee: load(entry.Tag, 'Employees').first_name\n" + + " }));\n" + + "})" + ] + + +class MyTsIndex_AllTimeSeries(AbstractJavaScriptTimeSeriesIndexCreationTask): + def __init__(self): + super().__init__() + self.maps = [ + "timeSeries.map('Companies', function (ts) {\n" + + " return ts.Entries.map(entry => ({\n" + + " heart_beat: entry.Values[0],\n" + + " date: new Date(entry.Timestamp.getFullYear(), entry.Timestamp.getMonth(), entry.Timestamp.getDate()),\n" + + " user: ts.documentId\n" + + " }));\n" + + " })" + ] + + +class AverageHeartRateDaily_ByDateAndCity(AbstractJavaScriptTimeSeriesIndexCreationTask): + class Result: + def __init__(self, heart_beat: float = None, date: datetime = None, city: str = None, count: int = None): + self.heart_beat = heart_beat + self.date = date + self.city = city + self.count = count + + def __init__(self): + super(AverageHeartRateDaily_ByDateAndCity, self).__init__() + self.maps = [ + "timeSeries.map('Users', 'heartRate', function (ts) {\n" + + "return ts.Entries.map(entry => ({\n" + + " heart_beat: entry.Value,\n" + + " date: new Date(entry.Timestamp.getFullYear(), entry.Timestamp.getMonth(), entry.Timestamp.getDate()),\n" + + " city: load(entry.Tag, 'Addresses').city,\n" + + " count: 1\n" + + " }));\n" + + "})" + ] + + self.reduce = ( + "groupBy(r => ({ date: r.date, city: r.city }))\n" + " .aggregate(g => ({\n" + " heart_beat: g.values.reduce((total, val) => val.heart_beat + total, 0) / g.values.reduce((total, val) => val.count + total, 0),\n" + " date: g.key.date,\n" + " city: g.key.city\n" + " count: g.values.reduce((total, val) => val.count + total, 0)\n" + " }))" + ) + + +class MyMultiMapTsIndex(AbstractJavaScriptTimeSeriesIndexCreationTask): + class Result: + def __init__(self, heart_beat: float = None, date: datetime = None, user: str = None): + self.heart_beat = heart_beat + self.date = date + self.user = user + + def __init__(self): + super(MyMultiMapTsIndex, self).__init__() + self.maps = set() + self.maps.add( + "timeSeries.map('Companies', 'HeartRate', function (ts) {\n" + + "return ts.Entries.map(entry => ({\n" + + " heart_beat: entry.Values[0],\n" + + " date: new Date(entry.Timestamp.getFullYear(), entry.Timestamp.getMonth(), entry.Timestamp.getDate()),\n" + + " user: ts.DocumentId\n" + + " }));\n" + + "})" + ) + + self.maps.add( + "timeSeries.map('Companies', 'HeartRate2', function (ts) {\n" + + "return ts.Entries.map(entry => ({\n" + + " heart_beat: entry.Values[0],\n" + + " date: new Date(entry.Timestamp.getFullYear(), entry.Timestamp.getMonth(), entry.Timestamp.getDate()),\n" + + " user: ts.DocumentId\n" + + " }));\n" + + "})" + ) + self.maps.add( + "timeSeries.map('Users', 'HeartRate', function (ts) {\n" + + "return ts.Entries.map(entry => ({\n" + + " heart_beat: entry.Values[0],\n" + + " date: new Date(entry.Timestamp.getFullYear(), entry.Timestamp.getMonth(), entry.Timestamp.getDate()),\n" + + " user: ts.DocumentId\n" + + " }));\n" + + "})" + ) diff --git a/ravendb/tests/jvm_migrated_tests/client_tests/indexing_tests/time_series_tests/test_basic_time_series_indexes_mixed_syntax.py b/ravendb/tests/jvm_migrated_tests/client_tests/indexing_tests/time_series_tests/test_basic_time_series_indexes_mixed_syntax.py new file mode 100644 index 00000000..a37cdb11 --- /dev/null +++ b/ravendb/tests/jvm_migrated_tests/client_tests/indexing_tests/time_series_tests/test_basic_time_series_indexes_mixed_syntax.py @@ -0,0 +1,39 @@ +from datetime import datetime + +from ravendb import PutIndexesOperation, GetTermsOperation +from ravendb.documents.indexes.time_series import TimeSeriesIndexDefinition +from ravendb.infrastructure.orders import Company +from ravendb.tests.test_base import TestBase + + +class TestBasicTimeSeriesIndexes_MixedSyntax(TestBase): + def setUp(self): + super(TestBasicTimeSeriesIndexes_MixedSyntax, self).setUp() + + def test_basic_map_index(self): + now1 = datetime.utcnow() + + with self.store.open_session() as session: + company = Company() + session.store(company, "companies/1") + session.time_series_for_entity(company, "HeartRate").append_single(now1, 7, "tag") + session.save_changes() + + time_series_index_definition = TimeSeriesIndexDefinition() + time_series_index_definition.name = "MyTsIndex" + time_series_index_definition.maps = [ + "from ts in timeSeries.Companies.HeartRate.Where(x => true) " + + "from entry in ts.Entries " + + "select new { " + + " heartBeat = entry.Values[0], " + + " date = entry.Timestamp.Date, " + + " user = ts.DocumentId " + + "}" + ] + + self.store.maintenance.send(PutIndexesOperation(time_series_index_definition)) + self.wait_for_indexing(self.store) + + terms = self.store.maintenance.send(GetTermsOperation("MyTsIndex", "heartBeat", None)) + self.assertEqual(1, len(terms)) + self.assertIn("7", terms) diff --git a/ravendb/tests/jvm_migrated_tests/client_tests/indexing_tests/time_series_tests/test_basic_time_series_indexes_strong_syntax.py b/ravendb/tests/jvm_migrated_tests/client_tests/indexing_tests/time_series_tests/test_basic_time_series_indexes_strong_syntax.py new file mode 100644 index 00000000..53cbb21d --- /dev/null +++ b/ravendb/tests/jvm_migrated_tests/client_tests/indexing_tests/time_series_tests/test_basic_time_series_indexes_strong_syntax.py @@ -0,0 +1,139 @@ +from __future__ import annotations +import datetime +from typing import Optional, Any, Dict + +from ravendb.documents.indexes.time_series import ( + AbstractTimeSeriesIndexCreationTask, + AbstractMultiMapTimeSeriesIndexCreationTask, +) +from ravendb.infrastructure.entities import User +from ravendb.infrastructure.orders import Company +from ravendb.tests.test_base import TestBase +from ravendb.tools.raven_test_helper import RavenTestHelper +from ravendb.tools.utils import Utils + + +class TestBasicTimeSeriesIndexes_StrongSyntax(TestBase): + def setUp(self): + super().setUp() + + def test_basic_map_index(self): + now1 = RavenTestHelper.utc_today() + + with self.store.open_session() as session: + company = Company() + session.store(company, "companies/1") + session.time_series_for_entity(company, "HeartRate").append_single(now1, 7, "tag") + session.save_changes() + + time_series_index = MyTsIndex() + index_definition = time_series_index.create_index_definition() + + self.assertIsNotNone(index_definition) + time_series_index.execute(self.store) + + self.wait_for_indexing(self.store) + + with self.store.open_session() as session: + results = list(session.query_index_type(MyTsIndex, MyMultiMapTsIndex.Result)) + + self.assertEqual(1, len(results)) + + result = results[0] + + self.assertEqual(now1, result.date) + self.assertIsNotNone(result.user) + self.assertGreater(result.heart_beat, 0) + + def test_basic_multi_map_index(self): + now = RavenTestHelper.utc_today() + time_series_index = MyMultiMapTsIndex() + time_series_index.execute(self.store) + + with self.store.open_session() as session: + company = Company() + session.store(company) + + session.time_series_for_entity(company, "HeartRate").append_single(now, 2.5, "tag1") + session.time_series_for_entity(company, "HeartRate2").append_single(now, 3.5, "tag2") + + user = User() + session.store(user) + session.time_series_for_entity(user, "HeartRate").append_single(now, 4.5, "tag3") + + session.save_changes() + + self.wait_for_indexing(self.store) + + with self.store.open_session() as session: + results = list(session.query_index_type(MyMultiMapTsIndex, MyMultiMapTsIndex.Result)) + self.assertEqual(3, len(results)) + + result = results[0] + + self.assertEqual(now, result.date) + self.assertIsNotNone(result.user) + self.assertGreater(result.heart_beat, 0) + + +class MyTsIndex(AbstractTimeSeriesIndexCreationTask): + def __init__(self): + super().__init__() + self.map = ( + "from ts in timeSeries.Companies.HeartRate " + "from entry in ts.Entries " + "select new { " + " heartBeat = entry.Values[0], " + " date = entry.Timestamp.Date, " + " user = ts.DocumentId " + "}" + ) + + +class MyMultiMapTsIndex(AbstractMultiMapTimeSeriesIndexCreationTask): + class Result: + def __init__( + self, + heart_beat: Optional[float] = None, + date: Optional[datetime.datetime] = None, + user: Optional[str] = None, + ): + self.heart_beat = heart_beat + self.date = date + self.user = user + + @classmethod + def from_json(cls, json_dict: Dict[str, Any]) -> MyMultiMapTsIndex.Result: + return cls(json_dict["heartBeat"], Utils.string_to_datetime(json_dict["date"]), json_dict["user"]) + + def __init__(self): + super(MyMultiMapTsIndex, self).__init__() + self._add_map( + "from ts in timeSeries.Companies.HeartRate " + + "from entry in ts.Entries " + + "select new { " + + " heartBeat = entry.Values[0], " + + " date = entry.Timestamp.Date, " + + " user = ts.DocumentId " + + "}" + ) + + self._add_map( + "from ts in timeSeries.Companies.HeartRate2 " + + "from entry in ts.Entries " + + "select new { " + + " heartBeat = entry.Values[0], " + + " date = entry.Timestamp.Date, " + + " user = ts.DocumentId " + + "}" + ) + + self._add_map( + "from ts in timeSeries.Users.HeartRate " + + "from entry in ts.Entries " + + "select new { " + + " heartBeat = entry.Values[0], " + + " date = entry.Timestamp.Date, " + + " user = ts.DocumentId " + + "}" + ) diff --git a/ravendb/tests/jvm_migrated_tests/client_tests/time_series_tests/test_time_series_document_query.py b/ravendb/tests/jvm_migrated_tests/client_tests/time_series_tests/test_time_series_document_query.py new file mode 100644 index 00000000..b9fc85b2 --- /dev/null +++ b/ravendb/tests/jvm_migrated_tests/client_tests/time_series_tests/test_time_series_document_query.py @@ -0,0 +1,111 @@ +from datetime import timedelta + +from ravendb.documents.queries.time_series import TimeSeriesAggregationResult, TimeSeriesRawResult +from ravendb.infrastructure.entities import User +from ravendb.tests.test_base import TestBase +from ravendb.tools.raven_test_helper import RavenTestHelper + + +class TestTimeSeriesDocumentQuery(TestBase): + def setUp(self): + super().setUp() + + def test_can_query_time_series_using_document_query(self): + base_line = RavenTestHelper.utc_today() + + with self.store.open_session() as session: + user = User(name="Oren", age=35) + session.store(user, "users/ayende") + + tsf = session.time_series_for("users/ayende", "Heartrate") + tsf.append_single(base_line + timedelta(minutes=61), 59, "watches/fitbit") + tsf.append_single(base_line + timedelta(minutes=62), 79, "watches/apple") + tsf.append_single(base_line + timedelta(minutes=63), 69, "watches/fitbit") + + tsf.append(base_line + timedelta(days=31, minutes=61), [159], "watches/apple") + tsf.append(base_line + timedelta(days=31, minutes=62), [179], "watches/apple") + tsf.append(base_line + timedelta(days=31, minutes=63), [169], "watches/fitbit") + + session.save_changes() + + with self.store.open_session() as session: + ts_query_text = ( + "from Heartrate between $start and $end\n" + "where Tag = 'watches/fitbit'\n" + "group by '1 month'\n" + "select min(), max(), avg()" + ) + + query = ( + session.advanced.document_query(object_type=User) + .where_greater_than("age", 21) + .select_time_series(TimeSeriesAggregationResult, lambda b: b.raw(ts_query_text)) + .add_parameter("start", base_line) + .add_parameter("end", base_line + timedelta(days=92)) + ) + + result = list(query) + + self.assertEqual(1, len(result)) + self.assertEqual(3, result[0].count) + + agg = result[0].results + self.assertEqual(2, len(agg)) + + self.assertEqual(69, agg[0].max[0]) + self.assertEqual(59, agg[0].min[0]) + self.assertEqual(64, agg[0].average[0]) + + self.assertEqual(169, agg[1].max[0]) + self.assertEqual(169, agg[1].min[0]) + self.assertEqual(169, agg[1].average[0]) + + def test_can_query_time_series_raw_values_using_document_query(self): + base_line = RavenTestHelper.utc_today() + + with self.store.open_session() as session: + user = User(name="Oren", age=35) + session.store(user, "users/ayende") + + tsf = session.time_series_for("users/ayende", "Heartrate") + tsf.append_single(base_line + timedelta(minutes=61), 59, "watches/fitbit") + tsf.append_single(base_line + timedelta(minutes=62), 79, "watches/apple") + tsf.append_single(base_line + timedelta(minutes=63), 69, "watches/fitbit") + + tsf.append(base_line + timedelta(days=31, minutes=61), [159], "watches/apple") + tsf.append(base_line + timedelta(days=31, minutes=62), [179], "watches/apple") + tsf.append(base_line + timedelta(days=31, minutes=63), [169], "watches/fitbit") + + session.save_changes() + + with self.store.open_session() as session: + ts_query_text = "from Heartrate between $start and $end\n" "where Tag = 'watches/fitbit'" + + query = ( + session.advanced.document_query(object_type=User) + .where_greater_than("age", 21) + .select_time_series(TimeSeriesRawResult, lambda b: b.raw(ts_query_text)) + .add_parameter("start", base_line) + .add_parameter("end", base_line + timedelta(days=92)) + ) + + result = list(query) + + self.assertEqual(1, len(result)) + self.assertEqual(3, result[0].count) + + values = result[0].results + + self.assertEqual(3, len(values)) + + self.assertEqual([59], values[0].values) + self.assertEqual("watches/fitbit", values[0].tag) + self.assertEqual(base_line + timedelta(minutes=61), values[0].timestamp) + + self.assertEqual([69], values[1].values) + self.assertEqual("watches/fitbit", values[1].tag) + self.assertEqual(base_line + timedelta(minutes=63), values[1].timestamp) + + self.assertEqual([169], values[2].values) + self.assertEqual("watches/fitbit", values[2].tag) + self.assertEqual(base_line + timedelta(minutes=63, days=31), values[2].timestamp) diff --git a/ravendb/tests/jvm_migrated_tests/client_tests/time_series_tests/test_time_series_typed_session.py b/ravendb/tests/jvm_migrated_tests/client_tests/time_series_tests/test_time_series_typed_session.py new file mode 100644 index 00000000..2cd56756 --- /dev/null +++ b/ravendb/tests/jvm_migrated_tests/client_tests/time_series_tests/test_time_series_typed_session.py @@ -0,0 +1,413 @@ +import time +import unittest +from datetime import datetime, timedelta +from typing import Dict, Tuple, Optional + +from ravendb.documents.operations.time_series import ( + TimeSeriesPolicy, + TimeSeriesCollectionConfiguration, + TimeSeriesConfiguration, + ConfigureTimeSeriesOperation, + RawTimeSeriesPolicy, +) +from ravendb.documents.queries.time_series import TimeSeriesRawResult +from ravendb.documents.session.time_series import ( + ITimeSeriesValuesBindable, + TypedTimeSeriesEntry, + TypedTimeSeriesRollupEntry, +) +from ravendb.infrastructure.entities import User +from ravendb.primitives.time_series import TimeValue +from ravendb.tests.test_base import TestBase +from ravendb.tools.raven_test_helper import RavenTestHelper + +document_id = "users/gracjan" +company_id = "companies/1-A" +order_id = "orders/1-A" +base_line = datetime(2023, 8, 20, 21, 30) +ts_name1 = "Heartrate" +ts_name2 = "Speedrate" +tag1 = "watches/fitbit" +tag2 = "watches/apple" +tag3 = "watches/sony" + + +class HeartRateMeasure(ITimeSeriesValuesBindable): + def __init__(self, value: float): + self.heart_rate = value + + def get_time_series_mapping(self) -> Dict[int, Tuple[str, Optional[str]]]: + return {0: ("heart_rate", None)} + + +class BigMeasure(ITimeSeriesValuesBindable): + def __init__(self, m1, m2, m3, m4, m5, m6): + self.measure1 = m1 + self.measure2 = m2 + self.measure3 = m3 + self.measure4 = m4 + self.measure5 = m5 + self.measure6 = m6 + + def get_time_series_mapping(self) -> Dict[int, Tuple[str, Optional[str]]]: + return { + 0: ("measure1", None), + 1: ("measure2", None), + 2: ("measure3", None), + 3: ("measure4", None), + 4: ("measure5", None), + 5: ("measure6", None), + } + + +class StockPrice(ITimeSeriesValuesBindable): + def __init__( + self, + open: Optional[float] = None, + close: Optional[float] = None, + high: Optional[float] = None, + low: Optional[float] = None, + volume: Optional[float] = None, + ): + self.open = open + self.close = close + self.high = high + self.low = low + self.volume = volume + + def get_time_series_mapping(self) -> Dict[int, Tuple[str, Optional[str]]]: + return { + 0: ("open", None), + 1: ("close", None), + 2: ("high", None), + 3: ("low", None), + 4: ("volume", None), + } + + +class TestTimeSeriesTypedSession(TestBase): + def setUp(self): + super(TestTimeSeriesTypedSession, self).setUp() + + def test_can_request_non_existing_time_series_range(self): + with self.store.open_session() as session: + session.store(User(), document_id) + tsf = session.time_series_for(document_id, ts_name1) + tsf.append_single(base_line, 58, tag1) + tsf.append_single(base_line + timedelta(minutes=10), 60, tag1) + session.save_changes() + + with self.store.open_session() as session: + vals = session.typed_time_series_for(HeartRateMeasure, document_id).get( + base_line - timedelta(minutes=10), base_line - timedelta(minutes=5) + ) + self.assertIsNone(vals) + + vals = session.typed_time_series_for(HeartRateMeasure, document_id).get( + base_line + timedelta(minutes=5), base_line + timedelta(minutes=9) + ) + self.assertIsNone(vals) + + def test_can_create_simple_time_series(self): + with self.store.open_session() as session: + session.store(User(), document_id) + heart_rate_measure = HeartRateMeasure(59) + ts = session.typed_time_series_for(HeartRateMeasure, document_id) + ts.append(base_line, heart_rate_measure, tag1) + session.save_changes() + + with self.store.open_session() as session: + val = session.typed_time_series_for(HeartRateMeasure, document_id).get()[0] + self.assertEqual(59, val.value.heart_rate) + self.assertEqual(tag1, val.tag) + self.assertEqual(base_line, val.timestamp) + + def test_can_create_simple_time_series_async(self): + with self.store.open_session() as session: + session.store(User(), document_id) + heart_rate_measure = HeartRateMeasure(59) + + measure = TypedTimeSeriesEntry(base_line + timedelta(minutes=1), tag1) + measure.value = heart_rate_measure + + ts = session.typed_time_series_for(HeartRateMeasure, document_id) + ts.append_entry(measure) + + session.save_changes() + + with self.store.open_session() as session: + val = session.typed_time_series_for(HeartRateMeasure, document_id).get()[0] + self.assertEqual(59, val.value.heart_rate) + self.assertEqual(tag1, val.tag) + self.assertEqual(base_line + timedelta(minutes=1), val.timestamp) + + def test_using_different_number_of_values_large_to_small(self): + with self.store.open_session() as session: + session.store(User(name="Gracjan"), document_id) + big = session.typed_time_series_for(BigMeasure, document_id) + for i in range(5): + big.append(base_line + timedelta(seconds=i * 3), BigMeasure(i, i, i, i, i, i), tag1) + + session.save_changes() + + with self.store.open_session() as session: + big = session.time_series_for(document_id, "BigMeasures") + + for i in range(5, 10): + big.append_single(base_line + timedelta(hours=12, seconds=i * 3), i, tag1) + + session.save_changes() + + with self.store.open_session() as session: + big = session.typed_time_series_for(BigMeasure, document_id).get() + for i in range(5): + m = big[i].value + self.assertEqual(i, m.measure1) + self.assertEqual(i, m.measure2) + self.assertEqual(i, m.measure3) + self.assertEqual(i, m.measure4) + self.assertEqual(i, m.measure5) + self.assertEqual(i, m.measure6) + + for i in range(5, 10): + m = big[i].value + self.assertEqual(i, m.measure1) + self.assertIsNone(m.measure2) + self.assertIsNone(m.measure3) + self.assertIsNone(m.measure4) + self.assertIsNone(m.measure5) + self.assertIsNone(m.measure6) + + def test_can_get_time_series_names(self): + with self.store.open_session() as session: + session.store(User(), document_id) + heart_rate_measure = HeartRateMeasure(66) + session.typed_time_series_for(HeartRateMeasure, document_id).append( + base_line, heart_rate_measure, "MyHeart" + ) + + stock_price = StockPrice(66, 55, 113.4, 52.4, 15472) + session.typed_time_series_for(StockPrice, document_id).append(base_line, stock_price) + + session.save_changes() + + with self.store.open_session() as session: + user = session.load(document_id, User) + ts_names = session.advanced.get_time_series_for(user) + self.assertEqual(2, len(ts_names)) + + # should be sorted + self.assertEqual("HeartRateMeasures", ts_names[0]) + self.assertEqual("StockPrices", ts_names[1]) + + heart_rate_measures = session.typed_time_series_for_entity(HeartRateMeasure, user).get()[0] + self.assertEqual(66, heart_rate_measures.value.heart_rate) + + stock_price_entry = session.typed_time_series_for_entity(StockPrice, user).get()[0] + self.assertEqual(66, stock_price_entry.value.open) + self.assertEqual(55, stock_price_entry.value.close) + self.assertEqual(113.4, stock_price_entry.value.high) + self.assertEqual(52.4, stock_price_entry.value.low) + self.assertEqual(15472, stock_price_entry.value.volume) + + def test_can_create_simple_time_series_2(self): + with self.store.open_session() as session: + session.store(User(), document_id) + tsf = session.time_series_for(document_id, "HeartRateMeasures") + tsf.append_single(base_line + timedelta(minutes=1), 59, tag1) + tsf.append_single(base_line + timedelta(minutes=2), 60, tag1) + tsf.append_single(base_line + timedelta(minutes=2), 61, tag1) + + session.save_changes() + + with self.store.open_session() as session: + val = session.typed_time_series_for(HeartRateMeasure, document_id).get() + self.assertEqual(2, len(val)) + + self.assertEqual(59, val[0].value.heart_rate) + self.assertEqual(61, val[1].value.heart_rate) + + @unittest.skip("Insufficient license permissions. Skipping on CI/CD.") + def test_can_execute_simple_rollup(self): + p1 = TimeSeriesPolicy("BySecond", TimeValue.of_seconds(1)) + p2 = TimeSeriesPolicy("By2Seconds", TimeValue.of_seconds(2)) + p3 = TimeSeriesPolicy("By4Seconds", TimeValue.of_seconds(4)) + + collection_config = TimeSeriesCollectionConfiguration() + collection_config.policies = [p1, p2, p3] + + config = TimeSeriesConfiguration() + config.collections = {"Users": collection_config} + + config.policy_check_frequency = timedelta(seconds=1) + + self.store.maintenance.send(ConfigureTimeSeriesOperation(config)) + + with self.store.open_session() as session: + session.store(User(name="Gracjan"), document_id) + for i in range(100): + session.time_series_for(document_id, ts_name1).append_single( + base_line + timedelta(milliseconds=400 * i), 29.0 * i, tag1 + ) + + session.save_changes() + + # wait for rollups to run + time.sleep(1.2) + + with self.store.open_session() as session: + ts = session.time_series_for(document_id, ts_name1).get() + ts_millis = ts[-1].timestamp - ts[0].timestamp + + ts1 = session.time_series_for(document_id, p1.get_time_series_name(ts_name1)).get() + ts1_millis = ts1[-1].timestamp - ts1[0].timestamp + + self.assertEqual(ts_millis - timedelta(milliseconds=600), ts1_millis) + + ts2 = session.time_series_for(document_id, p2.get_time_series_name(ts_name1)).get() + self.assertEqual(len(ts1) // 2, len(ts2)) + + ts3 = session.time_series_for(document_id, p3.get_time_series_name(ts_name1)).get() + self.assertEqual(len(ts1) // 4, len(ts3)) + + @unittest.skip("Insufficient license permissions. Skipping on CI/CD.") + def test_can_work_with_rollup_time_series_2(self): + raw_hours = 24 + raw = RawTimeSeriesPolicy(TimeValue.of_hours(raw_hours)) + + p1 = TimeSeriesPolicy("By6Hours", TimeValue.of_hours(6), TimeValue.of_hours(raw_hours * 4)) + p2 = TimeSeriesPolicy("By1Day", TimeValue.of_days(1), TimeValue.of_hours(raw_hours * 5)) + p3 = TimeSeriesPolicy("By30Minutes", TimeValue.of_minutes(30), TimeValue.of_hours(raw_hours * 2)) + p4 = TimeSeriesPolicy("By1Hour", TimeValue.of_hours(1), TimeValue.of_hours(raw_hours * 3)) + + time_series_collection_configuration = TimeSeriesCollectionConfiguration() + time_series_collection_configuration.raw_policy = raw + time_series_collection_configuration.policies = [p1, p2, p3, p4] + + config = TimeSeriesConfiguration() + config.collections = {"users": time_series_collection_configuration} + config.policy_check_frequency = timedelta(milliseconds=100) + + self.store.maintenance.send(ConfigureTimeSeriesOperation(config)) + self.store.time_series.register_type(User, StockPrice) + + total = TimeValue.of_days(12).value // 60 + base_line = RavenTestHelper.utc_today() - timedelta(days=12) + + with self.store.open_session() as session: + session.store(User(name="Karmel"), "users/karmel") + + ts = session.typed_time_series_for(StockPrice, "users/karmel") + for i in range(total + 1): + open = i + close = i + 100_000 + high = i + 200_000 + low = i + 300_000 + volume = i + 400_000 + ts.append( + base_line + timedelta(minutes=i), StockPrice(open, close, high, low, volume), "watches/fitbit" + ) + + session.save_changes() + + time.sleep(1.5) # wait for rollups + + with self.store.open_session() as session: + ts1 = session.time_series_rollup_for(StockPrice, "users/karmel", p1.name) + r = ts1.get()[0] + self.assertIsNotNone(r.first) + self.assertIsNotNone(r.last) + self.assertIsNotNone(r.min) + self.assertIsNotNone(r.max) + self.assertIsNotNone(r.count) + self.assertIsNotNone(r.average) + + @unittest.skip("Insufficient license permissions. Skipping on CI/CD.") + def test_can_work_with_rollup_time_series(self): + raw_hours = 24 + raw = RawTimeSeriesPolicy(TimeValue.of_hours(raw_hours)) + + p1 = TimeSeriesPolicy("By6Hours", TimeValue.of_hours(6), TimeValue.of_hours(raw_hours * 4)) + p2 = TimeSeriesPolicy("By1Day", TimeValue.of_days(1), TimeValue.of_hours(raw_hours * 5)) + p3 = TimeSeriesPolicy("By30Minutes", TimeValue.of_minutes(30), TimeValue.of_hours(raw_hours * 2)) + p4 = TimeSeriesPolicy("By1Hour", TimeValue.of_hours(1), TimeValue.of_hours(raw_hours * 3)) + + users_config = TimeSeriesCollectionConfiguration() + users_config.raw_policy = raw + users_config.policies = [p1, p2, p3, p4] + + config = TimeSeriesConfiguration() + config.collections = {"Users": users_config} + config.policy_check_frequency = timedelta(milliseconds=100) + + self.store.maintenance.send(ConfigureTimeSeriesOperation(config)) + self.store.time_series.register_type(User, StockPrice) + + # please notice we don't modify server time here! + + now = datetime.utcnow() + base_line = RavenTestHelper.utc_today() - timedelta(days=12) + + total = TimeValue.of_days(12).value // 60 + + with self.store.open_session() as session: + session.store(User(name="Karmel"), "users/karmel") + + ts = session.typed_time_series_for(StockPrice, "users/karmel") + for i in range(total + 1): + open = i + close = i + 100_000 + high = i + 200_000 + low = i + 300_000 + volume = i + 400_000 + ts.append( + base_line + timedelta(minutes=i), StockPrice(open, close, high, low, volume), "watches/fitbit" + ) + + session.save_changes() + + time.sleep(1.5) # wait for rollups + + with self.store.open_session() as session: + query = ( + session.advanced.raw_query( + "declare timeseries out()\n" + "{\n" + " from StockPrices\n" + " between $start and $end\n" + "}\n" + "from Users as u\n" + "select out()", + TimeSeriesRawResult, + ) + .add_parameter("start", base_line - timedelta(days=1)) + .add_parameter("end", now + timedelta(days=1)) + ) + result_raw = query.single() + result = result_raw.as_typed_result(StockPrice) + + self.assertGreater(len(result.results), 0) + + for res in result.results: + if res.is_rollup: + self.assertGreater(len(res.values), 0) + self.assertGreater(len(res.value.low), 0) + self.assertGreater(len(res.value.high), 0) + else: + self.assertEqual(5, len(res.values)) + + now = datetime.utcnow() + + with self.store.open_session() as session: + ts = session.time_series_rollup_for(StockPrice, "users/karmel", p1.name) + a = TypedTimeSeriesRollupEntry(StockPrice, datetime.utcnow()) + a.max.close = 1 + ts.append(a) + session.save_changes() + + with self.store.open_session() as session: + ts = session.time_series_rollup_for(StockPrice, "users/karmel", p1.name) + + res = ts.get(now - timedelta(milliseconds=1), now + timedelta(days=1)) + self.assertEqual(1, len(res)) + self.assertEqual(1, res[0].max.close) diff --git a/ravendb/tests/jvm_migrated_tests/issues_tests/test_RDBC_501.py b/ravendb/tests/jvm_migrated_tests/issues_tests/test_RDBC_501.py new file mode 100644 index 00000000..0fc9a85f --- /dev/null +++ b/ravendb/tests/jvm_migrated_tests/issues_tests/test_RDBC_501.py @@ -0,0 +1,87 @@ +from datetime import timedelta +from typing import Dict, Tuple, Optional + +from ravendb.documents.queries.time_series import TimeSeriesAggregationResult +from ravendb.documents.session.time_series import ITimeSeriesValuesBindable +from ravendb.tests.test_base import TestBase +from ravendb.tools.raven_test_helper import RavenTestHelper + + +class MarkerSymbol: + def __init__(self, Id: str = None): + self.Id = Id + + +class SymbolPrice(ITimeSeriesValuesBindable): + def __init__(self, open: float = None, close: float = None, high: float = None, low: float = None): + self.open = open + self.close = close + self.high = high + self.low = low + + def get_time_series_mapping(self) -> Dict[int, Tuple[str, Optional[str]]]: + return { + 0: ("open", None), + 1: ("close", None), + 2: ("high", None), + 3: ("low", None), + } + + +class TestRDBC501(TestBase): + def setUp(self): + super(TestRDBC501, self).setUp() + + def test_should_properly_map_typed_entries(self): + base_line = RavenTestHelper.utc_today() + with self.store.open_session() as session: + symbol = MarkerSymbol() + session.store(symbol, "markerSymbols/1-A") + + price1 = SymbolPrice(4, 7, 10, 1) + price2 = SymbolPrice(24, 27, 210, 21) + price3 = SymbolPrice(34, 37, 310, 321) + + tsf = session.typed_time_series_for_entity(SymbolPrice, symbol, "history") + tsf.append(base_line + timedelta(hours=1), price1) + tsf.append(base_line + timedelta(hours=2), price2) + tsf.append(base_line + timedelta(days=2), price3) + + session.save_changes() + + with self.store.open_session() as session: + aggregated_history_query_result = ( + session.query(object_type=MarkerSymbol) + .select_time_series( + TimeSeriesAggregationResult, + lambda b: b.raw( + "from history\n" + + " group by '1 days'\n" + + " select first(), last(), min(), max()" + ), + ) + .first() + ) + + self.assertEqual(2, len(aggregated_history_query_result.results)) + + typed = aggregated_history_query_result.as_typed_result(SymbolPrice) + + self.assertEqual(2, len(typed.results)) + + first_result = typed.results[0] + self.assertEqual(4, first_result.min.open) + self.assertEqual(7, first_result.min.close) + self.assertEqual(1, first_result.min.low) + self.assertEqual(10, first_result.min.high) + + self.assertEqual(4, first_result.first.open) + self.assertEqual(7, first_result.first.close) + self.assertEqual(1, first_result.first.low) + self.assertEqual(10, first_result.first.high) + + second_result = typed.results[1] + self.assertEqual(34, second_result.min.open) + self.assertEqual(37, second_result.min.close) + self.assertEqual(321, second_result.min.low) + self.assertEqual(310, second_result.min.high) diff --git a/ravendb/tests/jvm_migrated_tests/issues_tests/test_ravenDB_16060.py b/ravendb/tests/jvm_migrated_tests/issues_tests/test_ravenDB_16060.py index e3de619a..397de5c2 100644 --- a/ravendb/tests/jvm_migrated_tests/issues_tests/test_ravenDB_16060.py +++ b/ravendb/tests/jvm_migrated_tests/issues_tests/test_ravenDB_16060.py @@ -1,11 +1,23 @@ from __future__ import annotations + +import time +import unittest from datetime import datetime, timedelta from typing import Dict, Tuple, Optional +from ravendb.documents.operations.time_series import ( + RawTimeSeriesPolicy, + TimeSeriesPolicy, + TimeSeriesCollectionConfiguration, + TimeSeriesConfiguration, + ConfigureTimeSeriesOperation, +) from ravendb.documents.session.time_series import ITimeSeriesValuesBindable, TimeSeriesRangeType from ravendb.infrastructure.entities import User from ravendb.primitives.time_series import TimeValue +from ravendb.tests.jvm_migrated_tests.client_tests.time_series_tests.test_time_series_typed_session import StockPrice from ravendb.tests.test_base import TestBase +from ravendb.tools.raven_test_helper import RavenTestHelper class HeartRateMeasure(ITimeSeriesValuesBindable): @@ -398,3 +410,108 @@ def test_include_time_series_and_merge_with_existing_ranges_in_cache_typed(self) self.assertEqual(base_line, ranges[0].from_date) self.assertIsNone(ranges[0].to_date) + + @unittest.skip("Insufficient license permissions. Skipping on CI/CD.") + def test_can_serve_time_series_from_cache_rollup(self): + raw = RawTimeSeriesPolicy(TimeValue.of_hours(24)) + + p1 = TimeSeriesPolicy("By6Hours", TimeValue.of_hours(6), TimeValue.of_days(4)) + p2 = TimeSeriesPolicy("By1Day", TimeValue.of_days(1), TimeValue.of_days(5)) + p3 = TimeSeriesPolicy("By30Minutes", TimeValue.of_minutes(30), TimeValue.of_days(2)) + p4 = TimeSeriesPolicy("By1Hour", TimeValue.of_hours(1), TimeValue.of_days(3)) + + time_series_collection_configuration = TimeSeriesCollectionConfiguration() + time_series_collection_configuration.raw_policy = raw + time_series_collection_configuration.policies = [p1, p2, p3, p4] + + config = TimeSeriesConfiguration() + config.collections = {"users": time_series_collection_configuration} + config.policy_check_frequency = timedelta(seconds=1) + + self.store.maintenance.send(ConfigureTimeSeriesOperation(config)) + self.store.time_series.register_type(User, StockPrice) + + total = TimeValue.of_days(12).value + base_line = RavenTestHelper.utc_today() - timedelta(days=12) + + with self.store.open_session() as session: + session.store(User(name="Karmel"), "users/karmel") + + ts = session.typed_time_series_for(StockPrice, "users/karmel") + for i in range(total + 1): + open = i + close = i + 100_000 + high = i + 200_000 + low = i + 300_000 + volume = i + 400_000 + ts.append( + base_line + timedelta(minutes=i), StockPrice(open, close, high, low, volume), "watches/fitbit" + ) + + session.save_changes() + + time.sleep(1.2) + + with self.store.open_session() as session: + ts = session.time_series_rollup_for(StockPrice, "users/karmel", p1.name) + res = ts.get() + + self.assertEqual(16, len(res)) + + # should not go to server + res = ts.get(base_line, base_line + timedelta(days=365)) + self.assertEqual(16, len(res)) + self.assertEqual(1, session.advanced.number_of_requests) + + @unittest.skip("Insufficient license permissions. Skipping on CI/CD.") + def test_can_include_typed_time_series_rollup(self): + raw = RawTimeSeriesPolicy(TimeValue.of_hours(24)) + + p1 = TimeSeriesPolicy("By6Hours", TimeValue.of_hours(6), TimeValue.of_days(4)) + p2 = TimeSeriesPolicy("By1Day", TimeValue.of_days(1), TimeValue.of_days(5)) + p3 = TimeSeriesPolicy("By30Minutes", TimeValue.of_minutes(30), TimeValue.of_days(2)) + p4 = TimeSeriesPolicy("By1Hour", TimeValue.of_hours(1), TimeValue.of_days(3)) + + time_series_collection_configuration = TimeSeriesCollectionConfiguration() + time_series_collection_configuration.raw_policy = raw + time_series_collection_configuration.policies = [p1, p2, p3, p4] + + config = TimeSeriesConfiguration() + config.collections = {"users": time_series_collection_configuration} + config.policy_check_frequency = timedelta(seconds=1) + + self.store.maintenance.send(ConfigureTimeSeriesOperation(config)) + self.store.time_series.register_type(User, StockPrice) + + total = TimeValue.of_days(12).value + base_line = RavenTestHelper.utc_today() - timedelta(days=12) + + with self.store.open_session() as session: + session.store(User(name="Karmel"), "users/karmel") + + ts = session.typed_time_series_for(StockPrice, "users/karmel") + for i in range(total + 1): + open = i + close = i + 100_000 + high = i + 200_000 + low = i + 300_000 + volume = i + 400_000 + ts.append( + base_line + timedelta(minutes=i), StockPrice(open, close, high, low, volume), "watches/fitbit" + ) + + session.save_changes() + + time.sleep(1.2) + + with self.store.open_session() as session: + user = ( + session.query(object_type=User) + .include(lambda i: i.include_time_series(f"stockPrices@{p1.name}")) + .first() + ) + + # should not go to server + res = session.time_series_rollup_for(StockPrice, user.Id, p1.name).get() + self.assertEqual(16, len(res)) + self.assertEqual(1, session.advanced.number_of_requests) diff --git a/ravendb/tests/jvm_migrated_tests/spatial_tests/test_spatial.py b/ravendb/tests/jvm_migrated_tests/spatial_tests/test_spatial.py index b14cb07b..1ffacc2b 100644 --- a/ravendb/tests/jvm_migrated_tests/spatial_tests/test_spatial.py +++ b/ravendb/tests/jvm_migrated_tests/spatial_tests/test_spatial.py @@ -45,7 +45,7 @@ def __init__(self, Id: str = None, date: datetime.datetime = None, latitude: flo def from_json(cls, json_dict: Dict) -> MyProjection: return cls( json_dict["Id"], - datetime.datetime.fromisoformat(json_dict["date"]), + datetime.datetime.fromisoformat(json_dict["date"]) if "date" in json_dict else None, json_dict["latitude"], json_dict["longitude"], ) diff --git a/ravendb/tools/raven_test_helper.py b/ravendb/tools/raven_test_helper.py new file mode 100644 index 00000000..e84d5165 --- /dev/null +++ b/ravendb/tools/raven_test_helper.py @@ -0,0 +1,39 @@ +import os +from datetime import datetime +from typing import Optional + +from ravendb import DocumentStore, GetIndexErrorsOperation + + +class RavenTestHelper: + @staticmethod + def utc_today() -> datetime: + today = datetime.today() + return datetime(today.year, today.month, today.day, 0, 0, 0, 0) + + @staticmethod + def assert_no_index_errors(store: DocumentStore, database_name: Optional[str] = None) -> None: + errors = store.maintenance.for_database(database_name).send(GetIndexErrorsOperation()) + + sb = [] + for index_errors in errors: + if not index_errors or not index_errors.errors: + continue + + sb.append("Index Errors for '") + sb.append(index_errors.name) + sb.append("' (") + sb.append(len(index_errors.errors)) + sb.append(")") + sb.append(os.linesep) + + for index_error in index_errors.errors: + sb.append(f"- {index_error}") + sb.append(os.linesep) + + sb.append(os.linesep) + + if not sb: + return + + raise RuntimeError("".join(map(str, sb)))