diff --git a/apps/baseline/migrations/0017_alter_livelihoodzonebaselinecorrection_worksheet_name.py b/apps/baseline/migrations/0017_alter_livelihoodzonebaselinecorrection_worksheet_name.py new file mode 100644 index 00000000..35766075 --- /dev/null +++ b/apps/baseline/migrations/0017_alter_livelihoodzonebaselinecorrection_worksheet_name.py @@ -0,0 +1,29 @@ +# Generated by Django 5.1.1 on 2024-10-31 10:29 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("baseline", "0016_alter_livelihoodstrategy_additional_identifier_and_more"), + ] + + operations = [ + migrations.AlterField( + model_name="livelihoodzonebaselinecorrection", + name="worksheet_name", + field=models.CharField( + choices=[ + ("WB", "WB"), + ("Data", "Data"), + ("Data2", "Data2"), + ("Data3", "Data3"), + ("Timeline", "Timeline"), + ("Seas Cal", "Seas Cal"), + ], + max_length=20, + verbose_name="Worksheet name", + ), + ), + ] diff --git a/apps/baseline/models.py b/apps/baseline/models.py index 9dabfd3b..948337eb 100644 --- a/apps/baseline/models.py +++ b/apps/baseline/models.py @@ -282,6 +282,7 @@ class WorksheetName(models.TextChoices): DATA2 = "Data2", _("Data2") DATA3 = "Data3", _("Data3") TIMELINE = "Timeline", _("Timeline") + SEAS_CAL = "Seas Cal", _("Seas Cal") livelihood_zone_baseline = models.ForeignKey( LivelihoodZoneBaseline, @@ -1807,6 +1808,29 @@ class Meta: verbose_name_plural = _("Other Purchases") +class SeasonalActivityManager(common_models.IdentifierManager): + def get_by_natural_key( + self, + code: str, + reference_year_end_date: str, + seasonal_activity_type: str, + product: str = "", + additional_identifier: str = "", + ): + criteria = { + "livelihood_zone_baseline__livelihood_zone__code": code, + "livelihood_zone_baseline__reference_year_end_date": reference_year_end_date, + "seasonal_activity_type__code": seasonal_activity_type, + } + if product: + criteria["product__cpc"] = product + else: + criteria["product__isnull"] = True + if additional_identifier: + criteria["additional_identifier__iexact"] = additional_identifier + return self.get(**criteria) + + class SeasonalActivity(common_models.Model): """ An activity or event undertaken/experienced by households in a Livelihood Zone at specific periods during the year. @@ -1860,6 +1884,17 @@ class SeasonalActivity(common_models.Model): help_text=_("Additional text identifying the seasonal activity"), ) + objects = SeasonalActivityManager() + + def natural_key(self): + return ( + self.livelihood_zone_baseline.livelihood_zone_id, + self.livelihood_zone_baseline.reference_year_end_date.isoformat(), + self.seasonal_activity_type.code, + self.product.cpc if self.product else "", + self.additional_identifier if self.additional_identifier else "", + ) + class Meta: verbose_name = _("Seasonal Activity") verbose_name_plural = _("Seasonal Activities") @@ -1916,6 +1951,18 @@ class SeasonalActivityOccurrence(common_models.Model): validators=[MaxValueValidator(365), MinValueValidator(1)], verbose_name=_("End Day") ) + def natural_key(self): + return ( + self.livelihood_zone_baseline.livelihood_zone_id, + self.livelihood_zone_baseline.reference_year_end_date.isoformat(), + self.seasonal_activity.seasonal_activity_type.code, + self.seasonal_activity.product.cpc if self.seasonal_activity.product else "", + self.seasonal_activity.additional_identifier if self.seasonal_activity.additional_identifier else "", + self.community.full_name if self.community else "", + str(self.start), + str(self.end), + ) + def start_month(self): return get_month_from_day_number(self.start) diff --git a/apps/baseline/serializers.py b/apps/baseline/serializers.py index 83298a0e..013257d6 100644 --- a/apps/baseline/serializers.py +++ b/apps/baseline/serializers.py @@ -1,3 +1,5 @@ +from datetime import datetime, timedelta + from rest_framework import serializers from rest_framework_gis.serializers import GeoFeatureModelSerializer @@ -806,11 +808,14 @@ class Meta: "product_common_name", "product_description", "additional_identifier", + "seasonal_activity_label", # End SeasonalActivity "community", "community_name", "start", "end", + "start_date", + "end_date", ] livelihood_zone_name = serializers.CharField( @@ -852,10 +857,39 @@ def get_livelihood_zone_baseline_label(self, obj): source="seasonal_activity.seasonal_activity_type.activity_category", read_only=True ) activity_category_label = serializers.SerializerMethodField() + start_date = serializers.SerializerMethodField() + end_date = serializers.SerializerMethodField() + seasonal_activity_label = serializers.SerializerMethodField() def get_activity_category_label(self, obj): return obj.seasonal_activity.seasonal_activity_type.get_activity_category_display() + def get_start_date(self, obj): + """Compute start_date based on the start day of the year.""" + if obj.start is None: + return None + start_date = datetime(datetime.now().year, 1, 1) + timedelta(days=obj.start - 1) + return start_date.strftime("%Y-%m-%d") + + def get_end_date(self, obj): + """Compute end_date based on the end day of the year.""" + if obj.end is None: + return None + end_date = datetime(datetime.now().year, 1, 1) + timedelta(days=obj.end - 1) + return end_date.strftime("%Y-%m-%d") + + def get_seasonal_activity_label(self, obj): + """Generate activity_label based on additional_identifier and product.""" + additional_identifier = obj.seasonal_activity.additional_identifier + product = obj.seasonal_activity.product + + if additional_identifier and product: + return f"{additional_identifier.capitalize()}:{product.common_name.capitalize()}" + if additional_identifier: + return additional_identifier.capitalize() + if product: + return product.common_name.capitalize() + class CommunityCropProductionSerializer(serializers.ModelSerializer): class Meta: diff --git a/apps/baseline/tests/test_viewsets.py b/apps/baseline/tests/test_viewsets.py index c39ea0c1..473c09da 100644 --- a/apps/baseline/tests/test_viewsets.py +++ b/apps/baseline/tests/test_viewsets.py @@ -4412,6 +4412,9 @@ def test_get_record(self): "community_name", "start", "end", + "seasonal_activity_label", + "start_date", + "end_date", ) self.assertCountEqual( response.json().keys(), @@ -4489,6 +4492,19 @@ def test_html(self): df = pd.read_html(content)[0].fillna("") self.assertEqual(len(df), self.num_records + 1) + def test_show_zone_level_only_filter(self): + """Test filtering where show_zone_level_only=True (community is None)""" + SeasonalActivityOccurrenceFactory(community=None) + SeasonalActivityOccurrenceFactory(community=None) + response = self.client.get(self.url, {"show_zone_level_only": True}) + self.assertEqual(response.status_code, 200) + self.assertEqual(len(json.loads(response.content)), 2) + + """Test filtering where show_zone_level_only=False (occurrence is for community)""" + response = self.client.get(self.url, {"show_zone_level_only": False}) + self.assertEqual(response.status_code, 200) + self.assertEqual(len(json.loads(response.content)), self.num_records) + class CommunityCropProductionViewSetTestCase(APITestCase): @classmethod diff --git a/apps/baseline/viewsets.py b/apps/baseline/viewsets.py index 31ef1c9d..5e07dfb4 100644 --- a/apps/baseline/viewsets.py +++ b/apps/baseline/viewsets.py @@ -1280,6 +1280,10 @@ class SeasonalActivityViewSet(BaseModelViewSet): class SeasonalActivityOccurrenceFilterSet(filters.FilterSet): + show_zone_level_only = filters.BooleanFilter( + field_name="community", lookup_expr="isnull", label="Show zone level calendar only" + ) + class Meta: model = SeasonalActivityOccurrence fields = [ @@ -1288,6 +1292,7 @@ class Meta: "community", "start", "end", + "show_zone_level_only", ] diff --git a/apps/metadata/migrations/0010_alter_activitylabel_activity_type.py b/apps/metadata/migrations/0010_alter_activitylabel_activity_type.py new file mode 100644 index 00000000..807c7fbf --- /dev/null +++ b/apps/metadata/migrations/0010_alter_activitylabel_activity_type.py @@ -0,0 +1,29 @@ +# Generated by Django 5.1.1 on 2024-11-07 08:32 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("metadata", "0009_livelihoodcategory_color"), + ] + + operations = [ + migrations.AlterField( + model_name="activitylabel", + name="activity_type", + field=models.CharField( + choices=[ + ("LivelihoodActivity", "Livelihood Activity"), + ("OtherCashIncome", "Other Cash Income"), + ("WildFoods", "Wild Foods"), + ("Seas Cal", "Seas Cal"), + ], + default="LivelihoodActivity", + help_text="The type of Livelihood Activity, either a general Livelihood Activity, or an Other Cash Income activity from the 'Data2' worksheet, or a Wild Foods, Fishing or Hunting activity from the 'Data3' worksheet.", + max_length=20, + verbose_name="Activity Type", + ), + ), + ] diff --git a/apps/metadata/models.py b/apps/metadata/models.py index 854b5716..9d4ac73f 100644 --- a/apps/metadata/models.py +++ b/apps/metadata/models.py @@ -366,6 +366,7 @@ class LivelihoodActivityType(models.TextChoices): LIVELIHOOD_ACTIVITY = "LivelihoodActivity", _("Livelihood Activity") # Labels from the 'Data' worksheet OTHER_CASH_INCOME = "OtherCashIncome", _("Other Cash Income") # Labels from the 'Data2' worksheet WILD_FOODS = "WildFoods", _("Wild Foods") # Labels from the 'Data3' worksheet + SEAS_CAL = "Seas Cal", _("Seas Cal") # Header labels from the 'SeasCal' worksheet activity_label = common_models.NameField(max_length=200, verbose_name=_("Activity Label")) activity_type = models.CharField( diff --git a/pipelines/__init__.py b/pipelines/__init__.py index ed1a7f02..2ad76965 100644 --- a/pipelines/__init__.py +++ b/pipelines/__init__.py @@ -31,6 +31,15 @@ other_cash_income_label_dataframe, summary_other_cash_income_labels_dataframe, ) +from .assets.seasonal_calendar import ( + all_seasonal_calendar_labels_dataframe, + consolidated_seas_cal_fixtures, + imported_seas_cals, + seasonal_calendar_dataframe, + seasonal_calendar_instances, + seasonal_calendar_label_dataframe, + validated_seas_cal_instances, +) from .assets.wealth_characteristic import ( all_wealth_characteristic_labels_dataframe, summary_wealth_characteristic_labels_dataframe, @@ -48,6 +57,7 @@ from .jobs.fixtures import ( extract_dataframes, import_baseline_from_fixture, + import_seas_cal_from_fixture, update_external_assets, upload_baselines, ) @@ -97,6 +107,13 @@ uploaded_baselines, imported_communities, imported_baselines, + seasonal_calendar_dataframe, + seasonal_calendar_label_dataframe, + all_seasonal_calendar_labels_dataframe, + seasonal_calendar_instances, + validated_seas_cal_instances, + consolidated_seas_cal_fixtures, + imported_seas_cals, ], jobs=[ update_metadata, @@ -105,6 +122,7 @@ extract_dataframes, import_baseline_from_fixture, load_all_geographies, + import_seas_cal_from_fixture, ], resources={ "io_manager": PickleIOManager(base_path=EnvVar("DAGSTER_ASSET_BASE_PATH")), # Used by default diff --git a/pipelines/assets/base.py b/pipelines/assets/base.py index bee10a77..f0dd04b8 100644 --- a/pipelines/assets/base.py +++ b/pipelines/assets/base.py @@ -63,6 +63,8 @@ "range", "interval", "intervales", # 2023 Mali BSSs + "Results", + "Synthèse", ] @@ -366,8 +368,10 @@ def get_bss_dataframe( end_row = df.index[-1] # Find the language based on the value in cell A3 - lang = LANGS[df.loc[3, "A"].strip().lower()] - + if bss_sheet != "Seas Cal": + lang = LANGS[df.loc[3, "A"].strip().lower()] + else: + lang = "" # Filter to just the Wealth Group header rows and the Livelihood Activities df = pd.concat([df.loc[header_rows, :end_col], df.loc[start_row:end_row, :end_col]]) diff --git a/pipelines/assets/fixtures.py b/pipelines/assets/fixtures.py index 0f2c2851..5f4f48f3 100644 --- a/pipelines/assets/fixtures.py +++ b/pipelines/assets/fixtures.py @@ -146,6 +146,18 @@ def validated_instances( """ partition_key = context.asset_partition_key_for_output() # Create a dict of all the models, and a dataframe of their instances + validate_instances(consolidated_instances, partition_key) + + metadata = {f"num_{key.lower()}": len(value) for key, value in consolidated_instances.items()} + metadata["total_instances"] = sum(len(value) for value in consolidated_instances.values()) + metadata["preview"] = MetadataValue.md(f"```json\n{json.dumps(consolidated_instances, indent=4)}\n```") + return Output( + consolidated_instances, + metadata=metadata, + ) + + +def validate_instances(consolidated_instances, partition_key): errors = [] dfs = {} for model_name, instances in consolidated_instances.items(): @@ -196,7 +208,21 @@ def validated_instances( ], axis="columns", ) - + elif model_name == "SeasonalActivity": + df["key"] = df[ + ["livelihood_zone_baseline", "seasonal_activity_type", "product", "additional_identifier"] + ].apply( + lambda x: ( + x.iloc[0] + + [x.iloc[1], x.iloc[2] if pd.notna(x.iloc[2]) else "", x.iloc[3] if pd.notna(x.iloc[3]) else ""] + ), + axis="columns", + ) + elif model_name == "SeasonalActivityOccurrence": + df["key"] = df[["livelihood_zone_baseline", "seasonal_activity", "community", "start", "end"]].apply( + lambda x: (x.iloc[0] + x.iloc[1] + [x.iloc[2][-1] if x.iloc[2] else ""] + [x.iloc[3], x.iloc[4]]), + axis="columns", + ) # Apply some model-level defaults if "created" in valid_field_names and "created" not in df: df["created"] = pd.Timestamp.now(datetime.timezone.utc) @@ -266,14 +292,6 @@ def validated_instances( errors = "\n".join(errors) raise RuntimeError("Missing or inconsistent metadata in BSS %s:\n%s" % (partition_key, errors)) - metadata = {f"num_{key.lower()}": len(value) for key, value in consolidated_instances.items()} - metadata["total_instances"] = sum(len(value) for value in consolidated_instances.values()) - metadata["preview"] = MetadataValue.md(f"```json\n{json.dumps(consolidated_instances, indent=4)}\n```") - return Output( - consolidated_instances, - metadata=metadata, - ) - @asset(partitions_def=bss_instances_partitions_def, io_manager_key="json_io_manager") def consolidated_fixtures( diff --git a/pipelines/assets/seasonal_calendar.py b/pipelines/assets/seasonal_calendar.py new file mode 100644 index 00000000..15cb8294 --- /dev/null +++ b/pipelines/assets/seasonal_calendar.py @@ -0,0 +1,474 @@ +""" +Dagster assets related to Seasonal Calender, read from the 'Seas Cal' worksheet in a BSS. + +An example of relevant rows from the worksheet: + + |SEASONAL CALENDAR |Fill 1s in the relevant months | + |-----------------|--------|------|----|------|------|------|------|------|-------|-------|-------|-------|----------|-------|-------|-------|-------|-------|-------|-------|-------|-------|-------|-------| + | | | | | | | | | | | | | | | | | | | | | | | | | | + |village --> |Njobvu | | | | | | | | | | | |Kalikokha | | | | | | | | | | | | + |month --> |4 |5 |6 |7 |8 |9 |10 |11 |12 |1 |2 |3 |4 |5 |6 |7 |8 |9 |10 |11 |12 |1 |2 |3 | + |Seasons | | | | | | | | | | | | | | | | | | | | | | | | | + |rainy | | | | | | | |1 |1 |1 |1 | | | | | | | | |1 |1 |1 |1 |1 | + |winter | | | | | | | | | | | | | |1 |1 |1 | | | | | | | | | + |hot | | | | | | | | | | | | | | | | | |1 |1 |1 |1 |1 | | | + |Maize rainfed | | | | | | | | | | | | | | | | | | | | | | | | | + |land preparation | | | |1 |1 |1 |1 | | | | | | | | |1 |1 |1 |1 | | | | | | + |planting | | | | | | | |1 |1 |1 | | | | | | | | | |1 |1 | | | | + |weeding | | | | | | | | | |1 |1 |1 | | | | | | | | |1 |1 |1 | | + |green consumption|1 | | | | | | | | | | |1 | | | | | | | | | | |1 |1 | + |harvesting | | |1 |1 | | | | | | | | |1 |1 | | | | | | | | | | | + |threshing | | | |1 |1 | | | | | | | |1 |1 |1 |1 | | | | | | | | | + |Tobacco | | | | | | | | | | | | | | | | | | | | | | | | | + |land preparation | | | |1 |1 |1 | | | | | | |1 |1 |1 |1 | | | | | | | | | + |planting | | | | | |1 |1 |1 | | | | | | | | |1 |1 | | | | | | | + |weeding | | | | | | | | |1 | | | | | | | | | |1 |1 | | | | | + |green consumption| | | | | | | | | | | | | | | | | | | | | | | | | + |harvesting | | | | | | | | | |1 |1 |1 |1 | | | | | | | | | |1 |1 | + |threshing | | | | | | | | | | | | | | | | | | | | | | | | | + + +""" # NOQA: E501 + +import functools +import json +import math +import os + +import django +import numpy as np +import pandas as pd +from dagster import AssetExecutionContext, MetadataValue, Output, asset + +from baseline.models import Community, LivelihoodZoneBaseline # NOQA: E402 +from metadata.models import ActivityLabel # NOQA: E402 +from metadata.models import LabelStatus # NOQA: E402 + +from .. import imported_baselines +from ..configs import BSSMetadataConfig +from ..partitions import bss_instances_partitions_def +from .base import ( + get_all_bss_labels_dataframe, + get_bss_dataframe, + get_bss_label_dataframe, +) +from .fixtures import get_fixture_from_instances, validate_instances + +# set the default Django settings module +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "hea.settings.production") + +# Configure Django with our custom settings before importing any Django classes +django.setup() + +# Indexes of header rows in the seasonal_calendar dataframe +HEADER_ROWS = [ + 3, +] +SEAS_CAL = "Seas Cal" +# Cut-off percentage to determine the threshold of occurrences +CUT_OFF = 0.5 + + +@asset(partitions_def=bss_instances_partitions_def) +def seasonal_calendar_dataframe(config: BSSMetadataConfig, corrected_files) -> Output[pd.DataFrame]: + """ + DataFrame of seasonal calendar from a BSS + """ + return get_bss_dataframe( + config, + corrected_files, + "Seas Cal", + start_strings=["Village :", "village -->"], + header_rows=HEADER_ROWS, + ) + + +@asset(partitions_def=bss_instances_partitions_def) +def seasonal_calendar_label_dataframe( + context: AssetExecutionContext, + config: BSSMetadataConfig, + seasonal_calendar_dataframe, +) -> Output[pd.DataFrame]: + """ + Dataframe of seasonal calendar (Seas Cal) Label References + """ + return get_bss_label_dataframe( + context, config, seasonal_calendar_dataframe, "seasonal_calendar_dataframe", len(HEADER_ROWS) + ) + + +@asset(io_manager_key="dataframe_csv_io_manager") +def all_seasonal_calendar_labels_dataframe( + config: BSSMetadataConfig, seasonal_calendar_label_dataframe: dict[str, pd.DataFrame] +) -> Output[pd.DataFrame]: + """ + Combined dataframe of the seasonal calendar labels in use across all BSSs wherever the sheet is available. + """ + return get_all_bss_labels_dataframe(config, seasonal_calendar_label_dataframe) + + +@functools.cache +def get_seas_cal_label() -> dict[str, dict]: + """ """ + label_map = { + instance["activity_label"].lower(): instance + for instance in ActivityLabel.objects.filter( + status=LabelStatus.COMPLETE, activity_type=SEAS_CAL, is_start=True + ).values( + "activity_label", + "is_start", + "product", + "season", + "additional_identifier", + "notes", + ) + } + return label_map + + +def get_seas_cal_instances_from_dataframe( + context: AssetExecutionContext, + df: pd.DataFrame, + livelihood_zone_baseline: LivelihoodZoneBaseline, + partition_key: str, +) -> Output[dict]: + # Read the df, from its first column after doing the formatting and try creating the + # SeasonalActvity and SeasonalActivityOccurence + # Remove the bottom part that usually contains empty or 0 entry with 'Other' or 'autre' + last_valid_index = df[~df.iloc[:, 0].str.lower().isin(["other", "autre"])].index[-1] + df_original = df.iloc[: last_valid_index + 1] + df_original.reset_index(drop=True, inplace=True) + # drop first row + df_original = df_original.drop(0).reset_index(drop=True) + # Rename initial values for clarity + df_original.iloc[0, 0] = "community" + df_original.iloc[1, 0] = "month" + + df_original.replace("", np.nan, inplace=True) + df_original.iloc[0, 1:] = df_original.iloc[0, 1:].ffill(axis=0) + + # Drop season rows + df_original = df_original.drop(index=range(2, 6)) + df_original.reset_index(drop=True, inplace=True) + + df = df_original + + # Get the label mapping dictionary + label_mapping = get_seas_cal_label() + + # Extract the first two rows for community and month information + community_row = df.iloc[0, 1:] + month_row = df.iloc[1, 1:] + + # Initialize results and current header + results = [] + current_header = None + has_children = False + + # Iterate over rows, starting from row 3 (index 2) + for index, row in df.iloc[2:].iterrows(): + label_value = row.iloc[0].lower().strip() + # Check if the row is a header based on label_mapping + if label_value in label_mapping: + # Add standalone entry if previous header had no children + if current_header and not has_children: + mapping_data = label_mapping[current_header] + results.append( + { + "seasonal_activity_label": current_header, + "product": mapping_data["product"], + "additional_identifier": mapping_data["additional_identifier"], + "seasonal_activity_type": None, + "community": None, + "month": None, + "occurrence": None, + } + ) + # Update header and reset tracking for children + current_header = label_value + has_children = False + elif current_header: + # Process each child row for presence values + if row.iloc[2:].notna().any(): + has_children = True + for col_index, presence_value in row.iloc[2:].items(): + if not pd.isnull(presence_value): + presence_value = int(presence_value) + if not pd.isnull(presence_value) and presence_value >= 1: + presence_value = int(presence_value) # Convert to int if not NaN + mapping_data = label_mapping[current_header] + + result_row = { + "seasonal_activity_label": current_header, + "product": mapping_data["product"], + "additional_identifier": mapping_data["additional_identifier"].strip(), + "seasonal_activity_type": label_value, + "community": community_row[col_index], + "month": month_row[col_index], + "occurrence": presence_value, + } + results.append(result_row) + + # Add final standalone entry if last header has no children + if current_header and not has_children: + mapping_data = label_mapping[current_header] + results.append( + { + "seasonal_activity_label": current_header, + "product": mapping_data["product"], + "additional_identifier": mapping_data["additional_identifier"].strip(), + "seasonal_activity_type": None, + "community": None, + "month": None, + "occurrence": None, + } + ) + df_results = pd.DataFrame(results) + # drop if community wasn't provided for some rows + df_results.dropna(subset=["community"], inplace=True) + + df_seasonal_activity = df_results.copy() + df_seasonal_activity.fillna("") + df_seasonal_activity.drop_duplicates( + subset=["product", "additional_identifier", "seasonal_activity_type"], inplace=True + ) + # add livelihood_zone_baseline's natual key to the datafame + df_seasonal_activity["livelihood_zone_baseline"] = [ + [livelihood_zone_baseline.livelihood_zone_id, livelihood_zone_baseline.reference_year_end_date.isoformat()] + ] * len(df_seasonal_activity) + # we don't have a specific season to tie to + df_seasonal_activity["season"] = [[] for _ in range(len(df_seasonal_activity))] + seasonal_activities = df_seasonal_activity[ + ["livelihood_zone_baseline", "seasonal_activity_type", "product", "additional_identifier", "season"] + ].to_dict(orient="records") + + # populate the community full name from the corresponding livelihood_zone_baseline + community_list = Community.objects.filter(livelihood_zone_baseline=livelihood_zone_baseline) + community_dict = { + community.name.lower(): { + "full_name": community.full_name, + "aliases": [alias.strip().lower() for alias in community.aliases] if community.aliases else [], + } + for community in community_list + } + df_zonal_level = df_results[df_results["community"].isin(["Synthèse", "Results"])] + df_community_level = df_results[~df_results["community"].isin(["Synthèse", "Results"])] + + def lookup_community_full_name(name): + name = name.strip().lower() + if name in community_dict: + return community_dict[name]["full_name"] + # If not found, search through aliases + for community_info in community_dict.values(): + if name in community_info["aliases"]: + return community_info["full_name"] + + # If no match is found, return a default value or keep as-is + raise ValueError( + "%s contains unmatched Community name values in worksheet %s:\n%s\n\nExpected names are:\n %s" + % ( + partition_key, + "Seas Cal", + name, + "\n ".join( + Community.objects.filter(livelihood_zone_baseline=livelihood_zone_baseline).values_list( + "name", flat=True + ) + ), + ) + ) + + # Populate 'full_name' to the community DataFrame + df_community_level["community"] = df_community_level["community"].apply(lookup_community_full_name) + # For the zone level, we shall use an aggregate occurrence as cut-off, i.e. if the total reported is greater than + # the CUT_OFF or more we shall consider its occurrence for the zone + + no_of_communities = len(df_community_level["community"].unique()) + threshold = math.ceil(CUT_OFF * no_of_communities) + df_zonal_level = df_zonal_level[df_zonal_level["occurrence"] >= threshold] + # we can merge the two now + df = pd.concat([df_community_level, df_zonal_level], ignore_index=True) + + # Extract the start and end dates considering a continuous block of months if present + # Define month boundaries in a dictionary (simplified with 365 days in a year) + month_days = { + 1: (1, 31), + 2: (32, 59), + 3: (60, 90), + 4: (91, 120), + 5: (121, 151), + 6: (152, 181), + 7: (182, 212), + 8: (213, 243), + 9: (244, 273), + 10: (274, 304), + 11: (305, 334), + 12: (335, 365), + } + + # Function to build start and end dates for contiguous blocks + # Fill missing values with a placeholder + df["product"] = df["product"].fillna("") + df["additional_identifier"] = df["additional_identifier"].fillna("") + + def build_start_end_dates(group): + occurrences = group["month"].sort_values().tolist() + blocks = [] + start_month = occurrences[0] + prev_month = start_month + + for i in range(1, len(occurrences)): + current_month = occurrences[i] + if current_month == prev_month + 1 or (prev_month == 12 and current_month == 1): + prev_month = current_month + else: + blocks.append((start_month, prev_month)) + start_month = current_month + prev_month = current_month + + blocks.append((start_month, prev_month)) + + date_ranges = [] + for start, end in blocks: + start_day = month_days[start][0] + end_day = month_days[end][1] + date_ranges.append({"start": start_day, "end": end_day}) + + return pd.DataFrame(date_ranges) + + # Group by all relevant columns and apply the function + result = ( + df.groupby(["product", "additional_identifier", "seasonal_activity_type", "community"], dropna=False) + .apply(build_start_end_dates) + .reset_index(level=[0, 1, 2, 3], drop=False) + ) + + # Replace the placeholder back with NaN for a clean final output + result["product"].replace("", pd.NA, inplace=True) + result["additional_identifier"].replace("", pd.NA, inplace=True) + + # replace the 'Results' community placeholder with NaN for the community representing zonal value + result.loc[result["community"].isin(["Synthèse", "Results"]), "community"] = np.nan + result = result.fillna("") + + def create_seasonal_activity_column(row): + """ + Combine seasonal activity natural keys to create the column. + """ + columns = ["seasonal_activity_type", "product", "additional_identifier"] + non_null_values = [lz for lz in row["livelihood_zone_baseline"]] + non_null_values += [str(row[col]) for col in columns if pd.notna(row[col])] + row["seasonal_activity"] = non_null_values + return row + + def update_community(row): + identifers = [lz for lz in row["livelihood_zone_baseline"]] + if not row["community"]: + row["community"] = None + return row + identifers.append(row["community"]) + row["community"] = identifers + return row + + # Assign the livelihood_zone_baseline for each row + result["livelihood_zone_baseline"] = [ + [livelihood_zone_baseline.livelihood_zone_id, livelihood_zone_baseline.reference_year_end_date.isoformat()] + ] * len(result) + + # Apply the function and update each row in result + result = result.apply(create_seasonal_activity_column, axis=1) + result = result.apply(update_community, axis=1) + + seasonal_activity_occurences = result[ + ["livelihood_zone_baseline", "seasonal_activity", "community", "start", "end"] + ].to_dict(orient="records") + result = { + "SeasonalActivity": seasonal_activities, + "SeasonalActivityOccurrence": seasonal_activity_occurences, + } + metadata = { + "num_seasonal_activities": len(seasonal_activities), + "num_seasonal_activity_occurences": len(seasonal_activity_occurences), + "preview": MetadataValue.md(f"```json\n{json.dumps(result, indent=4)}\n```"), + } + return Output( + result, + metadata=metadata, + ) + + +@asset(partitions_def=bss_instances_partitions_def, io_manager_key="json_io_manager") +def seasonal_calendar_instances( + context: AssetExecutionContext, + config: BSSMetadataConfig, + seasonal_calendar_dataframe, +) -> Output[dict]: + """ """ + partition_key = context.asset_partition_key_for_output() + livelihood_zone_baseline = LivelihoodZoneBaseline.objects.get_by_natural_key(*partition_key.split("~")[1:]) + + instances = get_seas_cal_instances_from_dataframe( + context, seasonal_calendar_dataframe, livelihood_zone_baseline, partition_key + ) + if isinstance(instances, Output): + instances = instances.value + + metadata = {f"num_{key.lower()}": len(value) for key, value in instances.items()} + metadata["total_instances"] = sum(len(value) for value in instances.values()) + metadata["preview"] = MetadataValue.md(f"```json\n{json.dumps(instances, indent=4)}\n```") + + return Output( + instances, + metadata=metadata, + ) + + +@asset(partitions_def=bss_instances_partitions_def, io_manager_key="json_io_manager") +def validated_seas_cal_instances( + context: AssetExecutionContext, + seasonal_calendar_instances, +) -> Output[dict]: + """ + Validated seas_cal instances from a BSS, ready to be loaded via a Django fixture. + """ + partition_key = context.asset_partition_key_for_output() + # validate the instances using the common method + validate_instances(seasonal_calendar_instances, partition_key) + + metadata = {f"num_{key.lower()}": len(value) for key, value in seasonal_calendar_instances.items()} + metadata["total_instances"] = sum(len(value) for value in seasonal_calendar_instances.values()) + metadata["preview"] = MetadataValue.md(f"```json\n{json.dumps(seasonal_calendar_instances, indent=4)}\n```") + return Output( + seasonal_calendar_instances, + metadata=metadata, + ) + + +@asset(partitions_def=bss_instances_partitions_def, io_manager_key="json_io_manager") +def consolidated_seas_cal_fixtures( + context: AssetExecutionContext, + config: BSSMetadataConfig, + validated_seas_cal_instances, +) -> Output[list[dict]]: + """ + Consolidate the season calendar fixtures to make it ready for importing + """ + metadata, fixture = get_fixture_from_instances(validated_seas_cal_instances) + metadata["preview"] = MetadataValue.md(f"```json\n{json.dumps(fixture, indent=4)}\n```") + return Output( + fixture, + metadata=metadata, + ) + + +@asset(partitions_def=bss_instances_partitions_def) +def imported_seas_cals( + context: AssetExecutionContext, + consolidated_seas_cal_fixtures, +) -> Output[None]: + """ + Attempt to import the season calendar given the consolidated seas_cal fixtures using the + imported_baselines common method + """ + + return imported_baselines(None, consolidated_seas_cal_fixtures) diff --git a/pipelines/jobs/fixtures.py b/pipelines/jobs/fixtures.py index 4dbac2fb..44331786 100644 --- a/pipelines/jobs/fixtures.py +++ b/pipelines/jobs/fixtures.py @@ -27,6 +27,12 @@ other_cash_income_label_dataframe, summary_other_cash_income_labels_dataframe, ) +from ..assets.seasonal_calendar import ( + consolidated_seas_cal_fixtures, + imported_seas_cals, + seasonal_calendar_instances, + validated_seas_cal_instances, +) from ..assets.wealth_characteristic import ( all_wealth_characteristic_labels_dataframe, summary_wealth_characteristic_labels_dataframe, @@ -58,6 +64,16 @@ partitions_def=bss_instances_partitions_def, ) +import_seas_cal_from_fixture = define_asset_job( + name="import_seas_cal_from_fixture", + selection=( + seasonal_calendar_instances, + validated_seas_cal_instances, + consolidated_seas_cal_fixtures, + imported_seas_cals, + ), + partitions_def=bss_instances_partitions_def, +) update_external_assets = define_asset_job( name="update_external_assets",