Skip to content

Hea 126/draft ingestion pipeline for seasonal activity dagster #128

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Generated by Django 5.1.1 on 2024-10-31 10:29

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("baseline", "0016_alter_livelihoodstrategy_additional_identifier_and_more"),
]

operations = [
migrations.AlterField(
model_name="livelihoodzonebaselinecorrection",
name="worksheet_name",
field=models.CharField(
choices=[
("WB", "WB"),
("Data", "Data"),
("Data2", "Data2"),
("Data3", "Data3"),
("Timeline", "Timeline"),
("Seas Cal", "Seas Cal"),
],
max_length=20,
verbose_name="Worksheet name",
),
),
]
47 changes: 47 additions & 0 deletions apps/baseline/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ class WorksheetName(models.TextChoices):
DATA2 = "Data2", _("Data2")
DATA3 = "Data3", _("Data3")
TIMELINE = "Timeline", _("Timeline")
SEAS_CAL = "Seas Cal", _("Seas Cal")

livelihood_zone_baseline = models.ForeignKey(
LivelihoodZoneBaseline,
Expand Down Expand Up @@ -1807,6 +1808,29 @@ class Meta:
verbose_name_plural = _("Other Purchases")


class SeasonalActivityManager(common_models.IdentifierManager):
def get_by_natural_key(
self,
code: str,
reference_year_end_date: str,
seasonal_activity_type: str,
product: str = "",
additional_identifier: str = "",
):
criteria = {
"livelihood_zone_baseline__livelihood_zone__code": code,
"livelihood_zone_baseline__reference_year_end_date": reference_year_end_date,
"seasonal_activity_type__code": seasonal_activity_type,
}
if product:
criteria["product__cpc"] = product
else:
criteria["product__isnull"] = True
if additional_identifier:
criteria["additional_identifier__iexact"] = additional_identifier
return self.get(**criteria)


class SeasonalActivity(common_models.Model):
"""
An activity or event undertaken/experienced by households in a Livelihood Zone at specific periods during the year.
Expand Down Expand Up @@ -1860,6 +1884,17 @@ class SeasonalActivity(common_models.Model):
help_text=_("Additional text identifying the seasonal activity"),
)

objects = SeasonalActivityManager()

def natural_key(self):
return (
self.livelihood_zone_baseline.livelihood_zone_id,
self.livelihood_zone_baseline.reference_year_end_date.isoformat(),
self.seasonal_activity_type.code,
self.product.cpc if self.product else "",
self.additional_identifier if self.additional_identifier else "",
)

class Meta:
verbose_name = _("Seasonal Activity")
verbose_name_plural = _("Seasonal Activities")
Expand Down Expand Up @@ -1916,6 +1951,18 @@ class SeasonalActivityOccurrence(common_models.Model):
validators=[MaxValueValidator(365), MinValueValidator(1)], verbose_name=_("End Day")
)

def natural_key(self):
return (
self.livelihood_zone_baseline.livelihood_zone_id,
self.livelihood_zone_baseline.reference_year_end_date.isoformat(),
self.seasonal_activity.seasonal_activity_type.code,
self.seasonal_activity.product.cpc if self.seasonal_activity.product else "",
self.seasonal_activity.additional_identifier if self.seasonal_activity.additional_identifier else "",
self.community.full_name if self.community else "",
str(self.start),
str(self.end),
)

def start_month(self):
return get_month_from_day_number(self.start)

Expand Down
34 changes: 34 additions & 0 deletions apps/baseline/serializers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from datetime import datetime, timedelta

from rest_framework import serializers
from rest_framework_gis.serializers import GeoFeatureModelSerializer

Expand Down Expand Up @@ -806,11 +808,14 @@ class Meta:
"product_common_name",
"product_description",
"additional_identifier",
"seasonal_activity_label",
# End SeasonalActivity
"community",
"community_name",
"start",
"end",
"start_date",
"end_date",
]

livelihood_zone_name = serializers.CharField(
Expand Down Expand Up @@ -852,10 +857,39 @@ def get_livelihood_zone_baseline_label(self, obj):
source="seasonal_activity.seasonal_activity_type.activity_category", read_only=True
)
activity_category_label = serializers.SerializerMethodField()
start_date = serializers.SerializerMethodField()
end_date = serializers.SerializerMethodField()
seasonal_activity_label = serializers.SerializerMethodField()

def get_activity_category_label(self, obj):
return obj.seasonal_activity.seasonal_activity_type.get_activity_category_display()

def get_start_date(self, obj):
"""Compute start_date based on the start day of the year."""
if obj.start is None:
return None
start_date = datetime(datetime.now().year, 1, 1) + timedelta(days=obj.start - 1)
return start_date.strftime("%Y-%m-%d")

def get_end_date(self, obj):
"""Compute end_date based on the end day of the year."""
if obj.end is None:
return None
end_date = datetime(datetime.now().year, 1, 1) + timedelta(days=obj.end - 1)
return end_date.strftime("%Y-%m-%d")

def get_seasonal_activity_label(self, obj):
"""Generate activity_label based on additional_identifier and product."""
additional_identifier = obj.seasonal_activity.additional_identifier
product = obj.seasonal_activity.product

if additional_identifier and product:
return f"{additional_identifier.capitalize()}:{product.common_name.capitalize()}"
if additional_identifier:
return additional_identifier.capitalize()
if product:
return product.common_name.capitalize()


class CommunityCropProductionSerializer(serializers.ModelSerializer):
class Meta:
Expand Down
16 changes: 16 additions & 0 deletions apps/baseline/tests/test_viewsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -4412,6 +4412,9 @@ def test_get_record(self):
"community_name",
"start",
"end",
"seasonal_activity_label",
"start_date",
"end_date",
)
self.assertCountEqual(
response.json().keys(),
Expand Down Expand Up @@ -4489,6 +4492,19 @@ def test_html(self):
df = pd.read_html(content)[0].fillna("")
self.assertEqual(len(df), self.num_records + 1)

def test_show_zone_level_only_filter(self):
"""Test filtering where show_zone_level_only=True (community is None)"""
SeasonalActivityOccurrenceFactory(community=None)
SeasonalActivityOccurrenceFactory(community=None)
response = self.client.get(self.url, {"show_zone_level_only": True})
self.assertEqual(response.status_code, 200)
self.assertEqual(len(json.loads(response.content)), 2)

"""Test filtering where show_zone_level_only=False (occurrence is for community)"""
response = self.client.get(self.url, {"show_zone_level_only": False})
self.assertEqual(response.status_code, 200)
self.assertEqual(len(json.loads(response.content)), self.num_records)


class CommunityCropProductionViewSetTestCase(APITestCase):
@classmethod
Expand Down
5 changes: 5 additions & 0 deletions apps/baseline/viewsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -1280,6 +1280,10 @@ class SeasonalActivityViewSet(BaseModelViewSet):


class SeasonalActivityOccurrenceFilterSet(filters.FilterSet):
show_zone_level_only = filters.BooleanFilter(
field_name="community", lookup_expr="isnull", label="Show zone level calendar only"
)

class Meta:
model = SeasonalActivityOccurrence
fields = [
Expand All @@ -1288,6 +1292,7 @@ class Meta:
"community",
"start",
"end",
"show_zone_level_only",
]


Expand Down
29 changes: 29 additions & 0 deletions apps/metadata/migrations/0010_alter_activitylabel_activity_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Generated by Django 5.1.1 on 2024-11-07 08:32

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("metadata", "0009_livelihoodcategory_color"),
]

operations = [
migrations.AlterField(
model_name="activitylabel",
name="activity_type",
field=models.CharField(
choices=[
("LivelihoodActivity", "Livelihood Activity"),
("OtherCashIncome", "Other Cash Income"),
("WildFoods", "Wild Foods"),
("Seas Cal", "Seas Cal"),
],
default="LivelihoodActivity",
help_text="The type of Livelihood Activity, either a general Livelihood Activity, or an Other Cash Income activity from the 'Data2' worksheet, or a Wild Foods, Fishing or Hunting activity from the 'Data3' worksheet.",
max_length=20,
verbose_name="Activity Type",
),
),
]
1 change: 1 addition & 0 deletions apps/metadata/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ class LivelihoodActivityType(models.TextChoices):
LIVELIHOOD_ACTIVITY = "LivelihoodActivity", _("Livelihood Activity") # Labels from the 'Data' worksheet
OTHER_CASH_INCOME = "OtherCashIncome", _("Other Cash Income") # Labels from the 'Data2' worksheet
WILD_FOODS = "WildFoods", _("Wild Foods") # Labels from the 'Data3' worksheet
SEAS_CAL = "Seas Cal", _("Seas Cal") # Header labels from the 'SeasCal' worksheet

activity_label = common_models.NameField(max_length=200, verbose_name=_("Activity Label"))
activity_type = models.CharField(
Expand Down
18 changes: 18 additions & 0 deletions pipelines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@
other_cash_income_label_dataframe,
summary_other_cash_income_labels_dataframe,
)
from .assets.seasonal_calendar import (
all_seasonal_calendar_labels_dataframe,
consolidated_seas_cal_fixtures,
imported_seas_cals,
seasonal_calendar_dataframe,
seasonal_calendar_instances,
seasonal_calendar_label_dataframe,
validated_seas_cal_instances,
)
from .assets.wealth_characteristic import (
all_wealth_characteristic_labels_dataframe,
summary_wealth_characteristic_labels_dataframe,
Expand All @@ -48,6 +57,7 @@
from .jobs.fixtures import (
extract_dataframes,
import_baseline_from_fixture,
import_seas_cal_from_fixture,
update_external_assets,
upload_baselines,
)
Expand Down Expand Up @@ -97,6 +107,13 @@
uploaded_baselines,
imported_communities,
imported_baselines,
seasonal_calendar_dataframe,
seasonal_calendar_label_dataframe,
all_seasonal_calendar_labels_dataframe,
seasonal_calendar_instances,
validated_seas_cal_instances,
consolidated_seas_cal_fixtures,
imported_seas_cals,
],
jobs=[
update_metadata,
Expand All @@ -105,6 +122,7 @@
extract_dataframes,
import_baseline_from_fixture,
load_all_geographies,
import_seas_cal_from_fixture,
],
resources={
"io_manager": PickleIOManager(base_path=EnvVar("DAGSTER_ASSET_BASE_PATH")), # Used by default
Expand Down
8 changes: 6 additions & 2 deletions pipelines/assets/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
"range",
"interval",
"intervales", # 2023 Mali BSSs
"Results",
"Synthèse",
]


Expand Down Expand Up @@ -366,8 +368,10 @@ def get_bss_dataframe(
end_row = df.index[-1]

# Find the language based on the value in cell A3
lang = LANGS[df.loc[3, "A"].strip().lower()]

if bss_sheet != "Seas Cal":
lang = LANGS[df.loc[3, "A"].strip().lower()]
else:
lang = ""
# Filter to just the Wealth Group header rows and the Livelihood Activities
df = pd.concat([df.loc[header_rows, :end_col], df.loc[start_row:end_row, :end_col]])

Expand Down
36 changes: 27 additions & 9 deletions pipelines/assets/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,18 @@ def validated_instances(
"""
partition_key = context.asset_partition_key_for_output()
# Create a dict of all the models, and a dataframe of their instances
validate_instances(consolidated_instances, partition_key)

metadata = {f"num_{key.lower()}": len(value) for key, value in consolidated_instances.items()}
metadata["total_instances"] = sum(len(value) for value in consolidated_instances.values())
metadata["preview"] = MetadataValue.md(f"```json\n{json.dumps(consolidated_instances, indent=4)}\n```")
return Output(
consolidated_instances,
metadata=metadata,
)


def validate_instances(consolidated_instances, partition_key):
errors = []
dfs = {}
for model_name, instances in consolidated_instances.items():
Expand Down Expand Up @@ -196,7 +208,21 @@ def validated_instances(
],
axis="columns",
)

elif model_name == "SeasonalActivity":
df["key"] = df[
["livelihood_zone_baseline", "seasonal_activity_type", "product", "additional_identifier"]
].apply(
lambda x: (
x.iloc[0]
+ [x.iloc[1], x.iloc[2] if pd.notna(x.iloc[2]) else "", x.iloc[3] if pd.notna(x.iloc[3]) else ""]
),
axis="columns",
)
elif model_name == "SeasonalActivityOccurrence":
df["key"] = df[["livelihood_zone_baseline", "seasonal_activity", "community", "start", "end"]].apply(
lambda x: (x.iloc[0] + x.iloc[1] + [x.iloc[2][-1] if x.iloc[2] else ""] + [x.iloc[3], x.iloc[4]]),
axis="columns",
)
# Apply some model-level defaults
if "created" in valid_field_names and "created" not in df:
df["created"] = pd.Timestamp.now(datetime.timezone.utc)
Expand Down Expand Up @@ -266,14 +292,6 @@ def validated_instances(
errors = "\n".join(errors)
raise RuntimeError("Missing or inconsistent metadata in BSS %s:\n%s" % (partition_key, errors))

metadata = {f"num_{key.lower()}": len(value) for key, value in consolidated_instances.items()}
metadata["total_instances"] = sum(len(value) for value in consolidated_instances.values())
metadata["preview"] = MetadataValue.md(f"```json\n{json.dumps(consolidated_instances, indent=4)}\n```")
return Output(
consolidated_instances,
metadata=metadata,
)


@asset(partitions_def=bss_instances_partitions_def, io_manager_key="json_io_manager")
def consolidated_fixtures(
Expand Down
Loading
Loading