diff --git a/README.md b/README.md index 4deb66394..e3be950ae 100644 --- a/README.md +++ b/README.md @@ -137,6 +137,7 @@ of like poetry dev.dependencies stanza. main.txt and dev.txt are kind of like po versions of dependencies to use. main.txt and dev.txt are combined in the docker compose build process to create the final requirements.txt file and import the dependencies into the Docker image. + ## Local Testing Tests can be found in `tests` and are run with the following commands: diff --git a/nmdc_runtime/api/core/idgen.py b/nmdc_runtime/api/core/idgen.py index 4ec691d57..2a59de0f2 100644 --- a/nmdc_runtime/api/core/idgen.py +++ b/nmdc_runtime/api/core/idgen.py @@ -73,9 +73,9 @@ def generate_ids( shoulder: str = "fk4", ) -> List[str]: collection = mdb.get_collection(collection_name(naa, shoulder)) - existing_count = collection.count_documents({}) + initial_count = collection.count_documents({}) n_chars = next( - (n for n, t in SPING_SIZE_THRESHOLDS if (number + existing_count) < t), + (n for n, t in SPING_SIZE_THRESHOLDS if (number + initial_count) < t), 12, ) collected = [] diff --git a/nmdc_runtime/api/db/mongo.py b/nmdc_runtime/api/db/mongo.py index 4a625c747..023cd23d6 100644 --- a/nmdc_runtime/api/db/mongo.py +++ b/nmdc_runtime/api/db/mongo.py @@ -10,7 +10,6 @@ from jsonschema import Draft7Validator from nmdc_schema.nmdc import Database as NMDCDatabase from pymongo.errors import AutoReconnect, OperationFailure -from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase from refscan.lib.Finder import Finder from refscan.scanner import scan_outgoing_references from tenacity import wait_random_exponential, retry, retry_if_exception_type @@ -27,15 +26,16 @@ nmdc_database_collection_names, get_allowed_references, ) -from pymongo import MongoClient +from pymongo import AsyncMongoClient, MongoClient from pymongo.database import Database as MongoDatabase +from pymongo.asynchronous.database import AsyncDatabase @retry( retry=retry_if_exception_type(AutoReconnect), wait=wait_random_exponential(multiplier=0.5, max=60), ) -def check_mongo_ok_autoreconnect(mdb: MongoDatabase): +def check_mongo_ok_autoreconnect(mdb: MongoDatabase) -> bool: mdb["_runtime.healthcheck"].insert_one({"_id": "ok"}) mdb["_runtime.healthcheck"].delete_one({"_id": "ok"}) return True @@ -55,6 +55,19 @@ def get_mongo_client() -> MongoClient: ) +@lru_cache +def get_async_mongo_client() -> AsyncMongoClient: + r""" + Returns an `AsyncMongoClient` instance you can use to access the MongoDB server specified via environment variables. + """ + return AsyncMongoClient( + host=os.getenv("MONGO_HOST"), + username=os.getenv("MONGO_USERNAME"), + password=os.getenv("MONGO_PASSWORD"), + directConnection=True, + ) + + @lru_cache def get_mongo_db() -> MongoDatabase: r""" @@ -62,7 +75,9 @@ def get_mongo_db() -> MongoDatabase: Reference: https://pymongo.readthedocs.io/en/stable/api/pymongo/database.html#pymongo.database.Database """ _client = get_mongo_client() - mdb = _client[os.getenv("MONGO_DBNAME")] + database_name = os.getenv("MONGO_DBNAME") + assert database_name is not None, "MONGO_DBNAME is None" + mdb = _client[database_name] check_mongo_ok_autoreconnect(mdb) return mdb @@ -74,20 +89,20 @@ def get_session_bound_mongo_db(session=None) -> MongoDatabase: Reference: https://pymongo.readthedocs.io/en/stable/api/pymongo/database.html#pymongo.database.Database """ _client = get_mongo_client() - mdb = _client[os.getenv("MONGO_DBNAME")] + database_name = os.getenv("MONGO_DBNAME") + assert database_name is not None, "MONGO_DBNAME is None" + mdb = _client[database_name] check_mongo_ok_autoreconnect(mdb) return SessionBoundDatabase(mdb, session) if session is not None else mdb @lru_cache -def get_async_mongo_db() -> AsyncIOMotorDatabase: - _client = AsyncIOMotorClient( - host=os.getenv("MONGO_HOST"), - username=os.getenv("MONGO_USERNAME"), - password=os.getenv("MONGO_PASSWORD"), - directConnection=True, - ) - return _client[os.getenv("MONGO_DBNAME")] +def get_async_mongo_db() -> AsyncDatabase: + _client = get_async_mongo_client() + database_name = os.getenv("MONGO_DBNAME") + assert database_name is not None, "MONGO_DBNAME is None" + mdb = _client[database_name] + return mdb def get_nonempty_nmdc_schema_collection_names(mdb: MongoDatabase) -> Set[str]: diff --git a/nmdc_runtime/api/endpoints/jobs.py b/nmdc_runtime/api/endpoints/jobs.py index 917edbd7f..1848c983f 100644 --- a/nmdc_runtime/api/endpoints/jobs.py +++ b/nmdc_runtime/api/endpoints/jobs.py @@ -1,6 +1,7 @@ import json from typing import Optional, Annotated +from pymongo.asynchronous.database import AsyncDatabase from pymongo.database import Database from fastapi import APIRouter, Depends, Query, HTTPException, Path from pymongo.errors import ConnectionFailure, OperationFailure @@ -9,7 +10,7 @@ from nmdc_runtime.api.core.util import ( raise404_if_none, ) -from nmdc_runtime.api.db.mongo import get_mongo_db +from nmdc_runtime.api.db.mongo import get_async_mongo_db, get_mongo_db from nmdc_runtime.api.endpoints.util import list_resources, _claim_job from nmdc_runtime.api.models.job import Job, JobClaim from nmdc_runtime.api.models.operation import Operation, MetadataT @@ -26,9 +27,9 @@ @router.get( "/jobs", response_model=ListResponse[Job], response_model_exclude_unset=True ) -def list_jobs( +async def list_jobs( req: Annotated[ListRequest, Query()], - mdb: Database = Depends(get_mongo_db), + adb: AsyncDatabase = Depends(get_async_mongo_db), maybe_site: Optional[Site] = Depends(maybe_get_current_client_site), ): """List pre-configured workflow jobs. @@ -39,7 +40,7 @@ def list_jobs( """ if isinstance(maybe_site, Site) and req.filter is None: req.filter = json.dumps({"claims.site_id": {"$ne": maybe_site.id}}) - return list_resources(req, mdb, "jobs") + return await list_resources(req, adb, "jobs") @router.get("/jobs/{job_id}", response_model=Job, response_model_exclude_unset=True) diff --git a/nmdc_runtime/api/endpoints/nmdcschema.py b/nmdc_runtime/api/endpoints/nmdcschema.py index 957ba0bae..a2886fdde 100644 --- a/nmdc_runtime/api/endpoints/nmdcschema.py +++ b/nmdc_runtime/api/endpoints/nmdcschema.py @@ -3,6 +3,7 @@ from typing import List, Dict, Annotated import pymongo +from pymongo.asynchronous.database import AsyncDatabase from fastapi import APIRouter, Depends, HTTPException, Path, Query from pydantic import AfterValidator from refscan.lib.helpers import ( @@ -24,6 +25,7 @@ from nmdc_runtime.api.core.metadata import map_id_to_collection, get_collection_for_id from nmdc_runtime.api.core.util import raise404_if_none from nmdc_runtime.api.db.mongo import ( + get_async_mongo_db, get_mongo_db, ) from nmdc_runtime.api.endpoints.util import ( @@ -485,7 +487,7 @@ def get_collection_names(): response_model=ListResponse[Doc], response_model_exclude_unset=True, ) -def list_from_collection( +async def list_from_collection( collection_name: Annotated[ str, Path( @@ -495,7 +497,7 @@ def list_from_collection( ), ], req: Annotated[ListRequest, Query()], - mdb: MongoDatabase = Depends(get_mongo_db), + adb: AsyncDatabase = Depends(get_async_mongo_db), ): r""" Retrieves resources that match the specified filter criteria and reside in the specified collection. @@ -521,7 +523,7 @@ def list_from_collection( # raise HTTP_400_BAD_REQUEST on invalid collection_name ensure_collection_name_is_known_to_schema(collection_name) - rv = list_resources(req, mdb, collection_name) + rv = await list_resources(req, adb, collection_name) rv["resources"] = [strip_oid(d) for d in rv["resources"]] return rv diff --git a/nmdc_runtime/api/endpoints/objects.py b/nmdc_runtime/api/endpoints/objects.py index e4dac6401..d5a9fbe61 100644 --- a/nmdc_runtime/api/endpoints/objects.py +++ b/nmdc_runtime/api/endpoints/objects.py @@ -4,6 +4,7 @@ from fastapi import APIRouter, status, Depends, HTTPException, Query from gridfs import GridFS from pymongo import ReturnDocument +from pymongo.asynchronous.database import AsyncDatabase from pymongo.database import Database as MongoDatabase import requests from starlette.responses import RedirectResponse @@ -11,7 +12,7 @@ from nmdc_runtime.api.core.idgen import decode_id, generate_one_id, local_part from nmdc_runtime.api.core.util import raise404_if_none, API_SITE_ID -from nmdc_runtime.api.db.mongo import get_mongo_db +from nmdc_runtime.api.db.mongo import get_async_mongo_db, get_mongo_db from nmdc_runtime.api.db.s3 import S3_ID_NS, presigned_url_to_get, get_s3_client from nmdc_runtime.api.endpoints.util import ( list_resources, @@ -90,11 +91,11 @@ def create_object( @router.get("/objects", response_model=ListResponse[DrsObject]) -def list_objects( +async def list_objects( req: Annotated[ListRequest, Query()], - mdb: MongoDatabase = Depends(get_mongo_db), + adb: AsyncDatabase = Depends(get_async_mongo_db), ): - return list_resources(req, mdb, "objects") + return await list_resources(req, adb, "objects") @router.get( diff --git a/nmdc_runtime/api/endpoints/operations.py b/nmdc_runtime/api/endpoints/operations.py index d046e6dd9..e106483bb 100644 --- a/nmdc_runtime/api/endpoints/operations.py +++ b/nmdc_runtime/api/endpoints/operations.py @@ -1,11 +1,12 @@ from typing import Annotated import pymongo +from pymongo.asynchronous.database import AsyncDatabase from fastapi import APIRouter, Depends, status, HTTPException, Query from toolz import get_in, merge, assoc from nmdc_runtime.api.core.util import raise404_if_none, pick -from nmdc_runtime.api.db.mongo import get_mongo_db +from nmdc_runtime.api.db.mongo import get_async_mongo_db, get_mongo_db from nmdc_runtime.api.endpoints.util import list_resources from nmdc_runtime.api.models.operation import ( ListOperationsResponse, @@ -21,11 +22,11 @@ @router.get("/operations", response_model=ListOperationsResponse[ResultT, MetadataT]) -def list_operations( +async def list_operations( req: Annotated[ListRequest, Query()], - mdb: pymongo.database.Database = Depends(get_mongo_db), + adb: AsyncDatabase = Depends(get_async_mongo_db), ): - return list_resources(req, mdb, "operations") + return await list_resources(req, adb, "operations") @router.get("/operations/{op_id}", response_model=Operation[ResultT, MetadataT]) diff --git a/nmdc_runtime/api/endpoints/search.py b/nmdc_runtime/api/endpoints/search.py index b48c411ed..c04f0f42b 100644 --- a/nmdc_runtime/api/endpoints/search.py +++ b/nmdc_runtime/api/endpoints/search.py @@ -1,9 +1,9 @@ import json from fastapi import APIRouter, Depends -from pymongo.database import Database as MongoDatabase +from pymongo.asynchronous.database import AsyncDatabase -from nmdc_runtime.api.db.mongo import get_mongo_db +from nmdc_runtime.api.db.mongo import get_async_mongo_db from nmdc_runtime.api.endpoints.nmdcschema import strip_oid from nmdc_runtime.api.endpoints.util import list_resources from nmdc_runtime.api.models.nmdc_schema import ( @@ -21,9 +21,9 @@ response_model=ListResponse[DataObject], response_model_exclude_unset=True, ) -def data_objects( +async def data_objects( req: DataObjectListRequest = Depends(), - mdb: MongoDatabase = Depends(get_mongo_db), + adb: AsyncDatabase = Depends(get_async_mongo_db), ): filter_ = list_request_filter_to_mongo_filter(req.model_dump(exclude_unset=True)) max_page_size = filter_.pop("max_page_size", None) @@ -33,6 +33,6 @@ def data_objects( max_page_size=max_page_size, page_token=page_token, ) - rv = list_resources(req, mdb, "data_objects") + rv = await list_resources(req, adb, "data_objects") rv["resources"] = [strip_oid(d) for d in rv["resources"]] return rv diff --git a/nmdc_runtime/api/endpoints/sites.py b/nmdc_runtime/api/endpoints/sites.py index 46bd20b57..210dbc843 100644 --- a/nmdc_runtime/api/endpoints/sites.py +++ b/nmdc_runtime/api/endpoints/sites.py @@ -1,6 +1,7 @@ from typing import List, Annotated import botocore +from pymongo.asynchronous.database import AsyncDatabase import pymongo.database from fastapi import APIRouter, Depends, status, HTTPException, Path, Query from starlette.status import HTTP_403_FORBIDDEN @@ -17,7 +18,7 @@ generate_secret, API_SITE_ID, ) -from nmdc_runtime.api.db.mongo import get_mongo_db +from nmdc_runtime.api.db.mongo import get_async_mongo_db, get_mongo_db from nmdc_runtime.api.db.s3 import ( get_s3_client, presigned_url_to_put, @@ -72,11 +73,11 @@ def create_site( @router.get( "/sites", response_model=ListResponse[Site], response_model_exclude_unset=True ) -def list_sites( +async def list_sites( req: Annotated[ListRequest, Query()], - mdb: pymongo.database.Database = Depends(get_mongo_db), + adb: AsyncDatabase = Depends(get_async_mongo_db), ): - return list_resources(req, mdb, "sites") + return await list_resources(req, adb, "sites") @router.get("/sites/{site_id}", response_model=Site, response_model_exclude_unset=True) diff --git a/nmdc_runtime/api/endpoints/util.py b/nmdc_runtime/api/endpoints/util.py index 471aa0e0f..36b4594bd 100644 --- a/nmdc_runtime/api/endpoints/util.py +++ b/nmdc_runtime/api/endpoints/util.py @@ -6,7 +6,7 @@ from json import JSONDecodeError from pathlib import Path from time import time_ns -from typing import List, Optional, Set, Tuple +from typing import List, Optional, Set, Tuple, Union from zoneinfo import ZoneInfo from bson import json_util @@ -44,6 +44,8 @@ ResultT, ) from nmdc_runtime.util import drs_metadata_for +from pymongo.asynchronous.cursor import AsyncCursor +from pymongo.asynchronous.database import AsyncDatabase from pymongo.collection import Collection as MongoCollection from pymongo.database import Database as MongoDatabase from pymongo.errors import DuplicateKeyError @@ -73,7 +75,7 @@ def check_filter(filter_: str): return filter_ -def list_resources(req: ListRequest, mdb: MongoDatabase, collection_name: str): +async def list_resources(req: ListRequest, adb: AsyncDatabase, collection_name: str): r""" Returns a dictionary containing the requested MongoDB documents, maybe alongside pagination information. @@ -83,8 +85,19 @@ def list_resources(req: ListRequest, mdb: MongoDatabase, collection_name: str): when the collection involved contains many documents. """ + # Get information about the indexes defined on the collection. + index_information = await adb[collection_name].index_information() + + # Check whether any of the indexes has the name, `id_1`. + # + # Note: I think this is an attempt to determine whether there + # is an index on the `id` field, specifically, and that it + # was written under the assumption that the name of the index + # would be the name MongoDB would have given it by default + # (as opposed to a custom name someone might have given it). + # id_field = "id" - if "id_1" not in mdb[collection_name].index_information(): + if "id_1" not in index_information: logging.warning( f"list_resources: no index set on 'id' for collection {collection_name}" ) @@ -99,13 +112,24 @@ def list_resources(req: ListRequest, mdb: MongoDatabase, collection_name: str): else None ) if req.page_token: - doc = mdb.page_tokens.find_one({"_id": req.page_token, "ns": collection_name}) + token_filter = {"_id": req.page_token, "ns": collection_name} + + # Get the page token document. + doc = await adb.page_tokens.find_one(token_filter) if doc is None: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Bad page_token" ) last_id = doc["last_id"] - mdb.page_tokens.delete_one({"_id": req.page_token}) + + # Delete the page token document. + # + # Note: I don't know why the filter differs from the one used above + # to find the page token document. + # + token_filter = {"_id": req.page_token} + await adb.page_tokens.delete_one(token_filter) + else: last_id = None if last_id is not None: @@ -114,31 +138,47 @@ def list_resources(req: ListRequest, mdb: MongoDatabase, collection_name: str): else: filter_ = merge(filter_, {id_field: {"$gt": last_id}}) - # If limit is 0, the response will include all results (bypassing pagination altogether). - if (limit == 0) or (mdb[collection_name].count_documents(filter=filter_) <= limit): - rv = { - "resources": list( - mdb[collection_name].find(filter=filter_, projection=projection) - ) - } + # If the limit is either (a) 0 or (b) greater than the number of documents in the result set, + # the response will include all results (bypassing pagination altogether). + will_paginate = True + if limit == 0: + will_paginate = False + elif isinstance(limit, int): + num_docs_in_result = await adb[collection_name].count_documents(filter=filter_) + if limit > num_docs_in_result: + will_paginate = False + + if not will_paginate: + # Note: When using `AsyncDatabase`, the `find` method is synchronous, but returns an `AsyncCursor`. + resources_async_cursor: AsyncCursor = adb[collection_name].find( + filter=filter_, projection=projection + ) + resources = await resources_async_cursor.to_list() + rv = {"resources": resources} return rv else: - resources = list( - mdb[collection_name].find( - filter=filter_, - projection=projection, - limit=limit, - sort=[(id_field, 1)], - allow_disk_use=True, - ) + find_args = dict( + filter=filter_, + projection=projection, + limit=limit, + sort=[(id_field, 1)], + allow_disk_use=True, ) + resources_async_cursor: AsyncCursor = adb[collection_name].find(**find_args) + resources = await resources_async_cursor.to_list() last_id = resources[-1][id_field] - token = generate_one_id(mdb, "page_tokens") + + # Note: Here, we need to get a synchronous `MongoDatabase`, since the `generate_one_id` + # helper function doesn't accept an `AsyncDatabase` yet. As a result, this will + # be a blocking operation. + mdb_synchronous: MongoDatabase = get_mongo_db() + token = generate_one_id(mdb_synchronous, "page_tokens") + # TODO unify with `/queries:run` query continuation model # => {_id: cursor/token, query: , last_id: <>, last_modified: <>} - mdb.page_tokens.insert_one( - {"_id": token, "ns": collection_name, "last_id": last_id} - ) + token_descriptor = {"_id": token, "ns": collection_name, "last_id": last_id} + await adb.page_tokens.insert_one(token_descriptor) + return {"resources": resources, "next_page_token": token} diff --git a/nmdc_runtime/core/db/Database.py b/nmdc_runtime/core/db/Database.py index 111b644a5..8b2b8f99d 100644 --- a/nmdc_runtime/core/db/Database.py +++ b/nmdc_runtime/core/db/Database.py @@ -1,3 +1,7 @@ +r""" +TODO: Delete this module if it is obsolete. +""" + from contextlib import contextmanager from motor import motor_asyncio diff --git a/requirements/dev.txt b/requirements/dev.txt index fcef7b524..4d94b1bb9 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -23,6 +23,10 @@ certifi==2025.4.26 # via # -c requirements/main.txt # requests +cffi==1.17.1 + # via + # -c requirements/main.txt + # cryptography charset-normalizer==3.4.2 # via # -c requirements/main.txt @@ -36,6 +40,10 @@ coverage==7.8.0 # via # -r requirements/dev.in # pytest-cov +cryptography==44.0.3 + # via + # -c requirements/main.txt + # secretstorage docutils==0.21.2 # via readme-renderer exceptiongroup==1.3.0 @@ -66,6 +74,10 @@ jaraco-context==6.0.1 # via keyring jaraco-functools==4.1.0 # via keyring +jeepney==0.9.0 + # via + # keyring + # secretstorage keyring==25.6.0 # via twine markdown-it-py==3.0.0 @@ -110,6 +122,10 @@ pluggy==1.5.0 # pytest pycodestyle==2.13.0 # via flake8 +pycparser==2.22 + # via + # -c requirements/main.txt + # cffi pyflakes==3.3.2 # via flake8 pygments==2.19.1 @@ -164,6 +180,8 @@ rich==13.9.4 # via # -c requirements/main.txt # twine +secretstorage==3.3.3 + # via keyring tomli==2.2.1 # via # -c requirements/main.txt diff --git a/requirements/main.in b/requirements/main.in index ddf0943d7..9c7522804 100644 --- a/requirements/main.in +++ b/requirements/main.in @@ -41,13 +41,17 @@ lxml mkdocs-jupyter mkdocs-material mkdocs-mermaid2-plugin +# TODO: Remove `motor` as a dependency, once it is not longer in use. +# It is currently imported in: `nmdc_runtime/core/db/Database.py` motor ontology-loader==0.2.2 nmdc-schema==11.10.0 openpyxl pandas passlib[bcrypt] -pymongo +# Note: In `pymongo` version 4.13.0, "the asynchronous API is now stable and no longer in beta." +# Reference: https://pymongo.readthedocs.io/en/4.13.0/changelog.html +pymongo>=4.13.0 pydantic[email]>=1.10.0 # Note: We use `pyinstrument` for performance profiling. # Docs https://pyinstrument.readthedocs.io/en/latest/guide.html#profile-a-web-request-in-fastapi diff --git a/requirements/main.txt b/requirements/main.txt index 8ab35a041..9082cbe96 100644 --- a/requirements/main.txt +++ b/requirements/main.txt @@ -27,8 +27,6 @@ anyio==4.9.0 # watchfiles appdirs==1.4.4 # via oaklib -appnope==0.1.4 - # via ipykernel argon2-cffi==23.1.0 # via jupyter-server argon2-cffi-bindings==21.2.0 @@ -259,6 +257,8 @@ graphql-relay==3.2.0 # via graphene graphviz==0.20.3 # via linkml +greenlet==3.2.4 + # via sqlalchemy grpcio==1.71.0 # via # dagster @@ -701,7 +701,7 @@ pymdown-extensions==10.15 # via # mkdocs-material # mkdocs-mermaid2-plugin -pymongo==4.12.1 +pymongo==4.14.0 # via # -r requirements/main.in # motor