Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,7 @@ IS_SCALAR_ENABLED=true
# Feature flag that can be used to enable/disable performance profiling,
# which can be activated via the `?profile=true` URL query parameter.
IS_PROFILING_ENABLED=true

# Set the threshold level for logging (via python's `logging` module)
# Reference: https://docs.python.org/3/library/logging.html#levels
LOG_LEVEL=DEBUG
50 changes: 21 additions & 29 deletions nmdc_runtime/api/analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,55 +4,47 @@
"""

from datetime import datetime
import threading
from time import time
from typing import Dict, List

from pymongo.errors import OperationFailure, BulkWriteError
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
from starlette.requests import Request
from starlette.responses import Response
from starlette.types import ASGIApp
from toolz import merge

from nmdc_runtime.api.db.mongo import get_mongo_db
from nmdc_runtime.mongo_util import get_runtime_mdb

# This is a queue of the "request descriptors" that we will eventually insert into the database.
_requests = []
_last_posted = datetime.now()
_last_post_attempt = datetime.now()


def _post_requests(collection: str, requests_data: List[Dict], source: str):
async def _post_requests(collection: str, requests_data: List[Dict], source: str):
"""Inserts the specified request descriptors into the specified MongoDB collection."""
mdb = get_mongo_db()
mdb[collection].insert_many([merge(d, {"source": source}) for d in requests_data])
mdb = await get_runtime_mdb()
await mdb.raw[collection].insert_many(
[merge(d, {"source": source}) for d in requests_data]
)


def log_request(collection: str, request_data: Dict, source: str = "FastAPI"):
async def log_request(collection: str, request_data: Dict, source: str = "FastAPI"):
"""Flushes the queue of request descriptors to the database if enough time has passed since the previous time."""
global _requests, _last_posted
global _requests, _last_post_attempt
_requests.append(request_data)
now = datetime.now()
# flush queue every minute at most
if (now - _last_posted).total_seconds() > 60.0:
# Note: This use of threading is an attempt to avoid blocking the current thread
# while performing the insertion(s).
#
# TODO: Is there is a race condition here? If multiple requests arrive at approximately
# the same time, is it possible that each one causes a different thread to be
# started, each with a different (and possibly overlapping) set of requests to
# insert?
#
# TODO: If the insertion fails, will the requests be lost?
#
# Note: The author of this function said it may have been a "standard" solution copied
# from some documentation. Indeed, the comment at the top of this module contains
# a link to code on which it was based.
#
threading.Thread(
target=_post_requests, args=(collection, _requests, source)
).start()
_requests = [] # empties the queue
_last_posted = now
if (now - _last_post_attempt).total_seconds() > 60.0:
# Note: There is no race condition here because the FastAPI application has a single event loop.
# Note: The approach here was based on one linked to at the top of this module.
try:
await _post_requests(collection, _requests, source)
_requests = [] # empties the queue
_last_post_attempt = now
except (BulkWriteError, OperationFailure):
# If insertion fails, retain `_requests` in the queue and retry on the next attempt.
_last_post_attempt = now


class Analytics(BaseHTTPMiddleware):
Expand Down Expand Up @@ -86,5 +78,5 @@ async def dispatch(
"created_at": datetime.now().isoformat(),
}

log_request(self.collection, request_data, "FastAPI")
await log_request(self.collection, request_data, "FastAPI")
return response
139 changes: 93 additions & 46 deletions nmdc_runtime/api/core/idgen.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
import re
from datetime import datetime, timezone
from typing import List
from typing import List, Any

import base32_lib as base32
from pymongo.database import Database as MongoDatabase

from nmdc_runtime.minter.domain.model import id_typecode_pattern, id_shoulder_pattern
from nmdc_runtime.mongo_util import (
AsyncMongoDatabase,
RuntimeAsyncMongoDatabase,
get_runtime_mdb,
)


def generate_id(length=10, split_every=4, checksum=True) -> str:
Expand Down Expand Up @@ -70,23 +77,44 @@ def encode_id(number: int, split_every=4, min_length=10, checksum=True) -> int:
SPING_SIZE_THRESHOLDS = [(n, (2 ** (5 * n)) // 2) for n in [2, 4, 6, 8, 10]]


def collection_name(naa, shoulder):
r"""
def collection_name(ns: str, shoulder: str = "0sys0", naa: str = "nmdc_runtime"):
"""
Returns a string designed to be used as a MongoDB collection name.

TODO: Document the function parameters, including expanding the "naa" acronym.
Helper function for `generate_ids`. See its docstring for details on parameters.
"""
return f"ids_{naa}_{shoulder}"
return f"_runtime.ids.{naa}.{ns}.{shoulder}"


def collection_name_from_id(id_: str) -> str:
"""A string corresponding to the expected host MongoDB collection name for an ID generated by this module."""
naa, basename = id_.split(":", maxsplit=1)
ns, shoulder, blade = basename.split("-", maxsplit=2)
return collection_name(ns=ns, shoulder=shoulder, naa=naa)


def check_valid_ns(ns: str) -> None:
if not re.match(id_typecode_pattern, ns):
raise ValueError(
f"An ID namespace (aka typecode) must match the pattern: '{id_typecode_pattern}'."
)

def generate_ids(
mdb: MongoDatabase,

def check_valid_shoulder(ns: str) -> None:
if not re.match(id_shoulder_pattern, ns):
raise ValueError(
f"An ID shoulder must match the pattern: '{id_shoulder_pattern}'."
)


async def generate_ids(
owner: str,
populator: str,
number: int,
ns: str = "",
naa: str = "nmdc",
shoulder: str = "fk4",
ns: str,
naa: str = "nmdc_runtime",
shoulder: str = "4fk4",
mdb: RuntimeAsyncMongoDatabase = None,
) -> List[str]:
r"""
Generate the specified number of identifiers, storing them in a MongoDB collection
Expand All @@ -100,27 +128,33 @@ def generate_ids(
Indicates "who generated this ID." Callers will oftentimes set
this to the name of a Runtime "site" (as in, a "site client" site,
not a "Dagster" site).
:param ns: Namespace (see Minter docs); e.g. "changesheets"
:param ns: Namespace; e.g. "csheet" for changesheets
:param naa: Name-Assigning Authority (see Minter docs); e.g. "nmdc"
:param shoulder: String that will go in the "how" field (see Minter docs); e.g. "sys0"

This function was written the way it was in an attempt to mirror the ARK spec:
https://www.ietf.org/archive/id/draft-kunze-ark-41.html (found via: https://arks.org/specs/)

Deviations from the ARK spec include:
1. The inclusion of a typecode.
1. The inclusion of a namespace (aka typecode).
The inclusion of a typecode came out of discussions with team members,
who wanted identifiers to include some non-opaque substring that could be used
to determine what type of resource a given identifier refers to.
2. Making hyphens mandatory.
2. Starting shoulders with a number. This makes it straightforward to parse an ID's namespace/typecode and
shoulder in the absence of hyphens. However, see the next point.
3. Making hyphens mandatory.
We decided to make the hyphens mandatory, whereas the spec says they are optional.
> "Hyphens are considered to be insignificant and are always ignored in ARKs."
> Reference: https://www.ietf.org/archive/id/draft-kunze-ark-41.html#name-character-repertoires
In our case, we require that users include an identifier's hyphens whenever
they are using that identifier.
"""
collection = mdb.get_collection(collection_name(naa, shoulder))
estimated_document_count = collection.estimated_document_count()
check_valid_ns(ns)
mdb = mdb or await get_runtime_mdb()
collection = mdb.raw.get_collection(
collection_name(ns=ns, shoulder=shoulder, naa=naa)
)
estimated_document_count = await collection.estimated_document_count()
n_chars = next(
(
n
Expand All @@ -130,20 +164,27 @@ def generate_ids(
12,
)
collected = []

while True:
eids = set()
id_names = set()
n_to_generate = number - len(collected)
while len(eids) < n_to_generate:
eids.add(generate_id(length=(n_chars + 2), split_every=0, checksum=True))
eids = list(eids)
deids = [decode_id(eid) for eid in eids]
taken = {d["_id"] for d in collection.find({"_id": {"$in": deids}}, {"_id": 1})}
not_taken = [
(eid, eid_decoded)
for eid, eid_decoded in zip(eids, deids)
if eid_decoded not in taken
]
while len(id_names) < n_to_generate:
blade = generate_id(length=(n_chars + 2), split_every=0, checksum=True)
id_name = f"{naa}:{ns}-{shoulder}-{blade}"
id_names.add(id_name)
id_names = list(id_names)
taken = {
d["_id"]
async for d in collection.find({"_id": {"$in": id_names}}, {"_id": 1})
}
not_taken = [n for n in id_names if n not in taken]

# class Identifier(Entity):
# name: str
# typecode: Entity
# shoulder: Entity
# status: Status
# bindings: Optional[dict] = None

if not_taken:
# All attribute names beginning with "__a" are reserved...
# https://github.com/jkunze/n2t-eggnog/blob/0f0f4c490e6dece507dba710d3557e29b8f6627e/egg#L1882
Expand All @@ -153,45 +194,51 @@ def generate_ids(
docs = [
{
"@context": "https://n2t.net/e/n2t_apidoc.html#identifier-metadata",
"_id": eid_decoded,
"_id": id_name,
"who": populator,
"what": (f"{ns}/{eid}" if ns else "(:tba) Work in progress"),
"what": (f"{ns} member" if ns else "(:tba) Work in progress"),
"when": datetime.now(timezone.utc).isoformat(timespec="seconds"),
"how": shoulder,
"where": f"{naa}:{shoulder}{eid}",
"where": { # a `bson.dbref.DBRef`
"$ref": collection.name,
"$id": id_name,
},
"__as": "reserved", # status, public|reserved|unavailable
"__ao": owner, # owner
"__ac": datetime.now(timezone.utc).isoformat(
timespec="seconds"
), # created
}
for eid, eid_decoded in not_taken
for id_name in not_taken
]
collection.insert_many(docs)
await collection.insert_many(docs)
collected.extend(docs)
if len(collected) == number:
break
return [d["where"] for d in collected]
return [d["_id"] for d in collected]


def generate_one_id(
mdb: MongoDatabase,
ns: str = "",
shoulder: str = "sys0", # "sys0" represents the Runtime
async def generate_one_id(
ns: str,
shoulder: str = "0sys0", # "0sys0" represents the Runtime
mdb: RuntimeAsyncMongoDatabase = None,
) -> str:
"""Generate unique Crockford Base32-encoded ID for mdb repository.

Can associate ID with namespace ns to facilitate ID deletion/recycling.

"""
return generate_ids(
mdb,
owner="_system", # "_system" represents the Runtime
populator="_system", # "_system" represents the Runtime
number=1,
ns=ns,
naa="nmdc",
shoulder=shoulder,
mdb = mdb or await get_runtime_mdb()
return (
await generate_ids(
owner="_system", # "_system" represents the Runtime
populator="_system", # "_system" represents the Runtime
number=1,
ns=ns,
naa="nmdc_runtime",
shoulder=shoulder,
mdb=mdb,
)
)[0]


Expand Down
Loading
Loading