Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

♻️(backends) remove ClickHouse JSON column (#482) #493

Merged
merged 1 commit into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .env.dist
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ RALPH_BACKENDS__DATA__CLICKHOUSE__EVENT_TABLE_NAME=xapi_events_all
# RALPH_BACKENDS__DATA__CLICKHOUSE__USERNAME=
# RALPH_BACKENDS__DATA__CLICKHOUSE__PASSWORD=
# RALPH_BACKENDS__DATA__CLICKHOUSE__CLIENT_OPTIONS__date_time_input_format=
# RALPH_BACKENDS__DATA__CLICKHOUSE__CLIENT_OPTIONS__allow_experimental_object_type=
# RALPH_BACKENDS__DATA__CLICKHOUSE__DEFAULT_CHUNK_SIZE=500
# RALPH_BACKENDS__DATA__CLICKHOUSE__LOCALE_ENCODING=utf8
RALPH_BACKENDS__DATA__CLICKHOUSE__TEST_DATABASE=test_statements
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ have an authority field matching that of the user
- Helm: improve volumes and ingress configurations
- API: Add `RALPH_LRS_RESTRICT_BY_SCOPE` option enabling endpoint access
control by user scopes
- Backends: Replace reference to a JSON column in ClickHouse with
function calls on the String column [BC]

### Fixed

Expand Down
21 changes: 20 additions & 1 deletion UPGRADE.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,23 @@ CLI syntax has been changed from `fetch` & `push` to `read` & `write` affecting
```
$ sed -i 's/"fetch"/"read"/g' { my_history_file_path }
$ sed -i 's/"push"/"write"/g' { my_history_file_path }
```
```

#### Upgrade ClickHouse schema

If you are using the ClickHouse backend, schema changes have been made
to drop the existing JSON column in favor of the String version of the
same data. See [this issue](https://github.com/openfun/ralph/issues/482)
for details.

Ralph does not manage the ClickHouse schema, so if you have existing
data you will need to manually alter it as an admin user. Note: this
will rewrite the statements table, which may take a long time if you
have many rows. The command to run is:

```sql
-- If RALPH_BACKENDS__DATA__CLICKHOUSE__DATABASE is 'xapi'
-- and RALPH_BACKENDS__DATA__CLICKHOUSE__EVENT_TABLE_NAME is 'test'

ALTER TABLE xapi.test DROP COLUMN event, RENAME COLUMN event_str to event;
```
3 changes: 1 addition & 2 deletions docs/backends.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ By default, the following client options are set, if you override the default
client options you must also set these:

- `"date_time_input_format": "best_effort"` allows RFC date parsing
- `"allow_experimental_object_type": 1` allows the JSON data type we use to store statements

The ClickHouse client options supported in Ralph can be found in these locations:
- [Python driver specific](https://clickhouse.com/docs/en/integrations/language-clients/python/driver-api#settings-argument)
Expand All @@ -229,4 +228,4 @@ LRS parameters required to connect are:

Optional parameters can be configured if necessary:

- `headers`: a comma-separated key=value list of LRS server headers
- `headers`: a comma-separated key=value list of LRS server headers
29 changes: 17 additions & 12 deletions src/ralph/backends/data/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import clickhouse_connect
from clickhouse_connect.driver.exceptions import ClickHouseError
from pydantic import BaseModel, Json, ValidationError, conint
from pydantic import BaseModel, Json, ValidationError

from ralph.backends.data.base import (
BaseDataBackend,
Expand All @@ -47,16 +47,14 @@ class ClickHouseClientOptions(ClientOptions):
"""Pydantic model for `clickhouse` client options."""

date_time_input_format: str = "best_effort"
allow_experimental_object_type: conint(ge=0, le=1) = 1


class InsertTuple(NamedTuple):
"""Named tuple for ClickHouse insertion."""

event_id: UUID
emission_time: datetime
event: dict
event_str: str
event: str


class ClickHouseDataBackendSettings(BaseDataBackendSettings):
Expand Down Expand Up @@ -262,7 +260,7 @@ def read(
if query.limit:
sql += f"\nLIMIT {query.limit}"

reader = self._read_raw if raw_output else lambda _: _
reader = self._read_raw if raw_output else self._read_json

logger.debug(
"Start reading the %s table of the %s database (chunk size: %d)",
Expand Down Expand Up @@ -423,7 +421,6 @@ def _to_insert_tuples(
insert_tuple = InsertTuple(
insert.event_id,
insert.emission_time,
statement,
json.dumps(statement),
)

Expand All @@ -445,12 +442,7 @@ def _bulk_import(
self.client.insert(
event_table_name,
batch,
column_names=[
"event_id",
"emission_time",
"event",
"event_str",
],
column_names=["event_id", "emission_time", "event"],
# Allow ClickHouse to buffer the insert, and wait for the
# buffer to flush. Should be configurable, but I think these are
# reasonable defaults.
Expand Down Expand Up @@ -488,6 +480,19 @@ def _parse_bytes_to_dict(
logger.error("Raised error: %s, for document %s", error, raw_document)
raise error

@staticmethod
def _read_json(document: Dict[str, Any]) -> Dict[str, Any]:
"""Read the `documents` row and yield for the event JSON."""
if "event" in document:
document["event"] = json.loads(document["event"])

return document

def _read_raw(self, document: Dict[str, Any]) -> bytes:
"""Read the `documents` Iterable and yield bytes."""
# We want to return a JSON structure of the whole row, so if the event string
# is in there we first need to serialize it so that we can deserialize the
# whole thing.
document = self._read_json(document)

return json.dumps(document).encode(self.locale_encoding)
27 changes: 19 additions & 8 deletions src/ralph/backends/lrs/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,13 @@ def query_statements(self, params: RalphStatementsQuery) -> StatementQueryResult
ch_params.pop("authority", None)

if params.verb:
where.append("event.verb.id = {verb:String}")
where.append("JSONExtractString(event, 'verb', 'id') = {verb:String}")

if params.activity:
where.append("event.object.objectType = 'Activity'")
where.append("event.object.id = {activity:String}")
where.append(
"JSONExtractString(event, 'object', 'objectType') = 'Activity'"
)
where.append("JSONExtractString(event, 'object', 'id') = {activity:String}")

if params.since:
where.append("emission_time > {since:DateTime64(6)}")
Expand Down Expand Up @@ -152,33 +154,42 @@ def _add_agent_filters(
"""Add filters relative to agents to `where`."""
if not agent_params:
return

if not isinstance(agent_params, dict):
agent_params = agent_params.dict()

if agent_params.get("mbox"):
ch_params[f"{target_field}__mbox"] = agent_params.get("mbox")
where.append(f"event.{target_field}.mbox = {{{target_field}__mbox:String}}")
where.append(
f"JSONExtractString(event, '{target_field}', 'mbox') = "
f"{{{target_field}__mbox:String}}"
)
elif agent_params.get("mbox_sha1sum"):
ch_params[f"{target_field}__mbox_sha1sum"] = agent_params.get(
"mbox_sha1sum"
)
where.append(
f"event.{target_field}.mbox_sha1sum = {{{target_field}__mbox_sha1sum:String}}" # noqa: E501 # pylint: disable=line-too-long
f"JSONExtractString(event, '{target_field}', 'mbox_sha1sum') = "
f"{{{target_field}__mbox_sha1sum:String}}"
)
elif agent_params.get("openid"):
ch_params[f"{target_field}__openid"] = agent_params.get("openid")
where.append(
f"event.{target_field}.openid = {{{target_field}__openid:String}}"
f"JSONExtractString(event, '{target_field}', 'openid') = "
f"{{{target_field}__openid:String}}"
)
elif agent_params.get("account__name"):
ch_params[f"{target_field}__account__name"] = agent_params.get(
"account__name"
)
where.append(
f"event.{target_field}.account.name = {{{target_field}__account__name:String}}" # noqa: E501 # pylint: disable=line-too-long
f"JSONExtractString(event, '{target_field}', 'account', 'name') = "
f"{{{target_field}__account__name:String}}"
)
ch_params[f"{target_field}__account__home_page"] = agent_params.get(
"account__home_page"
)
where.append(
f"event.{target_field}.account.homePage = {{{target_field}__account__home_page:String}}" # noqa: E501 # pylint: disable=line-too-long
f"JSONExtractString(event, '{target_field}', 'account', 'homePage') = "
f"{{{target_field}__account__home_page:String}}"
)
65 changes: 41 additions & 24 deletions tests/backends/data/test_clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ def test_backends_data_clickhouse_data_backend_instantiation_with_settings():
PASSWORD="",
CLIENT_OPTIONS={
"date_time_input_format": "test_format",
"allow_experimental_object_type": 0,
},
DEFAULT_CHUNK_SIZE=1000,
LOCALE_ENCODING="utf-16",
Expand Down Expand Up @@ -159,31 +158,37 @@ def test_backends_data_clickhouse_data_backend_read_method_with_a_custom_query(
backend.write(statements)

# Test filtering
query = ClickHouseQuery(where="event.bool = 1")
query = ClickHouseQuery(where="JSONExtractBool(event, 'bool') = 1")
results = list(backend.read(query=query, chunk_size=None))
assert len(results) == 2
assert results[0]["event"] == statements[0]
assert results[1]["event"] == statements[2]

# Test select fields
query = ClickHouseQuery(select=["event_id", "event.bool"])
query = ClickHouseQuery(
select=["event_id", "JSONExtractBool(event, 'bool') as bool"]
)
results = list(backend.read(query=query))

assert len(results) == 3
assert len(results[0]) == 2
assert results[0]["event_id"] == documents[0][0]
assert results[0]["event.bool"] == statements[0]["bool"]
assert results[0]["bool"] == statements[0]["bool"]
assert results[1]["event_id"] == documents[1][0]
assert results[1]["event.bool"] == statements[1]["bool"]
assert results[1]["bool"] == statements[1]["bool"]
assert results[2]["event_id"] == documents[2][0]
assert results[2]["event.bool"] == statements[2]["bool"]
assert results[2]["bool"] == statements[2]["bool"]

# Test both
query = ClickHouseQuery(where="event.bool = 0", select=["event_id", "event.bool"])
query = ClickHouseQuery(
where="JSONExtractBool(event, 'bool') = 0",
select=["event_id", "JSONExtractBool(event, 'bool') as bool"],
)
results = list(backend.read(query=query))
assert len(results) == 1
assert len(results[0]) == 2
assert results[0]["event_id"] == documents[1][0]
assert results[0]["event.bool"] == statements[1]["bool"]
assert results[0]["bool"] == statements[1]["bool"]

# Test sort
query = ClickHouseQuery(sort="emission_time DESCENDING")
Expand All @@ -201,7 +206,7 @@ def test_backends_data_clickhouse_data_backend_read_method_with_a_custom_query(

# Test parameters
query = ClickHouseQuery(
where="event.bool = {event_bool:Bool}",
where="JSONExtractBool(event, 'bool') = {event_bool:Bool}",
parameters={"event_bool": 0, "format": "exact"},
)
results = list(backend.read(query=query))
Expand All @@ -217,15 +222,15 @@ def test_backends_data_clickhouse_data_backend_read_method_with_failures(
backend = clickhouse_backend()

statement = {"id": str(uuid.uuid4()), "timestamp": str(datetime.utcnow())}
document = {"event": statement}
document = {"event": json.dumps(statement)}
backend.write([statement])

# JSON encoding error
def mock_read_raw(*args, **kwargs):
"""Mock the `ClickHouseDataBackend._read_raw` method."""
def mock_read_json(*args, **kwargs):
"""Mock the `ClickHouseDataBackend._read_json` method."""
raise TypeError("Error")

monkeypatch.setattr(backend, "_read_raw", mock_read_raw)
monkeypatch.setattr(backend, "_read_json", mock_read_json)

msg = f"Failed to encode document {document}: Error"

Expand All @@ -235,7 +240,7 @@ def mock_read_raw(*args, **kwargs):
BackendException,
match=msg,
):
list(backend.read(raw_output=True, ignore_errors=False))
list(backend.read(raw_output=False, ignore_errors=False))

assert (
"ralph.backends.data.clickhouse",
Expand All @@ -247,7 +252,7 @@ def mock_read_raw(*args, **kwargs):

# Ignoring errors
with caplog.at_level(logging.WARNING):
list(backend.read(raw_output=True, ignore_errors=True))
list(backend.read(raw_output=False, ignore_errors=True))

assert (
"ralph.backends.data.clickhouse",
Expand Down Expand Up @@ -474,16 +479,20 @@ def test_backends_data_clickhouse_data_backend_write_method(
result = clickhouse.query(sql).result_set
assert result[0][0] == 2

sql = f"""SELECT * FROM {CLICKHOUSE_TEST_TABLE_NAME} ORDER BY event.timestamp"""
sql = f"""
SELECT *
FROM {CLICKHOUSE_TEST_TABLE_NAME}
ORDER BY JSONExtractString(event, 'timestamp')
"""
result = list(clickhouse.query(sql).named_results())

assert result[0]["event_id"] == native_statements[0]["id"]
assert result[0]["emission_time"] == native_statements[0]["timestamp"]
assert result[0]["event"] == statements[0]
assert json.loads(result[0]["event"]) == statements[0]

assert result[1]["event_id"] == native_statements[1]["id"]
assert result[1]["emission_time"] == native_statements[1]["timestamp"]
assert result[1]["event"] == statements[1]
assert json.loads(result[1]["event"]) == statements[1]
backend.close()


Expand Down Expand Up @@ -517,16 +526,20 @@ def test_backends_data_clickhouse_data_backend_write_method_bytes(
result = clickhouse.query(sql).result_set
assert result[0][0] == 2

sql = f"""SELECT * FROM {CLICKHOUSE_TEST_TABLE_NAME} ORDER BY event.timestamp"""
sql = f"""
SELECT *
FROM {CLICKHOUSE_TEST_TABLE_NAME}
ORDER BY JSONExtractString(event, 'timestamp')
"""
result = list(clickhouse.query(sql).named_results())

assert result[0]["event_id"] == native_statements[0]["id"]
assert result[0]["emission_time"] == native_statements[0]["timestamp"]
assert result[0]["event"] == statements[0]
assert json.loads(result[0]["event"]) == statements[0]

assert result[1]["event_id"] == native_statements[1]["id"]
assert result[1]["emission_time"] == native_statements[1]["timestamp"]
assert result[1]["event"] == statements[1]
assert json.loads(result[1]["event"]) == statements[1]
backend.close()


Expand Down Expand Up @@ -633,16 +646,20 @@ def test_backends_data_clickhouse_data_backend_write_method_with_custom_chunk_si
result = clickhouse.query(sql).result_set
assert result[0][0] == 2

sql = f"""SELECT * FROM {CLICKHOUSE_TEST_TABLE_NAME} ORDER BY event.timestamp"""
sql = f"""
SELECT *
FROM {CLICKHOUSE_TEST_TABLE_NAME}
ORDER BY JSONExtractString(event, 'timestamp')
"""
result = list(clickhouse.query(sql).named_results())

assert result[0]["event_id"] == native_statements[0]["id"]
assert result[0]["emission_time"] == native_statements[0]["timestamp"]
assert result[0]["event"] == statements[0]
assert json.loads(result[0]["event"]) == statements[0]

assert result[1]["event_id"] == native_statements[1]["id"]
assert result[1]["emission_time"] == native_statements[1]["timestamp"]
assert result[1]["event"] == statements[1]
assert json.loads(result[1]["event"]) == statements[1]
backend.close()


Expand Down
Loading