diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..404387f0 --- /dev/null +++ b/Makefile @@ -0,0 +1,37 @@ +.PHONY: test + +ERL=erl +BEAMDIR=./deps/*/ebin ./ebin +REBAR=./rebar +REBAR_GEN=../../rebar +DIALYZER=dialyzer + +#update-deps +all: deps compile + +deps: + @$(REBAR) get-deps + +update-deps: + @$(REBAR) update-deps + +compile: + @$(REBAR) compile + +xref: + @$(REBAR) xref skip_deps=true + +clean: + @$(REBAR) clean + +test: + $(REBAR) compile ct skip_deps=true + +edoc: + @$(REBAR) doc + +dialyzer: compile + @$(DIALYZER) ebin deps/ossp_uuid/ebin + +setup-dialyzer: + @$(DIALYZER) --build_plt --apps kernel stdlib mnesia eunit erts crypto diff --git a/ebin/mongodb.app b/ebin/mongodb.app index 6fb20af4..2d4fca65 100644 --- a/ebin/mongodb.app +++ b/ebin/mongodb.app @@ -1,7 +1,7 @@ {application, mongodb, [{description, "Client interface to MongoDB, also known as the driver. See www.mongodb.org"}, {vsn, "0.2.1"}, - {modules, [mongodb_app, mongo, mongo_protocol, mongo_connect, mongo_query, mongo_cursor, mvar, mongodb_tests, mongo_replset, resource_pool]}, + {modules, [mongoc,mongodb_app, mongo, mongo_protocol, mongo_connect, mongo_query, mongo_cursor, mvar, mongodb_tests, mongo_replset, resource_pool]}, {registered, []}, {applications, [kernel, stdlib]}, {mod, {mongodb_app, []}} diff --git a/rebar.config b/rebar.config index 342e4d85..71f453a3 100644 --- a/rebar.config +++ b/rebar.config @@ -1,7 +1,7 @@ {deps, [ - {bson, ".*", {git, "git://github.com/TonyGen/bson-erlang", "HEAD"}} + {bson, ".*", {git, "https://github.com/TonyGen/bson-erlang.git", "HEAD"}} ]}. {lib_dirs, ["deps"]}. -{erl_opts, [debug_info, fail_on_warning]}. \ No newline at end of file +{erl_opts, [debug_info, fail_on_warning]}. diff --git a/src/mongo.erl b/src/mongo.erl index c9d74f93..71413840 100644 --- a/src/mongo.erl +++ b/src/mongo.erl @@ -1,42 +1,42 @@ %@doc Top-level client interface to MongoDB --module (mongo). +-module(mongo). --export_type ([maybe/1]). +-export_type([maybe/1]). --export_type ([host/0, connection/0]). --export ([connect/1, connect/2, disconnect/1, connect_factory/1, connect_factory/2]). --export_type ([replset/0, rs_connection/0]). --export ([rs_connect/1, rs_connect/2, rs_disconnect/1, rs_connect_factory/1, rs_connect_factory/2]). +-export_type([host/0, connection/0]). +-export([connect/1, connect/2, disconnect/1, connect_factory/1, connect_factory/2]). +-export_type([replset/0, rs_connection/0]). +-export([rs_connect/1, rs_connect/2, rs_disconnect/1, rs_connect_factory/1, rs_connect_factory/2]). --export_type ([action/1, db/0, write_mode/0, read_mode/0, failure/0]). --export ([do/5, this_db/0]). +-export_type([action/1, db/0, write_mode/0, read_mode/0, failure/0]). +-export([do/5, this_db/0]). --export_type ([collection/0, selector/0, projector/0, skip/0, batchsize/0, modifier/0]). --export ([insert/2, insert_all/2]). --export ([save/2, replace/3, repsert/3, modify/3]). --export ([delete/2, delete_one/2]). --export ([find_one/2, find_one/3, find_one/4]). --export ([find/2, find/3, find/4, find/5]). --export ([count/2, count/3]). +-export_type([collection/0, selector/0, projector/0, skip/0, batchsize/0, modifier/0]). +-export([insert/2, insert_all/2]). +-export([save/2, replace/3, repsert/3, modify/3]). +-export([delete/2, delete_one/2]). +-export([find_one/5, find_one/3, find_one/4]). +-export([find/2, find/3, find/4, find/5]). +-export([count/2, count/3]). --export_type ([cursor/0]). --export ([next/1, rest/1, close_cursor/1]). +-export_type([cursor/0]). +-export([next/1, rest/1, close_cursor/1]). --export_type ([command/0]). --export ([command/1]). +-export_type([command/0]). +-export([command/1]). --export_type ([username/0, password/0]). --export ([auth/2]). +-export_type([username/0, password/0]). +-export([auth/2]). --export_type ([permission/0]). --export ([add_user/3]). +-export_type([permission/0]). +-export([add_user/3]). --export_type ([index_spec/0, key_order/0]). --export ([create_index/2]). +-export_type([index_spec/0, key_order/0]). +-export([create_index/2]). --export ([copy_database/3, copy_database/5]). +-export([copy_database/3, copy_database/5]). --include ("mongo_protocol.hrl"). +-include("mongo_protocol.hrl"). -type reason() :: any(). @@ -47,50 +47,52 @@ % Eg. "localhost" or {"localhost", 27017} -type connection() :: mongo_connect:connection(). --spec connect (host()) -> {ok, connection()} | {error, reason()}. % IO +-spec connect(host()) -> {ok, connection()} | {error, reason()}. % IO %@doc Connect to given MongoDB server -connect (Host) -> mongo_connect:connect (Host). +connect(Host) -> mongo_connect:connect(Host). --spec connect (host(), timeout()) -> {ok, connection()} | {error, reason()}. % IO +-spec connect(host(), timeout()) -> {ok, connection()} | {error, reason()}. % IO %@doc Connect to given MongoDB server. Timeout used for initial connection and every query and safe write. -connect (Host, TimeoutMS) -> mongo_connect:connect (Host, TimeoutMS). +connect(Host, TimeoutMS) -> mongo_connect:connect(Host, TimeoutMS). --spec disconnect (connection()) -> ok. % IO +-spec disconnect(connection()) -> ok. % IO %@doc Close connection to server -disconnect (Conn) -> mongo_connect:close (Conn). +disconnect(Conn) -> mongo_connect:close(Conn). --spec connect_factory (host()) -> resource_pool:factory(connection()). +-spec connect_factory(host()) -> resource_pool:factory(connection()). %@doc Factory for use with a connection pool. See resource_pool module. -connect_factory (Host) -> connect_factory (Host, infinity). +connect_factory(Host) -> connect_factory(Host, infinity). --spec connect_factory (host(), timeout()) -> resource_pool:factory(connection()). +-spec connect_factory(host(), timeout()) -> resource_pool:factory(connection()). %@doc Factory for use with a connection pool. See resource_pool module. -connect_factory (Host, TimeoutMS) -> {Host, fun (H) -> connect (H, TimeoutMS) end, fun disconnect/1, fun mongo_connect:is_closed/1}. +connect_factory(Host, TimeoutMS) -> + {Host, fun(H) -> connect(H, TimeoutMS) end, fun disconnect/1, fun mongo_connect:is_closed/1}. % Replica Set % -type replset() :: mongo_replset:replset(). -type rs_connection() :: mongo_replset:rs_connection(). --spec rs_connect (replset()) -> rs_connection(). % IO +-spec rs_connect(replset()) -> rs_connection(). % IO %@doc Create new cache of connections to replica set members starting with seed members. No connection attempted until rs_primary or rs_secondary_ok called. -rs_connect (Replset) -> mongo_replset:connect (Replset). +rs_connect(Replset) -> mongo_replset:connect(Replset). --spec rs_connect (replset(), timeout()) -> rs_connection(). % IO +-spec rs_connect(replset(), timeout()) -> rs_connection(). % IO %@doc Create new cache of connections to replica set members starting with seed members. No connection attempted until rs_primary or rs_secondary_ok called. Timeout used for initial connection and every query and safe write. -rs_connect (Replset, TimeoutMS) -> mongo_replset:connect (Replset, TimeoutMS). +rs_connect(Replset, TimeoutMS) -> mongo_replset:connect(Replset, TimeoutMS). --spec rs_disconnect (rs_connection()) -> ok. % IO +-spec rs_disconnect(rs_connection()) -> ok. % IO %@doc Close cache of replset connections -rs_disconnect (ReplsetConn) -> mongo_replset:close (ReplsetConn). +rs_disconnect(ReplsetConn) -> mongo_replset:close(ReplsetConn). --spec rs_connect_factory (replset()) -> resource_pool:factory(rs_connection()). +-spec rs_connect_factory(replset()) -> resource_pool:factory(rs_connection()). %@doc Factory for use with a rs_connection pool. See resource_pool module. -rs_connect_factory (ReplSet) -> rs_connect_factory (ReplSet, infinity). +rs_connect_factory(ReplSet) -> rs_connect_factory(ReplSet, infinity). --spec rs_connect_factory (replset(), timeout()) -> resource_pool:factory(rs_connection()). +-spec rs_connect_factory(replset(), timeout()) -> resource_pool:factory(rs_connection()). %@doc Factory for use with a rs_connection pool. See resource_pool module. -rs_connect_factory (Replset, TimeoutMS) -> {Replset, fun (RS) -> RC = rs_connect (RS, TimeoutMS), {ok, RC} end, fun rs_disconnect/1, fun mongo_replset:is_closed/1}. +rs_connect_factory(Replset, TimeoutMS) -> {Replset, fun(RS) -> RC = rs_connect(RS, TimeoutMS), + {ok, RC} end, fun rs_disconnect/1, fun mongo_replset:is_closed/1}. % Action % @@ -98,45 +100,48 @@ rs_connect_factory (Replset, TimeoutMS) -> {Replset, fun (RS) -> RC = rs_connect % An Action does IO, reads process dict {mongo_action_context, #context{}}, and throws failure() -type failure() :: - mongo_connect:failure() | % thrown by read and safe write - mongo_query:not_master() | % thrown by read and safe write - mongo_query:unauthorized() | % thrown by read and safe write - write_failure() | % thrown by safe write - mongo_cursor:expired(). % thrown by cursor next/rest - --record (context, { - write_mode :: write_mode(), - read_mode :: read_mode(), - dbconn :: mongo_connect:dbconnection() }). - --spec do (write_mode(), read_mode(), connection() | rs_connection(), db(), action(A)) -> {ok, A} | {failure, failure()}. % IO +mongo_connect:failure() | % thrown by read and safe write +mongo_query:not_master() | % thrown by read and safe write +mongo_query:unauthorized() | % thrown by read and safe write +write_failure() | % thrown by safe write +mongo_cursor:expired(). % thrown by cursor next/rest + +-record(context, { + write_mode :: write_mode(), + read_mode :: read_mode(), + dbconn :: mongo_connect:dbconnection()}). + +-spec do(write_mode(), read_mode(), connection() | rs_connection(), db(), action(A)) -> {ok, A} | {failure, failure()}. % IO %@doc Execute mongo action under given write_mode, read_mode, connection, and db. Return action result or failure. -do (WriteMode, ReadMode, Connection, Database, Action) -> case connection_mode (ReadMode, Connection) of - {error, Reason} -> {failure, {connection_failure, Reason}}; - {ok, Conn} -> - PrevContext = get (mongo_action_context), - put (mongo_action_context, #context {write_mode = WriteMode, read_mode = ReadMode, dbconn = {Database, Conn}}), - try Action() of - Result -> {ok, Result} - catch - throw: E = {connection_failure, _, _} -> {failure, E}; - throw: E = not_master -> {failure, E}; - throw: E = unauthorized -> {failure, E}; - throw: E = {write_failure, _, _} -> {failure, E}; - throw: E = {cursor_expired, _} -> {failure, E} - after - case PrevContext of undefined -> erase (mongo_action_context); _ -> put (mongo_action_context, PrevContext) end - end end. - --spec connection_mode (read_mode(), connection() | rs_connection()) -> {ok, connection()} | {error, reason()}. % IO +do(WriteMode, ReadMode, Connection, Database, Action) -> case connection_mode(ReadMode, Connection) of + {error, Reason} -> {failure, {connection_failure, Reason}}; + {ok, Conn} -> + PrevContext = get(mongo_action_context), + put(mongo_action_context, #context{write_mode = WriteMode, read_mode = ReadMode, dbconn = {Database, Conn}}), + try Action() of + Result -> {ok, Result} + catch + throw: E = {connection_failure, _, _} -> + {failure, E}; + throw: E = not_master -> {failure, E}; + throw: E = unauthorized -> {failure, E}; + throw: E = {write_failure, _, _} -> {failure, E}; + throw: E = {cursor_expired, _} -> {failure, E} + after + case PrevContext of undefined -> + erase(mongo_action_context); _ -> + put(mongo_action_context, PrevContext) end + end end. + +-spec connection_mode(read_mode(), connection() | rs_connection()) -> {ok, connection()} | {error, reason()}. % IO %@doc For rs_connection return appropriate primary or secondary connection -connection_mode (_, Conn = {connection, _, _, _}) -> {ok, Conn}; -connection_mode (master, RsConn = {rs_connection, _, _, _}) -> mongo_replset:primary (RsConn); -connection_mode (slave_ok, RsConn = {rs_connection, _, _, _}) -> mongo_replset:secondary_ok (RsConn). +connection_mode(_, Conn = {connection, _, _, _}) -> {ok, Conn}; +connection_mode(master, RsConn = {rs_connection, _, _, _}) -> mongo_replset:primary(RsConn); +connection_mode(slave_ok, RsConn = {rs_connection, _, _, _}) -> mongo_replset:secondary_ok(RsConn). --spec this_db () -> db(). % Action +-spec this_db() -> db(). % Action %@doc Current db in context that we are querying -this_db () -> {Db, _} = (get (mongo_action_context)) #context.dbconn, Db. +this_db() -> {Db, _} = (get(mongo_action_context))#context.dbconn, Db. % Write % @@ -149,69 +154,71 @@ this_db () -> {Db, _} = (get (mongo_action_context)) #context.dbconn, Db. -type write_failure() :: {write_failure, error_code(), bson:utf8()}. -type error_code() :: integer(). --spec write (mongo_query:write()) -> ok. % Action +-spec write(mongo_query:write()) -> ok. % Action %@doc Do unsafe unacknowledged fast write or safe acknowledged slower write depending on our context. When safe, throw write_failure if acknowledgment (getlasterror) reports error. -write (Write) -> - Context = get (mongo_action_context), - case Context #context.write_mode of - unsafe -> mongo_query:write (Context #context.dbconn, Write); - SafeMode -> - Params = case SafeMode of safe -> {}; {safe, Param} -> Param end, - Ack = mongo_query:write (Context #context.dbconn, Write, Params), - case bson:lookup (err, Ack) of - {} -> ok; {null} -> ok; - {String} -> case bson:at (code, Ack) of - 10058 -> throw (not_master); - Code -> throw ({write_failure, Code, String}) end end end. - --spec insert (collection(), bson:document()) -> bson:value(). % Action + + +write(Write) -> + Context = get(mongo_action_context), + case Context#context.write_mode of + unsafe -> mongo_query:write(Context#context.dbconn, Write); + SafeMode -> + Params = case SafeMode of safe -> {}; {safe, Param} -> Param end, + Ack = mongo_query:write(Context#context.dbconn, Write, Params), + case bson:lookup(err, Ack) of + {} -> ok; {null} -> ok; + {String} -> case bson:at(code, Ack) of + 10058 -> throw(not_master); + Code -> throw({write_failure, Code, String}) end end end. + +-spec insert(collection(), bson:document()) -> bson:value(). % Action %@doc Insert document into collection. Return its '_id' value, which is auto-generated if missing. -insert (Coll, Doc) -> [Value] = insert_all (Coll, [Doc]), Value. +insert(Coll, Doc) -> [Value] = insert_all(Coll, [Doc]), Value. --spec insert_all (collection(), [bson:document()]) -> [bson:value()]. % Action +-spec insert_all(collection(), [bson:document()]) -> [bson:value()]. % Action %@doc Insert documents into collection. Return their '_id' values, which are auto-generated if missing. -insert_all (Coll, Docs) -> - Docs1 = lists:map (fun assign_id/1, Docs), - write (#insert {collection = Coll, documents = Docs1}), - lists:map (fun (Doc) -> bson:at ('_id', Doc) end, Docs1). +insert_all(Coll, Docs) -> + Docs1 = lists:map(fun assign_id/1, Docs), + write(#insert{collection = Coll, documents = Docs1}), + lists:map(fun(Doc) -> bson:at('_id', Doc) end, Docs1). --spec assign_id (bson:document()) -> bson:document(). % IO +-spec assign_id(bson:document()) -> bson:document(). % IO %@doc If doc has no '_id' field then generate a fresh object id for it -assign_id (Doc) -> case bson:lookup ('_id', Doc) of - {_Value} -> Doc; - {} -> bson:append ({'_id', mongodb_app:gen_objectid()}, Doc) end. +assign_id(Doc) -> case bson:lookup('_id', Doc) of + {_Value} -> Doc; + {} -> bson:append({'_id', mongodb_app:gen_objectid()}, Doc) end. --spec save (collection(), bson:document()) -> ok. % Action +-spec save(collection(), bson:document()) -> ok. % Action %@doc If document has no '_id' field then insert it, otherwise update it and insert only if missing. -save (Coll, Doc) -> case bson:lookup ('_id', Doc) of - {} -> insert (Coll, Doc), ok; - {Id} -> repsert (Coll, {'_id', Id}, Doc) end. +save(Coll, Doc) -> case bson:lookup('_id', Doc) of + {} -> insert(Coll, Doc), ok; + {Id} -> repsert(Coll, {'_id', Id}, Doc) end. --spec replace (collection(), selector(), bson:document()) -> ok. % Action +-spec replace(collection(), selector(), bson:document()) -> ok. % Action %@doc Replace first document selected with given document. -replace (Coll, Selector, Doc) -> update (false, false, Coll, Selector, Doc). +replace(Coll, Selector, Doc) -> update(false, false, Coll, Selector, Doc). --spec repsert (collection(), selector(), bson:document()) -> ok. % Action +-spec repsert(collection(), selector(), bson:document()) -> ok. % Action %@doc Replace first document selected with given document, or insert it if selection is empty. -repsert (Coll, Selector, Doc) -> update (true, false, Coll, Selector, Doc). +repsert(Coll, Selector, Doc) -> update(true, false, Coll, Selector, Doc). --spec modify (collection(), selector(), modifier()) -> ok. % Action +-spec modify(collection(), selector(), modifier()) -> ok. % Action %@doc Update all documents selected using modifier -modify (Coll, Selector, Mod) -> update (false, true, Coll, Selector, Mod). +modify(Coll, Selector, Mod) -> update(false, true, Coll, Selector, Mod). --spec update (boolean(), boolean(), collection(), selector(), bson:document()) -> ok. % Action -update (Upsert, MultiUpdate, Coll, Sel, Doc) -> - write (#update {collection = Coll, upsert = Upsert, multiupdate = MultiUpdate, selector = Sel, updater = Doc}). +-spec update(boolean(), boolean(), collection(), selector(), bson:document()) -> ok. % Action +update(Upsert, MultiUpdate, Coll, Sel, Doc) -> + write(#update{collection = Coll, upsert = Upsert, multiupdate = MultiUpdate, selector = Sel, updater = Doc}). --spec delete (collection(), selector()) -> ok. % Action +-spec delete(collection(), selector()) -> ok. % Action %@doc Delete selected documents -delete (Coll, Selector) -> - write (#delete {collection = Coll, singleremove = false, selector = Selector}). +delete(Coll, Selector) -> + write(#delete{collection = Coll, singleremove = false, selector = Selector}). --spec delete_one (collection(), selector()) -> ok. % Action +-spec delete_one(collection(), selector()) -> ok. % Action %@doc Delete first selected document. -delete_one (Coll, Selector) -> - write (#delete {collection = Coll, singleremove = true, selector = Selector}). +delete_one(Coll, Selector) -> + write(#delete{collection = Coll, singleremove = true, selector = Selector}). % Read % @@ -219,87 +226,88 @@ delete_one (Coll, Selector) -> % Every query inside an action() will use this mode. % master = Server must be master/primary so reads are consistent (read latest writes). % slave_ok = Server may be slave/secondary so reads may not be consistent (may read stale data). Slaves will eventually get the latest writes, so technically this is called eventually-consistent. - -slave_ok (#context {read_mode = slave_ok}) -> true; -slave_ok (#context {read_mode = master}) -> false. +%@modified add undefined 2015/12/6 +slave_ok(undefined) -> false; +slave_ok(#context{read_mode = slave_ok}) -> true; +slave_ok(#context{read_mode = master}) -> false. -type maybe(A) :: {A} | {}. --spec find_one (collection(), selector()) -> maybe (bson:document()). % Action +-spec find_one(mongo_connect:dbconnection(), collection(), selector()) -> maybe (bson:document()). % Action %@doc Return first selected document, if any -find_one (Coll, Selector) -> find_one (Coll, Selector, []). +find_one(DbConn, Coll, Selector) -> find_one(DbConn, Coll, Selector, []). --spec find_one (collection(), selector(), projector()) -> maybe (bson:document()). % Action +-spec find_one(mongo_connect:dbconnection(), collection(), selector(), projector()) -> maybe (bson:document()). % Action %@doc Return projection of first selected document, if any. Empty projection [] means full projection. -find_one (Coll, Selector, Projector) -> find_one (Coll, Selector, Projector, 0). +find_one(DbConn, Coll, Selector, Projector) -> find_one(DbConn, Coll, Selector, Projector, 0). --spec find_one (collection(), selector(), projector(), skip()) -> maybe (bson:document()). % Action +-spec find_one(mongo_connect:dbconnection(), collection(), selector(), projector(), skip()) -> maybe (bson:document()). % Action %@doc Return projection of Nth selected document, if any. Empty projection [] means full projection. -find_one (Coll, Selector, Projector, Skip) -> - Context = get (mongo_action_context), - Query = #'query' { - collection = Coll, selector = Selector, projector = Projector, - skip = Skip, slaveok = slave_ok (Context) }, - mongo_query:find_one (Context #context.dbconn, Query). - --spec find (collection(), selector()) -> cursor(). % Action +find_one(DbConn, Coll, Selector, Projector, Skip) -> + Context = get(mongo_action_context), + Query = #'query'{ + collection = Coll, selector = Selector, projector = Projector, + skip = Skip, slaveok = slave_ok(Context)}, + mongo_query:find_one(DbConn, Query). + +-spec find(collection(), selector()) -> cursor(). % Action %@doc Return selected documents. -find (Coll, Selector) -> find (Coll, Selector, []). +find(Coll, Selector) -> find(Coll, Selector, []). --spec find (collection(), selector(), projector()) -> cursor(). % Action +-spec find(collection(), selector(), projector()) -> cursor(). % Action %@doc Return projection of selected documents. Empty projection [] means full projection. -find (Coll, Selector, Projector) -> find (Coll, Selector, Projector, 0). +find(Coll, Selector, Projector) -> find(Coll, Selector, Projector, 0). --spec find (collection(), selector(), projector(), skip()) -> cursor(). % Action +-spec find(collection(), selector(), projector(), skip()) -> cursor(). % Action %@doc Return projection of selected documents starting from Nth document. Empty projection means full projection. -find (Coll, Selector, Projector, Skip) -> find (Coll, Selector, Projector, Skip, 0). +find(Coll, Selector, Projector, Skip) -> find(Coll, Selector, Projector, Skip, 0). --spec find (collection(), selector(), projector(), skip(), batchsize()) -> cursor(). % Action +-spec find(collection(), selector(), projector(), skip(), batchsize()) -> cursor(). % Action %@doc Return projection of selected documents starting from Nth document in batches of batchsize. 0 batchsize means default batch size. Negative batch size means one batch only. Empty projection means full projection. -find (Coll, Selector, Projector, Skip, BatchSize) -> - Context = get (mongo_action_context), - Query = #'query' { - collection = Coll, selector = Selector, projector = Projector, - skip = Skip, batchsize = BatchSize, slaveok = slave_ok (Context) }, - mongo_query:find (Context #context.dbconn, Query). +find(Coll, Selector, Projector, Skip, BatchSize) -> + Context = get(mongo_action_context), + Query = #'query'{ + collection = Coll, selector = Selector, projector = Projector, + skip = Skip, batchsize = BatchSize, slaveok = slave_ok(Context)}, + mongo_query:find(Context#context.dbconn, Query). -type cursor() :: mongo_cursor:cursor(). --spec next (cursor()) -> maybe (bson:document()). % IO throws mongo_connect:failure() & mongo_cursor:expired() (this is a subtype of Action) +-spec next(cursor()) -> maybe (bson:document()). % IO throws mongo_connect:failure() & mongo_cursor:expired() (this is a subtype of Action) %@doc Return next document in query result cursor, if any. -next (Cursor) -> mongo_cursor:next (Cursor). +next(Cursor) -> mongo_cursor:next(Cursor). --spec rest (cursor()) -> [bson:document()]. % IO throws mongo_connect:failure() & mongo_cursor:expired() (this is a subtype of Action) +-spec rest(cursor()) -> [bson:document()]. % IO throws mongo_connect:failure() & mongo_cursor:expired() (this is a subtype of Action) %@doc Return remaining documents in query result cursor. -rest (Cursor) -> mongo_cursor:rest (Cursor). +rest(Cursor) -> mongo_cursor:rest(Cursor). --spec close_cursor (cursor()) -> ok. % IO (IO is a subtype of Action) +-spec close_cursor(cursor()) -> ok. % IO (IO is a subtype of Action) %@doc Close cursor -close_cursor (Cursor) -> mongo_cursor:close (Cursor). +close_cursor(Cursor) -> mongo_cursor:close(Cursor). --spec count (collection(), selector()) -> integer(). % Action +-spec count(collection(), selector()) -> integer(). % Action %@doc Count selected documents -count (Coll, Selector) -> count (Coll, Selector, 0). +count(Coll, Selector) -> count(Coll, Selector, 0). --spec count (collection(), selector(), integer()) -> integer(). % Action +-spec count(collection(), selector(), integer()) -> integer(). % Action %@doc Count selected documents up to given max number; 0 means no max. Ie. stops counting when max is reached to save processing time. -count (Coll, Selector, Limit) -> - CollStr = atom_to_binary (Coll, utf8), - Command = if - Limit =< 0 -> {count, CollStr, 'query', Selector}; - true -> {count, CollStr, 'query', Selector, limit, Limit} end, - Doc = command (Command), - trunc (bson:at (n, Doc)). % Server returns count as float +count(Coll, Selector, Limit) -> + CollStr = atom_to_binary(Coll, utf8), + Command = if + Limit =< 0 -> {count, CollStr, 'query', Selector}; + true -> {count, CollStr, 'query', Selector, limit, Limit} end, + Doc = command(Command), + trunc(bson:at(n, Doc)). % Server returns count as float % Command % -type command() :: mongo_query:command(). --spec command (command()) -> bson:document(). % Action +-spec command(command()) -> bson:document(). % Action %@doc Execute given MongoDB command and return its result. -command (Command) -> - Context = get (mongo_action_context), - mongo_query:command (Context #context.dbconn, Command, slave_ok (Context)). +command(Command) -> + Context = get(mongo_action_context), + mongo_query:command(Context#context.dbconn, Command, slave_ok(Context)). % Authentication % @@ -307,32 +315,35 @@ command (Command) -> -type password() :: bson:utf8(). -type nonce() :: bson:utf8(). --spec auth (username(), password()) -> boolean(). % Action +-spec auth(username(), password()) -> boolean(). % Action %@doc Authenticate with the database (if server is running in secure mode). Return whether authentication was successful or not. Reauthentication is required for every new pipe. -auth (Username, Password) -> - Nonce = bson:at (nonce, command ({getnonce, 1})), - try command ({authenticate, 1, user, Username, nonce, Nonce, key, pw_key (Nonce, Username, Password)}) - of _ -> true - catch error:{bad_command, _} -> false end. +auth(Username, Password) -> + Nonce = bson:at(nonce, command({getnonce, 1})), + try command({authenticate, 1, user, Username, nonce, Nonce, key, pw_key(Nonce, Username, Password)}) + of _ -> true + catch error:{bad_command, _} -> false end. --spec pw_key (nonce(), username(), password()) -> bson:utf8(). -pw_key (Nonce, Username, Password) -> bson:utf8 (binary_to_hexstr (crypto:md5 ([Nonce, Username, pw_hash (Username, Password)]))). +-spec pw_key(nonce(), username(), password()) -> bson:utf8(). +pw_key(Nonce, Username, Password) -> + bson:utf8(binary_to_hexstr(crypto:md5([Nonce, Username, pw_hash(Username, Password)]))). --spec pw_hash (username(), password()) -> bson:utf8(). -pw_hash (Username, Password) -> bson:utf8 (binary_to_hexstr (crypto:md5 ([Username, <<":mongo:">>, Password]))). +-spec pw_hash(username(), password()) -> bson:utf8(). +pw_hash(Username, Password) -> bson:utf8(binary_to_hexstr(crypto:md5([Username, <<":mongo:">>, Password]))). --spec binary_to_hexstr (binary()) -> string(). -binary_to_hexstr (Bin) -> - lists:flatten ([io_lib:format ("~2.16.0b", [X]) || X <- binary_to_list (Bin)]). +-spec binary_to_hexstr(binary()) -> string(). +binary_to_hexstr(Bin) -> + lists:flatten([io_lib:format("~2.16.0b", [X]) || X <- binary_to_list(Bin)]). -type permission() :: read_write | read_only. --spec add_user (permission(), username(), password()) -> ok. % Action +-spec add_user(permission(), username(), password()) -> ok. % Action %@doc Add user with given access rights (permission) -add_user (Permission, Username, Password) -> - User = case find_one (system.users, {user, Username}) of {} -> {user, Username}; {Doc} -> Doc end, - Rec = {readOnly, case Permission of read_only -> true; read_write -> false end, pwd, pw_hash (Username, Password)}, - save (system.users, bson:merge (Rec, User)). +%%can't be used before put +add_user(Permission, Username, Password) -> + DbConn = (get(mongo_action_context))#context.dbconn, + User = case find_one(DbConn, 'system.users', {user, Username}) of {} -> {user, Username}; {Doc} -> Doc end, + Rec = {readOnly, case Permission of read_only -> true; read_write -> false end, pwd, pw_hash(Username, Password)}, + save('system.users', bson:merge(Rec, User)). % Index % @@ -349,41 +360,44 @@ add_user (Permission, Username, Password) -> -type key_order() :: bson:document(). % Fields to index on and whether ascending (1) or descending (-1) or Geo (<<"2d">>). Eg. {x,1, y,-1} or {loc, <<"2d">>} --spec create_index (collection(), index_spec() | key_order()) -> ok. % Action +-spec create_index(collection(), index_spec() | key_order()) -> ok. % Action %@doc Create index on collection according to given spec. Allow user to just supply key -create_index (Coll, IndexSpec) -> - Db = this_db (), - Index = bson:append ({ns, mongo_protocol:dbcoll (Db, Coll)}, fillout_indexspec (IndexSpec)), - insert ('system.indexes', Index). +create_index(Coll, IndexSpec) -> + Db = this_db(), + Index = bson:append({ns, mongo_protocol:dbcoll(Db, Coll)}, fillout_indexspec(IndexSpec)), + insert('system.indexes', Index). --spec fillout_indexspec (index_spec() | key_order()) -> index_spec(). +-spec fillout_indexspec(index_spec() | key_order()) -> index_spec(). % Fill in missing optonal fields with defaults. Allow user to just supply key_order -fillout_indexspec (IndexSpec) -> case bson:lookup (key, IndexSpec) of - {Key} when is_tuple (Key) -> bson:merge (IndexSpec, {key, Key, name, gen_index_name (Key), unique, false, dropDups, false}); - {_} -> {key, IndexSpec, name, gen_index_name (IndexSpec), unique, false, dropDups, false}; % 'key' happens to be a user field - {} -> {key, IndexSpec, name, gen_index_name (IndexSpec), unique, false, dropDups, false} end. - --spec gen_index_name (key_order()) -> bson:utf8(). -gen_index_name (KeyOrder) -> - AsName = fun (Label, Order, Name) -> << - Name /binary, $_, - (atom_to_binary (Label, utf8)) /binary, $_, - (if - is_integer (Order) -> bson:utf8 (integer_to_list (Order)); - is_atom (Order) -> atom_to_binary (Order, utf8); - is_binary (Order) -> Order; - true -> <<>> end) /binary >> end, - bson:doc_foldl (AsName, <<"i">>, KeyOrder). +fillout_indexspec(IndexSpec) -> case bson:lookup(key, IndexSpec) of + {Key} when is_tuple(Key) -> + bson:merge(IndexSpec, {key, Key, name, gen_index_name(Key), unique, false, dropDups, false}); + {_} -> + {key, IndexSpec, name, gen_index_name(IndexSpec), unique, false, dropDups, false}; % 'key' happens to be a user field + {} -> + {key, IndexSpec, name, gen_index_name(IndexSpec), unique, false, dropDups, false} end. + +-spec gen_index_name(key_order()) -> bson:utf8(). +gen_index_name(KeyOrder) -> + AsName = fun(Label, Order, Name) -> << + Name/binary, $_, + (atom_to_binary(Label, utf8))/binary, $_, + (if + is_integer(Order) -> bson:utf8(integer_to_list(Order)); + is_atom(Order) -> atom_to_binary(Order, utf8); + is_binary(Order) -> Order; + true -> <<>> end)/binary>> end, + bson:doc_foldl(AsName, <<"i">>, KeyOrder). % Admin --spec copy_database (db(), host(), db()) -> bson:document(). % Action +-spec copy_database(db(), host(), db()) -> bson:document(). % Action % Copy database from given host to the server I am connected to. Must be connected to 'admin' database. -copy_database (FromDb, FromHost, ToDb) -> - command ({copydb, 1, fromhost, mongo_connect:show_host (FromHost), fromdb, atom_to_binary (FromDb, utf8), todb, atom_to_binary (ToDb, utf8)}). +copy_database(FromDb, FromHost, ToDb) -> + command({copydb, 1, fromhost, mongo_connect:show_host(FromHost), fromdb, atom_to_binary(FromDb, utf8), todb, atom_to_binary(ToDb, utf8)}). --spec copy_database (db(), host(), db(), username(), password()) -> bson:document(). % Action +-spec copy_database(db(), host(), db(), username(), password()) -> bson:document(). % Action % Copy database from given host, authenticating with given username and password, to the server I am connected to. Must be connected to 'admin' database. -copy_database (FromDb, FromHost, ToDb, Username, Password) -> - Nonce = bson:at (nonce, command ({copydbgetnonce, 1, fromhost, mongo_connect:show_host (FromHost)})), - command ({copydb, 1, fromhost, mongo_connect:show_host (FromHost), fromdb, atom_to_binary (FromDb, utf8), todb, atom_to_binary (ToDb, utf8), username, Username, nonce, Nonce, key, pw_key (Nonce, Username, Password)}). +copy_database(FromDb, FromHost, ToDb, Username, Password) -> + Nonce = bson:at(nonce, command({copydbgetnonce, 1, fromhost, mongo_connect:show_host(FromHost)})), + command({copydb, 1, fromhost, mongo_connect:show_host(FromHost), fromdb, atom_to_binary(FromDb, utf8), todb, atom_to_binary(ToDb, utf8), username, Username, nonce, Nonce, key, pw_key(Nonce, Username, Password)}). diff --git a/src/mongo_connect.erl b/src/mongo_connect.erl index a2bafc99..e9c6ae8f 100644 --- a/src/mongo_connect.erl +++ b/src/mongo_connect.erl @@ -1,39 +1,39 @@ %@doc Thread-safe TCP connection to a MongoDB server with synchronous call and asynchronous send interface. --module (mongo_connect). +-module(mongo_connect). --export_type ([host/0, connection/0, dbconnection/0, failure/0]). +-export_type([host/0, connection/0, dbconnection/0, failure/0]). --export ([host_port/1, read_host/1, show_host/1]). --export ([connect/1, connect/2, conn_host/1, close/1, is_closed/1]). +-export([host_port/1, read_host/1, show_host/1]). +-export([connect/1, connect/2, conn_host/1, close/1, is_closed/1]). --export ([call/3, send/2]). % for mongo_query and mongo_cursor +-export([call/3, send/2]). % for mongo_query and mongo_cursor -include_lib ("bson/include/bson_binary.hrl"). -type host() :: {inet:hostname(), 0..65535} | inet:hostname(). % Hostname and port. Port defaults to 27017 when missing --spec host_port (host()) -> host(). +-spec host_port(host()) -> host(). %@doc Port explicitly filled in with defaut if missing -host_port ({Hostname, Port}) -> {hostname_string (Hostname), Port}; -host_port (Hostname) -> {hostname_string (Hostname), 27017}. +host_port({Hostname, Port}) -> {hostname_string(Hostname), Port}; +host_port(Hostname) -> {hostname_string(Hostname), 27017}. --spec hostname_string (inet:hostname()) -> string(). +-spec hostname_string(inet:hostname()) -> string(). %@doc Convert possible hostname atom to string -hostname_string (Name) when is_atom (Name) -> atom_to_list (Name); -hostname_string (Name) -> Name. +hostname_string(Name) when is_atom(Name) -> atom_to_list(Name); +hostname_string(Name) -> Name. --spec show_host (host()) -> bson:utf8(). +-spec show_host(host()) -> bson:utf8(). %@doc UString representation of host, ie. "Hostname:Port" -show_host (Host) -> - {Hostname, Port} = host_port (Host), - bson:utf8 (Hostname ++ ":" ++ integer_to_list (Port)). +show_host(Host) -> + {Hostname, Port} = host_port(Host), + bson:utf8(Hostname ++ ":" ++ integer_to_list(Port)). --spec read_host (bson:utf8()) -> host(). +-spec read_host(bson:utf8()) -> host(). %@doc Interpret ustring as host, ie. "Hostname:Port" -> {Hostname, Port} -read_host (UString) -> case string:tokens (bson:str (UString), ":") of - [Hostname] -> host_port (Hostname); - [Hostname, Port] -> {Hostname, list_to_integer (Port)} end. +read_host(UString) -> case string:tokens(bson:str(UString), ":") of + [Hostname] -> host_port(Hostname); + [Hostname, Port] -> {Hostname, list_to_integer(Port)} end. -type reason() :: any(). @@ -42,77 +42,78 @@ read_host (UString) -> case string:tokens (bson:str (UString), ":") of % Passive raw binary socket. % Type not opaque to mongo:connection_mode/2 --spec connect (host()) -> {ok, connection()} | {error, reason()}. % IO +-spec connect(host()) -> {ok, connection()} | {error, reason()}. % IO %@doc Create connection to given MongoDB server or return reason for connection failure. -connect (Host) -> connect (Host, infinity). +connect(Host) -> connect(Host, infinity). --spec connect (host(), timeout()) -> {ok, connection()} | {error, reason()}. % IO +-spec connect(host(), timeout()) -> {ok, connection()} | {error, reason()}. % IO %@doc Create connection to given MongoDB server or return reason for connection failure. Timeout is used for initial connection and every call. -connect (Host, TimeoutMS) -> try mvar:create (fun () -> tcp_connect (host_port (Host), TimeoutMS) end, fun gen_tcp:close/1) - of VSocket -> {ok, {connection, host_port (Host), VSocket, TimeoutMS}} - catch Reason -> {error, Reason} end. +connect(Host, TimeoutMS) -> try mvar:create(fun() -> tcp_connect(host_port(Host), TimeoutMS) end, fun gen_tcp:close/1) + of VSocket -> {ok, {connection, host_port(Host), VSocket, TimeoutMS}} + catch Reason -> {error, Reason} end. --spec conn_host (connection()) -> host(). +-spec conn_host(connection()) -> host(). %@doc Host this is connected to -conn_host ({connection, Host, _VSocket, _}) -> Host. +conn_host({connection, Host, _VSocket, _}) -> Host. --spec close (connection()) -> ok. % IO +-spec close(connection()) -> ok. % IO %@doc Close connection. -close ({connection, _Host, VSocket, _}) -> mvar:terminate (VSocket). +close({connection, _Host, VSocket, _}) -> mvar:terminate(VSocket). --spec is_closed (connection()) -> boolean(). % IO +-spec is_closed(connection()) -> boolean(). % IO %@doc Has connection been closed? -is_closed ({connection, _, VSocket, _}) -> mvar:is_terminated (VSocket). +is_closed({connection, _, VSocket, _}) -> mvar:is_terminated(VSocket). -type dbconnection() :: {mongo_protocol:db(), connection()}. -type failure() :: {connection_failure, connection(), reason()}. --spec call (dbconnection(), [mongo_protocol:notice()], mongo_protocol:request()) -> mongo_protocol:reply(). % IO throws failure() +-spec call(dbconnection(), [mongo_protocol:notice()], mongo_protocol:request()) -> mongo_protocol:reply(). % IO throws failure() %@doc Synchronous send and reply. Notices are sent right before request in single block. Exclusive access to connection during entire call. -call ({Db, Conn = {connection, _Host, VSocket, TimeoutMS}}, Notices, Request) -> - {MessagesBin, RequestId} = messages_binary (Db, Notices ++ [Request]), - Call = fun (Socket) -> - tcp_send (Socket, MessagesBin), - <> = tcp_recv (Socket, 4, TimeoutMS), - tcp_recv (Socket, N-4, TimeoutMS) end, - try mvar:with (VSocket, Call) of - ReplyBin -> - {RequestId, Reply, <<>>} = mongo_protocol:get_reply (ReplyBin), - Reply % ^ ResponseTo must match RequestId - catch - throw: Reason -> close (Conn), throw ({connection_failure, Conn, Reason}); - exit: {noproc, _} -> throw ({connection_failure, Conn, closed}) end. - --spec send (dbconnection(), [mongo_protocol:notice()]) -> ok. % IO throws failure() +call({Db, Conn = {connection, _Host, VSocket, TimeoutMS}}, Notices, Request) -> + {MessagesBin, RequestId} = messages_binary(Db, Notices ++ [Request]), + Call = fun(Socket) -> + tcp_send(Socket, MessagesBin), + <> = tcp_recv(Socket, 4, TimeoutMS), + tcp_recv(Socket, N - 4, TimeoutMS) end, + try mvar:with(VSocket, Call) of + ReplyBin -> + {RequestId, Reply, <<>>} = mongo_protocol:get_reply(ReplyBin), + Reply % ^ ResponseTo must match RequestId + catch + throw: Reason -> close(Conn), throw({connection_failure, Conn, Reason}); + exit: {noproc, _} -> throw({connection_failure, Conn, closed}) end. + +-spec send(dbconnection(), [mongo_protocol:notice()]) -> ok. % IO throws failure() %@doc Asynchronous send (no reply). Don't know if send succeeded. Exclusive access to the connection during send. -send ({Db, Conn = {connection, _Host, VSocket, _}}, Notices) -> - {NoticesBin, _} = messages_binary (Db, Notices), - Send = fun (Socket) -> tcp_send (Socket, NoticesBin) end, - try mvar:with (VSocket, Send) - catch - throw: Reason -> close (Conn), throw ({connection_failure, Conn, Reason}); - exit: {noproc, _} -> throw ({connection_failure, Conn, closed}) end. - --spec messages_binary (mongo_protocol:db(), [mongo_protocol:message()]) -> {binary(), mongo_protocol:requestid()}. +send({Db, Conn = {connection, _Host, VSocket, _}}, Notices) -> + {NoticesBin, _} = messages_binary(Db, Notices), + Send = fun(Socket) -> tcp_send(Socket, NoticesBin) end, + try mvar:with(VSocket, Send) + catch + throw: Reason -> close(Conn), throw({connection_failure, Conn, Reason}); + exit: {noproc, _} -> throw({connection_failure, Conn, closed}) end. + +-spec messages_binary(mongo_protocol:db(), [mongo_protocol:message()]) -> {binary(), mongo_protocol:requestid()}. %@doc Binary representation of messages -messages_binary (Db, Messages) -> - Build = fun (Message, {Bin, _}) -> - RequestId = mongodb_app:next_requestid(), - MBin = mongo_protocol:put_message (Db, Message, RequestId), - {<>, RequestId} end, - lists:foldl (Build, {<<>>, 0}, Messages). +messages_binary(Db, Messages) -> + Build = fun(Message, {Bin, _}) -> + RequestId = mongodb_app:next_requestid(), + MBin = mongo_protocol:put_message(Db, Message, RequestId), + {<>, RequestId} end, + lists:foldl(Build, {<<>>, 0}, Messages). % Util % -tcp_connect ({Hostname, Port}, TimeoutMS) -> case gen_tcp:connect (Hostname, Port, [binary, {active, false}, {packet, 0}], TimeoutMS) of - {ok, Socket} -> Socket; - {error, Reason} -> throw (Reason) end. +tcp_connect({Hostname, Port}, TimeoutMS) -> + case gen_tcp:connect(Hostname, Port, [binary, {active, false}, {packet, 0}], TimeoutMS) of + {ok, Socket} -> Socket; + {error, Reason} -> throw(Reason) end. -tcp_send (Socket, Binary) -> case gen_tcp:send (Socket, Binary) of - ok -> ok; - {error, Reason} -> throw (Reason) end. +tcp_send(Socket, Binary) -> case gen_tcp:send(Socket, Binary) of + ok -> ok; + {error, Reason} -> throw(Reason) end. -tcp_recv (Socket, N, TimeoutMS) -> case gen_tcp:recv (Socket, N, TimeoutMS) of - {ok, Binary} -> Binary; - {error, Reason} -> throw (Reason) end. +tcp_recv(Socket, N, TimeoutMS) -> case gen_tcp:recv(Socket, N, TimeoutMS) of + {ok, Binary} -> Binary; + {error, Reason} -> throw(Reason) end. diff --git a/src/mongo_cursor.erl b/src/mongo_cursor.erl index 78f746c0..daa44c18 100644 --- a/src/mongo_cursor.erl +++ b/src/mongo_cursor.erl @@ -1,14 +1,14 @@ %@doc A cursor references pending query results on a server. % TODO: terminate cursor after idle for 10 minutes. --module (mongo_cursor). +-module(mongo_cursor). --export_type ([maybe/1]). --export_type ([cursor/0, expired/0]). +-export_type([maybe/1]). +-export_type([cursor/0, expired/0]). --export ([next/1, rest/1, close/1, is_closed/1]). % API --export ([cursor/4]). % for mongo_query +-export([next/1, rest/1, close/1, is_closed/1]). % API +-export([cursor/4]). % for mongo_query --include ("mongo_protocol.hrl"). +-include("mongo_protocol.hrl"). -type maybe(A) :: {A} | {}. @@ -21,56 +21,57 @@ -type env() :: {mongo_connect:dbconnection(), collection(), batchsize()}. -type batch() :: {cursorid(), [bson:document()]}. --spec cursor (mongo_connect:dbconnection(), collection(), batchsize(), {cursorid(), [bson:document()]}) -> cursor(). % IO +-spec cursor(mongo_connect:dbconnection(), collection(), batchsize(), {cursorid(), [bson:document()]}) -> cursor(). % IO %@doc Create new cursor from result batch -cursor (DbConn, Collection, BatchSize, Batch) -> - mvar:new ({{DbConn, Collection, BatchSize}, Batch}, fun finalize/1). +cursor(DbConn, Collection, BatchSize, Batch) -> + mvar:new({{DbConn, Collection, BatchSize}, Batch}, fun finalize/1). --spec close (cursor()) -> ok. % IO +-spec close(cursor()) -> ok. % IO %@doc Close cursor -close (Cursor) -> mvar:terminate (Cursor). +close(Cursor) -> mvar:terminate(Cursor). --spec is_closed (cursor()) -> boolean(). % IO +-spec is_closed(cursor()) -> boolean(). % IO %@doc Is cursor closed -is_closed (Cursor) -> mvar:is_terminated (Cursor). +is_closed(Cursor) -> mvar:is_terminated(Cursor). --spec rest (cursor()) -> [bson:document()]. % IO throws expired() & mongo_connect:failure() +-spec rest(cursor()) -> [bson:document()]. % IO throws expired() & mongo_connect:failure() %@doc Return remaining documents in query result -rest (Cursor) -> case next (Cursor) of - {} -> []; - {Doc} -> [Doc | rest (Cursor)] end. +rest(Cursor) -> case next(Cursor) of + {} -> []; + {Doc} -> [Doc | rest(Cursor)] end. --spec next (cursor()) -> maybe (bson:document()). % IO throws expired() & mongo_connect:failure() +-spec next(cursor()) -> maybe (bson:document()). % IO throws expired() & mongo_connect:failure() %@doc Return next document in query result or nothing if finished. -next (Cursor) -> - Next = fun ({Env, Batch}) -> - {Batch1, MDoc} = xnext (Env, Batch), - {{Env, Batch1}, MDoc} end, - try mvar:modify (Cursor, Next) - of {} -> close (Cursor), {}; {Doc} -> {Doc} - catch expired -> close (Cursor), throw ({cursor_expired, Cursor}) end. +next(Cursor) -> + Next = fun({Env, Batch}) -> + {Batch1, MDoc} = xnext(Env, Batch), + {{Env, Batch1}, MDoc} end, + try mvar:modify(Cursor, Next) + of {} -> close(Cursor), {}; {Doc} -> {Doc} + catch expired -> close(Cursor), throw({cursor_expired, Cursor}) end. --spec xnext (env(), batch()) -> {batch(), maybe (bson:document())}. % IO throws expired & mongo_connect:failure() +-spec xnext(env(), batch()) -> {batch(), maybe (bson:document())}. % IO throws expired & mongo_connect:failure() %@doc Get next document in cursor, fetching next batch from server if necessary -xnext (Env = {DbConn, Coll, BatchSize}, {CursorId, Docs}) -> case Docs of - [Doc | Docs1] -> {{CursorId, Docs1}, {Doc}}; - [] -> case CursorId of - 0 -> {{0, []}, {}}; - _ -> - Getmore = #getmore {collection = Coll, batchsize = BatchSize, cursorid = CursorId}, - Reply = mongo_connect:call (DbConn, [], Getmore), - xnext (Env, batch_reply (Reply)) end end. +xnext(Env = {DbConn, Coll, BatchSize}, {CursorId, Docs}) -> case Docs of + [Doc | Docs1] -> {{CursorId, Docs1}, {Doc}}; + [] -> case CursorId of + 0 -> {{0, []}, {}}; + _ -> + Getmore = #getmore{collection = Coll, batchsize = BatchSize, cursorid = CursorId}, + Reply = mongo_connect:call(DbConn, [], Getmore), + xnext(Env, batch_reply(Reply)) end end. --spec batch_reply (mongo_protocol:reply()) -> batch(). % IO throws expired +-spec batch_reply(mongo_protocol:reply()) -> batch(). % IO throws expired %@doc Extract next batch of results from reply. Throw expired if cursor not found on server. -batch_reply (#reply { - cursornotfound = CursorNotFound, queryerror = false, awaitcapable = _, - cursorid = CursorId, startingfrom = _, documents = Docs }) -> if - CursorNotFound -> throw (expired); - true -> {CursorId, Docs} end. +batch_reply(#reply{ + cursornotfound = CursorNotFound, queryerror = false, awaitcapable = _, + cursorid = CursorId, startingfrom = _, documents = Docs}) -> if + CursorNotFound -> throw(expired); + true -> {CursorId, Docs} end. --spec finalize (state()) -> ok. % IO. Result ignored +-spec finalize(state()) -> ok. % IO. Result ignored %@doc Kill cursor on server if not already -finalize ({{DbConn, _, _}, {CursorId, _}}) -> case CursorId of - 0 -> ok; - _ -> mongo_connect:send (DbConn, [#killcursor {cursorids = [CursorId]}]) end. +finalize({{DbConn, _, _}, {CursorId, _}}) -> case CursorId of + 0 -> ok; + _ -> + mongo_connect:send(DbConn, [#killcursor{cursorids = [CursorId]}]) end. diff --git a/src/mongo_protocol.erl b/src/mongo_protocol.erl index 23c28905..2c65ff71 100644 --- a/src/mongo_protocol.erl +++ b/src/mongo_protocol.erl @@ -1,14 +1,14 @@ %@doc MongoDB wire protocol -module(mongo_protocol). --export_type ([db/0]). --export_type ([notice/0, request/0, reply/0]). --export_type ([message/0]). --export_type ([requestid/0]). +-export_type([db/0]). +-export_type([notice/0, request/0, reply/0]). +-export_type([message/0]). +-export_type([requestid/0]). --export ([bit/1, bool/1, dbcoll/2, put_message/3, get_reply/1]). +-export([bit/1, bool/1, dbcoll/2, put_message/3, get_reply/1]). --include ("mongo_protocol.hrl"). +-include("mongo_protocol.hrl"). -include_lib ("bson/include/bson_binary.hrl"). -import (bson_binary, [put_cstring/1, put_document/1, get_document/1]). @@ -23,91 +23,93 @@ -type requestid() :: integer(). % message id --define (put_header (Opcode), ?put_int32 (RequestId), ?put_int32 (0), ?put_int32 (Opcode)). +-define(put_header(Opcode), ?put_int32(RequestId), ?put_int32(0), ?put_int32(Opcode)). % RequestId expected to be in scope at call site --define (get_header (Opcode, ResponseTo), ?get_int32 (_RequestId), ?get_int32 (ResponseTo), ?get_int32 (Opcode)). +-define(get_header(Opcode, ResponseTo), ?get_int32(_RequestId), ?get_int32(ResponseTo), ?get_int32(Opcode)). --define (ReplyOpcode, 1). --define (UpdateOpcode, 2001). --define (InsertOpcode, 2002). --define (QueryOpcode, 2004). --define (GetmoreOpcode, 2005). --define (DeleteOpcode, 2006). --define (KillcursorOpcode, 2007). +-define(ReplyOpcode, 1). +-define(UpdateOpcode, 2001). +-define(InsertOpcode, 2002). +-define(QueryOpcode, 2004). +-define(GetmoreOpcode, 2005). +-define(DeleteOpcode, 2006). +-define(KillcursorOpcode, 2007). --spec bit (boolean()) -> 0 | 1. -bit (false) -> 0; -bit (true) -> 1. +-spec bit(boolean()) -> 0 | 1. +bit(false) -> 0; +bit(true) -> 1. --spec bool (0 | 1) -> boolean(). -bool (0) -> false; -bool (1) -> true. +-spec bool(0 | 1) -> boolean(). +bool(0) -> false; +bool(1) -> true. --spec dbcoll (db(), collection()) -> bson:utf8(). +-spec dbcoll(db(), collection()) -> bson:utf8(). %@doc Concat db and collection name with period (.) in between -dbcoll (Db, Coll) -> <<(atom_to_binary (Db, utf8)) /binary, $., (atom_to_binary (Coll, utf8)) /binary>>. +dbcoll(Db, Coll) -> <<(atom_to_binary(Db, utf8))/binary, $., (atom_to_binary(Coll, utf8))/binary>>. -type message() :: notice() | request(). --spec put_message (db(), message(), requestid()) -> binary(). -put_message (Db, Message, RequestId) -> case Message of - #insert {collection = Coll, documents = Docs} -> << - ?put_header (?InsertOpcode), - ?put_int32 (0), - (put_cstring (dbcoll (Db, Coll))) /binary, - << <<(put_document (Doc)) /binary>> || Doc <- Docs>> /binary >>; - #update {collection = Coll, upsert = U, multiupdate = M, selector = Sel, updater = Up} -> << - ?put_header (?UpdateOpcode), - ?put_int32 (0), - (put_cstring (dbcoll (Db, Coll))) /binary, - ?put_bits32 (0,0,0,0,0,0, bit(M), bit(U)), - (put_document (Sel)) /binary, - (put_document (Up)) /binary >>; - #delete {collection = Coll, singleremove = R, selector = Sel} -> << - ?put_header (?DeleteOpcode), - ?put_int32 (0), - (put_cstring (dbcoll (Db, Coll))) /binary, - ?put_bits32 (0,0,0,0,0,0,0, bit(R)), - (put_document (Sel)) /binary >>; - #killcursor {cursorids = Cids} -> << - ?put_header (?KillcursorOpcode), - ?put_int32 (0), - ?put_int32 (length (Cids)), - << <> || Cid <- Cids>> /binary >>; - #'query' {tailablecursor = TC, slaveok = SOK, nocursortimeout = NCT, awaitdata = AD, - collection = Coll, skip = Skip, batchsize = Batch, selector = Sel, projector = Proj} -> << - ?put_header (?QueryOpcode), - ?put_bits32 (0, 0, bit(AD), bit(NCT), 0, bit(SOK), bit(TC), 0), - (put_cstring (dbcoll (Db, Coll))) /binary, - ?put_int32 (Skip), - ?put_int32 (Batch), - (put_document (Sel)) /binary, - (case Proj of [] -> <<>>; _ -> put_document (Proj) end) /binary >>; - #getmore {collection = Coll, batchsize = Batch, cursorid = Cid} -> << - ?put_header (?GetmoreOpcode), - ?put_int32 (0), - (put_cstring (dbcoll (Db, Coll))) /binary, - ?put_int32 (Batch), - ?put_int64 (Cid) >> - end. - --spec get_reply (binary()) -> {requestid(), reply(), binary()}. -get_reply (<< - ?get_header (?ReplyOpcode, ResponseTo), - ?get_bits32 (_,_,_,_, AwaitCapable, _, QueryError, CursorNotFound), - ?get_int64 (CursorId), - ?get_int32 (StartingFrom), - ?get_int32 (NumDocs), - Bin /binary >>) -> - {Docs, BinRest} = get_docs (NumDocs, Bin), - Reply = #reply { - cursornotfound = bool (CursorNotFound), queryerror = bool (QueryError), awaitcapable = bool (AwaitCapable), - cursorid = CursorId, startingfrom = StartingFrom, documents = Docs }, - {ResponseTo, Reply, BinRest}. - -get_docs (0, Bin) -> {[], Bin}; -get_docs (NumDocs, Bin) when NumDocs > 0 -> - {Doc, Bin1} = get_document (Bin), - {Docs, Bin2} = get_docs (NumDocs - 1, Bin1), - {[Doc | Docs], Bin2}. +-spec put_message(db(), message(), requestid()) -> binary(). +put_message(Db, Message, RequestId) -> case Message of + #insert{collection = Coll, documents = Docs} -> << + ?put_header(?InsertOpcode), + ?put_int32(0), + (put_cstring(dbcoll(Db, Coll)))/binary, + <<<<(put_document(Doc))/binary>> || Doc <- Docs>>/binary>>; + #update{collection = Coll, upsert = U, multiupdate = M, selector = Sel, updater = Up} -> + << + ?put_header(?UpdateOpcode), + ?put_int32(0), + (put_cstring(dbcoll(Db, Coll)))/binary, + ?put_bits32(0, 0, 0, 0, 0, 0, bit(M), bit(U)), + (put_document(Sel))/binary, + (put_document(Up))/binary>>; + #delete{collection = Coll, singleremove = R, selector = Sel} -> << + ?put_header(?DeleteOpcode), + ?put_int32(0), + (put_cstring(dbcoll(Db, Coll)))/binary, + ?put_bits32(0, 0, 0, 0, 0, 0, 0, bit(R)), + (put_document(Sel))/binary>>; + #killcursor{cursorids = Cids} -> << + ?put_header(?KillcursorOpcode), + ?put_int32(0), + ?put_int32(length(Cids)), + <<<> || Cid <- Cids>>/binary>>; + #'query'{tailablecursor = TC, slaveok = SOK, nocursortimeout = NCT, awaitdata = AD, + collection = Coll, skip = Skip, batchsize = Batch, selector = Sel, projector = Proj} -> + << + ?put_header(?QueryOpcode), + ?put_bits32(0, 0, bit(AD), bit(NCT), 0, bit(SOK), bit(TC), 0), + (put_cstring(dbcoll(Db, Coll)))/binary, + ?put_int32(Skip), + ?put_int32(Batch), + (put_document(Sel))/binary, + (case Proj of [] -> <<>>; _ -> put_document(Proj) end)/binary>>; + #getmore{collection = Coll, batchsize = Batch, cursorid = Cid} -> << + ?put_header(?GetmoreOpcode), + ?put_int32(0), + (put_cstring(dbcoll(Db, Coll)))/binary, + ?put_int32(Batch), + ?put_int64(Cid)>> + end. + +-spec get_reply(binary()) -> {requestid(), reply(), binary()}. +get_reply(<< + ?get_header(?ReplyOpcode, ResponseTo), + ?get_bits32(_, _, _, _, AwaitCapable, _, QueryError, CursorNotFound), + ?get_int64(CursorId), + ?get_int32(StartingFrom), + ?get_int32(NumDocs), + Bin/binary>>) -> + {Docs, BinRest} = get_docs(NumDocs, Bin), + Reply = #reply{ + cursornotfound = bool(CursorNotFound), queryerror = bool(QueryError), awaitcapable = bool(AwaitCapable), + cursorid = CursorId, startingfrom = StartingFrom, documents = Docs}, + {ResponseTo, Reply, BinRest}. + +get_docs(0, Bin) -> {[], Bin}; +get_docs(NumDocs, Bin) when NumDocs > 0 -> + {Doc, Bin1} = get_document(Bin), + {Docs, Bin2} = get_docs(NumDocs - 1, Bin1), + {[Doc | Docs], Bin2}. diff --git a/src/mongo_query.erl b/src/mongo_query.erl index 9a5d4828..4f5d156b 100644 --- a/src/mongo_query.erl +++ b/src/mongo_query.erl @@ -1,13 +1,13 @@ %@doc Write, find, and command operations --module (mongo_query). +-module(mongo_query). --export_type ([write/0, 'query'/0, command/0]). --export_type ([getlasterror_request/0, getlasterror_reply/0]). --export_type ([not_master/0, unauthorized/0]). +-export_type([write/0, 'query'/0, command/0]). +-export_type([getlasterror_request/0, getlasterror_reply/0]). +-export_type([not_master/0, unauthorized/0]). --export ([write/3, write/2, find_one/2, find/2, command/3]). +-export([write/3, write/2, find_one/2, find/2, command/3]). --include ("mongo_protocol.hrl"). +-include("mongo_protocol.hrl"). % QIO means does IO and may throw mongo_connect:failure(), not_master(), unauthorized(). @@ -23,57 +23,58 @@ -type getlasterror_reply() :: bson:document(). % Reply to getlasterror request. See http://www.mongodb.org/display/DOCS/getLastError+Command. --spec write (mongo_connect:dbconnection(), write(), getlasterror_request()) -> getlasterror_reply(). % QIO +-spec write(mongo_connect:dbconnection(), write(), getlasterror_request()) -> getlasterror_reply(). % QIO %@doc Send write and getlasterror request to mongodb over connection and wait for and return getlasterror reply. Bad getlasterror params are ignored. % Caller is responsible for checking for error in reply; if 'err' field is null then success, otherwise it holds error message string. -write (DbConn, Write, GetlasterrorParams) -> - Query = #'query' {batchsize = -1, collection = '$cmd', - selector = bson:append ({getlasterror, 1}, GetlasterrorParams)}, - Reply = mongo_connect:call (DbConn, [Write], Query), - {0, [Doc | _]} = query_reply (Reply), - Doc. - --spec write (mongo_connect:dbconnection(), write()) -> ok. % IO +write(DbConn, Write, GetlasterrorParams) -> + Query = #'query'{batchsize = -1, collection = '$cmd', + selector = bson:append({getlasterror, 1}, GetlasterrorParams)}, + Reply = mongo_connect:call(DbConn, [Write], Query), + {0, [Doc | _]} = query_reply(Reply), + Doc. + +-spec write(mongo_connect:dbconnection(), write()) -> ok. % IO %@doc Send write to mongodb over connection asynchronously. Does not wait for reply hence may silently fail. Doesn't even throw connection failure if connection is closed. -write (DbConn, Write) -> - mongo_connect:send (DbConn, [Write]). +write(DbConn, Write) -> + mongo_connect:send(DbConn, [Write]). -type command() :: bson:document(). --spec command (mongo_connect:dbconnection(), command(), boolean()) -> bson:document(). % QIO +-spec command(mongo_connect:dbconnection(), command(), boolean()) -> bson:document(). % QIO %@doc Send command to mongodb over connection and wait for reply and return it. Boolean arg indicates slave-ok or not. 'bad_command' error if bad command. -command (DbConn, Command, SlaveOk) -> - Query = #'query' {collection = '$cmd', selector = Command, slaveok = SlaveOk}, - {Doc} = find_one (DbConn, Query), - Ok = bson:at (ok, Doc), - if Ok == true orelse Ok == 1 -> Doc; true -> erlang:error ({bad_command, Doc}) end. % bad_command parsed by mongo:auth +command(DbConn, Command, SlaveOk) -> + Query = #'query'{collection = '$cmd', selector = Command, slaveok = SlaveOk}, + {Doc} = find_one(DbConn, Query), + Ok = bson:at(ok, Doc), + if Ok == true orelse Ok == 1 -> Doc; true -> + erlang:error({bad_command, Doc}) end. % bad_command parsed by mongo:auth -type 'query'() :: #'query'{}. -type maybe(A) :: {A} | {}. --spec find_one (mongo_connect:dbconnection(), 'query'()) -> maybe (bson:document()). % QIO +-spec find_one(mongo_connect:dbconnection(), 'query'()) -> maybe (bson:document()). % QIO %@doc Send read request to mongodb over connection and wait for reply. Return first result or nothing if empty. -find_one (DbConn, Query) -> - Query1 = Query #'query' {batchsize = -1}, - Reply = mongo_connect:call (DbConn, [], Query1), - {0, Docs} = query_reply (Reply), % batchsize negative so cursor should be closed (0) - case Docs of [] -> {}; [Doc | _] -> {Doc} end. +find_one(DbConn, Query) -> + Query1 = Query#'query'{batchsize = -1}, + Reply = mongo_connect:call(DbConn, [], Query1), + {0, Docs} = query_reply(Reply), % batchsize negative so cursor should be closed (0) + case Docs of [] -> {}; [Doc | _] -> {Doc} end. --spec find (mongo_connect:dbconnection(), 'query'()) -> mongo_cursor:cursor(). % QIO +-spec find(mongo_connect:dbconnection(), 'query'()) -> mongo_cursor:cursor(). % QIO %@doc Send read request to mongodb over connection and wait for reply of first batch. Return a cursor holding this batch and able to fetch next batch on demand. -find (DbConn, Query) -> - Reply = mongo_connect:call (DbConn, [], Query), - mongo_cursor:cursor (DbConn, Query #'query'.collection, Query #'query'.batchsize, query_reply (Reply)). +find(DbConn, Query) -> + Reply = mongo_connect:call(DbConn, [], Query), + mongo_cursor:cursor(DbConn, Query#'query'.collection, Query#'query'.batchsize, query_reply(Reply)). --spec query_reply (mongo_protocol:reply()) -> {cursorid(), [bson:document()]}. % QIO +-spec query_reply(mongo_protocol:reply()) -> {cursorid(), [bson:document()]}. % QIO %@doc Extract cursorid and results from reply. 'bad_query' error if query error. -query_reply (#reply { - cursornotfound = false, queryerror = QueryError, awaitcapable = _, - cursorid = Cid, startingfrom = _, documents = Docs }) -> - case QueryError of - false -> {Cid, Docs}; - true -> case bson:at (code, hd (Docs)) of - 13435 -> throw (not_master); - 10057 -> throw (unauthorized); - _ -> erlang:error ({bad_query, hd (Docs)}) end end. +query_reply(#reply{ + cursornotfound = false, queryerror = QueryError, awaitcapable = _, + cursorid = Cid, startingfrom = _, documents = Docs}) -> + case QueryError of + false -> {Cid, Docs}; + true -> case bson:at(code, hd(Docs)) of + 13435 -> throw(not_master); + 10057 -> throw(unauthorized); + _ -> erlang:error({bad_query, hd(Docs)}) end end. diff --git a/src/mongo_replset.erl b/src/mongo_replset.erl index 94a67db0..0cf40903 100644 --- a/src/mongo_replset.erl +++ b/src/mongo_replset.erl @@ -1,8 +1,8 @@ %@doc Get connection to appropriate server in a replica set --module (mongo_replset). +-module(mongo_replset). --export_type ([replset/0, rs_connection/0]). --export ([connect/1, connect/2, primary/1, secondary_ok/1, close/1, is_closed/1]). % API +-export_type([replset/0, rs_connection/0]). +-export([connect/1, connect/2, primary/1, secondary_ok/1, close/1, is_closed/1]). % API -type maybe(A) :: {A} | {}. -type err_or(A) :: {ok, A} | {error, reason()}. @@ -13,18 +13,18 @@ % find (Pred, []) -> {}; % find (Pred, [A | Tail]) -> case Pred (A) of true -> {A}; false -> find (Pred, Tail) end. --spec until_success ([A], fun ((A) -> B)) -> B. % EIO, fun EIO +-spec until_success([A], fun ((A) -> B)) -> B. % EIO, fun EIO %@doc Apply fun on each element until one doesn't fail. Fail if all fail or list is empty -until_success ([], _Fun) -> throw ([]); -until_success ([A | Tail], Fun) -> try Fun (A) - catch Reason -> try until_success (Tail, Fun) - catch Reasons -> throw ([Reason | Reasons]) end end. +until_success([], _Fun) -> throw([]); +until_success([A | Tail], Fun) -> try Fun(A) + catch Reason -> try until_success(Tail, Fun) + catch Reasons -> throw([Reason | Reasons]) end end. --spec rotate (integer(), [A]) -> [A]. +-spec rotate(integer(), [A]) -> [A]. %@doc Move first N element of list to back of list -rotate (N, List) -> - {Front, Back} = lists:split (N, List), - Back ++ Front. +rotate(N, List) -> + {Front, Back} = lists:split(N, List), + Back ++ Front. -type host() :: mongo_connect:host(). -type connection() :: mongo_connect:connection(). @@ -33,15 +33,15 @@ rotate (N, List) -> % Identify replset. Hosts is just seed list, not necessarily all hosts in replica set -type rs_name() :: bson:utf8(). --spec connect (replset()) -> rs_connection(). % IO +-spec connect(replset()) -> rs_connection(). % IO %@doc Create new cache of connections to replica set members starting with seed members. No connection attempted until primary or secondary_ok called. -connect (ReplSet) -> connect (ReplSet, infinity). +connect(ReplSet) -> connect(ReplSet, infinity). --spec connect (replset(), timeout()) -> rs_connection(). % IO +-spec connect(replset(), timeout()) -> rs_connection(). % IO %@doc Create new cache of connections to replica set members starting with seed members. No connection attempted until primary or secondary_ok called. Timeout used for initial connection and every call. -connect ({ReplName, Hosts}, TimeoutMS) -> - Dict = dict:from_list (lists:map (fun (Host) -> {mongo_connect:host_port (Host), {}} end, Hosts)), - {rs_connection, ReplName, mvar:new (Dict), TimeoutMS}. +connect({ReplName, Hosts}, TimeoutMS) -> + Dict = dict:from_list(lists:map(fun(Host) -> {mongo_connect:host_port(Host), {}} end, Hosts)), + {rs_connection, ReplName, mvar:new(Dict), TimeoutMS}. -opaque rs_connection() :: {rs_connection, rs_name(), mvar:mvar(connections()), timeout()}. % Maintains set of connections to some if not all of the replica set members. Opaque except to mongo:connect_mode @@ -49,109 +49,110 @@ connect ({ReplName, Hosts}, TimeoutMS) -> -type connections() :: dict:dictionary (host(), maybe(connection())). % All hosts listed in last member_info fetched are keys in dict. Value is {} if no attempt to connect to that host yet --spec primary (rs_connection()) -> err_or(connection()). % IO +-spec primary(rs_connection()) -> err_or(connection()). % IO %@doc Return connection to current primary in replica set -primary (ReplConn) -> try - MemberInfo = fetch_member_info (ReplConn), - primary_conn (2, ReplConn, MemberInfo) - of Conn -> {ok, Conn} - catch Reason -> {error, Reason} end. +primary(ReplConn) -> try + MemberInfo = fetch_member_info(ReplConn), + primary_conn(2, ReplConn, MemberInfo) + of Conn -> {ok, Conn} + catch Reason -> {error, Reason} end. --spec secondary_ok (rs_connection()) -> err_or(connection()). % IO +-spec secondary_ok(rs_connection()) -> err_or(connection()). % IO %@doc Return connection to a current secondary in replica set or primary if none -secondary_ok (ReplConn) -> try - {_Conn, Info} = fetch_member_info (ReplConn), - Hosts = lists:map (fun mongo_connect:read_host/1, bson:at (hosts, Info)), - R = random:uniform (length (Hosts)) - 1, - secondary_ok_conn (ReplConn, rotate (R, Hosts)) - of Conn -> {ok, Conn} - catch Reason -> {error, Reason} end. - --spec close (rs_connection()) -> ok. % IO +secondary_ok(ReplConn) -> try + {_Conn, Info} = fetch_member_info(ReplConn), + Hosts = lists:map(fun mongo_connect:read_host/1, bson:at(hosts, Info)), + R = random:uniform(length(Hosts)) - 1, + secondary_ok_conn(ReplConn, rotate(R, Hosts)) + of Conn -> {ok, Conn} + catch Reason -> {error, Reason} end. + +-spec close(rs_connection()) -> ok. % IO %@doc Close replset connection -close ({rs_connection, _, VConns, _}) -> - CloseConn = fun (_, MCon, _) -> case MCon of {Con} -> mongo_connect:close (Con); {} -> ok end end, - mvar:with (VConns, fun (Dict) -> dict:fold (CloseConn, ok, Dict) end), - mvar:terminate (VConns). +close({rs_connection, _, VConns, _}) -> + CloseConn = fun(_, MCon, _) -> case MCon of {Con} -> mongo_connect:close(Con); {} -> ok end end, + mvar:with(VConns, fun(Dict) -> dict:fold(CloseConn, ok, Dict) end), + mvar:terminate(VConns). --spec is_closed (rs_connection()) -> boolean(). % IO +-spec is_closed(rs_connection()) -> boolean(). % IO %@doc Has replset connection been closed? -is_closed ({rs_connection, _, VConns, _}) -> mvar:is_terminated (VConns). +is_closed({rs_connection, _, VConns, _}) -> mvar:is_terminated(VConns). % EIO = IO that may throw error of any type -type member_info() :: {connection(), bson:document()}. % Result of isMaster query on a server connnection. Returned fields are: setName, ismaster, secondary, hosts, [primary]. primary only present when ismaster = false --spec primary_conn (integer(), rs_connection(), member_info()) -> connection(). % EIO +-spec primary_conn(integer(), rs_connection(), member_info()) -> connection(). % EIO %@doc Return connection to primary designated in member_info. Only chase primary pointer N times. -primary_conn (0, _ReplConn, MemberInfo) -> throw ({false_primary, MemberInfo}); -primary_conn (Tries, ReplConn, {Conn, Info}) -> case bson:at (ismaster, Info) of - true -> Conn; - false -> case bson:lookup (primary, Info) of - {HostString} -> - MemberInfo = connect_member (ReplConn, mongo_connect:read_host (HostString)), - primary_conn (Tries - 1, ReplConn, MemberInfo); - {} -> throw ({no_primary, {Conn, Info}}) end end. - --spec secondary_ok_conn (rs_connection(), [host()]) -> connection(). % EIO +primary_conn(0, _ReplConn, MemberInfo) -> throw({false_primary, MemberInfo}); +primary_conn(Tries, ReplConn, {Conn, Info}) -> case bson:at(ismaster, Info) of + true -> Conn; + false -> case bson:lookup(primary, Info) of + {HostString} -> + MemberInfo = connect_member(ReplConn, mongo_connect:read_host(HostString)), + primary_conn(Tries - 1, ReplConn, MemberInfo); + {} -> throw({no_primary, {Conn, Info}}) end end. + +-spec secondary_ok_conn(rs_connection(), [host()]) -> connection(). % EIO %@doc Return connection to a live secondaries in replica set, or primary if none -secondary_ok_conn (ReplConn, Hosts) -> try - until_success (Hosts, fun (Host) -> - {Conn, Info} = connect_member (ReplConn, Host), - case bson:at (secondary, Info) of true -> Conn; false -> throw (not_secondary) end end) - catch _ -> primary_conn (2, ReplConn, fetch_member_info (ReplConn)) end. - --spec fetch_member_info (rs_connection()) -> member_info(). % EIO +secondary_ok_conn(ReplConn, Hosts) -> try + until_success(Hosts, fun(Host) -> + {Conn, Info} = connect_member(ReplConn, Host), + case bson:at(secondary, Info) of true -> Conn; false -> + throw(not_secondary) end end) + catch _ -> primary_conn(2, ReplConn, fetch_member_info(ReplConn)) end. + +-spec fetch_member_info(rs_connection()) -> member_info(). % EIO %@doc Retrieve isMaster info from a current known member in replica set. Update known list of members from fetched info. -fetch_member_info (ReplConn = {rs_connection, _ReplName, VConns, _}) -> - OldHosts_ = dict:fetch_keys (mvar:read (VConns)), - {Conn, Info} = until_success (OldHosts_, fun (Host) -> connect_member (ReplConn, Host) end), - OldHosts = sets:from_list (OldHosts_), - NewHosts = sets:from_list (lists:map (fun mongo_connect:read_host/1, bson:at (hosts, Info))), - RemovedHosts = sets:subtract (OldHosts, NewHosts), - AddedHosts = sets:subtract (NewHosts, OldHosts), - mvar:modify_ (VConns, fun (Dict) -> - Dict1 = sets:fold (fun remove_host/2, Dict, RemovedHosts), - Dict2 = sets:fold (fun add_host/2, Dict1, AddedHosts), - Dict2 end), - case sets:is_element (mongo_connect:conn_host (Conn), RemovedHosts) of - false -> {Conn, Info}; - true -> % Conn connected to member but under wrong name (eg. localhost instead of 127.0.0.1) so it was closed and removed because it did not match a host in isMaster info. Reconnect using correct name. - Hosts = dict:fetch_keys (mvar:read (VConns)), - until_success (Hosts, fun (Host) -> connect_member (ReplConn, Host) end) end. - -add_host (Host, Dict) -> dict:store (Host, {}, Dict). - -remove_host (Host, Dict) -> - MConn = dict:fetch (Host, Dict), - Dict1 = dict:erase (Host, Dict), - case MConn of {Conn} -> mongo_connect:close (Conn); {} -> ok end, - Dict1. - --spec connect_member (rs_connection(), host()) -> member_info(). % EIO +fetch_member_info(ReplConn = {rs_connection, _ReplName, VConns, _}) -> + OldHosts_ = dict:fetch_keys(mvar:read(VConns)), + {Conn, Info} = until_success(OldHosts_, fun(Host) -> connect_member(ReplConn, Host) end), + OldHosts = sets:from_list(OldHosts_), + NewHosts = sets:from_list(lists:map(fun mongo_connect:read_host/1, bson:at(hosts, Info))), + RemovedHosts = sets:subtract(OldHosts, NewHosts), + AddedHosts = sets:subtract(NewHosts, OldHosts), + mvar:modify_(VConns, fun(Dict) -> + Dict1 = sets:fold(fun remove_host/2, Dict, RemovedHosts), + Dict2 = sets:fold(fun add_host/2, Dict1, AddedHosts), + Dict2 end), + case sets:is_element(mongo_connect:conn_host(Conn), RemovedHosts) of + false -> {Conn, Info}; + true -> % Conn connected to member but under wrong name (eg. localhost instead of 127.0.0.1) so it was closed and removed because it did not match a host in isMaster info. Reconnect using correct name. + Hosts = dict:fetch_keys(mvar:read(VConns)), + until_success(Hosts, fun(Host) -> connect_member(ReplConn, Host) end) end. + +add_host(Host, Dict) -> dict:store(Host, {}, Dict). + +remove_host(Host, Dict) -> + MConn = dict:fetch(Host, Dict), + Dict1 = dict:erase(Host, Dict), + case MConn of {Conn} -> mongo_connect:close(Conn); {} -> ok end, + Dict1. + +-spec connect_member(rs_connection(), host()) -> member_info(). % EIO %@doc Connect to host and verify membership. Cache connection in rs_connection -connect_member ({rs_connection, ReplName, VConns, TimeoutMS}, Host) -> - Conn = get_connection (VConns, Host, TimeoutMS), - Info = try get_member_info (Conn) catch _ -> - mongo_connect:close (Conn), - Conn1 = get_connection (VConns, Host, TimeoutMS), - get_member_info (Conn1) end, - case bson:at (setName, Info) of - ReplName -> {Conn, Info}; - _ -> - mongo_connect:close (Conn), - throw ({not_member, ReplName, Host, Info}) end. - -get_connection (VConns, Host, TimeoutMS) -> mvar:modify (VConns, fun (Dict) -> - case dict:find (Host, Dict) of - {ok, {Conn}} -> case mongo_connect:is_closed (Conn) of - false -> {Dict, Conn}; - true -> new_connection (Dict, Host, TimeoutMS) end; - _ -> new_connection (Dict, Host, TimeoutMS) end end). - -new_connection (Dict, Host, TimeoutMS) -> case mongo_connect:connect (Host, TimeoutMS) of - {ok, Conn} -> {dict:store (Host, {Conn}, Dict), Conn}; - {error, Reason} -> throw ({cant_connect, Reason}) end. - -get_member_info (Conn) -> mongo_query:command ({admin, Conn}, {isMaster, 1}, true). +connect_member({rs_connection, ReplName, VConns, TimeoutMS}, Host) -> + Conn = get_connection(VConns, Host, TimeoutMS), + Info = try get_member_info(Conn) catch _ -> + mongo_connect:close(Conn), + Conn1 = get_connection(VConns, Host, TimeoutMS), + get_member_info(Conn1) end, + case bson:at(setName, Info) of + ReplName -> {Conn, Info}; + _ -> + mongo_connect:close(Conn), + throw({not_member, ReplName, Host, Info}) end. + +get_connection(VConns, Host, TimeoutMS) -> mvar:modify(VConns, fun(Dict) -> + case dict:find(Host, Dict) of + {ok, {Conn}} -> case mongo_connect:is_closed(Conn) of + false -> {Dict, Conn}; + true -> new_connection(Dict, Host, TimeoutMS) end; + _ -> new_connection(Dict, Host, TimeoutMS) end end). + +new_connection(Dict, Host, TimeoutMS) -> case mongo_connect:connect(Host, TimeoutMS) of + {ok, Conn} -> {dict:store(Host, {Conn}, Dict), Conn}; + {error, Reason} -> throw({cant_connect, Reason}) end. + +get_member_info(Conn) -> mongo_query:command({admin, Conn}, {isMaster, 1}, true). diff --git a/src/mongoc.erl b/src/mongoc.erl new file mode 100644 index 00000000..cf59c439 --- /dev/null +++ b/src/mongoc.erl @@ -0,0 +1,169 @@ +%%%------------------------------------------------------------------- +%%% @author Jeff +%%% @copyright (C) 2015, +%%% @doc +%%% +%%% @end +%%% Created : 06. 十二月 2015 1:44 +%%%------------------------------------------------------------------- +-module(mongoc). +-author("zhujiafeng"). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). + +%% gen_server callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-define(SERVER, ?MODULE). +-compile(export_all). +-record(state, {}). +-export([connect/4, rs_connect/3, find_one/4]). +%%%=================================================================== +%%% API +%%%=================================================================== +connect(Pid, Host, Port, Opts) -> + gen_server:call(Pid, {connect, Host, Port, Opts}). +rs_connect(Pid, ReplSet, Opts) -> + gen_server:call(Pid, {rs_connect, ReplSet, Opts}). +find_one(Pid, DbConn, Coll, Selector) -> + gen_server:call(Pid, {find_one, DbConn, Coll, Selector}). +primary(Pid, RSConn) -> + gen_server:call(Pid, {primary, RSConn}). +%%-------------------------------------------------------------------- +%% @doc +%% Starts the server +%% +%% @end +%%-------------------------------------------------------------------- +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Initializes the server +%% +%% @spec init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% @end +%%-------------------------------------------------------------------- +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + {ok, #state{}}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling call messages +%% +%% @end +%%-------------------------------------------------------------------- +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call({connect, Host, Port, _Opts}, _From, State) -> + {ok, Conn} = mongo:connect(Host, Port), + {reply, {ok, Conn}, State}; +handle_call({rs_connect, ReplSet, _Opts}, _From, State) -> + RSConn = mongo:rs_connect(ReplSet), + {reply, {ok, RSConn}, State}; +handle_call({find_one, DbConn, Coll, Selector}, _From, State) -> + Doc = mongo:find_one(DbConn, Coll, Selector), + {reply, {ok, Doc}, State}; +handle_call({primary, RSConn}, _From, State) -> + case mongo_replset:primary(RSConn) of + {ok, Conn} -> + {reply, {ok, Conn}, State}; + {error, Reason} -> + {reply, {error, Reason}, State} + end; +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling cast messages +%% +%% @end +%%-------------------------------------------------------------------- +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast(_Request, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling all non call/cast messages +%% +%% @spec handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info(_Info, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +%% +%% @spec terminate(Reason, State) -> void() +%% @end +%%-------------------------------------------------------------------- +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Convert process state when code is changed +%% +%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} +%% @end +%%-------------------------------------------------------------------- +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/src/mongodb_app.erl b/src/mongodb_app.erl index 1c0966f9..1aef1d1b 100644 --- a/src/mongodb_app.erl +++ b/src/mongodb_app.erl @@ -1,49 +1,49 @@ %@doc Init some internal global variables used by mongodb app --module (mongodb_app). +-module(mongodb_app). --behaviour (application). --export ([start/2, stop/1]). +-behaviour(application). +-export([start/2, stop/1]). --behaviour (supervisor). --export ([init/1]). +-behaviour(supervisor). +-export([init/1]). --export ([gen_objectid/0, next_requestid/0]). % API +-export([gen_objectid/0, next_requestid/0]). % API %% Behaviour callbacks -start (_, []) -> supervisor:start_link ({local, ?MODULE}, ?MODULE, []). +start(_, []) -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). -stop (_) -> ok. +stop(_) -> ok. %% Supervisor callbacks %@doc Create global vars which will be owned by this supervisor (and die with it) -init ([]) -> - ets:new (?MODULE, [named_table, public]), - ets:insert (?MODULE, [ - {oid_counter, 0}, - {oid_machineprocid, oid_machineprocid()}, - {requestid_counter, 0} ]), - {ok, {{one_for_one,3,10}, []}}. +init([]) -> + ets:new(?MODULE, [named_table, public]), + ets:insert(?MODULE, [ + {oid_counter, 0}, + {oid_machineprocid, oid_machineprocid()}, + {requestid_counter, 0}]), + {ok, {{one_for_one, 3, 10}, []}}. %% API functions --spec next_requestid () -> mongo_protocol:requestid(). % IO +-spec next_requestid() -> mongo_protocol:requestid(). % IO %@doc Fresh request id -next_requestid() -> ets:update_counter (?MODULE, requestid_counter, 1). +next_requestid() -> ets:update_counter(?MODULE, requestid_counter, 1). --spec gen_objectid () -> bson:objectid(). % IO +-spec gen_objectid() -> bson:objectid(). % IO %@doc Fresh object id gen_objectid() -> - Now = bson:unixtime_to_secs (bson:timenow()), - MPid = ets:lookup_element (?MODULE, oid_machineprocid, 2), - N = ets:update_counter (?MODULE, oid_counter, 1), - bson:objectid (Now, MPid, N). + Now = bson:unixtime_to_secs(bson:timenow()), + MPid = ets:lookup_element(?MODULE, oid_machineprocid, 2), + N = ets:update_counter(?MODULE, oid_counter, 1), + bson:objectid(Now, MPid, N). --spec oid_machineprocid () -> <<_:40>>. % IO +-spec oid_machineprocid() -> <<_:40>>. % IO %@doc Fetch hostname and os pid and compress into a 5 byte id oid_machineprocid() -> - OSPid = list_to_integer (os:getpid()), - {ok, Hostname} = inet:gethostname(), - <> = erlang:md5 (Hostname), - <>. + OSPid = list_to_integer(os:getpid()), + {ok, Hostname} = inet:gethostname(), + <> = erlang:md5(Hostname), + <>. diff --git a/src/mongodb_tests.erl b/src/mongodb_tests.erl index a96d6acc..30119537 100644 --- a/src/mongodb_tests.erl +++ b/src/mongodb_tests.erl @@ -4,162 +4,162 @@ -module(mongodb_tests). -include_lib("eunit/include/eunit.hrl"). --include ("mongo_protocol.hrl"). +-include("mongo_protocol.hrl"). --export ([test/0, test_rs/0]). +-export([test/0, test_rs/0]). -test() -> eunit:test ({setup, - fun () -> application:start (mongodb), - io:format (user, "~n** Make sure mongod is running on 127.0.0.1:27017 **~n~n", []) end, - fun (_) -> application:stop (mongodb) end, - [fun var_test/0, - fun var_finalize_test/0, - fun app_test/0, - fun connect_test/0, - fun mongo_test/0, - fun resource_pool_test/0 - ]}). +test() -> eunit:test({setup, + fun() -> application:start(mongodb), + io:format(user, "~n** Make sure mongod is running on 127.0.0.1:27017 **~n~n", []) end, + fun(_) -> application:stop(mongodb) end, + [fun var_test/0, + fun var_finalize_test/0, + fun app_test/0, + fun connect_test/0, + fun mongo_test/0, + fun resource_pool_test/0 + ]}). -test_rs() -> eunit:test ({setup, - fun () -> application:start (mongodb), - io:format (user, "~n** Make sure replica set is running on 127.0.0.1:27017 & 27018 **~n~n", []) end, - fun (_) -> application:stop (mongodb) end, - [fun replset_test/0, - fun mongo_rs_test/0 - ]}). +test_rs() -> eunit:test({setup, + fun() -> application:start(mongodb), + io:format(user, "~n** Make sure replica set is running on 127.0.0.1:27017 & 27018 **~n~n", []) end, + fun(_) -> application:stop(mongodb) end, + [fun replset_test/0, + fun mongo_rs_test/0 + ]}). var_test() -> - Var = mvar:new (0), - 0 = mvar:write (Var, 1), - 1 = mvar:read (Var), - foo = mvar:modify (Var, fun (N) -> {N+1, foo} end), - 2 = mvar:read (Var), - foo = (catch mvar:with (Var, fun (_) -> throw (foo) end)), - mvar:terminate (Var), - {exit, {noproc, _}} = try mvar:read (Var) catch C:E -> {C, E} end, - mvar:terminate (Var). % repeat termination is no-op (not failure) + Var = mvar:new(0), + 0 = mvar:write(Var, 1), + 1 = mvar:read(Var), + foo = mvar:modify(Var, fun(N) -> {N + 1, foo} end), + 2 = mvar:read(Var), + foo = (catch mvar:with(Var, fun(_) -> throw(foo) end)), + mvar:terminate(Var), + {exit, {noproc, _}} = try mvar:read(Var) catch C:E -> {C, E} end, + mvar:terminate(Var). % repeat termination is no-op (not failure) var_finalize_test() -> - Var0 = mvar:new ({}), - Var = mvar:new (0, fun (N) -> mvar:write (Var0, N) end), - {} = mvar:read (Var0), - 0 = mvar:read (Var), - mvar:terminate (Var), - 0 = mvar:read (Var0), - mvar:terminate (Var0). + Var0 = mvar:new({}), + Var = mvar:new(0, fun(N) -> mvar:write(Var0, N) end), + {} = mvar:read(Var0), + 0 = mvar:read(Var), + mvar:terminate(Var), + 0 = mvar:read(Var0), + mvar:terminate(Var0). % This test must be run first right after application start (assumes counter table contain initial values) app_test() -> - 1 = mongodb_app:next_requestid(), - UnixSecs = bson:unixtime_to_secs (bson:timenow()), - {<>} = bson:objectid (UnixSecs, <<1,2,3,4,5>>, 0), - {<>} = mongodb_app:gen_objectid(). % high two timestamp bytes should match + 1 = mongodb_app:next_requestid(), + UnixSecs = bson:unixtime_to_secs(bson:timenow()), + {<>} = bson:objectid(UnixSecs, <<1, 2, 3, 4, 5>>, 0), + {<>} = mongodb_app:gen_objectid(). % high two timestamp bytes should match % Mongod server must be running on 127.0.0.1:27017 connect_test() -> - {error, _} = mongo_connect:connect ({"127.0.0.1", 26555}), - {ok, Conn} = mongo_connect:connect ({"127.0.0.1", 27017}), - DbConn = {test, Conn}, - Res = mongo_query:write (DbConn, #delete {collection = foo, selector = {}}, {}), - {null} = bson:lookup (err, Res), - Doc0 = {'_id', 0, text, <<"hello">>}, - Doc1 = {'_id', 1, text, <<"world">>}, - Res1 = mongo_query:write (DbConn, #insert {collection = foo, documents = [Doc0, Doc1]}, {}), - {null} = bson:lookup (err, Res1), - ok = mongo_query:write (DbConn, #update {collection = foo, selector = {'_id', 1}, updater = {'$set', {text, <<"world!!">>}}}), - Doc1X = bson:update (text, <<"world!!">>, Doc1), - Cursor = mongo_query:find (DbConn, #'query' {collection = foo, selector = {}}), - [Doc0, Doc1X] = mongo_cursor:rest (Cursor), - true = mongo_cursor:is_closed (Cursor), - #reply {cursornotfound = true} = mongo_connect:call (DbConn, [], #getmore {collection = foo, cursorid = 2938725639}), - mongo_connect:close (Conn), - true = mongo_connect:is_closed (Conn). + {error, _} = mongo_connect:connect({"127.0.0.1", 26555}), + {ok, Conn} = mongo_connect:connect({"127.0.0.1", 27017}), + DbConn = {test, Conn}, + Res = mongo_query:write(DbConn, #delete{collection = foo, selector = {}}, {}), + {null} = bson:lookup(err, Res), + Doc0 = {'_id', 0, text, <<"hello">>}, + Doc1 = {'_id', 1, text, <<"world">>}, + Res1 = mongo_query:write(DbConn, #insert{collection = foo, documents = [Doc0, Doc1]}, {}), + {null} = bson:lookup(err, Res1), + ok = mongo_query:write(DbConn, #update{collection = foo, selector = {'_id', 1}, updater = {'$set', {text, <<"world!!">>}}}), + Doc1X = bson:update(text, <<"world!!">>, Doc1), + Cursor = mongo_query:find(DbConn, #'query'{collection = foo, selector = {}}), + [Doc0, Doc1X] = mongo_cursor:rest(Cursor), + true = mongo_cursor:is_closed(Cursor), + #reply{cursornotfound = true} = mongo_connect:call(DbConn, [], #getmore{collection = foo, cursorid = 2938725639}), + mongo_connect:close(Conn), + true = mongo_connect:is_closed(Conn). % Mongod server must be running on 127.0.0.1:27017 mongo_test() -> - {ok, Conn} = mongo:connect ("127.0.0.1"), - mongo:do (safe, master, Conn, baseball, fun () -> - mongo:delete (team, {}), - Teams0 = [ - {name, <<"Yankees">>, home, {city, <<"New York">>, state, <<"NY">>}, league, <<"American">>}, - {name, <<"Mets">>, home, {city, <<"New York">>, state, <<"NY">>}, league, <<"National">>}, - {name, <<"Phillies">>, home, {city, <<"Philadelphia">>, state, <<"PA">>}, league, <<"National">>}, - {name, <<"Red Sox">>, home, {city, <<"Boston">>, state, <<"MA">>}, league, <<"American">>} ], - Ids = mongo:insert_all (team, Teams0), - 4 = mongo:count (team, {}), - Teams = lists:zipwith (fun (Id, Team) -> bson:append ({'_id', Id}, Team) end, Ids, Teams0), - Teams = mongo:rest (mongo:find (team, {})), - NationalTeams = lists:filter (fun (Team) -> bson:lookup (league, Team) == {<<"National">>} end, Teams), - NationalTeams = mongo:rest (mongo:find (team, {league, <<"National">>})), - TeamNames = lists:map (fun (Team) -> {name, bson:at (name, Team)} end, Teams), - TeamNames = mongo:rest (mongo:find (team, {}, {'_id', 0, name, 1})), - BostonTeam = lists:last (Teams), - {BostonTeam} = mongo:find_one (team, {home, {city, <<"Boston">>, state, <<"MA">>}}), - mongo:delete_one (team, {}), - 3 = mongo:count (team, {}) - end), - mongo:disconnect (Conn). + {ok, Conn} = mongo:connect("127.0.0.1"), + mongo:do(safe, master, Conn, baseball, fun() -> + mongo:delete(team, {}), + Teams0 = [ + {name, <<"Yankees">>, home, {city, <<"New York">>, state, <<"NY">>}, league, <<"American">>}, + {name, <<"Mets">>, home, {city, <<"New York">>, state, <<"NY">>}, league, <<"National">>}, + {name, <<"Phillies">>, home, {city, <<"Philadelphia">>, state, <<"PA">>}, league, <<"National">>}, + {name, <<"Red Sox">>, home, {city, <<"Boston">>, state, <<"MA">>}, league, <<"American">>}], + Ids = mongo:insert_all(team, Teams0), + 4 = mongo:count(team, {}), + Teams = lists:zipwith(fun(Id, Team) -> bson:append({'_id', Id}, Team) end, Ids, Teams0), + Teams = mongo:rest(mongo:find(team, {})), + NationalTeams = lists:filter(fun(Team) -> bson:lookup(league, Team) == {<<"National">>} end, Teams), + NationalTeams = mongo:rest(mongo:find(team, {league, <<"National">>})), + TeamNames = lists:map(fun(Team) -> {name, bson:at(name, Team)} end, Teams), + TeamNames = mongo:rest(mongo:find(team, {}, {'_id', 0, name, 1})), + BostonTeam = lists:last(Teams), + {BostonTeam} = mongo:find_one(team, {home, {city, <<"Boston">>, state, <<"MA">>}}), + mongo:delete_one(team, {}), + 3 = mongo:count(team, {}) + end), + mongo:disconnect(Conn). % Mongod server must be running on 127.0.0.1:27017 resource_pool_test() -> - Pool = resource_pool:new (mongo:connect_factory ({"127.0.0.1", 27017}), 2), - Do = fun (Conn) -> mongo:do (safe, master, Conn, admin, fun () -> mongo:command ({listDatabases, 1}) end) end, - lists:foreach (fun (_) -> - {ok, Conn} = resource_pool:get (Pool), - {ok, Doc} = Do (Conn), - {_} = bson:lookup (databases, Doc) end, - lists:seq (1,8)), - resource_pool:close (Pool), - true = resource_pool:is_closed (Pool). + Pool = resource_pool:new(mongo:connect_factory({"127.0.0.1", 27017}), 2), + Do = fun(Conn) -> mongo:do(safe, master, Conn, admin, fun() -> mongo:command({listDatabases, 1}) end) end, + lists:foreach(fun(_) -> + {ok, Conn} = resource_pool:get(Pool), + {ok, Doc} = Do(Conn), + {_} = bson:lookup(databases, Doc) end, + lists:seq(1, 8)), + resource_pool:close(Pool), + true = resource_pool:is_closed(Pool). % Replica set named "rs1" must be running on localhost:27017 & 27018 replset_test() -> % TODO: change from connect_test - RS0 = mongo_replset:connect ({<<"rs0">>,[localhost]}), - {error, [{not_member, _, _, _} | _]} = mongo_replset:primary (RS0), - mongo_replset:close (RS0), - RS1 = mongo_replset:connect ({<<"rs1">>,[localhost]}), - {ok, Conn} = mongo_replset:primary (RS1), - DbConn = {test, Conn}, - Res = mongo_query:write (DbConn, #delete {collection = foo, selector = {}}, {}), - {null} = bson:lookup (err, Res), - Doc0 = {'_id', 0, text, <<"hello">>}, - Doc1 = {'_id', 1, text, <<"world">>}, - Res1 = mongo_query:write (DbConn, #insert {collection = foo, documents = [Doc0, Doc1]}, {}), - {null} = bson:lookup (err, Res1), - ok = mongo_query:write (DbConn, #update {collection = foo, selector = {'_id', 1}, updater = {'$set', {text, <<"world!!">>}}}), - Doc1X = bson:update (text, <<"world!!">>, Doc1), - Cursor = mongo_query:find (DbConn, #'query' {collection = foo, selector = {}}), - [Doc0, Doc1X] = mongo_cursor:rest (Cursor), - {ok, Conn2} = mongo_replset:secondary_ok (RS1), - DbConn2 = {test, Conn2}, - Cursor2 = mongo_query:find (DbConn2, #'query' {collection = foo, selector = {}, slaveok = true}), - [Doc0, Doc1X] = mongo_cursor:rest (Cursor2), - mongo_replset:close (RS1), - true = mongo_replset:is_closed (RS1). + RS0 = mongo_replset:connect({<<"rs0">>, [localhost]}), + {error, [{not_member, _, _, _} | _]} = mongo_replset:primary(RS0), + mongo_replset:close(RS0), + RS1 = mongo_replset:connect({<<"rs1">>, [localhost]}), + {ok, Conn} = mongo_replset:primary(RS1), + DbConn = {test, Conn}, + Res = mongo_query:write(DbConn, #delete{collection = foo, selector = {}}, {}), + {null} = bson:lookup(err, Res), + Doc0 = {'_id', 0, text, <<"hello">>}, + Doc1 = {'_id', 1, text, <<"world">>}, + Res1 = mongo_query:write(DbConn, #insert{collection = foo, documents = [Doc0, Doc1]}, {}), + {null} = bson:lookup(err, Res1), + ok = mongo_query:write(DbConn, #update{collection = foo, selector = {'_id', 1}, updater = {'$set', {text, <<"world!!">>}}}), + Doc1X = bson:update(text, <<"world!!">>, Doc1), + Cursor = mongo_query:find(DbConn, #'query'{collection = foo, selector = {}}), + [Doc0, Doc1X] = mongo_cursor:rest(Cursor), + {ok, Conn2} = mongo_replset:secondary_ok(RS1), + DbConn2 = {test, Conn2}, + Cursor2 = mongo_query:find(DbConn2, #'query'{collection = foo, selector = {}, slaveok = true}), + [Doc0, Doc1X] = mongo_cursor:rest(Cursor2), + mongo_replset:close(RS1), + true = mongo_replset:is_closed(RS1). % Replica set named "rs1" must be running on localhost:27017 & 27018 mongo_rs_test() -> - RsConn = mongo:rs_connect ({<<"rs1">>,["127.0.0.1"]}), - {ok, {Teams1, Ids1}} = mongo:do (safe, master, RsConn, baseball, fun () -> - mongo:delete (team, {}), - Teams0 = [ - {name, <<"Yankees">>, home, {city, <<"New York">>, state, <<"NY">>}, league, <<"American">>}, - {name, <<"Mets">>, home, {city, <<"New York">>, state, <<"NY">>}, league, <<"National">>}, - {name, <<"Phillies">>, home, {city, <<"Philadelphia">>, state, <<"PA">>}, league, <<"National">>}, - {name, <<"Red Sox">>, home, {city, <<"Boston">>, state, <<"MA">>}, league, <<"American">>} ], - Ids0 = mongo:insert_all (team, Teams0), - {Teams0, Ids0} - end), - timer:sleep (200), - mongo:do (safe, slave_ok, RsConn, baseball, fun () -> - 4 = mongo:count (team, {}), - Teams = lists:zipwith (fun (Id, Team) -> bson:append ({'_id', Id}, Team) end, Ids1, Teams1), - Teams = mongo:rest (mongo:find (team, {})), - NationalTeams = lists:filter (fun (Team) -> bson:lookup (league, Team) == {<<"National">>} end, Teams), - NationalTeams = mongo:rest (mongo:find (team, {league, <<"National">>})), - TeamNames = lists:map (fun (Team) -> {name, bson:at (name, Team)} end, Teams), - TeamNames = mongo:rest (mongo:find (team, {}, {'_id', 0, name, 1})), - BostonTeam = lists:last (Teams), - {BostonTeam} = mongo:find_one (team, {home, {city, <<"Boston">>, state, <<"MA">>}}) - end), - mongo:rs_disconnect (RsConn). + RsConn = mongo:rs_connect({<<"rs1">>, ["127.0.0.1"]}), + {ok, {Teams1, Ids1}} = mongo:do(safe, master, RsConn, baseball, fun() -> + mongo:delete(team, {}), + Teams0 = [ + {name, <<"Yankees">>, home, {city, <<"New York">>, state, <<"NY">>}, league, <<"American">>}, + {name, <<"Mets">>, home, {city, <<"New York">>, state, <<"NY">>}, league, <<"National">>}, + {name, <<"Phillies">>, home, {city, <<"Philadelphia">>, state, <<"PA">>}, league, <<"National">>}, + {name, <<"Red Sox">>, home, {city, <<"Boston">>, state, <<"MA">>}, league, <<"American">>}], + Ids0 = mongo:insert_all(team, Teams0), + {Teams0, Ids0} + end), + timer:sleep(200), + mongo:do(safe, slave_ok, RsConn, baseball, fun() -> + 4 = mongo:count(team, {}), + Teams = lists:zipwith(fun(Id, Team) -> bson:append({'_id', Id}, Team) end, Ids1, Teams1), + Teams = mongo:rest(mongo:find(team, {})), + NationalTeams = lists:filter(fun(Team) -> bson:lookup(league, Team) == {<<"National">>} end, Teams), + NationalTeams = mongo:rest(mongo:find(team, {league, <<"National">>})), + TeamNames = lists:map(fun(Team) -> {name, bson:at(name, Team)} end, Teams), + TeamNames = mongo:rest(mongo:find(team, {}, {'_id', 0, name, 1})), + BostonTeam = lists:last(Teams), + {BostonTeam} = mongo:find_one(team, {home, {city, <<"Boston">>, state, <<"MA">>}}) + end), + mongo:rs_disconnect(RsConn). diff --git a/src/mvar.erl b/src/mvar.erl index 5b42f02e..9d88f332 100644 --- a/src/mvar.erl +++ b/src/mvar.erl @@ -1,13 +1,13 @@ %@doc A mvar is a process that holds a value (its content) and provides exclusive access to it. When a mvar terminates it executes it given finalize procedure, which is needed for content that needs to clean up when terminating. When a mvar is created it executes its supplied initialize procedure, which creates the initial content from within the mvar process so if the initial content is another linked dependent process (such as a socket) it will terminate when the mvar terminates without the need for a finalizer. A mvar itself dependently links to its parent process (the process that created it) and thus terminates when its parent process terminates. --module (mvar). +-module(mvar). --export_type ([mvar/1]). --export ([create/2, new/2, new/1]). --export ([modify/2, modify_/2, with/2, read/1, write/2]). --export ([terminate/1, is_terminated/1]). +-export_type([mvar/1]). +-export([create/2, new/2, new/1]). +-export([modify/2, modify_/2, with/2, read/1, write/2]). +-export([terminate/1, is_terminated/1]). --behaviour (gen_server). --export ([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-behaviour(gen_server). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -type mvar(_) :: pid() | atom(). % Unregistered or registered process holding a value of given paramterized type @@ -17,85 +17,86 @@ -type finalizer(A) :: fun ((A) -> ok). % IO % Closes supplied value. Any exception will not be caught causing an exit signal to be sent to parent (creating) process. --spec create (initializer(A), finalizer(A)) -> mvar(A). % IO throws X +-spec create(initializer(A), finalizer(A)) -> mvar(A). % IO throws X %@doc Create new mvar with initial content created from initializer (run within new mvar process so it owns it). Any throw in initializer will be caught and re-thrown in the calling process. Other exceptions will not be caught causing an exit signal to be sent to calling process. When the mvar terminates then given finalizer will be executed against current content. Any exception raised in finalizer (when terminating) will be sent as an exit signal to the parent (calling) process. -create (Initialize, Finalize) -> - Ref = make_ref(), - case gen_server:start_link (?MODULE, {self(), Ref, Initialize, Finalize}, []) of - {ok, Pid} -> Pid; - ignore -> receive {mvar_init_throw, Ref, Thrown} -> throw (Thrown) end end. +create(Initialize, Finalize) -> + Ref = make_ref(), + case gen_server:start_link(?MODULE, {self(), Ref, Initialize, Finalize}, []) of + {ok, Pid} -> Pid; + ignore -> receive {mvar_init_throw, Ref, Thrown} -> throw(Thrown) end end. --spec new (A, finalizer(A)) -> mvar(A). % IO +-spec new(A, finalizer(A)) -> mvar(A). % IO %@doc Same as create/2 except initial value given directly -new (Value, Finalize) -> create (fun () -> Value end, Finalize). +new(Value, Finalize) -> create(fun() -> Value end, Finalize). --spec new (A) -> mvar(A). % IO +-spec new(A) -> mvar(A). % IO %@doc Same as new/2 except no finalizer -new (Value) -> new (Value, fun (_) -> ok end). +new(Value) -> new(Value, fun(_) -> ok end). --type modifier(A,B) :: fun ((A) -> {A, B}). % IO throws X +-type modifier(A, B) :: fun ((A) -> {A, B}). % IO throws X --spec modify (mvar(A), modifier(A,B)) -> B. % IO throws X +-spec modify(mvar(A), modifier(A, B)) -> B. % IO throws X %@doc Atomically modify content and return associated result. Any throw is caught and re-thrown in caller. Errors are not caught and will terminate var and send exit signal to parent. -modify (Var, Modify) -> case gen_server:call (Var, {modify, Modify}, infinity) of - {ok, B} -> B; - {throw, Thrown} -> throw (Thrown) end. +modify(Var, Modify) -> case gen_server:call(Var, {modify, Modify}, infinity) of + {ok, B} -> B; + {throw, Thrown} -> throw(Thrown) end. --spec modify_ (mvar(A), fun ((A) -> A)) -> ok. % IO throws X +-spec modify_(mvar(A), fun ((A) -> A)) -> ok. % IO throws X %@doc Same as modify but don't return anything -modify_ (Var, Modify) -> modify (Var, fun (A) -> {Modify (A), ok} end). +modify_(Var, Modify) -> modify(Var, fun(A) -> {Modify(A), ok} end). --spec with (mvar(A), fun ((A) -> B)) -> B. % IO throws X, fun IO throws X +-spec with(mvar(A), fun ((A) -> B)) -> B. % IO throws X, fun IO throws X %@doc Execute Procedure with exclusive access to content but don't modify it. -with (Var, Act) -> modify (Var, fun (A) -> {A, Act (A)} end). +with(Var, Act) -> modify(Var, fun(A) -> {A, Act(A)} end). --spec read (mvar(A)) -> A. % IO +-spec read(mvar(A)) -> A. % IO %@doc Return content -read (Var) -> with (Var, fun (A) -> A end). +read(Var) -> with(Var, fun(A) -> A end). --spec write (mvar(A), A) -> A. % IO +-spec write(mvar(A), A) -> A. % IO %@doc Change content and return previous content -write (Var, Value) -> modify (Var, fun (A) -> {Value, A} end). +write(Var, Value) -> modify(Var, fun(A) -> {Value, A} end). --spec terminate (mvar(_)) -> ok. % IO +-spec terminate(mvar(_)) -> ok. % IO %@doc Terminate mvar. Its finalizer will be executed. Future accesses to this mvar will fail, although repeated termination is fine. -terminate (Var) -> catch gen_server:call (Var, stop, infinity), ok. +terminate(Var) -> catch gen_server:call(Var, stop, infinity), ok. --spec is_terminated (mvar(_)) -> boolean(). % IO +-spec is_terminated(mvar(_)) -> boolean(). % IO %@doc Has mvar been terminated? -is_terminated (Var) -> not is_process_alive (Var). +is_terminated(Var) -> not is_process_alive(Var). % gen_server callbacks % -type state(A) :: {A, finalizer(A)}. --spec init ({pid(), reference(), initializer(A), finalizer(A)}) -> {ok, state(A)} | ignore. % IO +-spec init({pid(), reference(), initializer(A), finalizer(A)}) -> {ok, state(A)} | ignore. % IO % Create initial value using initializer and return it in state with finalizer. Catch throws in initializer and report it to caller via direct send and `ignore` result. `create` will pick this up an re-throw it in caller. An error in initializer will cause process to abort and exit signal being sent to caller. -init ({Caller, ThrowRef, Initialize, Finalize}) -> try Initialize() - of A -> {ok, {A, Finalize}} - catch Thrown -> Caller ! {mvar_init_throw, ThrowRef, Thrown}, ignore end. +init({Caller, ThrowRef, Initialize, Finalize}) -> try Initialize() + of A -> {ok, {A, Finalize}} + catch Thrown -> Caller ! {mvar_init_throw, ThrowRef, Thrown}, + ignore end. -spec handle_call - ({modify, modifier(A,B)}, {pid(), tag()}, state(A)) -> {reply, {ok, B} | {throw, any()}, state(A)}; - (stop, {pid(), tag()}, state(A)) -> {stop, normal, ok, state(A)}. % IO + ({modify, modifier(A, B)}, {pid(), tag()}, state(A)) -> {reply, {ok, B} | {throw, any()}, state(A)}; + (stop, {pid(), tag()}, state(A)) -> {stop, normal, ok, state(A)}. % IO % Modify content and return associated value. Catch any throws and return them to `modify` to be re-thrown. Errors will abort this mvar process and send exit signal to linked owner. -handle_call ({modify, Modify}, _From, {A, X}) -> try Modify (A) - of {A1, B} -> {reply, {ok, B}, {A1, X}} - catch Thrown -> {reply, {throw, Thrown}, {A, X}} end; +handle_call({modify, Modify}, _From, {A, X}) -> try Modify(A) + of {A1, B} -> {reply, {ok, B}, {A1, X}} + catch Thrown -> {reply, {throw, Thrown}, {A, X}} end; % Terminate mvar -handle_call (stop, _From, State) -> {stop, normal, ok, State}. +handle_call(stop, _From, State) -> {stop, normal, ok, State}. --spec terminate (reason(), state(_)) -> any(). % IO. Result ignored +-spec terminate(reason(), state(_)) -> any(). % IO. Result ignored % Execute finalizer upon termination -terminate (_Reason, {A, Finalize}) -> Finalize (A). +terminate(_Reason, {A, Finalize}) -> Finalize(A). -type tag() :: any(). % Unique tag -type reason() :: any(). -handle_cast (_Request, State) -> {noreply, State}. +handle_cast(_Request, State) -> {noreply, State}. -handle_info (Message, State) -> - io:format ("received: ~p~n", [Message]), - {noreply, State}. +handle_info(Message, State) -> + io:format("received: ~p~n", [Message]), + {noreply, State}. -code_change (_OldVsn, State, _Extra) -> {ok, State}. +code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/src/resource_pool.erl b/src/resource_pool.erl index 79029360..a4d9c812 100644 --- a/src/resource_pool.erl +++ b/src/resource_pool.erl @@ -1,16 +1,16 @@ %@doc A set of N resources handed out randomly, and recreated on expiration --module (resource_pool). +-module(resource_pool). --export_type ([factory/1, create/1, expire/1, is_expired/1]). --export_type ([pool/1]). --export ([new/2, get/1, close/1, is_closed/1]). +-export_type([factory/1, create/1, expire/1, is_expired/1]). +-export_type([pool/1]). +-export([new/2, get/1, close/1, is_closed/1]). -type maybe(A) :: {A} | {}. -type err_or(A) :: {ok, A} | {error, any()}. --spec trans_error (fun (() -> err_or(A))) -> A. % IO throws any() +-spec trans_error(fun (() -> err_or(A))) -> A. % IO throws any() %@doc Convert error return to throw -trans_error (Act) -> case Act() of {ok, A} -> A; {error, Reason} -> throw (Reason) end. +trans_error(Act) -> case Act() of {ok, A} -> A; {error, Reason} -> throw(Reason) end. -type factory(A) :: {any(), create(A), expire(A), is_expired(A)}. % Object for creating, destroying, and checking resources of type A. @@ -22,30 +22,30 @@ trans_error (Act) -> case Act() of {ok, A} -> A; {error, Reason} -> throw (Reaso -opaque pool(A) :: {factory(A), mvar:mvar (array:array (maybe(A)))}. % Pool of N resources of type A, created on demand, recreated on expiration, and handed out randomly --spec new (factory(A), integer()) -> pool(A). +-spec new(factory(A), integer()) -> pool(A). %@doc Create empty pool that will create and destroy resources using given factory and allow up to N resources at once -new (Factory, MaxSize) -> {Factory, mvar:new (array:new (MaxSize, [{fixed, false}, {default, {}}]))}. +new(Factory, MaxSize) -> {Factory, mvar:new(array:new(MaxSize, [{fixed, false}, {default, {}}]))}. --spec get (pool(A)) -> err_or(A). % IO +-spec get(pool(A)) -> err_or(A). % IO %@doc Return a random resource from the pool, creating one if necessary. Error if failed to create -get ({{Input,Create,_,IsExpired}, VResources}) -> - New = fun (Array, I) -> Res = trans_error (fun () -> Create (Input) end), {array:set (I, {Res}, Array), Res} end, - Check = fun (Array, I, Res) -> case IsExpired (Res) of true -> New (Array, I); false -> {Array, Res} end end, - try mvar:modify (VResources, fun (Array) -> - R = random:uniform (array:size (Array)) - 1, - case array:get (R, Array) of - {Res} -> Check (Array, R, Res); - {} -> New (Array, R) end end) - of Resource -> {ok, Resource} - catch Reason -> {error, Reason} end. - --spec close (pool(_)) -> ok. % IO +get({{Input, Create, _, IsExpired}, VResources}) -> + New = fun(Array, I) -> Res = trans_error(fun() -> Create(Input) end), {array:set(I, {Res}, Array), Res} end, + Check = fun(Array, I, Res) -> case IsExpired(Res) of true -> New(Array, I); false -> {Array, Res} end end, + try mvar:modify(VResources, fun(Array) -> + R = random:uniform(array:size(Array)) - 1, + case array:get(R, Array) of + {Res} -> Check(Array, R, Res); + {} -> New(Array, R) end end) + of Resource -> {ok, Resource} + catch Reason -> {error, Reason} end. + +-spec close(pool(_)) -> ok. % IO %@doc Close pool and all its resources -close ({{_,_,Expire,_}, VResources}) -> - mvar:with (VResources, fun (Array) -> - array:map (fun (_I, MRes) -> case MRes of {Res} -> Expire (Res); {} -> ok end end, Array) end), - mvar:terminate (VResources). +close({{_, _, Expire, _}, VResources}) -> + mvar:with(VResources, fun(Array) -> + array:map(fun(_I, MRes) -> case MRes of {Res} -> Expire(Res); {} -> ok end end, Array) end), + mvar:terminate(VResources). --spec is_closed (pool(_)) -> boolean(). % IO +-spec is_closed(pool(_)) -> boolean(). % IO %@doc Has pool been closed? -is_closed ({_, VResources}) -> mvar:is_terminated (VResources). +is_closed({_, VResources}) -> mvar:is_terminated(VResources). diff --git a/test/m_test.erl b/test/m_test.erl new file mode 100644 index 00000000..a5542eca --- /dev/null +++ b/test/m_test.erl @@ -0,0 +1,28 @@ +#! /usr/bin/env escript +%%! -smp enable -mnesia debug verbose -Wall -pz ../deps/bson/ebin ../ebin + +-module(m_test). + +-export([start/0]). + +start() -> + ok=application:start(mongodb), + {ok,Pid}=mongoc:start_link(), + ReplSet={<<"rs0">>, [{localhost, 27018}, {localhost, 27019},{localhost,27020}]}, + {ok,RelCon}=mongoc:rs_connect(Pid,ReplSet,[]), + %Sec=mongo_replset:secondary_ok(RelCon), + {ok,Prim}=mongo_replset:primary(RelCon), + %io:format("first get~p~n",[erlang:get(mongo_action_context)]), + %%{ok,A}=mongo:do(unsafe,master,RelCon,mongo,fun() -> ok end), + %mongo:save_config(unsafe,master,RelCon,mongo), + Sec=0, + %io:format("atfer do get~p~n",[erlang:get(mongo_action_context)]), + io:format("Rs:~p~nPrimary:~p~nSecondary:~p~n",[RelCon,Prim,Sec]), + %Res0=mongo:insert ({mongodb,Prim},foo, {x,1, y,2}), + %io:format("insert: Res~p~n",[Res0]), + Res=mongo:find_one ({test,Prim},foo, {x,1}), + io:format("find: Cursor~p~n",[Res]), + mongo:rs_disconnect(RelCon). + +main(_Args) -> + start().