Skip to content

Support for legacy mongodb < 2.6 and colldb everywhere. #103

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ The bson application needs to be started before starting mongodb application
> application:start (bson).

The crypto application needs to be started if you plan to use authorization to mongodb-server 3+.

> application:start (crypto).

The mongodb application needs be started before using (to initialize an internal ets table of counters)
Expand Down Expand Up @@ -54,7 +54,7 @@ use `mc_worker_api:connect/1`.
To connect mc_worker in your supervised pool, use `mc_worker:start_link/1` instead and pass all args to it.

`safe`, along with `{safe, GetLastErrorParams}` and `unsafe`, are write-modes. Safe mode makes a *getLastError* request
after every write in the sequence. If the reply says it failed then the rest of the sequence is aborted and returns
after every write in the sequence. If the reply says it failed then the rest of the sequence is aborted and returns
`{failure, {write_failure, Reason}}`, or `{failure, not_master}` when connected to a slave. An example write
failure is attempting to insert a duplicate key that is indexed to be unique. Alternatively, unsafe mode issues every
write without a confirmation, so if a write fails you won't know about it and remaining operations will be executed.
Expand All @@ -65,7 +65,7 @@ a master/primary server). If the connected server is not a master then the first
will be aborted, and `mongo:do` will return `{failure, not_master}`. `slave_ok` means every query is allowed to read
stale data from a slave/secondary (fresh data from a master is fine too).

If you set `{register, Name}` option - mc_worker process will be registered on this Name, or you can pass function
If you set `{register, Name}` option - mc_worker process will be registered on this Name, or you can pass function
`fun(pid())`, which it runs with self pid.
If you set `{login, Login}` and `{password, Password}` options - mc_worker will try to authenticate to the database.

Expand Down Expand Up @@ -220,7 +220,7 @@ And if you want your MongoDB deployment metadata to be auto revered use unknow i

{ unknown, "hostname1:port1", "hostname2:port2"] }

Type in `mongo_api:connect` is topology type (`unknown` | `sharded`).
Type in `mongo_api:connect` is topology type (`unknown` | `sharded`).

mongoc topology **Options**

Expand Down Expand Up @@ -267,13 +267,13 @@ Use transaction poolboy-like interface for mongoc:
mc_worker:hibernate(Conf),
Res
end)

mongoc:transaction(?DBPOOL, fun(Conf) -> mongoc:count(Conf, Collection, Value, [], 1) end, [])

mongoc:transaction(?DBPOOL,
fun(Worker) -> mc_worker_api:update(Worker, Collection, Key, Command, Upsert, Multi) end)

Notice, that all write operations like `update`, `insert`, `delete` do with **mongo**, but all read operations
Notice, that all write operations like `update`, `insert`, `delete` do with **mongo**, but all read operations
do with **mongoc**.
You can set up your read preferences when reading:

Expand All @@ -282,6 +282,10 @@ You can set up your read preferences when reading:
mongoc:find_one(Conf#{read_preference => secondaryPreferred}, Collection, Key, Projector, 0)
end)

### For MongoDB < 2.6

In your config, set:
`{mongodb, [{version, 2.4}]}`

### More Documentation

Expand Down
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
{bson, ".*",
{git, "git://github.com/comtihon/bson-erlang", {tag, "v0.2.2"}}},
{pbkdf2, ".*",
{git, "git@github.com:comtihon/erlang-pbkdf2.git", {tag, "2.0.0"}}},
{git, "git://github.com/comtihon/erlang-pbkdf2.git", {tag, "2.0.0"}}},
{poolboy, ".*",
{git, "git://github.com/devinus/poolboy.git", {tag, "1.5.1"}}}
]}.
Expand Down
147 changes: 126 additions & 21 deletions src/api/mc_worker_api.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
]).
-export([
command/2,
command/3,
sync_command/4,
ensure_index/3,
prepare/2]).
Expand Down Expand Up @@ -87,64 +88,114 @@ disconnect(Connection) ->

%% @doc Insert a document or multiple documents into a collection.
%% Returns the document or documents with an auto-generated _id if missing.
-spec insert(pid(), collection(), list() | map() | bson:document()) -> {{boolean(), map()}, list()}.
%% Note: for mongodb &lt; 2.6 getting true as a result doesn't always mean a successful insert due to mongo api limitation; Number of success is also not available for &lt; 2.6.
-spec insert(pid(), colldb(), list() | map() | bson:document()) -> {{boolean(), map()}, list()}.
insert(Connection, Coll, Doc) when is_tuple(Doc); is_map(Doc) ->
{Res, [UDoc | _]} = insert(Connection, Coll, [Doc]),
{Res, UDoc};
insert(Connection, Coll = {Db, Collection}, Docs) ->
Converted = prepare(Docs, fun assign_id/1),
case application:get_env(mongodb, version, latest) < 2.6 of
true -> legacy_insert(Connection, Coll, Converted);
_ -> {command(Connection, {<<"insert">>, Collection, <<"documents">>, Converted}, Db), Converted}
end;
insert(Connection, Coll, Docs) ->
Converted = prepare(Docs, fun assign_id/1),
{command(Connection, {<<"insert">>, Coll, <<"documents">>, Converted}), Converted}.
case application:get_env(mongodb, version, latest) < 2.6 of
true -> legacy_insert(Connection, Coll, Converted);
_ -> {command(Connection, {<<"insert">>, Coll, <<"documents">>, Converted}), Converted}
end.

-spec insert(pid(), collection(), list() | map() | bson:document(), bson:document()) -> {{boolean(), map()}, list()}.
%% @doc Insert a document or multiple documents into a collection with write concern option.
%% Note: this is not supported for mongodb &lt; 2.6 due to mongo api limitation.
-spec insert(pid(), colldb(), list() | map() | bson:document(), bson:document()) -> {{boolean(), map()}, list()}.
insert(Connection, Coll, Doc, WC) when is_tuple(Doc); is_map(Doc) ->
{Res, [UDoc | _]} = insert(Connection, Coll, [Doc], WC),
{Res, UDoc};
insert(Connection, {Db, Collection}, Docs, WC) ->
Converted = prepare(Docs, fun assign_id/1),
{command(Connection, {<<"insert">>, Collection, <<"documents">>, Converted, <<"writeConcern">>, WC}, Db), Converted};
insert(Connection, Coll, Docs, WC) ->
Converted = prepare(Docs, fun assign_id/1),
{command(Connection, {<<"insert">>, Coll, <<"documents">>, Converted, <<"writeConcern">>, WC}), Converted}.

%% @doc Replace the document matching criteria entirely with the new Document.
-spec update(pid(), collection(), selector(), map()) -> {boolean(), map()}.
%% @doc Replace the document matching criteria entirely with the new Document, with option upsert false and multi false.
%% Note: for mongodb &lt; 2.6 getting true as a result doesn't always mean a successful update due to mongo api limitation; Number of success is also not available for &lt; 2.6.
-spec update(pid(), colldb(), selector(), map()) -> {boolean(), map()}.
update(Connection, Coll, Selector, Doc) ->
update(Connection, Coll, Selector, Doc, false, false).

%% @doc Replace the document matching criteria entirely with the new Document.
-spec update(pid(), collection(), selector(), map(), boolean(), boolean()) -> {boolean(), map()}.
%% @doc Replace the document matching criteria entirely with the new Document, with upsert and multi options.
%% Note: for mongodb &lt; 2.6 getting true as a result doesn't always mean a successful update due to mongo api limitation; Number of success is also not available for &lt; 2.6.
-spec update(pid(), colldb(), selector(), map(), boolean(), boolean()) -> {boolean(), map()}.
update(Connection, Coll = {Db, Collection}, Selector, Doc, Upsert, MultiUpdate) ->
Converted = prepare(Doc, fun(D) -> D end),
case application:get_env(mongodb, version, latest) < 2.6 of
true -> legacy_update(Connection, Coll, Selector, Converted, Upsert, MultiUpdate);
_ -> command(Connection, {<<"update">>, Collection, <<"updates">>,
[#{<<"q">> => Selector, <<"u">> => Converted, <<"upsert">> => Upsert, <<"multi">> => MultiUpdate}]}, Db)
end;
update(Connection, Coll, Selector, Doc, Upsert, MultiUpdate) ->
Converted = prepare(Doc, fun(D) -> D end),
command(Connection, {<<"update">>, Coll, <<"updates">>,
[#{<<"q">> => Selector, <<"u">> => Converted, <<"upsert">> => Upsert, <<"multi">> => MultiUpdate}]}).
case application:get_env(mongodb, version, latest) < 2.6 of
true -> legacy_update(Connection, Coll, Selector, Converted, Upsert, MultiUpdate);
_ -> command(Connection, {<<"update">>, Coll, <<"updates">>,
[#{<<"q">> => Selector, <<"u">> => Converted, <<"upsert">> => Upsert, <<"multi">> => MultiUpdate}]})
end.

%% @doc Replace the document matching criteria entirely with the new Document.
-spec update(pid(), collection(), selector(), map(), boolean(), boolean(), bson:document()) -> {boolean(), map()}.
%% @doc Replace the document matching criteria entirely with the new Document, with upsert and multi options and write concern.
%% Note: this is not supported for mongodb &lt; 2.6 due to mongo api limitation.
-spec update(pid(), colldb(), selector(), map(), boolean(), boolean(), bson:document()) -> {boolean(), map()}.
update(Connection, {Db, Collection}, Selector, Doc, Upsert, MultiUpdate, WC) ->
Converted = prepare(Doc, fun(D) -> D end),
command(Connection, {<<"update">>, Collection, <<"updates">>,
[#{<<"q">> => Selector, <<"u">> => Converted, <<"upsert">> => Upsert, <<"multi">> => MultiUpdate}],
<<"writeConcern">>, WC}, Db);
update(Connection, Coll, Selector, Doc, Upsert, MultiUpdate, WC) ->
Converted = prepare(Doc, fun(D) -> D end),
command(Connection, {<<"update">>, Coll, <<"updates">>,
[#{<<"q">> => Selector, <<"u">> => Converted, <<"upsert">> => Upsert, <<"multi">> => MultiUpdate}],
<<"writeConcern">>, WC}).

%% @doc Delete selected documents
-spec delete(pid(), collection(), selector()) -> {boolean(), map()}.
%% Note: for mongodb &lt; 2.6 getting true as a result doesn't always mean a successful delete due to mongo api limitation; Number of success is also not available for &lt; 2.6.
-spec delete(pid(), colldb(), selector()) -> {boolean(), map()}.
delete(Connection, Coll, Selector) ->
delete_limit(Connection, Coll, Selector, 0).
case application:get_env(mongodb, version, latest) < 2.6 of
true -> legacy_delete(Connection, Coll, Selector);
_ -> delete_limit(Connection, Coll, Selector, 0)
end.

%% @doc Delete first selected document.
-spec delete_one(pid(), collection(), selector()) -> {boolean(), map()}.
%% Note: for mongodb &lt; 2.6 getting true as a result doesn't always mean a successful delete due to mongo api limitation; Number of success is also not available for &lt; 2.6.
-spec delete_one(pid(), colldb(), selector()) -> {boolean(), map()}.
delete_one(Connection, Coll, Selector) ->
delete_limit(Connection, Coll, Selector, 1).
case application:get_env(mongodb, version, latest) < 2.6 of
true -> legacy_delete_one(Connection, Coll, Selector);
_ -> delete_limit(Connection, Coll, Selector, 1)
end.

%% @doc Delete selected documents
-spec delete_limit(pid(), collection(), selector(), integer()) -> {boolean(), map()}.
%% Note: this is not supported for mongodb &lt; 2.6 due to mongo api limitation.
-spec delete_limit(pid(), colldb(), selector(), integer()) -> {boolean(), map()}.
delete_limit(Connection, {Db, Collection}, Selector, N) ->
command(Connection, {<<"delete">>, Collection, <<"deletes">>,
[#{<<"q">> => Selector, <<"limit">> => N}]}, Db);
delete_limit(Connection, Coll, Selector, N) ->
command(Connection, {<<"delete">>, Coll, <<"deletes">>,
[#{<<"q">> => Selector, <<"limit">> => N}]}).
[#{<<"q">> => Selector, <<"limit">> => N}]}).

%% @doc Delete selected documents
-spec delete_limit(pid(), collection(), selector(), integer(), bson:document()) -> {boolean(), map()}.
%% Note: this is not supported for mongodb &lt; 2.6 due to mongo api limitation.
-spec delete_limit(pid(), colldb(), selector(), integer(), bson:document()) -> {boolean(), map()}.
delete_limit(Connection, {Db, Collection}, Selector, N, WC) ->
command(Connection, {<<"delete">>, Collection, <<"deletes">>,
[#{<<"q">> => Selector, <<"limit">> => N}], <<"writeConcern">>, WC}, Db);
delete_limit(Connection, Coll, Selector, N, WC) ->
command(Connection, {<<"delete">>, Coll, <<"deletes">>,
[#{<<"q">> => Selector, <<"limit">> => N}], <<"writeConcern">>, WC}).


%% @doc Return first selected document, if any
-spec find_one(pid(), colldb(), selector()) -> {} | {bson:document()}.
find_one(Connection, Coll, Selector) ->
Expand Down Expand Up @@ -218,6 +269,15 @@ command(Connection, Command) ->
}),
mc_connection_man:process_reply(Doc, Command).

%% @doc Execute given MongoDB command on a database and return its result.
-spec command(pid(), bson:document(), database()) -> {boolean(), map()}. % Action
command(Connection, Command, Db) ->
Doc = mc_action_man:read_one(Connection, #'query'{
collection = {Db, <<"$cmd">>},
selector = Command
}),
mc_connection_man:process_reply(Doc, Command).

%% @doc Execute MongoDB command in this thread
-spec sync_command(port(), binary(), bson:document(), module()) -> {boolean(), map()}.
sync_command(Socket, Database, Command, SetOpts) ->
Expand All @@ -237,9 +297,9 @@ prepare(Docs, AssignFun) when is_tuple(Docs) ->
List -> List
end
end;
prepare(Doc, AssignFun) when is_map(Doc), map_size(Doc) == 1 ->
prepare(Doc, AssignFun) when is_map(Doc) ->
case maps:keys(Doc) of
[<<"$", _/binary>>] -> Doc; %command
[<<"$", _/binary>> | _] -> Doc; %command
_ -> %document
case prepare_doc(Doc, AssignFun) of
Res when is_tuple(Res) -> [Res];
Expand Down Expand Up @@ -274,4 +334,49 @@ assign_id(Doc) ->
case bson:lookup(<<"_id">>, Doc) of
{} -> bson:update(<<"_id">>, mongo_id_server:object_id(), Doc);
_Value -> Doc
end.
end.

%% @private
%% Legacy insert for mongodb &lt; 2.6
%% Note there is no response to an OP_INSERT message so getting true as a result doesn't always mean a successful insert.
-spec legacy_insert(pid(), colldb(), bson:document()) -> {{boolean(), map()}, list()}.
legacy_insert(Connection, Coll, Converted) ->
case mc_connection_man:request_worker(Connection, #insert{collection = Coll, documents = Converted}) of
ok ->
{{true, #{}}, Converted};
_ ->
{{false, #{}}, Converted}
end.

%% @private
%% Legacy update for mongodb &lt; 2.6
%% Note there is no response to an OP_UPDATE message so getting true as a result doesn't always mean a successful update.
-spec legacy_update(pid(), colldb(), selector(), map(), boolean(), boolean()) -> {boolean(), map()}.
legacy_update(Connection, Coll, Selector, Converted, Upsert, MultiUpdate) ->
case mc_connection_man:request_worker(Connection, #update{collection = Coll, selector = Selector,
updater = Converted, upsert = Upsert, multiupdate = MultiUpdate}) of
ok ->
{true, #{}};
_ ->
{false, #{}}
end.

%% @private
%% Legacy delete for mongodb &lt; 2.6
%% Note there is no response to an OP_DELETE message so getting true as a result doesn't always mean a successful delete.
-spec legacy_delete(pid(), colldb(), selector()) -> {boolean(), map()}.
legacy_delete(Connection, Coll, Selector) ->
case mc_connection_man:request_worker(Connection, #delete{collection = Coll, singleremove = false, selector = Selector}) of
ok ->
{true, #{}};
_ ->
{false, #{}}
end.
-spec legacy_delete_one(pid(), colldb(), selector()) -> {boolean(), map()}.
legacy_delete_one(Connection, Coll, Selector) ->
case mc_connection_man:request_worker(Connection, #delete{collection = Coll, singleremove = true, selector = Selector}) of
ok ->
{true, #{}};
_ ->
{false, #{}}
end.
18 changes: 10 additions & 8 deletions src/api/mongo_api.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,21 @@
-module(mongo_api).
-author("tihon").

-include("mongo_protocol.hrl").

%% API
-export([insert/4, find/5, update/5, delete/4, count/5, find_one/5, connect/4]).

-spec connect(atom(), list(), proplists:proplist(), proplists:proplist()) -> {ok, pid()}.
connect(Type, Hosts, TopologyOptions, WorkerOptions) ->
mongoc:connect({Type, Hosts}, TopologyOptions, WorkerOptions).

-spec insert(atom() | pid(), binary(), list() | map() | bson:document(), integer() | infinity) ->
-spec insert(atom() | pid(), colldb(), list() | map() | bson:document(), integer() | infinity) ->
{{boolean(), map()}, list()}.
insert(Topology, Collection, Document, TTL) ->
mongoc:transaction(Topology, fun(Worker) -> mc_worker_api:insert(Worker, Collection, Document) end, TTL).

-spec update(atom() | pid(), binary(), mc_worker_api:selector(), map(), map()) ->
-spec update(atom() | pid(), colldb(), mc_worker_api:selector(), map(), map()) ->
{boolean(), map()}.
update(Topology, Collection, Selector, Doc, Opts) ->
TTL = maps:get(ttl, Opts, infinity),
Expand All @@ -33,23 +35,23 @@ update(Topology, Collection, Selector, Doc, Opts) ->
mc_worker_api:update(Worker, Collection, Selector, Doc, Upsert, MultiUpdate)
end, TTL).

-spec delete(atom() | pid(), binary(), mc_worker_api:selector(), integer() | infinity) ->
-spec delete(atom() | pid(), colldb(), mc_worker_api:selector(), integer() | infinity) ->
{boolean(), map()}.
delete(Topology, Collection, Selector, TTL) ->
mongoc:transaction(Topology, fun(Worker) -> mc_worker_api:delete(Worker, Collection, Selector) end, TTL).

-spec find(atom() | pid(), binary(), mc_worker_api:selector(), mc_worker_api:projector(), integer() | infinity) ->
-spec find(atom() | pid(), colldb(), mc_worker_api:selector(), mc_worker_api:projector(), integer() | infinity) ->
mc_worker_api:cursor().
find(Topology, Collection, Selector, Projector, TTL) ->
mongoc:transaction_query(Topology,
fun(Conf) -> mongoc:find(Conf, Collection, Selector, Projector, 0, 0) end, [], TTL).

-spec find_one(atom() | pid(), binary(), mc_worker_api:selector(), mc_worker_api:projector(), integer() | infinity) ->
mc_worker_api:cursor().
-spec find_one(atom() | pid(), colldb(), mc_worker_api:selector(), mc_worker_api:projector(), integer() | infinity) ->
map().
find_one(Topology, Collection, Selector, Projector, TTL) ->
mongoc:transaction_query(Topology,
fun(Conf) -> mongoc:find_one(Conf, Collection, Selector, Projector, 0) end, [], TTL).

-spec count(atom() | pid(), binary(), mc_worker_api:selector(), map() | list(), integer() | infinity) -> integer().
-spec count(atom() | pid(), colldb(), mc_worker_api:selector(), integer(), integer() | infinity) -> integer().
count(Topology, Collection, Selector, Limit, TTL) ->
mongoc:transaction(Topology, fun(Conf) -> mongoc:count(Conf, Collection, Selector, [], Limit) end, [], TTL).
mongoc:transaction(Topology, fun(Conf) -> mongoc:count(Conf, Collection, Selector, [], Limit) end, [], TTL).
4 changes: 2 additions & 2 deletions src/api/mongoc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ count(Pool, Coll, Selector, Options, Limit) ->
{<<"count">>, mc_utils:value_to_binary(Coll), <<"query">>, Selector, <<"limit">>, Limit}, Options, undefined),
trunc(N). % Server returns count as float

-spec command(map(), bson:document(), readprefs(), undefined | colldb()) ->
-spec command(map(), bson:document(), readprefs(), undefined | database()) ->
{boolean(), bson:document()} | {error, reason()}. % Action
command(Pid, Command, Options, Db) when is_pid(Pid) ->
case mc_topology:get_pool(Pid, Options) of
Expand Down Expand Up @@ -261,4 +261,4 @@ mongos_query_transform(_, Q, _) ->
append_read_preference(Selector = #{<<"$query">> := _}, RP) ->
Selector#{<<"$readPreference">> => RP};
append_read_preference(Selector, RP) ->
#{<<"$query">> => Selector, <<"$readPreference">> => RP}.
#{<<"$query">> => Selector, <<"$readPreference">> => RP}.