diff --git a/apps/baseline/models.py b/apps/baseline/models.py index 14739732..8cb112cc 100644 --- a/apps/baseline/models.py +++ b/apps/baseline/models.py @@ -2104,6 +2104,24 @@ class Meta: verbose_name_plural = _("Market Prices") +class SeasonalProductionPerformanceManager(common_models.IdentifierManager): + def get_by_natural_key( + self, + code: str, + reference_year_end_date: str, + community_full_name: str, + season_name: str, + performance_year_end_date: str, + ): + return self.get( + community__livelihood_zone_baseline__livelihood_zone__code=code, + community__livelihood_zone_baseline__reference_year_end_date=reference_year_end_date, + community__full_name=community_full_name, + season__name_en=season_name, + performance_year_end_date=performance_year_end_date, + ) + + class SeasonalProductionPerformance(common_models.Model): """ Relative production performance experienced in a specific season / year. @@ -2141,12 +2159,35 @@ class Performance(models.IntegerChoices): verbose_name=_("Seasonal Performance"), help_text=_("Rating of the seasonal production performance from Very Poor (1) to Very Good (5)"), ) + objects = SeasonalProductionPerformanceManager() + + def natural_key(self): + return ( + self.community.livelihood_zone_baseline.livelihood_zone.code, + self.community.livelihood_zone_baseline.reference_year_end_date.isoformat(), + self.community.full_name, + self.season.name_en, + self.performance_year_end_date.isoformat(), + ) class Meta: verbose_name = _("Seasonal Production Performance") verbose_name_plural = _("Seasonal Production Performance") +class HazardManager(common_models.IdentifierManager): + def get_by_natural_key( + self, code: str, reference_year_end_date: str, community_full_name: str, chronic_or_periodic: str, ranking: str + ): + return self.get( + community__livelihood_zone_baseline__livelihood_zone__code=code, + community__livelihood_zone_baseline__reference_year_end_date=reference_year_end_date, + community__full_name=community_full_name, + chronic_or_periodic=chronic_or_periodic, + ranking=int(ranking), + ) + + class Hazard(common_models.Model): """ A shock such as drought, flood, conflict or market disruption which is likely @@ -2185,11 +2226,21 @@ class HazardRanking(models.IntegerChoices): description = common_models.DescriptionField( max_length=255, verbose_name=_("Description of Event(s) and/or Response(s)") ) + objects = HazardManager() class Meta: verbose_name = _("Hazard") verbose_name_plural = _("Hazards") + def natural_key(self): + return ( + self.community.livelihood_zone_baseline.livelihood_zone.code, + self.community.livelihood_zone_baseline.reference_year_end_date.isoformat(), + self.community.full_name, + self.chronic_or_periodic, + str(self.ranking), + ) + class Event(common_models.Model): """ diff --git a/apps/metadata/lookups.py b/apps/metadata/lookups.py index aca8075c..8611c768 100644 --- a/apps/metadata/lookups.py +++ b/apps/metadata/lookups.py @@ -7,6 +7,7 @@ from common.lookups import Lookup from .models import ( + HazardCategory, LivelihoodCategory, ReferenceData, Season, @@ -66,3 +67,7 @@ class SeasonNameLookup(SeasonLookup): *translation_fields("description"), "aliases", ] + + +class HazardCategoryLookup(ReferenceDataLookup): + model = HazardCategory diff --git a/pipelines/__init__.py b/pipelines/__init__.py index 946bca6c..88bc2a77 100644 --- a/pipelines/__init__.py +++ b/pipelines/__init__.py @@ -31,6 +31,18 @@ other_cash_income_label_dataframe, summary_other_cash_income_labels_dataframe, ) +from .assets.seasonal_production_performance import ( + all_hazard_labels_dataframe, + all_seasonal_production_performance_labels_dataframe, + hazard_instances, + hazard_labels_dataframe, + hazards_dataframe, + seasonal_production_performance_dataframe, + seasonal_production_performance_instances, + seasonal_production_performance_label_dataframe, + summary_hazard_labels_dataframe, + summary_seasonal_production_performance_labels_dataframe, +) from .assets.wealth_characteristic import ( all_wealth_characteristic_labels_dataframe, summary_wealth_characteristic_labels_dataframe, @@ -96,6 +108,16 @@ consolidated_fixtures, uploaded_baselines, imported_baselines, + seasonal_production_performance_dataframe, + hazards_dataframe, + seasonal_production_performance_label_dataframe, + all_seasonal_production_performance_labels_dataframe, + summary_seasonal_production_performance_labels_dataframe, + hazard_labels_dataframe, + all_hazard_labels_dataframe, + summary_hazard_labels_dataframe, + seasonal_production_performance_instances, + hazard_instances, ], jobs=[update_metadata, update_external_assets, upload_baselines, extract_dataframes, import_baseline_from_fixture], resources={ diff --git a/pipelines/assets/base.py b/pipelines/assets/base.py index aae292a9..e8de83cf 100644 --- a/pipelines/assets/base.py +++ b/pipelines/assets/base.py @@ -47,6 +47,10 @@ "groupe de richesse": "fr", "groupe socio-economique": "fr", "group socio-economique": "fr", + "year of harvest": "en", + "production year": "en", + "year": "en", + "année": "fr", } # List of labels that indicate the start of the summary columns from row 3 in the Data, Data, and Data3 worksheets @@ -59,6 +63,8 @@ "range", "interval", "intervales", # 2023 Mali BSSs + "Average", # For timeline sheet + "MOYENNE", ] @@ -309,7 +315,7 @@ def validate_previous_value(cell, expected_prev_value, prev_value): def get_bss_dataframe( config: BSSMetadataConfig, - filepath_or_buffer, + filepath_or_buffer: object, bss_sheet: str, start_strings: list[str], end_strings: Optional[list[str]] = None, @@ -327,10 +333,7 @@ def get_bss_dataframe( # The requested worksheet does not exist in the file return Output( pd.DataFrame(), - metadata={ - "worksheet": bss_sheet, - "row_count": "Worksheet not present in file", - }, + metadata={"worksheet": bss_sheet, "row_count": "Worksheet not present in file", "datapoint_count": 0}, ) # Use a 1-based index to match the Excel Row Number @@ -338,15 +341,16 @@ def get_bss_dataframe( # Set the column names to match Excel df.columns = [get_column_letter(col + 1) for col in df.columns] - # Find the last column before the summary column, which is in row 3 - end_col = get_index(SUMMARY_LABELS, df.loc[3], offset=-1) - if not end_col: + # Find the last column before the summary column, which is in row 3 for the WB, data sheets and 4 for timeline sheet + row = 4 if bss_sheet == "Timeline" else 3 + end_col = get_index(SUMMARY_LABELS, df.loc[row], offset=-1) + if end_col == df.columns[-1]: # Need the last row because get_index has offset=-1 raise ValueError(f'No cell containing any of the summary strings: {", ".join(SUMMARY_LABELS)}') if not num_summary_cols: # If the number of summary columns wasn't specified, then assume that # there is one summary column for each wealth category, from row 3. - num_summary_cols = df.loc[3, "B":end_col].dropna().nunique() + num_summary_cols = df.loc[row, "B":end_col].dropna().nunique() end_col = df.columns[df.columns.get_loc(end_col) + num_summary_cols] # Find the row index of the start of the Livelihood Activities or Wealth Group Characteristic Values @@ -370,8 +374,8 @@ def get_bss_dataframe( # for rows that rely on copying down the label in column A from a previous row. end_row = df.index[-1] - # Find the language based on the value in cell A3 - lang = LANGS[df.loc[3, "A"].strip().lower()] + # Find the language based on the value in cell A3, or A4 + lang = LANGS[df.loc[row, "A"].strip().lower()] # Filter to just the Wealth Group header rows and the Livelihood Activities df = pd.concat([df.loc[header_rows, :end_col], df.loc[start_row:end_row, :end_col]]) diff --git a/pipelines/assets/fixtures.py b/pipelines/assets/fixtures.py index 9b02ea7f..365cb4fd 100644 --- a/pipelines/assets/fixtures.py +++ b/pipelines/assets/fixtures.py @@ -99,6 +99,8 @@ def consolidated_instances( livelihood_activity_instances, other_cash_income_instances, wild_foods_instances, + seasonal_production_performance_instances, + hazard_instances, ) -> Output[dict]: """ Consolidated record instances from a BSS, ready to be validated. @@ -109,7 +111,10 @@ def consolidated_instances( # WealthGroup instances, which are needed as a foreign key from LivelihoodActivity, etc. **wealth_characteristic_instances, **livelihood_activity_instances, + **seasonal_production_performance_instances, + **hazard_instances, } + # Add the wild foods and other cash income instances, if they are present for model_name, instances in {**other_cash_income_instances, **wild_foods_instances}.items(): if instances: diff --git a/pipelines/assets/seasonal_production_performance.py b/pipelines/assets/seasonal_production_performance.py new file mode 100644 index 00000000..4a3805f9 --- /dev/null +++ b/pipelines/assets/seasonal_production_performance.py @@ -0,0 +1,420 @@ +import json +import os + +import django +import pandas as pd +from dagster import AssetExecutionContext, MetadataValue, Output, asset + +from baseline.models import Hazard # NOQA: E402 +from metadata.lookups import HazardCategoryLookup, SeasonNameLookup # NOQA: E402 + +from ..configs import BSSMetadataConfig +from ..partitions import bss_files_partitions_def, bss_instances_partitions_def +from .base import ( + get_all_bss_labels_dataframe, + get_bss_dataframe, + get_bss_label_dataframe, + get_summary_bss_label_dataframe, +) + +# set the default Django settings module +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "hea.settings.production") + +# Configure Django with our custom settings before importing any Django classes +django.setup() + +# Indexes of header rows in the Timeline dataframe (year, community) +HEADER_ROWS = [ + 4, +] +YEAR_COLUMNS = ["Year of harvest", "year", "Year", "Année", "Production year", "Production Year"] + +HAZARD_TYPE_COLUMNS = { + Hazard.ChronicOrPeriodic.CHRONIC: [ + "Chronic hazards:", + "Chronic hazards (every year):", + "Alás croniques", + "Alás chroniques", + "Aléas croniques", + "Aléas/ risques - ranking 1 à 8", + "Aleás croniques", + "Aléas chroniques", + ], + Hazard.ChronicOrPeriodic.PERIODIC: ["Periodic hazards (not every year):"], +} + + +@asset(partitions_def=bss_files_partitions_def) +def seasonal_production_performance_dataframe(config: BSSMetadataConfig, corrected_files) -> Output[pd.DataFrame]: + """ + DataFrame asset for seasonal production performance from a BSS in 'Timeline' sheet + """ + return get_bss_dataframe( + config, + corrected_files, + "Timeline", + start_strings=YEAR_COLUMNS, + end_strings=[ + "Chronic hazards:", + "Chronic hazards (every year):", + "Alás croniques", + "Alás chroniques", + "Aléas croniques", + "Aléas/ risques - ranking 1 à 8", + "Aleás croniques", + "Aléas chroniques", + ], + header_rows=HEADER_ROWS, + num_summary_cols=1, + ) + + +@asset(partitions_def=bss_files_partitions_def) +def seasonal_production_performance_label_dataframe( + context: AssetExecutionContext, + config: BSSMetadataConfig, + seasonal_production_performance_dataframe: pd.DataFrame, +) -> Output[pd.DataFrame]: + """ + Dataframe of Seasonal Production Perf (Timeline) Label References + """ + return get_bss_label_dataframe( + context, + config, + seasonal_production_performance_dataframe, + "seasonal_production_performance_dataframe", + len(HEADER_ROWS), + ) + + +@asset(io_manager_key="dataframe_csv_io_manager") +def all_seasonal_production_performance_labels_dataframe( + config: BSSMetadataConfig, seasonal_production_performance_label_dataframe: dict[str, pd.DataFrame] +) -> Output[pd.DataFrame]: + """ + Combined dataframe of the seasonal production performance labels in use across all BSSs. + """ + return get_all_bss_labels_dataframe(config, seasonal_production_performance_label_dataframe) + + +@asset(io_manager_key="dataframe_csv_io_manager") +def summary_seasonal_production_performance_labels_dataframe( + config: BSSMetadataConfig, all_seasonal_production_performance_labels_dataframe: pd.DataFrame +) -> Output[pd.DataFrame]: + """ + Summary of the seasonal production performance labels in use across all BSSs. + """ + return get_summary_bss_label_dataframe(config, all_seasonal_production_performance_labels_dataframe) + + +@asset(partitions_def=bss_instances_partitions_def, io_manager_key="json_io_manager") +def seasonal_production_performance_instances( + context: AssetExecutionContext, + config: BSSMetadataConfig, + baseline_instances, + seasonal_production_performance_dataframe, +) -> Output[dict]: + """ + Seasonal Production Performance instances extracted from the Timeline sheet of the BSS. + """ + partition_key = context.asset_partition_key_for_output() + result = { + "SeasonalProductionPerformance": seasonal_production_performance_dataframe.to_dict(orient="records"), + } + metadata = { + "num_seasonal_production_performances": 0, + } + if seasonal_production_performance_dataframe.empty: + # We don't have a Timeline sheet with seasonal production performance for this BSS + # so prepare an empty dataframe with 0 counts + metadata["message"] = ( + f"Empty seasonal production encountered for partition {partition_key}, due to missing timeline sheet" + ) + return Output( + result, + metadata=metadata, + ) + df = seasonal_production_performance_dataframe + baseline_df = baseline_instances + reference_year_end_date = baseline_df["LivelihoodZoneBaseline"][0]["reference_year_end_date"] + reference_year_start_date = baseline_df["LivelihoodZoneBaseline"][0]["reference_year_start_date"] + + df = df.reset_index(drop=True) + # The last column is the average, let us drop it + df = df.drop(df.columns[-1], axis=1) + # Make the column names the first row + df.columns = df.iloc[0] + df = df.drop([0, 1]) + df = df.reset_index(drop=True) + + def get_year_column_name(df): + for column_name in YEAR_COLUMNS: + if column_name in df.columns: + return column_name + raise ValueError( + "No candidate columns found for the year from %s, current columns %s" % (YEAR_COLUMNS, df.columns) + ) + + # Get the name of the first matching year column + year_column_name = get_year_column_name(df) + df = df.rename(columns={year_column_name: "year"}) + df["year"] = df["year"].astype(str) + + df = df.melt(id_vars="year", var_name="community", value_name="seasonal_performance") + + columns = ["year", "community", "seasonal_performance"] + df = df.dropna(subset=columns) + # Some BSSs may contain the Timeline sheet but the entries for seasonal performance could just be empty + if df.empty: + metadata["message"] = f"Empty seasonal production encountered for partition {partition_key}" + return Output( + result, + metadata=metadata, + ) + df = df[df[columns].apply(lambda x: x.str.strip()).all(axis=1)] + + # Function to extract start and end dates + def extract_dates(year): + if pd.notna(year): + if len(year) == 4: # Check if it's a single year value + start_date = f"{year}-{reference_year_start_date.split('-')[1]}-01" + end_date = f"{int(year) + 1}-{reference_year_end_date.split('-')[1]}-01" + else: + if "/" in year: + start_year, end_year = map(str, year.split("/")) + elif "-" in year: + start_year, end_year = map(str, year.split("-")) + elif ":" in year: + start_year, end_year = map(str, year.split(":")) + elif " " in year: + start_year, end_year = map(str, year.split()) + else: + raise ValueError(f"Invalid year format {str(year)}") + # check if the year format provided is with 2 or 4 digits + if end_year and len(end_year) == 2: + end_year = start_year[:2] + end_year + + start_date = f"({int(start_year)}-{int(reference_year_start_date.split(' - ')[1])}-01)" + end_date = f"({int(end_year)}-{int(reference_year_start_date.split(' - ')[1])}-01)" + else: + raise ValueError("Year is not recognized") + return start_date, end_date + + # Apply the function to create new 'Start Date' and 'End Date' columns + df[["performance_year_start_date", "performance_year_end_date"]] = df["year"].apply( + lambda x: pd.Series(extract_dates(x)) + ) + # Add season for a season Lookup, a default season is Harvest + # We may need to update this for some BSS that my have season + seasonnamelookup = SeasonNameLookup() + + def get_season(): + return [seasonnamelookup.get("Harvest", country_id=baseline_df["LivelihoodZone"][0]["country_id"])] + + df["season"] = "Harvest" + df["season"] = df["season"].apply(lambda c: get_season()) + + # Create function for looking for the community + def get_community(community): + for community_dict in baseline_df["Community"]: + if community_dict["name"] == community: + return [ + community_dict["livelihood_zone_baseline"][0], + baseline_df["LivelihoodZoneBaseline"][0]["reference_year_end_date"], + community_dict["full_name"], + ] + return None + # raise ValueError( + # "Unable to get community %s, perhaps that is wrongly spelled:" % (community) + # ) + + df["community"] = df["community"].apply(lambda c: get_community(c)) + df = df.dropna(subset="community") + seasonal_performances = df.to_dict(orient="records") + result = { + "SeasonalProductionPerformance": seasonal_performances, + } + metadata = { + "num_seasonal_production_performances": len(seasonal_performances), + "preview": MetadataValue.md(f"```json\n{json.dumps(result, indent=4)}\n```"), + } + + return Output( + result, + metadata=metadata, + ) + + +@asset(partitions_def=bss_files_partitions_def) +def hazards_dataframe(config: BSSMetadataConfig, corrected_files) -> Output[pd.DataFrame]: + """ + DataFrame asset for Hazards from a BSS in 'Timeline' sheet + """ + return get_bss_dataframe( + config, + corrected_files, + "Timeline", + start_strings=[ + "Chronic hazards:", + "Chronic hazards (every year):", + "Alás croniques", + "Alás chroniques", + "Aléas croniques", + "Aléas/ risques - ranking 1 à 8", + "Aleás croniques", + "Aléas chroniques", + ], + header_rows=HEADER_ROWS, + num_summary_cols=1, + ) + + +@asset(partitions_def=bss_instances_partitions_def, io_manager_key="json_io_manager") +def hazard_instances( + context: AssetExecutionContext, + config: BSSMetadataConfig, + baseline_instances, + hazards_dataframe, +) -> Output[dict]: + """ + Hazard instances extracted from the Timeline sheet of the BSS. + """ + partition_key = context.asset_partition_key_for_output() + result = { + "Hazard": hazards_dataframe.to_dict(orient="records"), + } + metadata = { + "hazards": 0, + } + if hazards_dataframe.empty: + # We don't have a Timeline sheet with Hazard for this BSS so prepare an empty dataframe with 0 counts + metadata["message"] = f"Empty Hazard encountered for partition {partition_key}, due to missing timeline sheet" + return Output( + result, + metadata=metadata, + ) + df = hazards_dataframe + baseline_df = baseline_instances + df = df.reset_index(drop=True) + # The last column is the average, let us drop it + df = df.drop(df.columns[-1], axis=1) + # Make the column names the first row + df.columns = df.iloc[0] + df = df.drop(0) + + def get_year_column_name(df): + for column_name in YEAR_COLUMNS: + if column_name in df.columns: + return column_name + raise ValueError( + "No candidate columns found for the year from %s, current columns %s" % (YEAR_COLUMNS, df.columns) + ) + + # Get the name of the first matching year column + year_column_name = get_year_column_name(df) + df = df.rename(columns={year_column_name: "ranking"}) + + # Determine is chronic vs periodic + df = df.melt(id_vars="ranking", var_name="community", value_name="hazard_category") + + df["chronic_or_periodic"] = Hazard.ChronicOrPeriodic.CHRONIC + periodic_encountered = False + for index, row in df.iterrows(): + if row["ranking"] in HAZARD_TYPE_COLUMNS[Hazard.ChronicOrPeriodic.PERIODIC]: + periodic_encountered = True + if row["ranking"] in HAZARD_TYPE_COLUMNS[Hazard.ChronicOrPeriodic.CHRONIC]: + periodic_encountered = False + if periodic_encountered: + df.loc[index, "chronic_or_periodic"] = Hazard.ChronicOrPeriodic.PERIODIC + + # Drop empty or invalid ranking and hazard values + df = df[df["ranking"].apply(lambda x: isinstance(x, int) and x != "")] + df = df[ + df["hazard_category"].apply( + lambda x: isinstance(x, str) and x.strip() != "" and not pd.isna(x) and not pd.isnull(x) + ) + ] + # Some BSSs may contain the Timeline sheet but the entries for Hazards could just be empty + if df.empty: + metadata["message"] = f"Empty Hazards encountered for partition {partition_key}" + return Output( + result, + metadata=metadata, + ) + df["hazard_category"] = df["hazard_category"].str.lower() + df = HazardCategoryLookup().do_lookup(df, "hazard_category", "hazard_category") + + def get_community(community): + for community_dict in baseline_df["Community"]: + if community_dict["name"] == community: + return [ + community_dict["livelihood_zone_baseline"][0], + baseline_df["LivelihoodZoneBaseline"][0]["reference_year_end_date"], + community_dict["full_name"], + ] + return None + # raise ValueError( + # "Unable to get community %s, perhaps that is wrongly spelled:" % (community) + # ) + + df["community"] = df["community"].apply(lambda c: get_community(c)) + df = df.dropna(subset="community") + hazards = df.to_dict(orient="records") + result = { + "Hazard": hazards, + } + metadata = { + "num_hazards": len(hazards), + "preview": MetadataValue.md(f"```json\n{json.dumps(result, indent=4)}\n```"), + } + + return Output( + result, + metadata=metadata, + ) + + +@asset(partitions_def=bss_files_partitions_def) +def hazard_labels_dataframe( + context: AssetExecutionContext, + config: BSSMetadataConfig, + hazards_dataframe: pd.DataFrame, +) -> Output[pd.DataFrame]: + """ + Dataframe of Hazards (Timeline) Label References + """ + return get_bss_label_dataframe(context, config, hazards_dataframe, "hazards_dataframe", len(HEADER_ROWS)) + + +@asset(io_manager_key="dataframe_csv_io_manager") +def all_hazard_labels_dataframe( + config: BSSMetadataConfig, hazard_labels_dataframe: dict[str, pd.DataFrame] +) -> Output[pd.DataFrame]: + """ + Combined dataframe of the Hazards labels in use across all BSSs. + """ + return get_all_bss_labels_dataframe(config, hazard_labels_dataframe) + + +@asset(io_manager_key="dataframe_csv_io_manager") +def summary_hazard_labels_dataframe( + config: BSSMetadataConfig, all_hazard_labels_dataframe: pd.DataFrame +) -> Output[pd.DataFrame]: + """ + Summary of the Hazards labels in use across all BSSs. + """ + return get_summary_bss_label_dataframe(config, all_hazard_labels_dataframe) + + +@asset(partitions_def=bss_files_partitions_def) +def events_dataframe(config: BSSMetadataConfig, corrected_files) -> Output[pd.DataFrame]: + """ + DataFrame asset for events from a BSS in 'Timeline' sheet + """ + return get_bss_dataframe( + config, + corrected_files, + "Timeline", + start_strings=["Periodic hazards (not every year):", "Aléas periodiques"], + header_rows=HEADER_ROWS, + ) diff --git a/pipelines/jobs/fixtures.py b/pipelines/jobs/fixtures.py index d252e8f6..7248524c 100644 --- a/pipelines/jobs/fixtures.py +++ b/pipelines/jobs/fixtures.py @@ -32,6 +32,10 @@ other_cash_income_label_dataframe, summary_other_cash_income_labels_dataframe, ) +from ..assets.seasonal_production_performance import ( + hazard_instances, + seasonal_production_performance_instances, +) from ..assets.wealth_characteristic import ( all_wealth_characteristic_labels_dataframe, summary_wealth_characteristic_labels_dataframe, @@ -59,6 +63,8 @@ validated_instances, consolidated_fixtures, imported_baselines, + seasonal_production_performance_instances, + hazard_instances, ), partitions_def=bss_instances_partitions_def, ) diff --git a/pipelines/sensors.py b/pipelines/sensors.py index 62c8dc97..bb8f4f97 100644 --- a/pipelines/sensors.py +++ b/pipelines/sensors.py @@ -5,6 +5,10 @@ from .assets.livelihood_activity import livelihood_activity_instances from .assets.other_cash_income import other_cash_income_instances +from .assets.seasonal_production_performance import ( + hazards_dataframe, + seasonal_production_performance_dataframe, +) from .assets.wealth_characteristic import wealth_characteristic_instances from .assets.wild_foods import wild_foods_instances from .partitions import bss_instances_partitions_def @@ -24,6 +28,8 @@ other_cash_income_instances.key, wild_foods_instances.key, wealth_characteristic_instances.key, + seasonal_production_performance_dataframe.key, + hazards_dataframe.key, ), minimum_interval_seconds=600, )