Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Speed up rethinkdb driver issue #107 #108

Closed
wants to merge 7 commits into from
5 changes: 4 additions & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@
:dependencies [[org.clojure/clojure "1.7.0"]
[org.clojure/clojurescript "1.7.48" :scope "provided"]
[org.clojure/core.async "0.1.346.0-17112a-alpha"]
[org.clojure/data.json "0.2.6"]
[cheshire "5.5.0"]
[org.clojure/tools.logging "0.3.1"]
[rethinkdb-protobuf "2.1.0"]
[manifold "0.1.1"]
[com.google.protobuf/protobuf-java "2.6.1"]
[aleph "0.4.1-beta2"]
[gloss "0.2.5"]
[clj-time "0.10.0"]]
:profiles {:dev {:resource-paths ["test-resources"]
:dependencies [[ch.qos.logback/logback-classic "1.1.3"]]}}
Expand Down
83 changes: 34 additions & 49 deletions src/rethinkdb/core.clj
Original file line number Diff line number Diff line change
@@ -1,46 +1,33 @@
(ns rethinkdb.core
(:require [rethinkdb.net :refer [send-int send-str read-init-response send-stop-query make-connection-loops close-connection-loops]]
[clojure.tools.logging :as log])
(:require [rethinkdb.net :refer [read-init-response send-stop-query make-connection-loops
wrap-duplex-stream handshake]]
[clojure.tools.logging :as log]
[manifold.deferred :as d]
[manifold.stream :as s]
[aleph.tcp :as tcp]
[clojure.core.async :as async])
(:import [clojure.lang IDeref]
[java.io Closeable DataInputStream DataOutputStream]
[java.net Socket]))
[java.io Closeable]))

(defn send-version
"Sends protocol version to RethinkDB when establishing connection.
Hard coded to use v3."
[out]
(let [v1 1063369270
v2 1915781601
v3 1601562686
v4 1074539808]
(send-int out v3 4)))
(def versions
{:v1 1063369270
:v2 1915781601
:v3 1601562686
:v4 1074539808})

(defn send-protocol
"Sends protocol type to RethinkDB when establishing connection.
Hard coded to use JSON protocol."
[out]
(let [protobuf 656407617
json 2120839367]
(send-int out json 4)))

(defn send-auth-key
"Sends auth-key to RethinkDB when establishing connection."
[out auth-key]
(let [n (count auth-key)]
(send-int out n 4)
(send-str out auth-key)))
(def protocols
{:protobuf 656407617
:json 2120839367})

(defn close
"Closes RethinkDB database connection, stops all running queries
and waits for response before returning."
[conn]
(let [{:keys [^Socket socket ^DataOutputStream out ^DataInputStream in waiting]} @conn]
(let [{:keys [client waiting parsed-in]} @conn]
(doseq [token waiting]
(send-stop-query conn token))
(close-connection-loops conn)
(.close out)
(.close in)
(.close socket)
(s/close! @client)
(async/close! parsed-in)
:closed))

(defrecord Connection [conn]
Expand All @@ -53,6 +40,11 @@
[r writer]
(print-method (:conn r) writer))

(defn wrap-client
[client]
(d/chain (d/chain client
#(wrap-duplex-stream %))))

(defn connection [m]
(->Connection (atom m)))

Expand All @@ -62,35 +54,28 @@
is not explicitly set. Default values are used for any parameters
not provided.

(connect :host \"dbserver1.local\")
"
[& {:keys [^String host ^int port token auth-key db]
(connect :host \"dbserver1.local\")"
[& {:keys [^String host ^int port token auth-key db version protocol]
:or {host "127.0.0.1"
port 28015
token 0
auth-key ""
version :v3
protocol :json
db nil}}]
(try
(let [socket (Socket. host port)
out (DataOutputStream. (.getOutputStream socket))
in (DataInputStream. (.getInputStream socket))]
;; Initialise the connection
(send-version out)
(send-auth-key out auth-key)
(send-protocol out)
(let [init-response (read-init-response in)]
(let [client (tcp/client {:host host :port port})
init-response (handshake (version versions) auth-key (protocol protocols) @client)]
(if-not (= init-response "SUCCESS")
(throw (ex-info init-response {:host host :port port :auth-key auth-key :db db}))))
(throw (ex-info init-response {:host host :port port :auth-key auth-key :db db})))
;; Once initialised, create the connection record
(let [wrapped-client (wrap-client client)]
(connection
(merge
{:socket socket
:out out
:in in
{:client wrapped-client
:db db
:waiting #{}
:token token}
(make-connection-loops in out))))
(make-connection-loops wrapped-client)))))
(catch Exception e
(log/error e "Error connecting to RethinkDB database")
(throw (ex-info "Error connecting to RethinkDB database" {:host host :port port :auth-key auth-key :db db} e)))))
156 changes: 72 additions & 84 deletions src/rethinkdb/net.clj
Original file line number Diff line number Diff line change
@@ -1,12 +1,38 @@
(ns rethinkdb.net
(:require [clojure.data.json :as json]
(:require [cheshire.core :as cheshire]
[clojure.core.async :as async]
[clojure.tools.logging :as log]
[manifold.stream :as s]
[rethinkdb.query-builder :refer [parse-query]]
[rethinkdb.types :as types]
[rethinkdb.response :refer [parse-response]]
[rethinkdb.utils :refer [str->bytes int->bytes bytes->int pp-bytes]])
(:import [java.io Closeable InputStream OutputStream DataInputStream]))
[rethinkdb.utils :refer [str->bytes
int->bytes bytes->int
pp-bytes]]
[gloss.core :as gloss]
[gloss.io :as io])
(:import [java.io Closeable]))

(gloss/defcodec query-frame (gloss/compile-frame
(gloss/finite-frame
(gloss/prefix :int32-le)
(gloss/string :utf-8))
cheshire/generate-string
#(cheshire/parse-string % true)))

(gloss/defcodec id :int64-le)
(gloss/defcodec msg-protocol [id query-frame])

(defn wrap-duplex-stream
[s]
(let [out (s/stream)]
(s/connect
(s/map #(io/encode msg-protocol %) out)
s)

(s/splice
out
(io/decode-stream s msg-protocol))))

(declare send-continue-query send-stop-query)

Expand All @@ -21,94 +47,56 @@
clojure.lang.Seqable
(seq [this] (do
(Thread/sleep 250)
(lazy-seq (concat coll (send-continue-query conn token))))))

(defn send-int [^OutputStream out i n]
(.write out (int->bytes i n) 0 n))

(defn send-str [^OutputStream out s]
(let [n (count s)]
(.write out (str->bytes s) 0 n)))

(defn read-str [^DataInputStream in n]
(let [resp (byte-array n)]
(.readFully in resp 0 n)
(String. resp)))

(defn ^String read-init-response [^InputStream in]
(let [resp (byte-array 4096)]
(.read in resp 0 4096)
(clojure.string/replace (String. resp) #"\W*$" "")))


(defn read-response* [^InputStream in]
(let [recvd-token (byte-array 8)
length (byte-array 4)]
(.read in recvd-token 0 8)
(.read in length 0 4)
(let [recvd-token (bytes->int recvd-token 8)
length (bytes->int length 4)
json (read-str in length)]
[recvd-token json])))

(defn write-query [out [token json]]
(send-int out token 8)
(send-int out (count json) 4)
(send-str out json))

(defn make-connection-loops [in out]
(let [recv-chan (async/chan)
send-chan (async/chan)
pub (async/pub recv-chan first)
;; Receive loop
recv-loop (async/go-loop []
(when (try
(let [resp (read-response* in)]
(log/trace "Received raw response %s" resp)
(async/>! recv-chan resp))
(catch java.net.SocketException e
false))
(recur)))
;; Send loop
send-loop (async/go-loop []
(when-let [query (async/<! send-chan)]
(log/trace "Sending raw query %s")
(write-query out query)
(recur)))]
;; Return as map to merge into connection
{:pub pub
:loops [recv-loop send-loop]
:r-ch recv-chan
:ch send-chan}))

(defn close-connection-loops
[conn]
(let [{:keys [pub ch r-ch] [recv-loop send-loop] :loops} @conn]
(async/unsub-all pub)
;; Close send channel and wait for loop to complete
(async/close! ch)
(async/<!! send-loop)
;; Close recv channel
(async/close! r-ch)))

(defn send-query* [conn token query]
(let [chan (async/chan)
{:keys [pub ch]} @conn]
(lazy-seq (concat coll
(send-continue-query conn token))))))

(defn read-init-response [resp]
(-> resp
String.
(clojure.string/replace #"\W*$" "")))

(defn handshake [version auth proto client]
(let [auth-bytes (if (some? auth)
(str->bytes auth)
(int->bytes 0 4))
msg-bytes (byte-array (concat
(int->bytes version 4)
auth-bytes
(int->bytes proto 4)))]
@(s/put! client msg-bytes)
(read-init-response @(s/take! client))))

(defn make-connection-loops [client]
(let [parsed-in (async/chan (async/sliding-buffer 100))
pub (async/pub parsed-in first)
publish-loop
(async/go-loop []
(when-let [result @(s/take! @client)]
(async/>! parsed-in result)
(recur)))]
{:loops [publish-loop]
:parsed-in parsed-in
:pub pub}))

(defn send-query* [{:keys [client pub] :as conn}
token query]
(let [chan (async/chan)]
(async/sub pub token chan)
(async/>!! ch [token query])
(let [[recvd-token json] (async/<!! chan)]
(assert (= recvd-token token) "Must not receive response with different token")
(async/unsub-all pub token)
(json/read-str json :key-fn keyword))))
(s/put! @client [token query])
(let [[recvd-token json]
(async/<!! chan)]
(assert (= recvd-token token)
"Must not receive response with different token")
(async/unsub pub token chan)
json)))

(defn send-query [conn token query]
(let [{:keys [db]} @conn
query (if (and db (= 2 (count query))) ;; If there's only 1 element in query then this is a continue or stop query.
json (if (and db (= 2 (count query))) ;; If there's only 1 element in query then this is a continue or stop query.
;; TODO: Could provide other global optargs too
(concat query [{:db [(types/tt->int :DB) [db]]}])
query)
json (json/write-str query)
{type :t resp :r :as json-resp} (send-query* conn token json)
{type :t resp :r :as json-resp} (send-query* @conn token json)
resp (parse-response resp)]
(condp get type
#{1} (first resp) ;; Success Atom, Query returned a single RQL datatype
Expand Down
4 changes: 2 additions & 2 deletions src/rethinkdb/query.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@
(defn changes
"Return an infinite stream of objects representing changes to a table or a
document."
[table]
(term :CHANGES [table]))
[table & [optargs]]
(term :CHANGES [table] optargs))

;;; Writing data

Expand Down
14 changes: 8 additions & 6 deletions test/rethinkdb/connection_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -30,33 +30,35 @@
(-> (r/db test-db)
(r/table-list)))


;; Uncomment to run test
(deftest connection-speed-test
(deftest connection-speed-test-single
(println "performance (connection per query)")
(let [conn r/connect]
(time
(doseq [n (range 100)]
(with-open [conn (r/connect)]
(r/run query conn)))))

(deftest connection-speed-test-reuse
(println "performance (reusing connection")
(time
(with-open [conn (r/connect)]
(doseq [n (range 100)]
(r/run query conn))))
(r/run query conn)))))

(deftest connection-speed-test-parallel
(println "performance (parallel, one connection)")
(with-open [conn (r/connect)]
(time
(doall
(pmap (fn [v] (r/run query conn))
(range 100)))))
(range 100))))))

(deftest connection-speed-test-pooled
(println "performance (pooled connection")
#_(with-open [conn connect]
nil)
nil))

(deftest connection-speed-test-multiple
(println "multiple connection test")
(let [conn1 (r/connect)
conn2 (r/connect)
Expand Down
Loading