Skip to content

Commit

Permalink
mnesia_to_khepri: Check record validity before calling `Mod:copy_to_k…
Browse files Browse the repository at this point in the history
…hepri/3`

[Why]
It appears that we get Mnesia write events with the table name and the
key, instead of the table name and the written record. The expected
event seems to come after.

We can't do anything with just the key and we want to skip calls to the
callback module in this case.

[How]
To distinguish the key from the expected record, we first query the
table info to get the record name and its arity. Then, we verify the
received term against the queried record definition to check if the term
is a valid record before calling `Mod:copy_to_khepri/3`.
  • Loading branch information
dumbbell committed Oct 27, 2023
1 parent 6a56ade commit 6c8902a
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 60 deletions.
171 changes: 111 additions & 60 deletions src/m2k_table_copy.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
-record(?MODULE, {khepri_store :: khepri:store_id(),
migration_id :: mnesia_to_khepri:migration_id(),
tables :: [mnesia_to_khepri:mnesia_table()],
record_defs :: #{mnesia_to_khepri:mnesia_table() =>
{atom(), arity()}},
converter_mod :: mnesia_to_khepri:converter_mod() |
{mnesia_to_khepri:converter_mod(), any()},
converter_mod_priv :: any() | undefined,
Expand Down Expand Up @@ -250,9 +252,11 @@ init(#{khepri_store := StoreId,
converter_mod := Mod}) ->
erlang:process_flag(trap_exit, true),
Progress = migration_recorded_state(self(), Tables),
RecordDefs = query_table_record_definitions(Tables),
State = #?MODULE{khepri_store = StoreId,
migration_id = MigrationId,
tables = Tables,
record_defs = RecordDefs,
converter_mod = Mod,
progress = Progress},
{ok, State}.
Expand Down Expand Up @@ -317,6 +321,39 @@ terminate(_Reason, State) ->
%% Internal functions.
%% -------------------------------------------------------------------

query_table_record_definitions(Tables) ->
query_table_record_definitions(Tables, #{}).

query_table_record_definitions([Table | Rest], RecordDefs) ->
RecordName = mnesia:table_info(Table, record_name),
Arity = mnesia:table_info(Table, arity),
RecordDefs1 = RecordDefs#{Table => {RecordName, Arity}},
query_table_record_definitions(Rest, RecordDefs1);
query_table_record_definitions([], RecordDefs) ->
RecordDefs.

is_record_valid(#?MODULE{record_defs = RecordDefs}, Table, Tuple)
when is_tuple(Tuple) ->
#{Table := {RecordName, Arity}} = RecordDefs,
case is_record(Tuple, RecordName, Arity) of
true ->
true;
false ->
?LOG_DEBUG(
"Mnesia->Khepri data copy: "
"the following term is not a valid record for "
"table \"~ts\": ~p",
[Table, Tuple],
#{domain => ?KMM_M2K_TABLE_COPY_LOG_DOMAIN})
end;
is_record_valid(_State, Table, Term) ->
?LOG_DEBUG(
"Mnesia->Khepri data copy: "
"the following term is not a valid record for table \"~ts\": ~p",
[Table, Term],
#{domain => ?KMM_M2K_TABLE_COPY_LOG_DOMAIN}),
false.

do_copy_data(#?MODULE{migration_id = MigrationId, tables = Tables} = State) ->
?LOG_INFO(
"Mnesia->Khepri data copy: "
Expand Down Expand Up @@ -435,30 +472,37 @@ handle_migration_records(
tables = Tables} = State0) ->
receive
{m2k_export, ExportPid, handle_record, Table, Record} ->
{State, Reply} = try
ActualMod = actual_mod(Mod),
case ActualMod:copy_to_khepri(
Table, Record, ModPriv) of
{ok, ModPriv1} ->
State1 = State0#?MODULE{
converter_mod_priv =
ModPriv1},
{State1, ok};
Error ->
{State0, Error}
end
catch
Class:Reason:Stacktrace ->
Exception = ?kmm_exception(
converter_mod_exception,
#{converter_mod => Mod,
tables => Tables,
class => Class,
reason => Reason,
stacktrace =>
Stacktrace}),
{State0, {error, Exception}}
end,
{State, Reply} =
case is_record_valid(State0, Table, Record) of
true ->
try
ActualMod = actual_mod(Mod),
Ret = ActualMod:copy_to_khepri(
Table, Record, ModPriv),
case Ret of
{ok, ModPriv1} ->
State1 = State0#?MODULE{
converter_mod_priv =
ModPriv1},
{State1, ok};
Error ->
{State0, Error}
end
catch
Class:Reason:Stacktrace ->
Exception = ?kmm_exception(
converter_mod_exception,
#{converter_mod => Mod,
tables => Tables,
class => Class,
reason => Reason,
stacktrace =>
Stacktrace}),
{State0, {error, Exception}}
end;
false ->
{State0, ok}
end,
ExportPid ! {self(), record_handled, Reply},
handle_migration_records(State);
{BackupPid, done, Ret} ->
Expand Down Expand Up @@ -649,42 +693,49 @@ consume_mnesia_events(
[length(Events), Tables],
#{domain => ?KMM_M2K_TABLE_COPY_LOG_DOMAIN}),
ActualMod = actual_mod(Mod),
ModPriv1 = consume_mnesia_events1(Events, ActualMod, ModPriv),
ModPriv1 = consume_mnesia_events1(Events, ActualMod, ModPriv, State),
State#?MODULE{converter_mod_priv = ModPriv1}.

consume_mnesia_events1([{put, Table, Record} | Rest], Mod, ModPriv) ->
case Mod:copy_to_khepri(Table, Record, ModPriv) of
{ok, ModPriv1} ->
Remaining = length(Rest),
if
Remaining rem 100 =:= 0 ->
?LOG_DEBUG(
"Mnesia->Khepri data copy: ~b Mnesia events left",
[Remaining],
#{domain => ?KMM_M2K_TABLE_COPY_LOG_DOMAIN});
true ->
ok
end,
consume_mnesia_events1(Rest, Mod, ModPriv1);
Error ->
throw(Error)
end;
consume_mnesia_events1([{delete, Table, Key} | Rest], Mod, ModPriv) ->
case Mod:delete_from_khepri(Table, Key, ModPriv) of
{ok, ModPriv1} ->
Remaining = length(Rest),
if
Remaining rem 100 =:= 0 ->
?LOG_DEBUG(
"Mnesia->Khepri data copy: ~b Mnesia events left",
[Remaining],
#{domain => ?KMM_M2K_TABLE_COPY_LOG_DOMAIN});
true ->
ok
end,
consume_mnesia_events1(Rest, Mod, ModPriv1);
Error ->
throw(Error)
end;
consume_mnesia_events1([], _Mod, ModPriv) ->
consume_mnesia_events1(
[{put, Table, Record} | Rest], Mod, ModPriv, State) ->
ModPriv2 = case is_record_valid(State, Table, Record) of
true ->
case Mod:copy_to_khepri(Table, Record, ModPriv) of
{ok, ModPriv1} -> ModPriv1;
Error -> throw(Error)
end;
false ->
ModPriv
end,
Remaining = length(Rest),
if
Remaining rem 100 =:= 0 ->
?LOG_DEBUG(
"Mnesia->Khepri data copy: ~b Mnesia events left",
[Remaining],
#{domain => ?KMM_M2K_TABLE_COPY_LOG_DOMAIN});
true ->
ok
end,
consume_mnesia_events1(Rest, Mod, ModPriv2, State);
consume_mnesia_events1(
[{delete, Table, Key} | Rest], Mod, ModPriv, State) ->
ModPriv2 = case Mod:delete_from_khepri(Table, Key, ModPriv) of
{ok, ModPriv1} ->
ModPriv1;
Error ->
throw(Error)
end,
Remaining = length(Rest),
if
Remaining rem 100 =:= 0 ->
?LOG_DEBUG(
"Mnesia->Khepri data copy: ~b Mnesia events left",
[Remaining],
#{domain => ?KMM_M2K_TABLE_COPY_LOG_DOMAIN});
true ->
ok
end,
consume_mnesia_events1(Rest, Mod, ModPriv2, State);
consume_mnesia_events1([], _Mod, ModPriv, _State) ->
ModPriv.
3 changes: 3 additions & 0 deletions test/kmm_gen_servers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ m2k_subscriber_test() ->
test_gen_server(Module).

test_gen_server(Module) ->
ok = mnesia:start(),
{atomic, ok} = mnesia:create_table(Module, []),

RaSystem = Module,
StoreId = RaSystem,
StoreDir = helpers:store_dir_name(RaSystem),
Expand Down

0 comments on commit 6c8902a

Please # to comment.