Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

WIP: Add copy mode (take 2) #28

Merged
merged 5 commits into from
Jul 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions lib-driver/caqti_driver_mariadb.ml
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,8 @@ module Connect_functor (System : Caqti_driver_sig.System_unix) = struct
let transaction_failed query err =
return (Error (Caqti_error.request_failed ~uri ~query (Mdb_msg err)))

let exec q p = call ~f:Response.exec q p

let start () =
Mdb.autocommit db false >>=
(function
Expand All @@ -493,6 +495,40 @@ module Connect_functor (System : Caqti_driver_sig.System_unix) = struct
(function
| Ok () -> return (Ok ())
| Error err -> transaction_failed "# ROLLBACK" err)

let populate ~table ~columns row_type input_stream =
let columns_tuple = "(" ^ (String.concat "," columns) ^ ")" in
let values_tuple = "(" ^ (String.concat "," (List.map (fun _ -> "?") columns)) ^ ")" in
let insert_query =
Caqti_request.exec
~oneshot:true
row_type
("INSERT INTO " ^ table ^ " " ^ columns_tuple ^ " VALUES " ^ values_tuple)
in
(* TODO: Should we prepare the statement directly somehow? *)
begin
(* Begin a transaction *)
start () >>=? fun () ->

(* Insert each element in the stream *)
System.Stream.iter_s
~f:(fun row -> exec insert_query row)
input_stream
>>= fun resp ->
begin
(* Since the input stream cannot contain errors, unpack the combined error type
* returned
*)
match resp with
| Ok () as x -> return x
| Error (`Callback e) -> return (Error e)
| Error (`Self ()) -> failwith "Input stream to populate cannot return errors"
end
>>=? fun () ->

(* Commit the transaction *)
commit ()
end
end

type conninfo = {
Expand Down
34 changes: 34 additions & 0 deletions lib-driver/caqti_driver_postgresql.ml
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,40 @@ module Connect_functor (System : Caqti_driver_sig.System_unix) = struct
let start () = exec start_req ()
let commit () = exec commit_req ()
let rollback () = exec rollback_req ()

let populate ~table ~columns row_type input_stream =
let columns_tuple = "(" ^ (String.concat "," columns) ^ ")" in
let values_tuple = "(" ^ (String.concat "," (List.map (fun _ -> "?") columns)) ^ ")" in
let insert_query =
Caqti_request.exec
~oneshot:true
row_type
("INSERT INTO " ^ table ^ " " ^ columns_tuple ^ " VALUES " ^ values_tuple)
in
(* TODO: Should we prepare the statement directly somehow? *)
begin
(* Begin a transaction *)
start () >>=? fun () ->

(* Insert each element in the stream *)
System.Stream.iter_s
~f:(fun row -> exec insert_query row)
input_stream
>>= fun resp ->
begin
(* Since the input stream cannot contain errors, unpack the combined error type
* returned
*)
match resp with
| Ok () as x -> return x
| Error (`Callback e) -> return (Error e)
| Error (`Self ()) -> failwith "Input stream to populate cannot return errors"
end
>>=? fun () ->

(* Commit the transaction *)
commit ()
end
end

let connect uri =
Expand Down
61 changes: 48 additions & 13 deletions lib-driver/caqti_driver_sqlite3.ml
Original file line number Diff line number Diff line change
Expand Up @@ -383,10 +383,7 @@ module Connect_functor (System : Caqti_driver_sig.System_unix) = struct

let pcache = Hashtbl.create 19

let call ~f req param =
let param_type = Caqti_request.param_type req in
let row_type = Caqti_request.row_type req in

let prepare req =
let prepare_helper query =
try
let stmt = Sqlite3.prepare db query in
Expand All @@ -395,21 +392,25 @@ module Connect_functor (System : Caqti_driver_sig.System_unix) = struct
| Some stmt -> Ok stmt)
with Sqlite3.Error msg ->
let msg = Caqti_error.Msg msg in
Error (Caqti_error.request_failed ~uri ~query msg) in
Error (Caqti_error.request_failed ~uri ~query msg)
in

let prepare () =
let templ = Caqti_request.query req driver_info in
let query = linear_query_string templ in
let os = linear_param_order templ in
Preemptive.detach prepare_helper query >|=? fun stmt ->
Ok (stmt, os, query) in
let templ = Caqti_request.query req driver_info in
let query = linear_query_string templ in
let os = linear_param_order templ in
Preemptive.detach prepare_helper query >|=? fun stmt ->
Ok (stmt, os, query)

let call ~f req param =
let param_type = Caqti_request.param_type req in
let row_type = Caqti_request.row_type req in

(match Caqti_request.query_id req with
| None -> prepare ()
| None -> prepare req
| Some id ->
(try return (Ok (Hashtbl.find pcache id)) with
| Not_found ->
prepare () >|=? fun pcache_entry ->
prepare req >|=? fun pcache_entry ->
Hashtbl.add pcache id pcache_entry;
Ok pcache_entry))
>>=? fun (stmt, os, query) ->
Expand Down Expand Up @@ -469,6 +470,40 @@ module Connect_functor (System : Caqti_driver_sig.System_unix) = struct
let start () = exec Q.start ()
let commit () = exec Q.commit ()
let rollback () = exec Q.rollback ()

let populate ~table ~columns row_type input_stream =
let columns_tuple = "(" ^ (String.concat "," columns) ^ ")" in
let values_tuple = "(" ^ (String.concat "," (List.map (fun _ -> "?") columns)) ^ ")" in
let insert_query =
Caqti_request.exec
~oneshot:true
row_type
("INSERT INTO " ^ table ^ " " ^ columns_tuple ^ " VALUES " ^ values_tuple)
in
(* TODO: Should we prepare the statement directly somehow? *)
begin
(* Begin a transaction *)
start () >>=? fun () ->

(* Insert each element in the stream *)
System.Stream.iter_s
~f:(fun row -> exec insert_query row)
input_stream
>>= fun resp ->
begin
(* Since the input stream cannot contain errors, unpack the combined error type
* returned
*)
match resp with
| Ok () as x -> return x
| Error (`Callback e) -> return (Error e)
| Error (`Self ()) -> failwith "Input stream to populate cannot return errors"
end
>>=? fun () ->

(* Commit the transaction *)
commit ()
end
end

let connect uri =
Expand Down
1 change: 1 addition & 0 deletions lib/caqti_connect.ml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ module Make_unix (System : Caqti_driver_sig.System_unix) = struct
r

let call ~f req param = use (fun () -> C.call ~f req param)
let populate ~table ~columns r s = use (fun () -> C.populate ~table ~columns r s)

let exec q p = call ~f:Response.exec q p
let find q p = call ~f:Response.find q p
Expand Down
10 changes: 10 additions & 0 deletions lib/caqti_connection_sig.mli
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,16 @@ module type Base = sig
during the call to [f], and must not be returned or operated on by other
threads. *)

(** {2 Insertion} *)
val populate:
table: string ->
columns: string list ->
'input Caqti_type.t ->
('input, unit) stream ->
(unit, [> Caqti_error.call_or_retrieve]) result future
(** [populate table columns row_type input_stream] inputs the contents of
[input_stream] into the database in whatever manner is most efficient
as decided by the driver. *)

(** {2 Transactions} *)

Expand Down
25 changes: 18 additions & 7 deletions lib/caqti_stream.ml
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,21 @@ module type S = sig
('state, 'err) result future

val fold_s :
f: ('a -> 'state -> ('state, 'err) result future) ->
f: ('a -> 'state -> ('state, 'errc) result future) ->
('a, 'err) t ->
'state ->
('state, 'err) result future
('state, [`Self of 'err| `Callback of 'errc]) result future

val iter_s :
f:('a -> (unit, 'err) result future) ->
f:('a -> (unit, 'errc) result future) ->
('a, 'err) t ->
(unit, 'err) result future
(unit, [`Self of 'err| `Callback of 'errc]) result future

val to_rev_list : ('a, 'err) t -> ('a list, 'err) result future

val to_list : ('a, 'err) t -> ('a list, 'err) result future

val of_list : 'a list -> ('a, 'err) t
end

module type FUTURE = sig
Expand All @@ -56,7 +58,11 @@ end
module Make(X : FUTURE) : S with type 'a future := 'a X.future = struct
open X

let (>>=?) res_future f = res_future >>= function Ok a -> f a | Error _ as r -> return r
let (>>=?) res_future f =
res_future >>= function
| Ok a -> f a
| Error r -> return (Error (`Callback r))

let (>|=?) res_future f =
res_future >>= function
| Ok a -> return @@ Ok (f a)
Expand All @@ -77,16 +83,21 @@ module Make(X : FUTURE) : S with type 'a future := 'a X.future = struct
let rec fold_s ~f t state =
t () >>= function
| Nil -> return (Ok state)
| Error err -> return (Error err : ('a, 'err) result)
| Error err -> return (Error (`Self err) : ('a, 'err) result)
| Cons (a, t') -> f a state >>=? fold_s ~f t'

let rec iter_s ~f t =
t () >>= function
| Nil -> return (Ok ())
| Error err -> return (Error err : ('a, 'err) result)
| Error err -> return (Error (`Self err) : ('a, 'err) result)
| Cons (a, t') -> f a >>=? fun () -> iter_s ~f t'

let to_rev_list t = fold ~f:List.cons t []

let to_list t = to_rev_list t >|=? List.rev

let rec of_list l =
fun () -> match l with
| [] -> return Nil
| hd::tl -> return (Cons (hd, (of_list tl)))
end
10 changes: 6 additions & 4 deletions lib/caqti_stream.mli
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,21 @@ module type S = sig
('state, 'err) result future

val fold_s :
f: ('a -> 'state -> ('state, 'err) result future) ->
f: ('a -> 'state -> ('state, 'errc) result future) ->
('a, 'err) t ->
'state ->
('state, 'err) result future
('state, [`Self of 'err| `Callback of 'errc]) result future

val iter_s :
f:('a -> (unit, 'err) result future) ->
f:('a -> (unit, 'errc) result future) ->
('a, 'err) t ->
(unit, 'err) result future
(unit, [`Self of 'err| `Callback of 'errc]) result future

val to_rev_list : ('a, 'err) t -> ('a list, 'err) result future

val to_list : ('a, 'err) t -> ('a list, 'err) result future

val of_list : 'a list -> ('a, 'err) t
end

module type FUTURE = sig
Expand Down
19 changes: 19 additions & 0 deletions tests/test_sql.ml
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,29 @@ struct
assert_stream_is (Ok [(1, "one"); (2, "two")]) >>= Sys.or_fail >>= fun () ->
Db.exec Q.drop_tmp () >>= Sys.or_fail

let test_stream_both_ways (module Db : Caqti_sys.CONNECTION) =
let assert_stream_both_ways expected =
let input_stream = Caqti_sys.Stream.of_list expected in
Db.exec Q.create_tmp () >>= Sys.or_fail >>= fun () ->
Db.populate
~table:"test_sql"
~columns:["i"; "s"]
Caqti_type.(tup2 int string)
input_stream
>>= Sys.or_fail >>= fun () ->
Db.collect_list Q.select_from_tmp () >>= Sys.or_fail >>= fun actual ->
assert (actual = expected);
Db.exec Q.drop_tmp ()
in
assert_stream_both_ways [] >>= Sys.or_fail >>= fun () ->
assert_stream_both_ways [(1, "one")] >>= Sys.or_fail >>= fun () ->
assert_stream_both_ways [(1, "one"); (2, "two")] >>= Sys.or_fail

let run (module Db : Caqti_sys.CONNECTION) =
test_expr (module Db) >>= fun () ->
test_table (module Db) >>= fun () ->
test_stream (module Db) >>= fun () ->
test_stream_both_ways (module Db) >>= fun () ->
Db.disconnect ()

let run_pool pool =
Expand Down