Skip to content

Overhaul #145

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
language: elixir

elixir:
- 1.3
- 1.6.3

otp_release:
- 18.3
Expand Down
48 changes: 25 additions & 23 deletions lib/rethinkdb/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ defmodule RethinkDB.Connection do
defmacro __using__(_opts) do
quote location: :keep do
def start_link(opts \\ []) do
if Dict.has_key?(opts, :name) && opts[:name] != __MODULE__ do
if Keyword.has_key?(opts, :name) && opts[:name] != __MODULE__ do
# The whole point of this macro is to provide an implicit process
# name, so subverting it is considered an error.
raise ArgumentError.exception(
"Process name #{inspect opts[:name]} conflicts with implicit name #{inspect __MODULE__} provided by `use RethinkDB.Connection`"
)
end
RethinkDB.Connection.start_link(Dict.put_new(opts, :name, __MODULE__))
RethinkDB.Connection.start_link(Keyword.put_new(opts, :name, __MODULE__))
end

def run(query, opts \\ []) do
Expand Down Expand Up @@ -100,12 +100,14 @@ defmodule RethinkDB.Connection do
* `binary_format` - what format to return binary data in (default: :native). Set this to :raw if you want the raw pseudotype.
"""
def run(query, conn, opts \\ []) do
timeout = Dict.get(opts, :timeout, 5000)
conn_opts = Dict.drop(opts, [:timeout])
noreply = Dict.get(opts, :noreply, false)
conn_opts = Connection.call(conn, :conn_opts)
|> Dict.take([:db])
|> Dict.merge(conn_opts)
timeout = Keyword.get(opts, :timeout, 5000)
conn_opts = Keyword.drop(opts, [:timeout])
noreply = Keyword.get(opts, :noreply, false)
conn_opts = Connection.call(conn, :conn_opts) |>
Map.take([:db]) |>
Enum.to_list |>
Keyword.merge(conn_opts)

query = prepare_and_encode(query, conn_opts)
msg = case noreply do
true -> {:query_noreply, query}
Expand Down Expand Up @@ -172,7 +174,7 @@ defmodule RethinkDB.Connection do
@doc """
Start connection as a linked process

Accepts a `Dict` of options. Supported options:
Accepts a `Keyword` of options. Supported options:

* `:host` - hostname to use to connect to database. Defaults to `'localhost'`.
* `:port` - port on which to connect to database. Defaults to `28015`.
Expand All @@ -184,26 +186,26 @@ defmodule RethinkDB.Connection do
* `:ca_certs` - a list of file paths to cacerts.
"""
def start_link(opts \\ []) do
args = Dict.take(opts, [:host, :port, :auth_key, :db, :sync_connect, :ssl, :max_pending])
args = Keyword.take(opts, [:host, :port, :auth_key, :db, :sync_connect, :ssl, :max_pending])
Connection.start_link(__MODULE__, args, opts)
end

def init(opts) do
host = case Dict.get(opts, :host, 'localhost') do
x when is_binary(x) -> String.to_char_list x
host = case Keyword.get(opts, :host, 'localhost') do
x when is_binary(x) -> String.to_charlist x
x -> x
end
sync_connect = Dict.get(opts, :sync_connect, false)
ssl = Dict.get(opts, :ssl)
opts = Dict.put(opts, :host, host)
|> Dict.put_new(:port, 28015)
|> Dict.put_new(:auth_key, "")
|> Dict.put_new(:max_pending, 10000)
|> Dict.drop([:sync_connect])
sync_connect = Keyword.get(opts, :sync_connect, false)
ssl = Keyword.get(opts, :ssl)
opts = Keyword.put(opts, :host, host)
|> Keyword.put_new(:port, 28015)
|> Keyword.put_new(:auth_key, "")
|> Keyword.put_new(:max_pending, 10000)
|> Keyword.drop([:sync_connect])
|> Enum.into(%{})
{transport, transport_opts} = case ssl do
nil -> {%Transport.TCP{}, []}
x -> {%Transport.SSL{}, Enum.map(Dict.fetch!(x, :ca_certs), &({:cacertfile, &1})) ++ [verify: :verify_peer]}
x -> {%Transport.SSL{}, Enum.map(Keyword.fetch!(x, :ca_certs), &({:cacertfile, &1})) ++ [verify: :verify_peer]}
end
state = %{
pending: %{},
Expand All @@ -230,11 +232,11 @@ defmodule RethinkDB.Connection do
:ok ->
:ok = Transport.setopts(socket, [active: :once])
# TODO: investigate timeout vs hibernate
{:ok, Dict.put(state, :socket, socket)}
{:ok, Map.put(state, :socket, socket)}
end
{:error, :econnrefused} ->
backoff = min(Dict.get(state, :timeout, 1000), 64000)
{:backoff, backoff, Dict.put(state, :timeout, backoff*2)}
backoff = min(Map.get(state, :timeout, 1000), 64000)
{:backoff, backoff, Map.put(state, :timeout, backoff*2)}
end
end

Expand Down
4 changes: 2 additions & 2 deletions lib/rethinkdb/connection/request.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule RethinkDB.Connection.Request do
def make_request(query, token, from, state = %{pending: pending, socket: socket}) do
new_pending = case from do
:noreply -> pending
_ -> Dict.put_new(pending, token, from)
_ -> Map.put_new(pending, token, from)
end
bsize = :erlang.size(query)
payload = token <> << bsize :: little-size(32) >> <> query
Expand Down Expand Up @@ -41,7 +41,7 @@ defmodule RethinkDB.Connection.Request do
case leftover <> data do
<< response :: binary-size(length), leftover :: binary >> ->
Connection.reply(pending[token], {response, token})
handle_recv("", %{state | current: {:start, leftover}, pending: Dict.delete(pending, token)})
handle_recv("", %{state | current: {:start, leftover}, pending: Map.delete(pending, token)})
new_data ->
{:noreply, %{state | current: {:length, length, token, new_data}}}
end
Expand Down
4 changes: 2 additions & 2 deletions lib/rethinkdb/prepare.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ defmodule RethinkDB.Prepare do
{Enum.into(map, %{}), state}
end
defp prepare(ref, state = {max, map}) when is_reference(ref) do
case Dict.get(map,ref) do
nil -> {max + 1, {max + 1, Dict.put_new(map, ref, max + 1)}}
case Map.get(map,ref) do
nil -> {max + 1, {max + 1, Map.put_new(map, ref, max + 1)}}
x -> {x, state}
end
end
Expand Down
6 changes: 3 additions & 3 deletions lib/rethinkdb/query.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ defmodule RethinkDB.Query do
@type reql_number :: integer|float|t
@type reql_array :: [term]|t
@type reql_bool :: boolean|t
@type reql_obj :: %{}|t
@type reql_obj :: map|t
@type reql_datum :: term
@type reql_func0 :: (() -> term)|t
@type reql_func1 :: (term -> term)|t
@type reql_func2 :: (term, term -> term)|t
@type reql_opts :: %{}
@type reql_opts :: map
@type reql_binary :: %RethinkDB.Pseudotypes.Binary{}|binary|t
@type reql_geo_point :: %RethinkDB.Pseudotypes.Geometry.Point{}|{reql_number,reql_number}|t
@type reql_geo_line :: %RethinkDB.Pseudotypes.Geometry.Line{}|t
Expand Down Expand Up @@ -1868,7 +1868,7 @@ defmodule RethinkDB.Query do

args = case arity do
0 -> []
_ -> Enum.map(1..arity, fn _ -> make_ref end)
_ -> Enum.map(1..arity, fn _ -> make_ref() end)
end
params = Enum.map(args, &var/1)

Expand Down
6 changes: 4 additions & 2 deletions lib/rethinkdb/response.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ defmodule RethinkDB.Collection do

def count(%{data: data}), do: Enumerable.count(data)
def member?(%{data: data}, el), do: Enumerable.member?(data, el)
def slice(%{data: data}), do: Enumerable.slice(data)
end
end

Expand All @@ -36,8 +37,9 @@ defmodule RethinkDB.Feed do
end)
stream.(acc, fun)
end
def count(_changes), do: raise "count/1 not supported for changes"
def member?(_changes, _values), do: raise "member/2 not supported for changes"
def count(_changes), do: raise "count/1 is not supported for changes"
def member?(_changes, _values), do: raise "member/2 is not supported for changes"
def slice(_changes), do: raise "slice/1 is not supported for changes"
end
end

Expand Down
21 changes: 12 additions & 9 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ defmodule RethinkDB.Mixfile do use Mix.Project
version: "0.4.0",
elixir: "~> 1.0",
description: "RethinkDB driver for Elixir",
package: package,
deps: deps,
test_coverage: [tool: ExCoveralls]]
package: package(),
deps: deps(),
test_coverage: [tool: ExCoveralls],
dialyzer: [
plt_add_apps: [:ssl]
]]
end

def package do
Expand Down Expand Up @@ -40,13 +43,13 @@ defmodule RethinkDB.Mixfile do use Mix.Project
# Type `mix help deps` for more examples and options
defp deps do
[
{:poison, "~> 3.0"},
{:earmark, "~> 0.1", only: :dev},
{:ex_doc, "~> 0.7", only: :dev},
{:poison, "~> 3.1"},
{:ex_doc, "~> 0.18", only: :dev},
{:flaky_connection, github: "hamiltop/flaky_connection", only: :test},
{:connection, "~> 1.0.1"},
{:excoveralls, "~> 0.3.11", only: :test},
{:dialyze, "~> 0.2.0", only: :test}
{:connection, "~> 1.0"},
{:excoveralls, "~> 0.8", only: :test},

{:dialyxir, "~> 0.4", only: :dev},
]
end
end
30 changes: 18 additions & 12 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
%{"certifi": {:hex, :certifi, "0.3.0", "389d4b126a47895fe96d65fcf8681f4d09eca1153dc2243ed6babad0aac1e763", [:rebar3], []},
"connection": {:hex, :connection, "1.0.1", "16bf178158088f29513a34a742d4311cd39f2c52425559d679ecb28a568c5c0b", [:mix], []},
%{
"certifi": {:hex, :certifi, "2.0.0", "a0c0e475107135f76b8c1d5bc7efb33cd3815cb3cf3dea7aefdd174dabead064", [:rebar3], [], "hexpm"},
"connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], [], "hexpm"},
"dialyxir": {:hex, :dialyxir, "0.5.1", "b331b091720fd93e878137add264bac4f644e1ddae07a70bf7062c7862c4b952", [:mix], [], "hexpm"},
"dialyze": {:hex, :dialyze, "0.2.0", "ecabf292e9f4bd0f7d844981f899a85c0300b30ff2dd1cdfef0c81a6496466f1", [:mix], []},
"earmark": {:hex, :earmark, "0.1.19", "ffec54f520a11b711532c23d8a52b75a74c09697062d10613fa2dbdf8a9db36e", [:mix], []},
"ex_doc": {:hex, :ex_doc, "0.10.0", "f49c237250b829df986486b38f043e6f8e19d19b41101987f7214543f75947ec", [:mix], [{:earmark, "~> 0.1.17 or ~> 0.2", [hex: :earmark, optional: true]}]},
"excoveralls": {:hex, :excoveralls, "0.3.11", "cd1abaf07db5bed9cf7891d86470247c8b3c8739d7758679071ce1920bb09dbc", [:mix], [{:exjsx, "~> 3.0", [hex: :exjsx, optional: false]}, {:hackney, ">= 0.12.0", [hex: :hackney, optional: false]}]},
"exjsx": {:hex, :exjsx, "3.2.0", "7136cc739ace295fc74c378f33699e5145bead4fdc1b4799822d0287489136fb", [:mix], [{:jsx, "~> 2.6.2", [hex: :jsx, optional: false]}]},
"earmark": {:hex, :earmark, "1.2.4", "99b637c62a4d65a20a9fb674b8cffb8baa771c04605a80c911c4418c69b75439", [:mix], [], "hexpm"},
"ex_doc": {:hex, :ex_doc, "0.18.3", "f4b0e4a2ec6f333dccf761838a4b253d75e11f714b85ae271c9ae361367897b7", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm"},
"excoveralls": {:hex, :excoveralls, "0.8.1", "0bbf67f22c7dbf7503981d21a5eef5db8bbc3cb86e70d3798e8c802c74fa5e27", [:mix], [{:exjsx, ">= 3.0.0", [hex: :exjsx, repo: "hexpm", optional: false]}, {:hackney, ">= 0.12.0", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm"},
"exjsx": {:hex, :exjsx, "4.0.0", "60548841e0212df401e38e63c0078ec57b33e7ea49b032c796ccad8cde794b5c", [:mix], [{:jsx, "~> 2.8.0", [hex: :jsx, repo: "hexpm", optional: false]}], "hexpm"},
"flaky_connection": {:git, "https://github.com/hamiltop/flaky_connection.git", "e3a09e7198e1b155f35291ffad438966648a8156", []},
"hackney": {:hex, :hackney, "1.4.8", "c8c6977ed55cc5095e3929f6d94a6f732dd2e31ae42a7b9236d5574ec3f5be13", [:rebar3], [{:certifi, "0.3.0", [hex: :certifi, optional: false]}, {:idna, "1.0.3", [hex: :idna, optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, optional: false]}, {:ssl_verify_hostname, "1.0.5", [hex: :ssl_verify_hostname, optional: false]}]},
"idna": {:hex, :idna, "1.0.3", "d456a8761cad91c97e9788c27002eb3b773adaf5c893275fc35ba4e3434bbd9b", [:rebar3], []},
"hackney": {:hex, :hackney, "1.11.0", "4951ee019df102492dabba66a09e305f61919a8a183a7860236c0fde586134b6", [:rebar3], [{:certifi, "2.0.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "5.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"},
"idna": {:hex, :idna, "5.1.0", "d72b4effeb324ad5da3cab1767cb16b17939004e789d8c0ad5b70f3cea20c89a", [:rebar3], [{:unicode_util_compat, "0.3.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm"},
"inch_ex": {:hex, :inch_ex, "0.2.4"},
"jsx": {:hex, :jsx, "2.6.2", "213721e058da0587a4bce3cc8a00ff6684ced229c8f9223245c6ff2c88fbaa5a", [:mix, :rebar], []},
"mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [:rebar3], []},
"jsx": {:hex, :jsx, "2.8.3", "a05252d381885240744d955fbe3cf810504eb2567164824e19303ea59eef62cf", [:mix, :rebar3], [], "hexpm"},
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [], [], "hexpm"},
"mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [:rebar3], [], "hexpm"},
"poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"},
"ranch": {:hex, :ranch, "1.1.0", "f7ed6d97db8c2a27cca85cacbd543558001fc5a355e93a7bff1e9a9065a8545b", [:make], []},
"ssl_verify_hostname": {:hex, :ssl_verify_hostname, "1.0.5", "2e73e068cd6393526f9fa6d399353d7c9477d6886ba005f323b592d389fb47be", [:make], []}}
"ranch": {:hex, :ranch, "1.1.0", "f7ed6d97db8c2a27cca85cacbd543558001fc5a355e93a7bff1e9a9065a8545b", [:make], [], "hexpm"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [], [], "hexpm"},
"ssl_verify_hostname": {:hex, :ssl_verify_hostname, "1.0.5", "2e73e068cd6393526f9fa6d399353d7c9477d6886ba005f323b592d389fb47be", [:make], []},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.3.1", "a1f612a7b512638634a603c8f401892afbf99b8ce93a45041f8aaca99cadb85e", [], [], "hexpm"},
}
24 changes: 16 additions & 8 deletions test/changes_test.exs
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
defmodule ChangesTest do
use ExUnit.Case, async: true
use ExUnit.Case, async: false
alias RethinkDB.Feed
use RethinkDB.Connection
import RethinkDB.Query

@table_name "changes_test_table_1"

setup_all do
start_link
start_link()

db_create("test") |> run
table_create(@table_name) |> run

on_exit fn ->
start_link
table_drop(@table_name) |> run
start_link()

db_drop("test") |> run
end
:ok
end
Expand Down Expand Up @@ -43,18 +48,18 @@ defmodule ChangesTest do
q = table(@table_name) |> insert(data)
{:ok, res} = run(q)
expected = res.data["id"]
{:ok, changes} = Task.await(t)
{:ok, changes} = Task.await(t)
^expected = changes.data |> hd |> Map.get("id")

# test Enumerable
t = Task.async fn ->
changes |> Enum.take(5)
changes |> Enum.take(5)
end
1..6 |> Enum.each(fn _ ->
q = table(@table_name) |> insert(data)
run(q)
end)
data = Task.await(t)
data = Task.await(t)
5 = Enum.count(data)
end

Expand All @@ -67,7 +72,10 @@ defmodule ChangesTest do
data = %{"id" => "0"}
q = table(@table_name) |> insert(data)
{:ok, res} = run(q)
expected = res.data["id"]

# Unused?
_expected = res.data["id"]

[h|[]] = Task.await(t)
assert %{"new_val" => %{"id" => "0"}} = h
end
Expand Down
Loading