Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

add a sync version of capnp-rpc #189

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions capnp-rpc-sync.opam
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
opam-version: "2.0"
synopsis:
"Cap'n Proto is a capability-based RPC system with bindings for many languages"
description: """
This package provides a version of the Cap'n Proto RPC system using the Cap'n
Proto serialisation format and Unix for the IO"""
maintainer: "Thomas Leonard <talex5@gmail.com>"
authors: "Thomas Leonard <talex5@gmail.com>"
license: "Apache"
homepage: "https://github.com/mirage/capnp-rpc"
bug-reports: "https://github.com/mirage/capnp-rpc/issues"
doc: "https://mirage.github.io/capnp-rpc/"
depends: [
"ocaml" {>= "4.03.0"}
"conf-capnproto" {build}
"capnp" {>= "3.4.0"}
"capnp-rpc" {= version}
"base-unix"
"astring"
"fmt"
"logs"
"asetmap"
"containers"
"uri" {>= "1.6.0"}
"dune" {>= "1.0"}
]
build: [
["dune" "build" "-p" name "-j" jobs]
["dune" "runtest" "-p" name "-j" jobs] {with-test}
]
dev-repo: "git+https://github.com/mirage/capnp-rpc.git"
116 changes: 116 additions & 0 deletions capnp-rpc-sync/capability.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
open Capnp_core
open Utils

module Log = Capnp_rpc.Debug.Log
module StructStorage = Capnp.BytesMessage.StructStorage

type 'a t = Core_types.cap
type 'a capability_t = 'a t
type ('t, 'a, 'b) method_t = ('t, 'a, 'b) Capnp.RPC.MethodID.t

module Request = Request

let inc_ref = Core_types.inc_ref
let dec_ref = Core_types.dec_ref

let with_ref t fn =
CCFun.finally1 ~h:(fun () -> dec_ref t) fn t

let pp f x = x#pp f

let broken = Core_types.broken_cap
let when_broken = Core_types.when_broken
let when_released (x:Core_types.cap) f = x#when_released f
let problem x = x#problem

let wait_until_settled (x:'a t) : 'a Fut.t =
let sync = SyncPoint.create () in
let result = Fut.make (fun () -> SyncPoint.wait sync) in
(* watch for unblocking *)
let rec aux x =
if x#blocker = None then (
() (* done *)
) else (
x#when_more_resolved
(fun x -> Core_types.dec_ref x; aux x)
)
in
aux x;
result

let equal a b =
match a#blocker, b#blocker with
| None, None ->
let a = a#shortest in
let b = b#shortest in
begin match a#problem, b#problem with
| None, None -> Ok (a = b)
| Some a, Some b -> Ok (a = b)
| _ -> Ok false
end
| _ -> Error `Unsettled

let call (target : 't capability_t) (m : ('t, 'a, 'b) method_t)
(req : 'a Request.t) : Core_types.struct_ref =
Log.info (fun f -> f "Calling %a" Capnp.RPC.MethodID.pp m);
let msg = Request.finish m req in
let results, resolver = Local_struct_promise.make () in
target#call resolver msg;
results

let call_and_wait cap (m : ('t, 'a, 'b StructStorage.reader_t) method_t) req : _ result =
let mvar, resolve = MVar.create () in
let result = call cap m req in
let finish = lazy (Core_types.dec_ref result) in
result#when_resolved (function
| Error e ->
Lazy.force finish;
resolve (Error (`Capnp e))
| Ok resp ->
Lazy.force finish;
let payload = Msg.Response.readable resp in
let release_response_caps () = Core_types.Response_payload.release resp in
let contents = Schema.Reader.Payload.content_get payload |> Schema.Reader.of_pointer in
resolve @@ Ok (contents, release_response_caps)
);
MVar.wait mvar

let call_for_value cap m req =
match call_and_wait cap m req with
| Error _ as response -> response
| Ok (response, release_response_caps) ->
release_response_caps ();
Ok response

let call_for_value_exn cap m req =
match call_for_value cap m req with
| Ok x -> x
| Error (`Capnp e) ->
let msg = Fmt.asprintf "Error calling %t(%a): %a"
cap#pp
Capnp.RPC.MethodID.pp m
Capnp_rpc.Error.pp e in
raise (Failure msg)

let call_for_unit cap m req =
match call_for_value cap m req with
| Ok _ -> Ok ()
| Error _ as e -> e

let call_for_unit_exn cap m req = call_for_value_exn cap m req |> ignore

let call_for_caps cap m req fn =
let q = call cap m req in
match fn q with
| r -> Core_types.dec_ref q; r
| exception ex -> Core_types.dec_ref q; raise ex

type 'a resolver = Cap_proxy.resolver_cap

let promise () =
let cap = Cap_proxy.local_promise () in
(cap :> Core_types.cap), (cap :> 'a resolver)

let resolve_ok r x = r#resolve x

let resolve_exn r ex = r#resolve (Core_types.broken_cap ex)
18 changes: 18 additions & 0 deletions capnp-rpc-sync/capnp_core.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

module Capnp_content = struct
include Msg
let ref_leak_detected fn = fn()
end

module Core_types = Capnp_rpc.Core_types(Capnp_content)

module Local_struct_promise = Capnp_rpc.Local_struct_promise.Make(Core_types)
module Cap_proxy = Capnp_rpc.Cap_proxy.Make(Core_types)

module type ENDPOINT = Capnp_rpc.Message_types.ENDPOINT with
module Core_types = Core_types

class type sturdy_ref = object
method connect : (Core_types.cap, Capnp_rpc.Exception.t) result
method to_uri_with_secrets : Uri.t
end
74 changes: 74 additions & 0 deletions capnp-rpc-sync/capnp_rpc_sync.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
open Capnp_core

include Capnp.Message.BytesMessage

module Log = Capnp_rpc.Debug.Log
module RO_array = Capnp_rpc.RO_array

module Capability = Capability

module StructRef = struct
type 'a t = Core_types.struct_ref

let inc_ref = Core_types.inc_ref
let dec_ref = Core_types.dec_ref
end

module Sturdy_ref = Sturdy_ref

module Untyped = struct
let struct_field t i =
(* todo: would be better to have a separate type for this *)
object (_ : Core_types.struct_ref)
method cap path = t#cap (Xform.Field i :: path)
method when_resolved _ = failwith "Can't use when_resolved on a sub-struct"
method response = failwith "Can't use response on a sub-struct"
method update_rc = failwith "Can't use rec-counts on a sub-struct"
method sealed_dispatch _ = None
method pp f = Fmt.pf f "pointer %d in %t" i t#pp
method blocker = failwith "struct_field: blocker"
method check_invariants = ()
end

let capability_field t f = t#cap [Xform.Field f]

let local = Service.local

type abstract_method_t = Service.abstract_method_t

let abstract_method x req release =
x (StructStorage.cast_reader req) release

let get_cap a i =
Core_types.Attachments.cap (Stdint.Uint32.to_int i) (Msg.unwrap_attachments a)

let add_cap a cap =
Core_types.Attachments.add_cap (Msg.unwrap_attachments a) cap |> Stdint.Uint32.of_int

let clear_cap a i =
Core_types.Attachments.clear_cap (Msg.unwrap_attachments a) (Stdint.Uint32.to_int i)

let unknown_interface ~interface_id _req release_params =
release_params ();
Core_types.fail ~ty:`Unimplemented "Unknown interface %a" Stdint.Uint64.printer interface_id

let unknown_method ~interface_id ~method_id _req release_params =
release_params ();
Core_types.fail ~ty:`Unimplemented "Unknown method %a.%d" Stdint.Uint64.printer interface_id method_id

class type generic_service = Service.generic
end

module Service = Service

module Private = Private

module Cast = struct
let cap_of_raw x = x
let cap_to_raw x = x

let sturdy_of_raw x = x
let sturdy_to_raw x = x
end

module Persistence = Persistence
Loading