|
| 1 | +"""Storage utility to work with S3, GCS and minio. |
| 2 | +
|
| 3 | +HOW TO USE: |
| 4 | +* Make sure that credentials are configured the way boto3 expects |
| 5 | +* You need to do extra setup to use this module with GCS |
| 6 | + - Generate GCS HMAC credentials and set them as aws crentials. |
| 7 | + - Please make sure that endpoint url is set to 'https://storage.googleapis.com' |
| 8 | +""" |
| 9 | +import boto3 |
| 10 | +import botocore |
| 11 | + |
| 12 | +s3 = boto3.resource("s3") |
| 13 | + |
| 14 | +class StoragePath: |
| 15 | + """The StoragePath class provides a pathlib.Path like interface for |
| 16 | + storage. |
| 17 | + USAGE: |
| 18 | + root = StoragePath(bucket_name, "alpha") |
| 19 | + path = root.join("datasets", "customer-master", "template.csv") |
| 20 | + text = path.read_text() |
| 21 | + """ |
| 22 | + def __init__(self, bucket: str, path: str): |
| 23 | + self.bucket = bucket |
| 24 | + self.path = path |
| 25 | + |
| 26 | + @property |
| 27 | + def _object(self): |
| 28 | + return s3.Object(bucket_name=self.bucket, key=self.path) |
| 29 | + |
| 30 | + def exists(self): |
| 31 | + """Tells the storage path exists or not. |
| 32 | +
|
| 33 | + Checks if the path exists or not by getting objects metadata. |
| 34 | + """ |
| 35 | + obj = self._object |
| 36 | + try: |
| 37 | + obj.metadata |
| 38 | + return True |
| 39 | + except botocore.exceptions.ClientError as e: |
| 40 | + if e.response['Error']['Code'] == "404": |
| 41 | + return False |
| 42 | + raise |
| 43 | + |
| 44 | + def delete(self, del_dir: bool=False): |
| 45 | + """Deletes the storage path file. |
| 46 | +
|
| 47 | + Please specifiy del_dir flag if you want to delete the directory. |
| 48 | + """ |
| 49 | + obj = self._object |
| 50 | + if del_dir: |
| 51 | + bucket = obj.Bucket() |
| 52 | + bucket.objects.filter(Prefix=self.path).delete() |
| 53 | + else: |
| 54 | + obj.delete() |
| 55 | + |
| 56 | + def download(self, local_path): |
| 57 | + """Download the contents of storage file to the local_path file. |
| 58 | + """ |
| 59 | + obj = self._object |
| 60 | + obj.download_file(local_path) |
| 61 | + |
| 62 | + def upload(self, local_path): |
| 63 | + """Uploads the file from local_path to storage path. |
| 64 | + """ |
| 65 | + obj = self._object |
| 66 | + obj.upload_file(local_path) |
| 67 | + |
| 68 | + def read_text(self): |
| 69 | + """Read the contents of a path |
| 70 | + """ |
| 71 | + obj = self._object |
| 72 | + return obj.get()['Body'].read() |
| 73 | + |
| 74 | + def _get_presigned_url(self, client_method, expires=600, content_type=None): |
| 75 | + """Returns a presigned URL for upload or download. |
| 76 | + The client_method should be one of get_object or put_object. |
| 77 | + """ |
| 78 | + params = { |
| 79 | + 'Bucket': self.bucket, |
| 80 | + 'Key': self.path, |
| 81 | + } |
| 82 | + if content_type: |
| 83 | + params['ContentType'] = content_type |
| 84 | + |
| 85 | + return s3.meta.client.generate_presigned_url(client_method, |
| 86 | + Params=params, |
| 87 | + ExpiresIn=expires |
| 88 | + ) |
| 89 | + |
| 90 | + def get_presigned_url_for_download(self, expires=3600): |
| 91 | + """Returns a presigned URL for upload. |
| 92 | +
|
| 93 | + The default expiry is one hour (3600 seconds). |
| 94 | + """ |
| 95 | + return self._get_presigned_url(client_method='get_object', expires=expires) |
| 96 | + |
| 97 | + def get_presigned_url_for_upload(self, expires=600, content_type="text/csv"): |
| 98 | + """Returns a presigned URL for upload. |
| 99 | +
|
| 100 | + The default expiry is 10 minutes (600 seconds). |
| 101 | + """ |
| 102 | + return self._get_presigned_url(client_method='put_object', expires=expires, content_type=content_type) |
| 103 | + |
| 104 | + def list(self): |
| 105 | + """List files in that path. |
| 106 | +
|
| 107 | + TODO: Check if there is a limit in number of results. |
| 108 | + """ |
| 109 | + bucket = self._object.Bucket() |
| 110 | + collection = bucket.objects.filter(Prefix=self.path) |
| 111 | + return [obj.key for obj in collection.all()] |
| 112 | + |
| 113 | + def read_dataframe(self): |
| 114 | + """TODO: Support csv and parq. |
| 115 | + """ |
| 116 | + pass |
| 117 | + |
| 118 | + def copy_to(self, dest_path): |
| 119 | + """Copy the file to destination path within the bucket. |
| 120 | + """ |
| 121 | + pass |
| 122 | + |
| 123 | + def join(self, *parts): |
| 124 | + """Combine the storage path with one or more parts and returns a new path. |
| 125 | + """ |
| 126 | + path = "/".join([self.path] + list(parts)) |
| 127 | + return StoragePath(self.bucket, path) |
| 128 | + |
| 129 | + def __repr__(self): |
| 130 | + return f'<StoragePath {self.path}>' |
| 131 | + |
0 commit comments