Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified dist/neonutilities-1.1.1-py3-none-any.whl
Binary file not shown.
Binary file modified dist/neonutilities-1.1.1.tar.gz
Binary file not shown.
8 changes: 6 additions & 2 deletions src/neonutilities/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,9 @@
from .tabular_download import zips_by_product
from .get_issue_log import get_issue_log
from .read_table_neon import read_table_neon
from .unzip_and_stack import stack_by_table
from .unzip_and_stack import load_by_product
from .unzip_and_stack import (
stack_by_table,
load_by_product,
dataset_query,
)

224 changes: 218 additions & 6 deletions src/neonutilities/unzip_and_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from .get_issue_log import get_issue_log
from .citation import get_citation
from .helper_mods.api_helpers import readme_url
from .helper_mods.api_helpers import get_api
from .read_table_neon import get_variables, cast_table_neon
from . import __resources__
import logging
Expand Down Expand Up @@ -741,7 +742,12 @@ def format_readme(readmetab, tables):
return rd


def stack_data_files_parallel(folder, package, dpid, progress=True, cloud_mode=False):
def stack_data_files_parallel(folder,
package,
dpid,
progress=True,
cloud_mode=False,
datasetq=False):
"""

Join data files in a unzipped NEON data package by table type
Expand All @@ -753,6 +759,7 @@ def stack_data_files_parallel(folder, package, dpid, progress=True, cloud_mode=F
dpid: Data product ID of product to stack.
progress: Should a progress bar be displayed?
cloud_mode: Use cloud mode to transfer files cloud-to-cloud? If used, stack_by_table() expects a list of file urls as input. Defaults to False.
datasetq: Return an arrow dataset for a single table? Defaults to False.

Return
--------
Expand Down Expand Up @@ -1081,8 +1088,13 @@ def stack_data_files_parallel(folder, package, dpid, progress=True, cloud_mode=F
f"Failed to stack table {j}. Check input data and variables file."
)
continue

# for dataset query, done here
if datasetq:
return dat
else:
pdat = dattab.to_pandas()

pdat = dattab.to_pandas()
if stringset:
try:
pdat = cast_table_neon(pdat, tablepkgvar)
Expand Down Expand Up @@ -1174,7 +1186,7 @@ def stack_data_files_parallel(folder, package, dpid, progress=True, cloud_mode=F

# remove filename column
pdat = pdat.drop(columns=["__filename"])

# add table to list
if j == "science_review_flags" or j == "sensor_positions":
stacklist[f"{j}_{dpnum}"] = pdat
Expand Down Expand Up @@ -1220,7 +1232,11 @@ def stack_data_files_parallel(folder, package, dpid, progress=True, cloud_mode=F


def stack_by_table(
filepath, savepath=None, save_unzipped_files=False, progress=True, cloud_mode=False
filepath, savepath=None,
save_unzipped_files=False,
progress=True,
cloud_mode=False,
datasetq=False,
):
"""

Expand Down Expand Up @@ -1248,6 +1264,11 @@ def stack_by_table(
expects a list of file urls as input. Defaults to False; in general this
option should be used via load_by_product(), in which stack_by_table() is a
helper function.

datasetq: bool, optional
Should the function return an arrow dataset for a single table, instead
of a set of stacked tables? Defaults to False.


Return
-------------------
Expand Down Expand Up @@ -1363,6 +1384,7 @@ def stack_by_table(
dpid=dpid,
progress=progress,
cloud_mode=True,
datasetq=datasetq,
)

else:
Expand All @@ -1379,7 +1401,8 @@ def stack_by_table(

# Stack the files
stackedlist = stack_data_files_parallel(
folder=stackpath, package=package, dpid=dpid, progress=progress
folder=stackpath, package=package, dpid=dpid,
progress=progress, cloud_mode=False, datasetq=False,
)

# delete input files
Expand All @@ -1405,6 +1428,10 @@ def stack_by_table(
if stackedlist is None:
logging.info("ERROR! Stacking failed due to previous errors.")
return None

# if returning an arrow dataset, done here
if datasetq:
return stackedlist

# sort the dictionary of tables
mk = list(stackedlist.keys())
Expand Down Expand Up @@ -1509,7 +1536,8 @@ def load_by_product(
progress: bool, optional
Should the function display progress bars as it runs? Defaults to True.

token: User-specific API token from data.neonscience.org user account. See
token: str, optional
User-specific API token from data.neonscience.org user account. See
https://data.neonscience.org/data-api/rate-limiting/ for details about
API rate limits and user tokens. If omitted, download uses the public rate limit.

Expand Down Expand Up @@ -1595,3 +1623,187 @@ def load_by_product(
)

return outlist


def dataset_query(
dpid,
site="all",
startdate=None,
enddate=None,
package="basic",
release="current",
tabl=None,
hor=None,
ver=None,
include_provisional=False,
token=None,
):
"""
This function uses the query endpoint of the NEON API to find the full
list of files for a given data product, release, site(s), and date range,
then turns them into an arrow dataset.

Parameters
----------------
dpid: str
Data product identifier in the form DP#.#####.###

site: str
Either the string 'all', or one or more 4-letter NEON site codes. Defaults to 'all'.

startdate: str, optional
Earliest date of data to include in dataset, in the form YYYY-MM

enddate: str, optional
Latest date of data to include in dataset, in the form YYYY-MM

package: str, optional
Download package to access, either basic or expanded. Defaults to 'basic'.

release: str, optional
Data release to access. Defaults to the most recent release.

tabl: str
The name of a single data table to access.

hor: str, optional
The horizontal index of data to access. Only applicable to sensor (IS) data.

ver: str, optional
The vertical index of data to access. Only applicable to sensor (IS) data.

include_provisional: bool, optional
Should Provisional data be returned in the download? Defaults to False. See
https://www.neonscience.org/data-samples/data-management/data-revisions-releases
for details on the difference between provisional and released data.

token: str, optional
User-specific API token from data.neonscience.org user account. See
https://data.neonscience.org/data-api/rate-limiting/ for details about
API rate limits and user tokens. If omitted, download uses the public rate limit.

Return
---------------
An arrow dataset for the data requested.

Example
---------------
Create a dataset of mammal trap data from HARV (Harvard Forest) in 2018

>>> mamds = dataset_query(dpid="DP1.10072.001", site="HARV",
tabl="mam_pertrapnight", package="expanded",
startdate="2018-01", enddate="2018-12",
token=None)

Created on August 12 2025

@author: Claire Lunch
"""

# check that needed inputs are present and data product is tabular
# consider turning opening checks in zips_by_product() into a function

# query the /products endpoint for the product requested
if release == "current" or release == "PROVISIONAL":
prodreq = get_api(
api_url="https://data.neonscience.org/api/v0/products/" + dpid, token=token
)
else:
prodreq = get_api(
api_url="https://data.neonscience.org/api/v0/products/"
+ dpid
+ "?release="
+ release,
token=token,
)

if prodreq is None:
if release == "LATEST":
logging.info(
f"No data found for NEON data product {dpid}. LATEST data requested; check that token is valid for LATEST access."
)
return
else:
logging.info(
f"No data found for NEON data product {dpid}."
)
return

avail = prodreq.json()
pubtype = avail["data"]["productPublicationFormatType"]

if pubtype=="AOP Data Product Type":
raise ValueError(
f"{dpid} is a remote sensing data product. Remote sensing data can't be queried using this function."
)

if tabl is None:
raise ValueError(
"Table name (tabl=) is a required input to this function."
)

if pubtype in ["TIS Data Product Type","AIS Data Product Type"]:
if dpid in ["DP4.00200.001",
"DP1.00007.001","DP1.00010.001","DP1.00034.001","DP1.00035.001",
"DP1.00036.001","DP1.00037.001","DP1.00099.001","DP1.00100.001",
"DP2.00008.001","DP2.00009.001","DP2.00024.001","DP3.00008.001",
"DP3.00009.001","DP3.00010.001","DP4.00002.001","DP4.00007.001",
"DP4.00067.001","DP4.00137.001","DP4.00201.001","DP1.00030.001"]:
raise ValueError(
f"{dpid} is an eddy covariance data product and can't be queried using this function."
)
else:
if bool(re.search(pattern="^ais[_]", string=tabl)):
tabl = tabl
else:
if site=="all" or not isinstance(site, str):
raise ValueError(
f"{dpid} is a sensor data product and can only be queried at a single site using this function. If you need data at multiple sites, run multiple queries or use loadByProduct()."
)
if hor is None or ver is None:
raise ValueError(
f"{dpid} is a sensor data product. hor and ver indices must be provided to use this query function."
)
if len(hor)!=3 or len(ver)!=3 or not isinstance(hor, str) or not isinstance(ver, str):
raise ValueError(
"hor and ver must be 3-digit codes in string format, and must each be a single code. Only one sensor location can be queried."
)

# get list of files
flist = zips_by_product(
dpid=dpid,
site=site,
startdate=startdate,
enddate=enddate,
package=package,
release=release,
tabl=tabl,
check_size=False,
include_provisional=include_provisional,
cloud_mode=True,
token=token,
savepath=None,
)

# if IS data, subset by hor and ver
# keep the variables file for schema creation
if ver is not None:
hvr = re.compile("[.]00[0-9]{1}[.]" + hor + "[.]" + ver + "[.][0-9]{2}[A-Z0-9]{1}[.]|variables")
d0 = [r for r in flist[0] if hvr.search(r)]
d1 = {r: flist[1][r] for r in flist[1] if hvr.search(r)}

flist[0] = d0
flist[1] = d1

# send to stacking function to turn into dataset
# stacking stops right before the dataset-to-table step
outds = stack_by_table(
filepath=flist,
savepath="envt",
cloud_mode=True,
save_unzipped_files=False,
progress=False,
datasetq=True,
)

return outds