Skip to content
251 changes: 245 additions & 6 deletions cleverdict/cleverdict.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import os
import json
from __future__ import annotations

import csv
import inspect
import keyword
import itertools
import json
import keyword
import os
import types
from datetime import datetime
from pathlib import Path
from pprint import pprint
from datetime import datetime
import types
import inspect
from typing import Iterable, List, Union

"""
Change log
Expand Down Expand Up @@ -300,6 +303,35 @@ def make_set(arg):
return make_set(ignore) | make_set(exclude) | CleverDict.ignore, only


def _preprocess_csv(file_path: Union[str, Path], delimiter: str):
"""Validates a CSV file and returns the data as a list of lists"""

if isinstance(file_path, str):
file_path = Path(file_path)
if not file_path.exists():
raise ValueError("File not found")

with open(file_path, "r", encoding="utf-8") as file:
reader = csv.reader(file, delimiter=delimiter)
csv_data = list(reader)
if not csv_data:
raise ValueError("File is empty")

return csv_data


def _write_csv(file_path: Path, data: List[CleverDict], delimiter: str) -> Union[Path, None]:
"""Write a list of CleverDict objects to a csv file"""
with open(file_path, 'w', newline='') as file:
writer = csv.DictWriter(file, fieldnames=data[0].keys(), delimiter=delimiter)
writer.writeheader()
writer.writerows(data)

if file_path.exists():
return file_path.absolute()
return None


class Expand:
def __init__(self, ok):
"""
Expand Down Expand Up @@ -1045,6 +1077,213 @@ def from_json(
else:
return cls(data, **kwargs)

@classmethod
def from_csv(
cls,
file_path: Union[Path, str] = None,
skip_rows: int = None,
nrows: int = None,
header: bool = True,
names: list = [],
delimiter: str = ',',
ignore: Union[str, list] = None,
exclude: Union[str, list] = None,
only: Union[str, list] = None
) -> CleverDict:
"""Creates a new CleverDict object from a CSV file.
Each row is also encoded as a CleverDict object with the key being the row number starting from zero

Parameters:
-----------
file_path: str | pathlib.Path
The path to the csv file

skip_rows: int
Number of rows to skip from the beginning of the file
Does not count the header if header is True

nrows: int
Number of rows to read
Does not count the header if header is True

header: bool
Parses the first row of the csv file as headers

names: list
List of names to be used as keys instead of csv headers
Cannot be used if header is True
The number of items in list must be the same as number of columns

delimiter: str
The delimiter used in the csv file

ignore: str | list
Any keys to ignore from output.

exclude: iterable | str
Alias for ignore

only: iterable | str
Only return output with the specified keys

Returns
-------
New CleverDict: CleverDict

Raises:
-------
TypeError:
If more than one of ignore, exclude, and only are specified

ValueError:
- If file path is not provided or is invalid
- If names are specified without headers=False
- If number of items in names is not the same as number of columns
- If names, whether specified or read from the csv file, contains one or more duplicate values

Example:
--------
>>> data = [
... ['id', 'name', 'color'],
... [1, 'Banana', 'yellow'],
... [2, 'Apple', 'green']
... ]
>>> with open('test_csv.csv', 'w') as f:
... f.write('\\n'.join(delimiter.join(str(k) for k in i) for i in data))
>>> data = CleverDict.from_csv(f'test_csv.csv')
>>> print(data)
CleverDict(
{
0: CleverDict({'id': '1', 'name': 'Banana', 'color': 'yellow'}, _aliases={}, _vars={}),
1: CleverDict({'id': '2', 'name': 'Apple', 'color': 'green'}, _aliases={}, _vars={})
}, _aliases={'_0': 0, '_False': 0, '_1': 1, '_True': 1}, _vars={}
)
"""

if file_path is None:
raise ValueError("File path is not provided")

if not header and not names and (ignore or only):
raise ValueError('Ignore, Exclude, and Only cannot be used without column names')
ignore, only = _preprocess_options(ignore, exclude, only)
kwargs = {"ignore": ignore, "only": only}

csv_data = _preprocess_csv(file_path, delimiter)

if skip_rows is None:
start_row = 1 if header else 0
else:
start_row = skip_rows + (1 if header else 0)

if nrows is None:
end_row = None
else:
end_row = start_row + nrows

if header and names:
raise ValueError("Names cannot be specified if header is True.\n"
"Hint: To specify custom names for CSV with headers, set header=False and skip_rows=1")

if not names:
names = csv_data[0] if header else list(range(len(csv_data[0])))

if len(names) != len(csv_data[0]):
raise ValueError("Number of items in names does not match the number of columns")

if len(names) != len(set(names)):
raise ValueError("Names contain one or more duplicate values")

data = {}
for idx, row in enumerate(csv_data[start_row:end_row]):
current_row_dict = cls(dict(zip(names, row)), **kwargs)
data[idx] = current_row_dict

return cls(data)

def to_csv(
self,
file_path: Path = None,
delimiter: str = ',',
ignore: Union[Iterable, str] = None,
exclude: Union[Iterable, str] = None,
only: Union[Iterable, str] = None
) -> Union[Path, None]:
"""Write a nested CleverDict object to a CSV file
Only CleverDicts consisting of CleverDicts can be written to a CSV file
The input object should have the same format as the output of from_csv

Parameters
----------
file_path : Path | str
Path for the output csv file
delimiter: str, default ','
The delimiter to use in the csv file

ignore : Iterable | str, optional
Keys to ignore from the subitem CleverDicts

exclude : Iterable | str, optional
alias for ignore

only : Iterable | str, optional
Only include these keys in the output csv file

Returns
-------
Returns a pathlib.Path object containing the path to the output file.

Raises
------
ValueError
- If the file path is not provided
- If the subitems contain different lengths or keys

TypeError
- If the underlying items are not CleverDicts
- If any of the values in the sub-items are iterables

Example
-------
>>> my_list = [
... {'id': ''.join(random.sample(string.ascii_lowercase, 6)),
... 'value': random.randint(10, 100)}
for i in range(3)]
>>> c_dict = CleverDict({i: CleverDict(j) for i, j in enumerate(my_list)})
>>> print(c_dict)
CleverDict(
{
0: CleverDict({'id': 'argyso', 'value': 61}, _aliases={}, _vars={}),
1: CleverDict({'id': 'xnsjcu', 'value': 70}, _aliases={}, _vars={}),
2: CleverDict({'id': 'fabxvc', 'value': 91}, _aliases={}, _vars={})
}, _aliases={'_0': 0, '_False': 0, '_1': 1, '_True': 1, '_2': 2}, _vars={}
)
>>> c_dict.to_csv('my_csv.csv')
WindowsPath('C:/example/my_csv.csv')
"""

if file_path is None:
raise ValueError("File path not provided")
if isinstance(file_path, str):
file_path = Path(file_path)

ignore, only = _preprocess_options(ignore, exclude, only)

if any(not isinstance(v, CleverDict) for _, v in self.items()):
raise TypeError("Parent object should only contain CleverDict objects for CSV conversion.")

data_list = [v._filtered_mapping(ignore, only) for _, v in self.items()]

if any(v.keys() != self[0].keys() for _, v in self.items()):
raise ValueError("All subitems should have the same keys")

for i in data_list:
for _, val in i.items():
if (hasattr(val, '__iter__') or hasattr(val, '__getitem__')) and not isinstance(val, str):
raise TypeError("Values to be written cannot be iterables")

output_file = _write_csv(file_path, data_list, delimiter=delimiter)
return output_file

@classmethod
def get_new_save_path(cls):
"""
Expand Down
Loading