diff --git a/apps/baseline/importers.py b/apps/baseline/importers.py new file mode 100644 index 00000000..a042c4ed --- /dev/null +++ b/apps/baseline/importers.py @@ -0,0 +1,368 @@ +import logging + +from django.contrib.contenttypes.models import ContentType + +from baseline.models import ( + Community, + LivelihoodActivity, + LivelihoodStrategy, + LivelihoodZoneBaseline, +) +from ingestion.decorators import register +from ingestion.importers import Importer +from ingestion.models import SpreadsheetLocation + +logger = logging.getLogger(__name__) + + +@register +class LivelihoodZoneBaselineImporter(Importer): + # Management command load_from_bss calls this importer's ingest() for a pre-saved LivelihoodZoneBaseline instance. + + class Meta: + model = LivelihoodZoneBaseline + dependent_model_fields = [ + "livelihood_strategies", + "communities", + ] + + +@register +class LivelihoodStrategyImporter(Importer): + class Meta: + model = LivelihoodStrategy + fields = [ + "product", + "strategy_type", + "season", + "unit_of_measure", + "currency", + "additional_identifier", + ] + parent_model_fields = [ + "livelihood_zone_baseline", + ] + dependent_model_fields = [ + "livelihoodactivity", + ] + + # def _save_instances(self, successful_mappings, parent_instances, import_run): + # """ + # Uncomment to generate a fake parent instance, so that child importers can run. + # """ + # parent_instances[self.Meta.model] = [LivelihoodStrategyFactory()] + # return parent_instances + + def ingest_product( + self, + field_def, + successful_mappings, + failed_mappings, + parent_instances, + bss_value_extractors, + import_run, + ): + # Scan down column A of the three Data sheets looking for a product alias. + for sheet_name in ("Data", "Data2", "Data3"): + row_count, column_count = parent_instances[LivelihoodZoneBaseline][0].load_sheet(sheet_name).shape + for row in range(7, min(row_count, 3000)): + new_spreadsheet_location, successful_mappings, failed_mappings = self.attempt_load_from_cell( + parent_instances=parent_instances, + field_def=field_def, + find_field=False, + sheet_name=sheet_name, + column=0, # col A + row=row, + bss_value_extractors=bss_value_extractors, + successful_mappings=successful_mappings, + failed_mappings=failed_mappings, + import_run=import_run, + ) + return successful_mappings, failed_mappings, parent_instances + + def ingest_strategy_type( + self, + field_def, + successful_mappings, + failed_mappings, + parent_instances, + bss_value_extractors, + import_run, + ): + # The products must already have been mapped, so we know how many LSes we have and which rows they're on. + # This finds the strategy_types (~12), then generates a strategy_type SpreadsheetLocation per LS (~90). + + # 1. Identify SpreadsheetLocation of each strategy_type found in the BSS (approx 12): + strategy_type_spreadsheet_locations = [] + for sheet_name in ("Data", "Data2", "Data3"): + row_count, column_count = parent_instances[LivelihoodZoneBaseline][0].load_sheet(sheet_name).shape + for row in range(10, min(row_count, 3000 + 1)): + new_spreadsheet_location, successful_mappings, failed_mappings = self.attempt_load_from_cell( + parent_instances=parent_instances, + field_def=field_def, + find_field=False, + sheet_name=sheet_name, + column=0, # all in column A + row=row, + bss_value_extractors=bss_value_extractors, + successful_mappings=successful_mappings, + failed_mappings=failed_mappings, + import_run=import_run, + ) + if new_spreadsheet_location: + strategy_type_spreadsheet_locations.append(new_spreadsheet_location) + + # 2. Generate a strategy_type SpreadsheetLocation per LivelihoodStrategy in the BSS (approx 90): + # The first row of each LivelihoodStrategy has the product in col A, so we use product mappings to iterate LS. + sl_per_livelihood_strategy = [] + for instance_number, product_sl in enumerate(successful_mappings["product"]): + strategy_type_sl = self.get_strategy_type_for_instance(instance_number, successful_mappings) + sl_per_livelihood_strategy.append(strategy_type_sl) + + # 3. Clean up working data: + # Grab the PKs of the SLs not attached to any LS instance for deletion later + sls_to_delete = [o.pk for o in strategy_type_spreadsheet_locations] + + # Generate a new SpreadsheetLocation per LivelihoodStrategy instance + for instance_number, sl in enumerate(sl_per_livelihood_strategy): + sl.pk = None + sl.id = None + sl.instance_number = instance_number + sl.save() + + # Delete the strategy_type SpreadsheetLocations not attached to any LivelihoodStrategy instance + SpreadsheetLocation.objects.filter(pk__in=sls_to_delete).delete() + + return successful_mappings, failed_mappings, parent_instances + + @staticmethod + def get_strategy_type_for_instance(instance_number, successful_mappings): + # The strategy type for a given LivelihoodStrategy instance is the one closest above it in the BSS: + product = successful_mappings["product"][instance_number] + strategy_types = successful_mappings["strategy_type"] + st_index = 0 + while st_index < len(strategy_types) and ( + product.sheet_name != strategy_types[st_index].sheet_name or product.row < strategy_types[st_index].row + ): + st_index += 1 + return strategy_types[st_index] + + +@register +class CommunityImporter(Importer): + class Meta: + model = Community + fields = [ + "name", + "full_name", + "code", + "interview_number", + ] + dependent_model_fields = [ + # TODO: "wealthgroup" importer + ] + + def ingest_name( + self, + field_def, + successful_mappings, + failed_mappings, + parent_instances, + bss_value_extractors, + import_run, + ): + # The community/village names are on row 4, repeated for each wealth category (on row 2). + # So scan across the first wealth category accumulating village names. + sheet_name = "Data" + sheet = parent_instances[LivelihoodZoneBaseline][0].load_sheet(sheet_name) + row = 4 + column = 1 + first_wc = sheet.iloc[2, column] + while first_wc == sheet.iloc[2, column]: + new_spreadsheet_location, successful_mappings, failed_mappings = self.attempt_load_from_cell( + parent_instances=parent_instances, + field_def=field_def, + find_field=False, + sheet_name=sheet_name, + column=column, + row=row, + bss_value_extractors=bss_value_extractors, + successful_mappings=successful_mappings, + failed_mappings=failed_mappings, + import_run=import_run, + ) + column += 1 + return successful_mappings, failed_mappings, parent_instances + + def ingest_full_name( + self, + field_def, + successful_mappings, + failed_mappings, + parent_instances, + bss_value_extractors, + import_run, + ): + # 1. Scan across Data sheet row 3 loading district names + sheet_name = "Data" + row = 3 + for name_loc in successful_mappings["name"]: + new_spreadsheet_location, successful_mappings, failed_mappings = self.attempt_load_from_cell( + parent_instances=parent_instances, + field_def=field_def, + find_field=False, + sheet_name=sheet_name, + column=name_loc.column, + row=row, + bss_value_extractors=bss_value_extractors, + successful_mappings=successful_mappings, + failed_mappings=failed_mappings, + import_run=import_run, + ) + # 2. Prefix the village names + for i, full_name_loc in enumerate(successful_mappings[field_def.name]): + village_loc = successful_mappings["name"][i] + full_name_loc.mapped_value = ", ".join((village_loc.mapped_value, full_name_loc.mapped_value)) + return successful_mappings, failed_mappings, parent_instances + + def ingest_code( + self, + field_def, + successful_mappings, + failed_mappings, + parent_instances, + bss_value_extractors, + import_run, + ): + # 1. Populate in the same way as the name field + successful_mappings, failed_mappings, parent_instances = self.ingest_name( + field_def, + successful_mappings, + failed_mappings, + parent_instances, + bss_value_extractors, + import_run, + ) + # 2. Convert to lower case + for loc in successful_mappings[field_def.name]: + loc.mapped_value = loc.mapped_value.lower() + return successful_mappings, failed_mappings, parent_instances + + +@register +class LivelihoodActivityImporter(Importer): + class Meta: + model = LivelihoodActivity + fields = [ + "scenario", + "wealth_group", + "quantity_produced", + "quantity_purchased", + "quantity_sold", + "quantity_other_uses", + "quantity_consumed", + "price", + "income", + "expenditure", + "kcals_consumed", + "percentage_kcals", + "household_labor_provider", + "strategy_type", + ] + parent_model_fields = [ + "livelihood_strategy", + "livelihood_zone_baseline", + ] + dependent_model_fields = [ + # TODO: related importers (by inheriting from LivelihoodActivityImporter) + # "milkproduction", + # "butterproduction", + # "meatproduction", + # "livestocksale", + # "cropproduction", + # "foodpurchase", + # "paymentinkind", + # "reliefgiftother", + # "fishing", + # "wildfoodgathering", + # "othercashincome", + # "otherpurchase", + ] + + def ingest_quantity_produced( + self, + field_def, + successful_mappings, + failed_mappings, + parent_instances, + bss_value_extractors, + import_run, + ): + # The product is specified on the first row of each LS. + # Use them to iterate over each LS's rows, looking for quantity_produced field name aliases + ls_product_locs = SpreadsheetLocation.objects.filter( + content_type=ContentType.objects.get_for_model(LivelihoodStrategy), + app_label="baseline", + model="LivelihoodStrategy", + field="product", + find_field=False, + import_run=import_run, + ).order_by("instance_number") + num_lses = len(ls_product_locs) + + # Wealth categories provide the LivelihoodActivity columns + ls_wealth_category_locs = SpreadsheetLocation.objects.filter( + content_type=ContentType.objects.get_for_model(LivelihoodStrategy), + app_label="baseline", + model="LivelihoodStrategy", + field="wealth_category", + find_field=False, + import_run=import_run, + ).order_by("instance_number") + + for strategy_i, strategy_loc in enumerate(ls_product_locs): + sheet = parent_instances[LivelihoodZoneBaseline][0].load_sheet(strategy_loc.sheet_name) + row_count, column_count = sheet.shape + row = strategy_loc.row + + # If there's a subsequent LS on the same sheet, scan col A until that row, otherwise scan to bottom + if strategy_i + 1 < num_lses and ls_product_locs[strategy_i + 1].sheet_name == strategy_loc.sheet_name: + last_row = ls_product_locs[strategy_i + 1].row - 1 + else: + last_row = min(row_count, 3000) + + # locate the field in col A + while row < last_row: + new_spreadsheet_location, successful_mappings, failed_mappings = self.attempt_load_from_cell( + parent_instances=parent_instances, + field_def=field_def, + find_field=True, + sheet_name=strategy_loc.sheet_name, + column=0, + row=row, + bss_value_extractors=bss_value_extractors, + successful_mappings=successful_mappings, + failed_mappings=failed_mappings, + import_run=import_run, + ) + # When we locate a quantity_produced field alias in col A, stop looking and load the values + if new_spreadsheet_location: + break + row += 1 + + # get the value on row `row` for each LA. + # There is 1 WealthGroup per WealthCategory per Community, plus 1 WG per WealthCategory with no Community + for wg_i, wg_loc in enumerate(ls_wealth_category_locs): + new_spreadsheet_location, successful_mappings, failed_mappings = self.attempt_load_from_cell( + parent_instances=parent_instances, + field_def=field_def, + find_field=False, + sheet_name=strategy_loc.sheet_name, + column=wg_loc.column, + row=row, + bss_value_extractors=bss_value_extractors, + successful_mappings=successful_mappings, + failed_mappings=failed_mappings, + import_run=import_run, + ) + return successful_mappings, failed_mappings, parent_instances diff --git a/apps/baseline/models.py b/apps/baseline/models.py index 7fee0845..eb3d29b5 100644 --- a/apps/baseline/models.py +++ b/apps/baseline/models.py @@ -3,7 +3,9 @@ """ import numbers +from collections import defaultdict +import pandas as pd from django.conf import settings from django.contrib.gis.db import models from django.core.exceptions import ValidationError @@ -247,6 +249,14 @@ class Meta: ) ] + def load_sheet(self, sheet_name): + # TODO: Apply overrides. + if not hasattr(self, "cache"): + self.cache = defaultdict(dict) + if sheet_name not in self.cache: + self.cache[sheet_name] = pd.read_excel(self.bss.path, sheet_name=sheet_name, header=None) + return self.cache[sheet_name] + class LivelihoodZoneBaselineCorrection(common_models.Model): """ diff --git a/apps/ingestion/__init__.py b/apps/ingestion/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/apps/ingestion/admin.py b/apps/ingestion/admin.py new file mode 100644 index 00000000..e69de29b diff --git a/apps/ingestion/apps.py b/apps/ingestion/apps.py new file mode 100644 index 00000000..bd7533fb --- /dev/null +++ b/apps/ingestion/apps.py @@ -0,0 +1,8 @@ +from django.apps import AppConfig +from django.utils.translation import gettext_lazy as _ + + +class IngestionConfig(AppConfig): + default_auto_field = "django.db.models.BigAutoField" + name = "ingestion" + verbose_name = _("ingestion") diff --git a/apps/ingestion/decorators.py b/apps/ingestion/decorators.py new file mode 100644 index 00000000..344eebe2 --- /dev/null +++ b/apps/ingestion/decorators.py @@ -0,0 +1,34 @@ +import logging +from inspect import isclass + +from ingestion.exceptions import ImportException +from ingestion.importers import Importer + +logger = logging.getLogger(__name__) + + +def register(importer_class=None): + def report_invalid_importer(klass): + raise ImportException( + f"Attempted to @register an importer class which is not a sub-class of Importer. Class {klass.__name__}." + ) + + def attach_importer_to_model(importer_class): + if isclass(importer_class) and issubclass(importer_class, Importer): + importer_class.Meta.model.importer = importer_class + print( + f"Attached importer class to model. {importer_class.__name__} to {importer_class.Meta.model.__name__}" + ) + return importer_class + else: + report_invalid_importer(importer_class) + + # Usage @register or register(Importer) (ie, without brackets or without @) + if importer_class is not None: + return attach_importer_to_model(importer_class) + + # Usage @register() (ie, with brackets and @) + def inner(importer_class_inner): + return attach_importer_to_model(importer_class) + + return inner diff --git a/apps/ingestion/exceptions.py b/apps/ingestion/exceptions.py new file mode 100644 index 00000000..8a51a30c --- /dev/null +++ b/apps/ingestion/exceptions.py @@ -0,0 +1,10 @@ +class ImportException(Exception): + pass + + +class LookupException(ImportException): + pass + + +class ReferenceDataException(ImportException): + pass diff --git a/apps/ingestion/importers.py b/apps/ingestion/importers.py new file mode 100644 index 00000000..f49db90d --- /dev/null +++ b/apps/ingestion/importers.py @@ -0,0 +1,387 @@ +import importlib +import logging +from abc import ABC +from collections import defaultdict + +import pandas as pd +from django.db import models +from django.forms import model_to_dict + +from baseline.models import LivelihoodZoneBaseline +from ingestion.exceptions import ImportException +from ingestion.mappers import get_fully_qualified_field_name +from ingestion.models import ( + BssValueExtractor, + ImportLog, + ImportRun, + ScanLog, + SpreadsheetLocation, +) + +logger = logging.getLogger(__name__) + + +class Importer(ABC): + class Meta: + model = None + fields = [] + parent_model_fields = [] + dependent_model_fields = [] + + def __init__(self, mapper_factory): + self.mapper_factory = mapper_factory + if not hasattr(self.Meta, "fields"): + self.Meta.fields = [] + if not hasattr(self.Meta, "parent_model_fields"): + self.Meta.parent_model_fields = [] + if not hasattr(self.Meta, "dependent_model_fields"): + self.Meta.dependent_model_fields = [] + + def ingest(self, field_def, parent_instances, import_run=None): + if not isinstance(self, self.Meta.model.importer): + raise ImportException(f"Importer has not been registered. Add @register to {type(self).__name__}.") + + if not import_run: + import_run = ImportRun(livelihood_zone_baseline=parent_instances[LivelihoodZoneBaseline][0]) + import_run.save() + + # a dict {field_name: [spreadsheet_location_1, spreadsheet_location_2, ]} with successfully mapped mapped_values # NOQA: E501 + successful_mappings = defaultdict(list) + # a dict {sheet: {column: {row: {field_name: [{successful_mappings: , parent_instances: , field_def: , sheet_name: , column: , row: , source_value: ,}, ]}}} # NOQA: E501 + failed_mappings = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(list)))) + # a dict {field_name: [bss_value_extractor_1, bss_value_extractor_2, ]} for all fields in Meta.model + bss_value_extractors = self._get_bss_value_extractors() + + # Calls ingest_{field} for each field to populate successful_mappings[field_name] with a list of SpreadsheetLocations. # NOQA: E501 + # Populates SpreadsheetLocation.mapped_value with the code/id/value to save on the Meta.model instance. + successful_mappings, failed_mappings, parent_instances = self.ingest_fields( + successful_mappings, + failed_mappings, + parent_instances, + bss_value_extractors, + import_run, + ) + self.log( + logging.INFO, + f"Mapping report for table {self.Meta.model.__name__}:\n" + f"Successful mappings\n{successful_mappings}\nFailed mappings\n{failed_mappings}", + import_run, + ) + + # successful_mappings now contains a list of SpreadsheetLocation instances per field, with source, parsed and mapped values saved. # NOQA: E501 + # {field_name: [spreadsheet_location_for_field_on_instance_1, spreadsheet_location_for_field_on_instance_2, ... ]} # NOQA: E501 + + # Iterate over the successful_mappings lists, saving a Meta.model instance for each, pulling from parent_instances as nec. # NOQA: E501 + parent_instances = self._save_instances(successful_mappings, parent_instances, import_run) + + # Call dependent (child) model importers, passing in parents so far + parent_instances = self._load_dependent_models(parent_instances, import_run) + + # Delete failed mapping logs after a successful import + ImportLog.objects.filter( + log_level=logging.INFO, + import_run=import_run, + message__startswith="Value parsed but lookup failed", + ).delete() + + # Return instances to the parent importer + return parent_instances + + def ingest_fields( + self, + successful_mappings, + failed_mappings, + parent_instances, + bss_value_extractors, + import_run, + ): + for field_name in self.Meta.fields: + field_def = self.Meta.model._meta.get_field(field_name) + ingest_method = getattr(self, f"ingest_{field_name}", None) + if callable(ingest_method): + successful_mappings, failed_mappings, parent_instances = ingest_method( + field_def, + successful_mappings, + failed_mappings, + parent_instances, + bss_value_extractors[field_name], + import_run, + ) + else: + self.log( + logging.ERROR, + f"Method not found to ingest field {field_def.model.__name__}.{field_def.name}. " + f"Implement ingest_{field_def.name} on {type(self).__name__}.", + import_run, + ) + return successful_mappings, failed_mappings, parent_instances + + def attempt_load_from_cell( + self, + parent_instances, + field_def, + find_field, + sheet_name, + column, + row, + bss_value_extractors, + successful_mappings, + failed_mappings, + import_run, + ): + alias = matching_re = mapped_value = bss_value_extractor = None + sheet = parent_instances[LivelihoodZoneBaseline][0].load_sheet(sheet_name) + + # See if the current cell matches any of the regexes in this field's BssValueExtractors + source_value = sheet.iloc[row, column] + if pd.isna(source_value): + source_value = "" + extractors = [e for e in bss_value_extractors if e.find_field == find_field] + found = self.mapper_factory(field_def, find_field).map(extractors, source_value) + + if found: + bss_value_extractor, matching_re, alias, mapped_value = found + sl = SpreadsheetLocation( + bss_value_extractor=bss_value_extractor, + import_run=import_run, + app_label=field_def.model._meta.app_label, + model=field_def.model.__name__, + field=field_def.name, + find_field=find_field, + sheet_name=sheet_name, + column=column, + row=row, + regex=bss_value_extractor.regex if bss_value_extractor else "", + matching_re=matching_re, + source_value=source_value, + alias=alias, + mapped_value=mapped_value, + instance_number=len(successful_mappings[field_def.name]), + # destination_instance will be set when instance saved by save_instances + ) + sl.save() + successful_mappings[field_def.name].append(sl) + + self.log( + logging.INFO, + f"Lookup Success. " + f"Field {get_fully_qualified_field_name(field_def)}.\n" + f" Sheet {sheet_name} column {column} row {row} BSS\n" + f" source value {source_value} mapped value {mapped_value} " + f" matching re {matching_re} alias {alias}.", + import_run, + ) + else: + # Diagnosis will be aided by seeing what has been scanned and not mapped. + # It will be very common to have missing aliases, but log, even though most will be scanning a wrong cell. + failed_mappings[sheet_name][column][row][field_def.name].append( + { + "successful_mappings": successful_mappings, + "parent_instances": parent_instances, + "field_def": field_def, + "sheet_name": sheet_name, + "column": column, + "row": row, + "source_value": source_value, + } + ) + # These logs are deleted on import success. + self.log( + logging.INFO, + f"Value parsed but lookup failed, for " + f"field {get_fully_qualified_field_name(field_def)}.\n" + f" Sheet {sheet_name} column {column} row {row} BSS\n" + f" lzb {parent_instances[LivelihoodZoneBaseline][0]}\n" + f" bss {parent_instances[LivelihoodZoneBaseline][0].bss.path}\n" + f" source value {source_value}\n" + f" aliases {', '.join((str(s) for s in self.mapper_factory(field_def).all_aliases()))}\n" + f" this field's successful mappings so far {successful_mappings[field_def.name]}.", + import_run, + ) + sl = None # No SpreadsheetLocation to return as mapping not successful + + ScanLog( + bss_value_extractor=bss_value_extractor, + import_run=import_run, + sheet_name=sheet_name, + column=column, + row=row, + app_label=field_def.model._meta.app_label, + model=field_def.model.__name__, + field=field_def.name, + find_field=find_field, + source_value=source_value, + alias=alias, + regex=getattr(bss_value_extractor, "regex", None), + matching_re=matching_re, + mapped_value=mapped_value, + ).save() + + return sl, successful_mappings, failed_mappings + + def _save_instances(self, successful_mappings, parent_instances, import_run): + """ + Instantiates and saves the self.Meta.model instances from the parents and field mappings. + Consumes this model's successful_mappings. + Returns parent_instances for future importers. + """ + if self.Meta.model not in parent_instances: + parent_instances[self.Meta.model] = [] + + # all successful_mappings lists have a SpreadsheeLocation per instance + if not successful_mappings: + self.log(logging.WARNING, f"No successful mappings for {self}.", import_run) + return parent_instances + + num_instances = len(successful_mappings[self.Meta.fields[0]]) + + for key, value in successful_mappings.items(): + if len(value) != num_instances: + self.log( + logging.WARNING, + f"Instance count mismatch. " + f"{num_instances} {self.Meta.fields[0]} mappings but {len(value)} {key} mappings for {self}.", + import_run, + ) + return parent_instances + + for instance_number in range(num_instances): + + instance = self.Meta.model() + instance, parent_instances = self._set_foreign_keys_to_parents(instance, parent_instances) + instance_ss_locs = {field: successful_mappings[field][instance_number] for field in self.Meta.fields} + instance = self._set_instance_values(instance, instance_ss_locs, instance_number) + + # model.report_warnings logs any warnings of suspicious data, eg, calc or missing value + if callable(getattr(instance, "report_warnings", None)): + instance.report_warnings(instance_number, instance_ss_locs.values()) + else: + self.log(logging.INFO, f"No report_warnings method found on {type(instance).__name__}.", import_run) + + try: + instance.save() + # Store join to the saved instance on the instance's SpreadsheetLocations + for ss_loc in instance_ss_locs.values(): + ss_loc.destination_instance = instance # generic foreign key + ss_loc.save() + except Exception: + self.log(logging.WARNING, f"Error occurred saving {instance} instance.", import_run, exc_info=True) + parent_instances[self.Meta.model].append(instance) + return parent_instances + + def _load_dependent_models(self, parent_instances, import_run): + """ + Calls the importers of the dependent models. + """ + for field in self.Meta.dependent_model_fields: + field_def = self.Meta.model._meta.get_field(field) + + # Check this is an incoming relation field: + if not isinstance(field_def, models.fields.reverse_related.ManyToOneRel): + self.log( + logging.ERROR, + f"{type(self).__name__}.Meta.dependent_model_fields must be a list of " + f"reverse_related.ManyToOneRel fields (foreign keys on other models to this one).", + import_run, + ) + # Check that the related model has an importer registered: + if not issubclass(getattr(field_def.related_model, "importer", None), Importer): + # Perhaps the importer.py hasn't been imported (which runs the @register decorator): + try: + importlib.import_module(f"{field_def.related_model._meta.app_label}.importers") + except ImportError: + self.log( + logging.ERROR, + f"Module not found: {field_def.related_model._meta.app_label}.importers", + import_run, + ) + if not issubclass(getattr(field_def.related_model, "importer", None), Importer): + self.log( + logging.ERROR, + f"Importer not found for dependent model. {field_def.related_model.__name__}Importer class " + f"needs declaring and registering.", + import_run, + ) + # Instantiate importer here. All cache and state are discarded after this import run. + importer = field_def.related_model.importer(self.mapper_factory) + parent_instances = importer.ingest(field_def, parent_instances, import_run) + return parent_instances + + def _set_foreign_keys_to_parents(self, instance, parent_instances): + for parent_field in self.Meta.parent_model_fields: + field_def = self.Meta.model._meta.get_field(parent_field) + setattr(instance, parent_field, parent_instances[field_def.related_model][0]) + return instance, parent_instances + + def _set_instance_values(self, instance, instance_values, instance_number): + # set values parsed and mapped from bss, recorded while scanning as SpreadsheetLocation instances + for field in self.Meta.fields: + ss_loc = instance_values[field] + setattr(instance, field, ss_loc.mapped_value) + return instance + + def _get_bss_value_extractors(self): + queryset = BssValueExtractor.objects.filter( + app_label=self.Meta.model._meta.app_label, + model=self.Meta.model.__name__, + ) + bss_value_extractors = defaultdict(list) + for bss_value_extractor in queryset: + bss_value_extractors[bss_value_extractor.field].append(bss_value_extractor) + return bss_value_extractors + + @staticmethod + def log( + log_level, + message, + import_run, + instance=None, + sheet_name=None, + column=None, + row=None, + app_label=None, + model=None, + field=None, + source_value=None, + matching_re=None, + mapped_value=None, + bss_value_extractor=None, + regex=None, + aliases=None, + successful_mappings=None, + failed_mappings=None, + exc_info=None, + ): + logger.log(log_level, message, exc_info=exc_info) + + # Prepare model instances for saving to JSONField + for param in (instance, bss_value_extractor): + if isinstance(param, models.Model): + instance = model_to_dict(instance) + successful_mappings_dicts = defaultdict(list) + if successful_mappings: + for field_name, spreadsheet_locations in successful_mappings.items(): + for spreadsheet_location in spreadsheet_locations: + successful_mappings_dicts[field_name] = model_to_dict(spreadsheet_location) + + # TODO: Add traceback field + ImportLog( + import_run=import_run, + log_level=log_level, + message=message, + instance=instance, + sheet_name=sheet_name, + column=column, + row=row, + app_label=app_label, + model=model, + field=field, + source_value=source_value, + matching_re=matching_re, + mapped_value=mapped_value, + bss_value_extractor=bss_value_extractor, + regex=regex, + aliases=aliases, + successful_mappings=successful_mappings_dicts, + failed_mappings=failed_mappings, + ).save() diff --git a/apps/ingestion/management/__init__.py b/apps/ingestion/management/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/apps/ingestion/management/commands/__init__.py b/apps/ingestion/management/commands/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/apps/ingestion/management/commands/load_from_bss.py b/apps/ingestion/management/commands/load_from_bss.py new file mode 100644 index 00000000..4b74857b --- /dev/null +++ b/apps/ingestion/management/commands/load_from_bss.py @@ -0,0 +1,84 @@ +import datetime +import logging +import sys +from pathlib import Path + +from django.conf import settings +from django.core.management import call_command +from django.core.management.templates import TemplateCommand + +from baseline.importers import LivelihoodZoneBaselineImporter +from baseline.models import LivelihoodStrategy, LivelihoodZoneBaseline +from ingestion.mappers import MapperFactory + +logger = logging.getLogger(__name__) + + +class Command(TemplateCommand): + help = "Loads a BSS into the database." + + def add_arguments(self, parser): + parser.add_argument( + "pk", + help=( + "Optional: Livelihood Zone Baseline ID to import. If omitted, loads the most recently " + "modified Livelihood Zone Baseline." + ), + nargs="*", + default="", + ) + # TODO: --verbose? + + def handle(self, pk, **options): + # This will be called by an 'Refresh from BSS' button on the LivelihoodZoneBaseline admin detail screen. + # parent_instances will contain the root, saved LivelihoodZoneBaseline parent with spreadsheet uploaded. + # For now, check for lzb id on command line or use most recently modified. + # (This is just an example, would need to initialize Django for this to work directly.) + if sys.argv and sys.argv[0].isdigit(): + root_lzb = LivelihoodZoneBaseline.objects.get(pk=int(sys.argv[0])) + else: + root_lzb = LivelihoodZoneBaseline.objects.order_by("-modified").first() + parent_instances = { + LivelihoodZoneBaseline: [root_lzb], + } + field_def = LivelihoodStrategy._meta.get_field("livelihood_zone_baseline") + + # Instantiate importer and mapper factory here. + # All cache and state are discarded after this import run. + importer = LivelihoodZoneBaselineImporter(MapperFactory()) + + try: + importer.ingest(field_def, parent_instances) + self.dump_run_db(root_lzb.pk) + except Exception as e: + logger.warning(f"Error occurred ingesting {root_lzb} id {root_lzb.pk}.", exc_info=e) + self.dump_run_db(root_lzb.pk) + raise # so runner can detect success or failure + + @staticmethod + def dump_run_db(lzb_pk): + """ + Gives full traceability for a particular run. + Also removes the risk of accidentally flushing aliases entered direct to the db while working on a BSS. + """ + Path("import_run_data").mkdir(parents=True, exist_ok=True) + call_command( + "dumpdata", + *settings.PROJECT_APPS, + # The below tables contain aliases, but decided to dump everything as it is only a few hundred K gzipped: + # "common.ClassifiedProduct", + # "common.UnitOfMeasure", + # "ingestion.BssValueExtractor", + # "ingestion.SpreadsheetLocation", + # "ingestion.ChoiceAlias", + # "ingestion.FieldNameAlias", + # "metadata.LivelihoodCategory", + # "metadata.WealthCharacteristic", + # "metadata.SeasonalActivityType", + # "metadata.WealthGroupCategory", + # "metadata.HazardCategory", + # "metadata.Market", + # "metadata.Season", + indent=4, + output=f"import_run_data/refdata_{datetime.datetime.now().isoformat()}_lzb_{lzb_pk}.json.gz", + ) diff --git a/apps/ingestion/mappers.py b/apps/ingestion/mappers.py new file mode 100644 index 00000000..488d56d0 --- /dev/null +++ b/apps/ingestion/mappers.py @@ -0,0 +1,178 @@ +import abc +import logging +from typing import Iterable + +from django.conf import settings +from django.db import models +from django.utils import translation +from django.utils.encoding import force_str + +from ingestion.models import ChoiceAlias, FieldNameAlias + +logger = logging.getLogger(__name__) + + +def get_fully_qualified_field_name(field_def): + return f"{field_def.model._meta.app_label}.{field_def.model.__name__}.{field_def.name}" + + +class Mapper(abc.ABC): + def __init__(self, field_def): + self.field_def = field_def + # self.cache is a dict {alias: mapped_value}. It is cached per import. + self.cache = {} + self.populate_lookup_cache() + + def map(self, bss_value_extractors, source_value): + # See if the current cell matches any of the regexes in this field's BssValueExtractors + sought_value = self.prepare_value(source_value) + if not bss_value_extractors: + for alias, value in self.cache.items(): + if sought_value == self.prepare_value(alias): + return None, "", alias, value # a regex empty string is equivalent to no mapping + for bss_value_extractor in bss_value_extractors: + for alias, value in self.cache.items(): + matching_re = bss_value_extractor.match(sought_value, self.prepare_value(alias)) + if matching_re: + logger.info( + f"Successfully matched source value {source_value} " + f"regex {matching_re} extractor {bss_value_extractor}." + ) + return bss_value_extractor, matching_re, alias, value + return None + + def add_alias_to_cache(self, parsed_value, mapped_value, add_variants=False): + lookup_value = self.prepare_value(parsed_value) + if lookup_value in self.cache: + logger.error( + f"Duplicate lookup value. {get_fully_qualified_field_name(self.field_def)} " + f"source value {lookup_value}." + ) + self.cache[lookup_value] = mapped_value + if add_variants and isinstance(parsed_value, str): + for variant in (self.camel_case_to_spaced, self.snake_case_to_spaced): + space_separated = self.prepare_value(variant(parsed_value)) + if space_separated and space_separated not in self.cache and space_separated != lookup_value: + self.add_alias_to_cache(space_separated, mapped_value) + + def all_aliases(self): + return (f"{alias}=>{mapped}" for alias, mapped in self.cache.items()) + + @staticmethod + def camel_case_to_spaced(code): + # If we have a code in camel case, eg, "MeatProduction", treat "meat production" as an alias. + + # if it is a string of mixed case and no spaces + if " " not in code and any(x.isupper() for x in code) and any(x.islower() for x in code): + return "".join(" " + c.lower() if c.isupper() else c for c in code).lstrip() + + @staticmethod + def snake_case_to_spaced(code): + # If we have a code in camel case, eg, "meat_production", treat "meat production" as an alias. + # Returns None if the string does not look like snake case or does not produce an alternative alias. + if " " not in code and "_" in code: + return code.replace("_", " ") + + @staticmethod + def prepare_value(parsed_value): + translated = force_str(parsed_value, strings_only=True) + return Mapper.prepare_string(translated) if isinstance(translated, str) else parsed_value + + @staticmethod + def prepare_string(parsed_value): + return parsed_value.strip().rstrip(":").lower() + + def populate_lookup_cache(self): + pass + + +class ForeignKeyMapper(Mapper): + """ + Use self.mapper_factory(field_def) so that aliases are cached for the import. + + Override populate_lookup_cache if fields other than aliases, name (all languages) and code should also be mapped. + + Nb. All model instances are garbage collected and just their key is stored in the cache. + """ + + def populate_lookup_cache(self): + queryset = self.field_def.related_model.objects.all() + for instance in queryset: + if isinstance(getattr(instance, "aliases", None), Iterable): + for alias in instance.aliases: + self.add_alias_to_cache(alias, instance.pk) + if isinstance(getattr(instance, "code", None), str) and instance.code: + self.add_alias_to_cache(instance.code, instance.pk, add_variants=True) + # name_en, name_fr, etc. + for language_code, language_name in settings.LANGUAGES: + lookup = getattr(instance, f"name_{language_code}", None) + if isinstance(lookup, str) and lookup.strip(): + self.add_alias_to_cache(instance.code, instance.pk) + + +class ChoiceMapper(Mapper): + """Use self.mapper_factory(field_def) so that aliases are cached for the import.""" + + def populate_lookup_cache(self): + for language_code, language_name in settings.LANGUAGES: + translation.activate(language_code) + for value, label in self.field_def.choices: + if label.strip(): # if translation returns nothing, don't use it + self.add_alias_to_cache(label, value) + if self.prepare_value(label) != self.prepare_value(value): # recognize choice code + self.add_alias_to_cache(value, value, add_variants=True) + translation.activate(settings.LANGUAGE_CODE) + + queryset = ChoiceAlias.objects.filter( + app_label=self.field_def.model._meta.app_label, + model=self.field_def.model.__name__, + field=self.field_def.name, + ) + for instance in queryset: + self.add_alias_to_cache(instance.alias, instance.code) + + +class FieldNameMapper(Mapper): + """ + Use self.mapper_factory(field_def, find_field=True) so that aliases are cached for the import. + Used when searching for field name aliases, + eg, looking for a milking_animals label for baseline.MilkProduction in Data column A. + """ + + def populate_lookup_cache(self): + queryset = FieldNameAlias.objects.filter( + app_label=self.field_def.model._meta.app_label, + model=self.field_def.model.__name__, + field=self.field_def.name, + ) + for instance in queryset: + self.add_alias_to_cache(instance.alias, ".".join((instance.app_label, instance.model, instance.field))) + + +class SimpleValueMapper(Mapper): + def map(self, bss_value_extractors, source_value): + return None, "", source_value, source_value + + +class MapperFactory: + """ + Permits caching of lookups for duration of import, rather than instantiating on each importer or passing them. + """ + + def __init__(self): + self.cache = {} + + def __call__(self, field_def, find_field=False): + key = (get_fully_qualified_field_name(field_def), find_field) + if key not in self.cache: + if find_field: + # FieldNameMapper is used for searching for field name aliases, + # eg, looking for a milking_animals label for baseline.MilkProduction in Data column A. + self.cache[key] = FieldNameMapper(field_def) + elif isinstance(field_def, models.ForeignKey): + self.cache[key] = ForeignKeyMapper(field_def) + elif isinstance(getattr(field_def, "choices", None), Iterable): + self.cache[key] = ChoiceMapper(field_def) + else: + self.cache[key] = SimpleValueMapper(field_def) + return self.cache[key] diff --git a/apps/ingestion/migrations/0001_initial.py b/apps/ingestion/migrations/0001_initial.py new file mode 100644 index 00000000..f4668f02 --- /dev/null +++ b/apps/ingestion/migrations/0001_initial.py @@ -0,0 +1,361 @@ +import django.utils.timezone +import model_utils.fields +from django.db import migrations, models + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [ + ("baseline", "0006_livelihoodzone_alternate_code"), + ("contenttypes", "0002_remove_content_type_name"), + ] + + operations = [ + migrations.CreateModel( + name="BssValueExtractor", + fields=[ + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ( + "created", + model_utils.fields.AutoCreatedField( + default=django.utils.timezone.now, editable=False, verbose_name="created" + ), + ), + ( + "modified", + model_utils.fields.AutoLastModifiedField( + default=django.utils.timezone.now, editable=False, verbose_name="modified" + ), + ), + ("app_label", models.CharField()), + ("model", models.CharField()), + ("field", models.CharField()), + ( + "find_field", + models.BooleanField( + default=False, + help_text="True if the scan was looking for a field name alias, eg, 'Quantity Produced' for LivelihoodActivity.quantity_produced, as opposed to a value to be stored in that field.", + ), + ), + ( + "regex", + models.CharField( + blank=True, + help_text='Put the text ALIAS in the regex where the alias should be injected. All occurrences will be replaced. To match any chars, `(`, maybe spaces, alias, maybe spaces, `)`, any chars, use: ".+\\(\\s*ALIAS\\s*\\).*". It will return None and not log anything if no value is extracted. It will concatenate multiple matches if found and log a warning.If no regex is needed, then no BssValueExtractor is needed, and the full cell contents will be matched.(Identical behaviour to regex being an empty string.)', + ), + ), + ], + options={ + "verbose_name": "BSS Value Extractor", + "verbose_name_plural": "BSS Value Extractors", + }, + ), + migrations.CreateModel( + name="ChoiceAlias", + fields=[ + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ( + "created", + model_utils.fields.AutoCreatedField( + default=django.utils.timezone.now, editable=False, verbose_name="created" + ), + ), + ( + "modified", + model_utils.fields.AutoLastModifiedField( + default=django.utils.timezone.now, editable=False, verbose_name="modified" + ), + ), + ("app_label", models.CharField()), + ("model", models.CharField()), + ("field", models.CharField()), + ("code", models.CharField()), + ("alias", models.CharField(blank=True)), + ], + options={ + "verbose_name": "Choice Alias", + "verbose_name_plural": "Choice Aliases", + }, + ), + migrations.CreateModel( + name="FieldNameAlias", + fields=[ + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ( + "created", + model_utils.fields.AutoCreatedField( + default=django.utils.timezone.now, editable=False, verbose_name="created" + ), + ), + ( + "modified", + model_utils.fields.AutoLastModifiedField( + default=django.utils.timezone.now, editable=False, verbose_name="modified" + ), + ), + ("app_label", models.CharField()), + ("model", models.CharField()), + ("field", models.CharField()), + ("alias", models.CharField(blank=True)), + ], + options={ + "verbose_name": "Field Name Alias", + "verbose_name_plural": "Field Name Aliases", + }, + ), + migrations.CreateModel( + name="SpreadsheetLocation", + fields=[ + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ( + "created", + model_utils.fields.AutoCreatedField( + default=django.utils.timezone.now, editable=False, verbose_name="created" + ), + ), + ( + "modified", + model_utils.fields.AutoLastModifiedField( + default=django.utils.timezone.now, editable=False, verbose_name="modified" + ), + ), + ("app_label", models.CharField()), + ("model", models.CharField()), + ("field", models.CharField()), + ( + "find_field", + models.BooleanField( + default=False, + help_text="True if the scan was looking for a field name alias, eg, 'Quantity Produced' for LivelihoodActivity.quantity_produced, as opposed to a value to be stored in that field.", + ), + ), + ("sheet_name", models.CharField()), + ("column", models.PositiveIntegerField()), + ("row", models.PositiveIntegerField()), + ("regex", models.CharField(blank=True)), + ( + "alias", + models.CharField( + blank=True, help_text="Alias that successfully matched the source value.", null=True + ), + ), + ("source_value", models.CharField(blank=True)), + ("matching_re", models.CharField(blank=True)), + ( + "mapped_value", + models.JSONField( + blank=True, + help_text="A string, integer, decimal, choice key, string id or integer id.", + null=True, + ), + ), + ("instance_number", models.PositiveIntegerField(blank=True, null=True)), + ("object_id", models.CharField(blank=True, null=True)), + ( + "bss_value_extractor", + models.ForeignKey( + blank=True, + help_text="The extractor used to parse the cell value. Null if no extractor was used and the whole value was used.", + null=True, + on_delete=django.db.models.deletion.DO_NOTHING, + to="ingestion.bssvalueextractor", + ), + ), + ( + "content_type", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.CASCADE, + to="contenttypes.contenttype", + ), + ), + ( + "livelihood_zone_baseline", + models.ForeignKey( + on_delete=django.db.models.deletion.DO_NOTHING, to="baseline.livelihoodzonebaseline" + ), + ), + ], + options={ + "verbose_name": "Spreadsheet Location", + "verbose_name_plural": "Spreadsheet Locations", + }, + ), + migrations.CreateModel( + name="ScanLog", + fields=[ + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ( + "created", + model_utils.fields.AutoCreatedField( + default=django.utils.timezone.now, editable=False, verbose_name="created" + ), + ), + ( + "modified", + model_utils.fields.AutoLastModifiedField( + default=django.utils.timezone.now, editable=False, verbose_name="modified" + ), + ), + ("sheet_name", models.CharField()), + ("column", models.PositiveIntegerField()), + ("row", models.PositiveIntegerField()), + ("app_label", models.CharField()), + ("model", models.CharField()), + ("field", models.CharField()), + ( + "find_field", + models.BooleanField( + default=False, + help_text="True if the scan was looking for a field name alias, eg, 'Quantity Produced' for LivelihoodActivity.quantity_produced, as opposed to a value to be stored in that field.", + ), + ), + ("source_value", models.CharField(blank=True)), + ( + "alias", + models.CharField( + blank=True, help_text="Alias that successfully matched the source value.", null=True + ), + ), + ( + "regex", + models.CharField( + blank=True, + help_text="The Regex that successfully matched the source value. Duplicated here in case regex is updated on BssVlueExtractor.", + null=True, + ), + ), + ( + "matching_re", + models.CharField( + blank=True, + help_text="The regex with the alias injected that successfully matched the source value.", + null=True, + ), + ), + ( + "mapped_value", + models.JSONField( + blank=True, + help_text="The value, such as choice value, foreign key or plain value, that the source value matched. A string, integer, decimal, choice key, string id or integer id.", + null=True, + ), + ), + ( + "bss_value_extractor", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.DO_NOTHING, + to="ingestion.bssvalueextractor", + ), + ), + ( + "livelihood_zone_baseline", + models.ForeignKey( + on_delete=django.db.models.deletion.DO_NOTHING, to="baseline.livelihoodzonebaseline" + ), + ), + ], + options={ + "abstract": False, + }, + ), + migrations.CreateModel( + name="ImportRun", + fields=[ + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ( + "created", + model_utils.fields.AutoCreatedField( + default=django.utils.timezone.now, editable=False, verbose_name="created" + ), + ), + ( + "modified", + model_utils.fields.AutoLastModifiedField( + default=django.utils.timezone.now, editable=False, verbose_name="modified" + ), + ), + ( + "livelihood_zone_baseline", + models.ForeignKey( + on_delete=django.db.models.deletion.DO_NOTHING, to="baseline.livelihoodzonebaseline" + ), + ), + ], + options={ + "abstract": False, + }, + ), + migrations.CreateModel( + name="ImportLog", + fields=[ + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ( + "created", + model_utils.fields.AutoCreatedField( + default=django.utils.timezone.now, editable=False, verbose_name="created" + ), + ), + ( + "modified", + model_utils.fields.AutoLastModifiedField( + default=django.utils.timezone.now, editable=False, verbose_name="modified" + ), + ), + ( + "log_level", + models.IntegerField( + choices=[ + (0, "Not Set"), + (10, "Debug"), + (20, "Info"), + (30, "Warn"), + (40, "Error"), + (50, "Critical"), + ], + default=0, + verbose_name="Log Level", + ), + ), + ("message", models.TextField()), + ("instance", models.JSONField(blank=True, null=True)), + ("sheet_name", models.CharField(blank=True, null=True)), + ("column", models.PositiveIntegerField(blank=True, null=True)), + ("row", models.PositiveIntegerField(blank=True, null=True)), + ("app_label", models.CharField(blank=True, null=True)), + ("model", models.CharField(blank=True, null=True)), + ("field", models.CharField(blank=True, null=True)), + ("source_value", models.CharField(blank=True, null=True)), + ("matching_re", models.CharField(blank=True, null=True)), + ( + "mapped_value", + models.JSONField( + blank=True, + help_text="The value, such as choice value, foreign key or plain value, that the source value matched. A string, integer, decimal, choice key, string id or integer id.", + null=True, + ), + ), + ("regex", models.CharField(blank=True, null=True)), + ("aliases", models.JSONField(blank=True, null=True)), + ("successful_mappings", models.JSONField(blank=True, null=True)), + ("failed_mappings", models.JSONField(blank=True, null=True)), + ( + "bss_value_extractor", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.DO_NOTHING, + to="ingestion.bssvalueextractor", + ), + ), + ], + options={ + "abstract": False, + }, + ), + ] diff --git a/apps/ingestion/migrations/0002_join_logs_to_import_run.py b/apps/ingestion/migrations/0002_join_logs_to_import_run.py new file mode 100644 index 00000000..c5a19df1 --- /dev/null +++ b/apps/ingestion/migrations/0002_join_logs_to_import_run.py @@ -0,0 +1,59 @@ +import django.db.models.deletion +from django.db import migrations, models + + +def forwards(apps, schema_editor): + # Delete all logs, because join to ImportRun is non-nullable. + ScanLog = apps.get_model("ingestion", "ScanLog") + ScanLog.objects.all().delete() + ImportLog = apps.get_model("ingestion", "ImportLog") + ImportLog.objects.all().delete() + + +def backwards(apps, schema_editor): + pass + + +class Migration(migrations.Migration): + + # TODO: Squash migrations so that the defaults and table truncation are not necessary. + + dependencies = [ + ("ingestion", "0001_initial"), + ] + + operations = [ + migrations.RunPython(forwards, backwards), + migrations.RemoveField( + model_name="scanlog", + name="livelihood_zone_baseline", + ), + migrations.RemoveField( + model_name="spreadsheetlocation", + name="livelihood_zone_baseline", + ), + migrations.AddField( + model_name="spreadsheetlocation", + name="import_run", + field=models.ForeignKey( + default=1, on_delete=django.db.models.deletion.DO_NOTHING, to="ingestion.importrun" + ), + preserve_default=False, + ), + migrations.AddField( + model_name="importlog", + name="import_run", + field=models.ForeignKey( + default=1, on_delete=django.db.models.deletion.DO_NOTHING, to="ingestion.importrun" + ), + preserve_default=False, + ), + migrations.AddField( + model_name="scanlog", + name="import_run", + field=models.ForeignKey( + default=1, on_delete=django.db.models.deletion.DO_NOTHING, to="ingestion.importrun" + ), + preserve_default=False, + ), + ] diff --git a/apps/ingestion/migrations/__init__.py b/apps/ingestion/migrations/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/apps/ingestion/models.py b/apps/ingestion/models.py new file mode 100644 index 00000000..44f11dcd --- /dev/null +++ b/apps/ingestion/models.py @@ -0,0 +1,255 @@ +import logging +import re + +from django.contrib.contenttypes.fields import GenericForeignKey +from django.contrib.contenttypes.models import ContentType +from django.db import models +from django.utils.translation import gettext_lazy as _ + +from common.models import Model + +logger = logging.getLogger(__name__) + + +class BssValueExtractor(Model): + app_label = models.CharField() + model = models.CharField() + field = models.CharField() + find_field = models.BooleanField( + default=False, + help_text=_( + "True if the scan was looking for a field name alias, eg, 'Quantity Produced' for " + "LivelihoodActivity.quantity_produced, as opposed to a value to be stored in that field." + ), + ) + regex = models.CharField( + blank=True, + help_text=_( + "Put the text ALIAS in the regex where the alias should be injected. " + "For regexes with multiple slots for multipart aliases, put the text ALIAS in the regex multiple times, " + "and use || in the alias to separate the alias parts. The number of parts must match. " + 'To match any chars, `(`, maybe spaces, alias, maybe spaces, `)`, any chars, use: ".+\(\s*ALIAS\s*\).*". ' + "It will not match if no value is extracted. " + "If no regex is needed, then no BssValueExtractor is needed, and the full cell contents will be matched." + "(Identical behaviour to regex being an empty string.)" + ), + ) + + # "Question mark cardinality suffix makes group non-greedy, for example the 3rd and final characters in the below example. " # NOQA: E501 + # 're.findall(".*?([a-c]+).*?", "fsgdabcbafd") returns ["abcba"]' + # "BssValueExtractor.parse will log a warning if more than one substring is extracted and return only the first. " # NOQA: E501 + + class Meta: + verbose_name = _("BSS Value Extractor") + verbose_name_plural = _("BSS Value Extractors") + + def match(self, source_value, sought_alias): + num_alias_slots = self.regex.count("ALIAS") + if num_alias_slots == 1: + escaped_alias = re.escape(sought_alias) + injected_re = self.regex.replace("ALIAS", escaped_alias) + else: + # regex expects a multipart alias - check if alias is suitable, split on || substring + aliases = sought_alias.split("||") + if num_alias_slots == len(aliases): + injected_re = self.regex + for alias in aliases: + escaped_alias = re.escape(alias) + injected_re = injected_re.replace("ALIAS", escaped_alias, 1) + else: + # regex expects a multipart alias, but alias has wrong number of parts, so this is not a match + return None + + candidates = re.findall(injected_re, source_value) + if not candidates: + # Most times this will be scanning an irrelevant cell looking for a match. + # logger.info( + # f"No value parsed for {self.app_label}.{self.model}.{self.field} for source value {source_value}." + # ) + return None + return injected_re + + +class ImportRun(Model): + livelihood_zone_baseline = models.ForeignKey("baseline.LivelihoodZoneBaseline", on_delete=models.DO_NOTHING) + + +class SpreadsheetLocation(Model): + bss_value_extractor = models.ForeignKey( + BssValueExtractor, + on_delete=models.DO_NOTHING, + null=True, + blank=True, + help_text=_( + "The extractor used to parse the cell value. Null if no extractor was used and the whole value was used." + ), + ) + app_label = models.CharField() + model = models.CharField() + field = models.CharField() + find_field = models.BooleanField( + default=False, + help_text=_( + "True if the scan was looking for a field name alias, eg, 'Quantity Produced' for " + "LivelihoodActivity.quantity_produced, as opposed to a value to be stored in that field." + ), + ) + import_run = models.ForeignKey(ImportRun, on_delete=models.DO_NOTHING) + sheet_name = models.CharField() + column = models.PositiveIntegerField() + row = models.PositiveIntegerField() + regex = models.CharField(blank=True) # duplicate here on creation, in case regex is updated on BssValueExtractor + alias = models.CharField( + null=True, + blank=True, + help_text=_("Alias that successfully matched the source value."), + ) + + source_value = models.CharField(blank=True) # empty string may ingest as zero + matching_re = models.CharField(blank=True) # TODO: regex is without alias injected, matching_re is with? + mapped_value = models.JSONField( + null=True, blank=True, help_text=_("A string, integer, decimal, choice key, string id or integer id.") + ) + + # The below are all only populated after the destination instance has been saved: + instance_number = models.PositiveIntegerField(null=True, blank=True) + + # The below three are all set with just: destination_instance=any_django_instance + # It stores the instance that this spreadsheet value ends up being saved on + content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE, null=True, blank=True) + object_id = models.CharField(null=True, blank=True) # permits ints or char keys + destination_instance = GenericForeignKey("content_type", "object_id") + + class Meta: + verbose_name = _("Spreadsheet Location") + verbose_name_plural = _("Spreadsheet Locations") + + def __str__(self): + return " ".join( + f"{prop}:{getattr(self, prop)}" + for prop in ( + "sheet_name", + "column", + "row", + "source_value", + "mapped_value", + ) + if getattr(self, prop, None) + ) + + +class ChoiceAlias(Model): + app_label = models.CharField() + model = models.CharField() + field = models.CharField() + code = models.CharField() + alias = models.CharField(blank=True) + + class Meta: + verbose_name = _("Choice Alias") + verbose_name_plural = _("Choice Aliases") + + +class FieldNameAlias(Model): + app_label = models.CharField() + model = models.CharField() + field = models.CharField() + alias = models.CharField(blank=True) + + class Meta: + verbose_name = _("Field Name Alias") + verbose_name_plural = _("Field Name Aliases") + + +class LogLevel(models.IntegerChoices): + NOTSET = 0, _("Not Set") + DEBUG = 10, _("Debug") + INFO = 20, _("Info") + WARN = 30, _("Warn") + ERROR = 40, _("Error") + CRITICAL = 50, _("Critical") + + +class ImportLog(Model): + """ + Enables us to, for example, extract all logs for a specific cell with a problem value, + or all logs for a specific field, and see the state at the time of logging. + """ + + import_run = models.ForeignKey(ImportRun, on_delete=models.DO_NOTHING) + log_level = models.IntegerField(verbose_name=_("Log Level"), choices=LogLevel.choices, default=LogLevel.NOTSET) + message = models.TextField() + instance = models.JSONField(null=True, blank=True) + sheet_name = models.CharField(null=True, blank=True) + column = models.PositiveIntegerField(null=True, blank=True) + row = models.PositiveIntegerField(null=True, blank=True) + app_label = models.CharField(null=True, blank=True) + model = models.CharField(null=True, blank=True) + field = models.CharField(null=True, blank=True) + source_value = models.CharField(null=True, blank=True) + matching_re = models.CharField(null=True, blank=True) + mapped_value = models.JSONField( + null=True, + blank=True, + help_text=_( + "The value, such as choice value, foreign key or plain value, that the source value matched. " + "A string, integer, decimal, choice key, string id or integer id." + ), + ) + bss_value_extractor = models.ForeignKey(BssValueExtractor, on_delete=models.DO_NOTHING, null=True, blank=True) + regex = models.CharField(null=True, blank=True) + aliases = models.JSONField(null=True, blank=True) + successful_mappings = models.JSONField(null=True, blank=True) # will need to monitor space used + failed_mappings = models.JSONField(null=True, blank=True) # will need to monitor space used + + +class ScanLog(Model): + """ + Tracks every attempt to load each value from each cell. + Successful scans have the resulting alias, regex, injected regex and mapped value. + """ + + import_run = models.ForeignKey(ImportRun, on_delete=models.DO_NOTHING) + sheet_name = models.CharField() + column = models.PositiveIntegerField() + row = models.PositiveIntegerField() + app_label = models.CharField() + model = models.CharField() + field = models.CharField() + find_field = models.BooleanField( + default=False, + help_text=_( + "True if the scan was looking for a field name alias, eg, 'Quantity Produced' for " + "LivelihoodActivity.quantity_produced, as opposed to a value to be stored in that field." + ), + ) + source_value = models.CharField(blank=True) + + # Only populated for successful scans - alias used, regex with alias injected, original regex and mapped value: + bss_value_extractor = models.ForeignKey(BssValueExtractor, on_delete=models.DO_NOTHING, null=True, blank=True) + alias = models.CharField( + null=True, + blank=True, + help_text=_("Alias that successfully matched the source value."), + ) + regex = models.CharField( + null=True, + blank=True, + help_text=_( + "The Regex that successfully matched the source value. " + "Duplicated here in case regex is updated on BssVlueExtractor." + ), + ) + matching_re = models.CharField( + null=True, + blank=True, + help_text=_("The regex with the alias injected that successfully matched the source value."), + ) + mapped_value = models.JSONField( + null=True, + blank=True, + help_text=_( + "The value, such as choice value, foreign key or plain value, that the source value matched. " + "A string, integer, decimal, choice key, string id or integer id." + ), + ) diff --git a/apps/ingestion/tests.py b/apps/ingestion/tests.py new file mode 100644 index 00000000..e69de29b diff --git a/apps/ingestion/views.py b/apps/ingestion/views.py new file mode 100644 index 00000000..e69de29b diff --git a/apps/metadata/models.py b/apps/metadata/models.py index af75c7b0..9164b35d 100644 --- a/apps/metadata/models.py +++ b/apps/metadata/models.py @@ -36,7 +36,10 @@ class ReferenceData(common_models.Model): blank=True, null=True, verbose_name=_("aliases"), - help_text=_("A list of alternate names for the object."), + help_text=_( + "A list of alternate names for the object. If an alias has multiple parts, for a regex that has multiple " + "slots, use || in the alias to separate the alias parts. The number of parts must match." + ), ) def calculate_fields(self): diff --git a/hea/settings/base.py b/hea/settings/base.py index 46e38d6c..16cf2466 100644 --- a/hea/settings/base.py +++ b/hea/settings/base.py @@ -106,7 +106,12 @@ "django_extensions", "ddtrace.contrib.django", ] -PROJECT_APPS = ["common", "metadata", "baseline"] +PROJECT_APPS = [ + "common", + "metadata", + "baseline", + "ingestion", +] INSTALLED_APPS = EXTERNAL_APPS + PROJECT_APPS MIDDLEWARE = [