diff --git a/cleverdict/cleverdict.py b/cleverdict/cleverdict.py index c2841f2..31c1e73 100644 --- a/cleverdict/cleverdict.py +++ b/cleverdict/cleverdict.py @@ -1,13 +1,16 @@ -import os -import json +from __future__ import annotations + +import csv import inspect -import keyword import itertools +import json +import keyword +import os +import types +from datetime import datetime from pathlib import Path from pprint import pprint -from datetime import datetime -import types -import inspect +from typing import Iterable, List, Union """ Change log @@ -300,6 +303,35 @@ def make_set(arg): return make_set(ignore) | make_set(exclude) | CleverDict.ignore, only +def _preprocess_csv(file_path: Union[str, Path], delimiter: str): + """Validates a CSV file and returns the data as a list of lists""" + + if isinstance(file_path, str): + file_path = Path(file_path) + if not file_path.exists(): + raise ValueError("File not found") + + with open(file_path, "r", encoding="utf-8") as file: + reader = csv.reader(file, delimiter=delimiter) + csv_data = list(reader) + if not csv_data: + raise ValueError("File is empty") + + return csv_data + + +def _write_csv(file_path: Path, data: List[CleverDict], delimiter: str) -> Union[Path, None]: + """Write a list of CleverDict objects to a csv file""" + with open(file_path, 'w', newline='') as file: + writer = csv.DictWriter(file, fieldnames=data[0].keys(), delimiter=delimiter) + writer.writeheader() + writer.writerows(data) + + if file_path.exists(): + return file_path.absolute() + return None + + class Expand: def __init__(self, ok): """ @@ -1045,6 +1077,213 @@ def from_json( else: return cls(data, **kwargs) + @classmethod + def from_csv( + cls, + file_path: Union[Path, str] = None, + skip_rows: int = None, + nrows: int = None, + header: bool = True, + names: list = [], + delimiter: str = ',', + ignore: Union[str, list] = None, + exclude: Union[str, list] = None, + only: Union[str, list] = None + ) -> CleverDict: + """Creates a new CleverDict object from a CSV file. + Each row is also encoded as a CleverDict object with the key being the row number starting from zero + + Parameters: + ----------- + file_path: str | pathlib.Path + The path to the csv file + + skip_rows: int + Number of rows to skip from the beginning of the file + Does not count the header if header is True + + nrows: int + Number of rows to read + Does not count the header if header is True + + header: bool + Parses the first row of the csv file as headers + + names: list + List of names to be used as keys instead of csv headers + Cannot be used if header is True + The number of items in list must be the same as number of columns + + delimiter: str + The delimiter used in the csv file + + ignore: str | list + Any keys to ignore from output. + + exclude: iterable | str + Alias for ignore + + only: iterable | str + Only return output with the specified keys + + Returns + ------- + New CleverDict: CleverDict + + Raises: + ------- + TypeError: + If more than one of ignore, exclude, and only are specified + + ValueError: + - If file path is not provided or is invalid + - If names are specified without headers=False + - If number of items in names is not the same as number of columns + - If names, whether specified or read from the csv file, contains one or more duplicate values + + Example: + -------- + >>> data = [ + ... ['id', 'name', 'color'], + ... [1, 'Banana', 'yellow'], + ... [2, 'Apple', 'green'] + ... ] + >>> with open('test_csv.csv', 'w') as f: + ... f.write('\\n'.join(delimiter.join(str(k) for k in i) for i in data)) + >>> data = CleverDict.from_csv(f'test_csv.csv') + >>> print(data) + CleverDict( + { + 0: CleverDict({'id': '1', 'name': 'Banana', 'color': 'yellow'}, _aliases={}, _vars={}), + 1: CleverDict({'id': '2', 'name': 'Apple', 'color': 'green'}, _aliases={}, _vars={}) + }, _aliases={'_0': 0, '_False': 0, '_1': 1, '_True': 1}, _vars={} + ) + """ + + if file_path is None: + raise ValueError("File path is not provided") + + if not header and not names and (ignore or only): + raise ValueError('Ignore, Exclude, and Only cannot be used without column names') + ignore, only = _preprocess_options(ignore, exclude, only) + kwargs = {"ignore": ignore, "only": only} + + csv_data = _preprocess_csv(file_path, delimiter) + + if skip_rows is None: + start_row = 1 if header else 0 + else: + start_row = skip_rows + (1 if header else 0) + + if nrows is None: + end_row = None + else: + end_row = start_row + nrows + + if header and names: + raise ValueError("Names cannot be specified if header is True.\n" + "Hint: To specify custom names for CSV with headers, set header=False and skip_rows=1") + + if not names: + names = csv_data[0] if header else list(range(len(csv_data[0]))) + + if len(names) != len(csv_data[0]): + raise ValueError("Number of items in names does not match the number of columns") + + if len(names) != len(set(names)): + raise ValueError("Names contain one or more duplicate values") + + data = {} + for idx, row in enumerate(csv_data[start_row:end_row]): + current_row_dict = cls(dict(zip(names, row)), **kwargs) + data[idx] = current_row_dict + + return cls(data) + + def to_csv( + self, + file_path: Path = None, + delimiter: str = ',', + ignore: Union[Iterable, str] = None, + exclude: Union[Iterable, str] = None, + only: Union[Iterable, str] = None + ) -> Union[Path, None]: + """Write a nested CleverDict object to a CSV file + Only CleverDicts consisting of CleverDicts can be written to a CSV file + The input object should have the same format as the output of from_csv + + Parameters + ---------- + file_path : Path | str + Path for the output csv file + delimiter: str, default ',' + The delimiter to use in the csv file + + ignore : Iterable | str, optional + Keys to ignore from the subitem CleverDicts + + exclude : Iterable | str, optional + alias for ignore + + only : Iterable | str, optional + Only include these keys in the output csv file + + Returns + ------- + Returns a pathlib.Path object containing the path to the output file. + + Raises + ------ + ValueError + - If the file path is not provided + - If the subitems contain different lengths or keys + + TypeError + - If the underlying items are not CleverDicts + - If any of the values in the sub-items are iterables + + Example + ------- + >>> my_list = [ + ... {'id': ''.join(random.sample(string.ascii_lowercase, 6)), + ... 'value': random.randint(10, 100)} + for i in range(3)] + >>> c_dict = CleverDict({i: CleverDict(j) for i, j in enumerate(my_list)}) + >>> print(c_dict) + CleverDict( + { + 0: CleverDict({'id': 'argyso', 'value': 61}, _aliases={}, _vars={}), + 1: CleverDict({'id': 'xnsjcu', 'value': 70}, _aliases={}, _vars={}), + 2: CleverDict({'id': 'fabxvc', 'value': 91}, _aliases={}, _vars={}) + }, _aliases={'_0': 0, '_False': 0, '_1': 1, '_True': 1, '_2': 2}, _vars={} + ) + >>> c_dict.to_csv('my_csv.csv') + WindowsPath('C:/example/my_csv.csv') + """ + + if file_path is None: + raise ValueError("File path not provided") + if isinstance(file_path, str): + file_path = Path(file_path) + + ignore, only = _preprocess_options(ignore, exclude, only) + + if any(not isinstance(v, CleverDict) for _, v in self.items()): + raise TypeError("Parent object should only contain CleverDict objects for CSV conversion.") + + data_list = [v._filtered_mapping(ignore, only) for _, v in self.items()] + + if any(v.keys() != self[0].keys() for _, v in self.items()): + raise ValueError("All subitems should have the same keys") + + for i in data_list: + for _, val in i.items(): + if (hasattr(val, '__iter__') or hasattr(val, '__getitem__')) and not isinstance(val, str): + raise TypeError("Values to be written cannot be iterables") + + output_file = _write_csv(file_path, data_list, delimiter=delimiter) + return output_file + @classmethod def get_new_save_path(cls): """ diff --git a/cleverdict/test_cleverdict.py b/cleverdict/test_cleverdict.py index f50d1eb..b84dfde 100644 --- a/cleverdict/test_cleverdict.py +++ b/cleverdict/test_cleverdict.py @@ -1,3 +1,4 @@ +from multiprocessing.sharedctypes import Value from cleverdict import CleverDict, Expand, all_aliases import pytest import os @@ -8,6 +9,8 @@ import keyring from itertools import permutations +import cleverdict + def example_save_function(self, name=None, value=None): """ @@ -334,7 +337,7 @@ def test_only_OR_ignore_OR_exclude_as_args(self): perms = ["".join(list(x)).replace("=", "=['Yes'],") for x in perms] for args in perms: with pytest.raises(TypeError): - eval("x." + func.replace("(", "(" + args)) + eval("x." + func.replace("(", "(" + args)) def test_filters_with_init(self): """ @@ -676,6 +679,178 @@ def test_import_existing_cleverdict(test): assert list(y.keys()) == ["nationality"] +class Test_From_CSV: + def create_csv(self, tmpdir, delimiter): + data = [ + ['id', 'name', 'color'], + [1, 'Banana', 'yellow'], + [2, 'Apple', 'green'], + [3, 'Blueberry', 'blue'], + [4, 'Kinnow', 'orange'], + [5, 'Kiwi', 'brown'] + ] + with open(f'{tmpdir}/test_csv.csv', 'w') as f: + f.write('\n'.join(delimiter.join(str(k) for k in i) for i in data)) + + def test_missing_file(self): + """Creates a csv file from data and tests the output""" + + with pytest.raises(ValueError): + CleverDict.from_csv() + with pytest.raises(ValueError): + CleverDict.from_csv('test_csv.csv') + + def test_header_names(self, tmpdir): + self.create_csv(tmpdir, delimiter=',') + + data = CleverDict.from_csv(f'{tmpdir}/test_csv.csv') + assert 0 in data.keys() + assert len(data.keys()) == 5 + assert data._0.name == 'Banana' + + data = CleverDict.from_csv(f'{tmpdir}/test_csv.csv', header=False, skip_rows=1, names=['sl', 'fruit', 'appearance']) + assert 'color' not in data._0.keys() + assert len(data._0.keys()) == 3 + assert len(data) == 5 + assert 'fruit' in data._0.keys() + assert data._0.fruit == 'Banana' + + data = CleverDict.from_csv(f'{tmpdir}/test_csv.csv', header=False, skip_rows=1) + assert 'color' not in data._0.keys() + assert len(data._0.keys()) == 3 + assert len(data) == 5 + assert 1 in data._0.keys() + assert data._0._1 == 'Banana' + + with pytest.raises(ValueError): + data = CleverDict.from_csv(f'{tmpdir}/test_csv.csv', header=False, ignore='id') + with pytest.raises(ValueError): + data = CleverDict.from_csv(f'{tmpdir}/test_csv.csv', header=False, only='id') + + def test_ignore_only(self, tmpdir): + self.create_csv(tmpdir, delimiter=',') + + with pytest.raises(TypeError): + data = CleverDict.from_csv(f'{tmpdir}/test_csv.csv', ignore='id', only='name') + + data = CleverDict.from_csv(f'{tmpdir}/test_csv.csv', ignore='id') + assert 'id' not in data._0.keys() + + data = CleverDict.from_csv(f'{tmpdir}/test_csv.csv', only='name') + assert 'color' not in data._0.keys() + assert len(data._0.keys()) == 1 + + data = CleverDict.from_csv(f'{tmpdir}/test_csv.csv', ignore=['id', 'name']) + assert 'name' not in data._0.keys() + assert len(data._0.keys()) == 1 + + def test_skiprows_nrows(self, tmpdir): + self.create_csv(tmpdir, delimiter=',') + data = CleverDict.from_csv(f'{tmpdir}/test_csv.csv', nrows=3) + assert len(data) == 3 + assert data._0.name == 'Banana' + + data = CleverDict.from_csv(f'{tmpdir}/test_csv.csv', skip_rows=1, nrows=3) + assert len(data) == 3 + assert data._0.name == 'Apple' + + def test_delimiter(self, tmpdir): + self.create_csv(tmpdir, delimiter='|') + + data = CleverDict.from_csv(f'{tmpdir}/test_csv.csv', delimiter='|') + assert 0 in data.keys() + assert len(data.keys()) == 5 + assert data._0.name == 'Banana' + + self.create_csv(tmpdir, delimiter='\t') + + data = CleverDict.from_csv(f'{tmpdir}/test_csv.csv', delimiter='\t') + assert 0 in data.keys() + assert len(data.keys()) == 5 + assert data._0.name == 'Banana' + + +class Test_To_CSV: + def create_data(self): + keys = ['id', 'name', 'color'] + data = [ + [1, 'Banana', 'yellow'], + [2, 'Apple', 'green'], + [3, 'Blueberry', 'blue'], + [4, 'Kinnow', 'orange'], + [5, 'Kiwi', 'brown'] + ] + data_list = [CleverDict(zip(keys, i)) for i in data] + c_dict = CleverDict({i: v for i, v in enumerate(data_list)}) + return c_dict + + def test_invalid_params(self, tmpdir): + c_dict = self.create_data() + with pytest.raises(ValueError): + c_dict.to_csv() + with pytest.raises(TypeError): + c_dict.to_csv(f'{tmpdir}/test.csv', exclude='name', only='id') + + def test_bad_dicts(self, tmpdir): + c_dict = self.create_data() + c_dict._0.name2 = 'temp' + with pytest.raises(ValueError): + c_dict.to_csv(f'{tmpdir}/test.csv') + + c_dict = self.create_data() + c_dict[6] = CleverDict([('key1', 'temp'), ('key2', 'temp')]) + with pytest.raises(ValueError): + c_dict.to_csv(f'{tmpdir}/test.csv') + + c_dict = self.create_data() + c_dict[6] = dict(key1='temp', key2='temp') + with pytest.raises(TypeError): + c_dict.to_csv(f'{tmpdir}/test.csv') + + c_dict = self.create_data() + c_dict._1.color = ['green', 'red'] + with pytest.raises(TypeError): + c_dict.to_csv(f'{tmpdir}/test.csv') + + def test_file_creation(self, tmpdir): + c_dict = self.create_data() + file_path = c_dict.to_csv(f'{tmpdir}/test1.csv') + assert file_path.exists() + assert file_path.suffix == '.csv' + assert file_path.name == 'test1.csv' + + def test_written_file(self, tmpdir): + c_dict = self.create_data() + file_path = c_dict.to_csv(f'{tmpdir}/test2.csv') + data = CleverDict.from_csv(file_path) + assert data._0.name == 'Banana' + assert len(data.keys()) == 5 + + def test_delimiter(self, tmpdir): + c_dict = self.create_data() + delimiter = '|' + file_path = c_dict.to_csv(f'{tmpdir}/test3.csv', delimiter=delimiter) + data = CleverDict.from_csv(file_path, delimiter=delimiter) + assert data._0.name == 'Banana' + assert len(data._0.keys()) == 3 + assert len(data.keys()) == 5 + + def test_ignore_only(self, tmpdir): + c_dict = self.create_data() + file_path = c_dict.to_csv(f'{tmpdir}/test4.csv', ignore='id') + data = CleverDict.from_csv(file_path) + assert 'id' not in data._0 + assert data._1.name == 'Apple' + assert list(data._1) == ['name', 'color'] + + file_path = c_dict.to_csv(f'{tmpdir}/test5.csv', only='id') + data = CleverDict.from_csv(file_path) + assert 'id' in data._0 + assert 'color' not in data._0 + assert data._1.id == '2' + assert list(data._1) == ['id'] + + class Test_Internal_Logic: def test_raises_error(self): """