|
| 1 | +<!--- Copyright (c) 2025, NVIDIA CORPORATION. |
| 2 | +SPDX-License-Identifier: BSD-3-Clause --> |
| 3 | + |
| 4 | + |
| 5 | +(crude-data)= |
| 6 | +# Crude Datasets and Auxiliary Data |
| 7 | + |
| 8 | +As explained in [](sample-loading), the user has several options to choose how energon converts the raw (crude) data inside the tar files into Python sample objects (instances of a `Sample` dataclass) during loading. |
| 9 | + |
| 10 | +When using crude datasets, this conversion happens through so-called "cookers", i.e. user-defined functions defined in the task encoder, as explained below. |
| 11 | +In this case, the dataset on disk will specify neither the resulting sample type nor a sample loader for conversion, hence we call it "crude". |
| 12 | +All of the conversion will happen in the user's code base. |
| 13 | + |
| 14 | +## Setting Up a Crude Dataset with Cookers |
| 15 | +Let's try it. |
| 16 | +When you run `energon prepare` to prepare your dataset, you can pick "Crude sample" as the sample type. |
| 17 | +If you already have an existing energon-compliant data set, you can modify it as follows (or create a copy of your `dataset.yaml` specifically for this use case and keep the original). |
| 18 | + |
| 19 | +Original `dataset.yaml`: |
| 20 | +```yaml |
| 21 | +sample_type: |
| 22 | + __module__: megatron.energon |
| 23 | + __class__: OCRSample |
| 24 | +sample_loader: sample_loader.py:sample_loader |
| 25 | +part_filter: sample_loader.py:part_filter |
| 26 | +``` |
| 27 | +
|
| 28 | +New `dataset.yaml` (or `crude.yaml` of you like): |
| 29 | +```yaml |
| 30 | +__module__: megatron.energon |
| 31 | +__class__: CrudeWebdataset |
| 32 | +subflavors: |
| 33 | + example_crude_selector: foobar |
| 34 | +``` |
| 35 | + |
| 36 | +As shown above, [subflavors](sect-subflavors) can be assigned directly in the dataset yaml, however we recommend |
| 37 | +to define them in the [Metadataset](../basic/metadataset) instead. |
| 38 | +Subflavors can be used to differentiate the data origin when you mix different crude datasets or crude with non-crude data using a [Metadataset](../basic/metadataset). |
| 39 | +You can then use the subflavors to determine how each sample shall be processed, as explained below. |
| 40 | + |
| 41 | +Let's see how we set up our [Task Encoder](../basic/task_encoder) to cook the crude samples and turn them into real samples like {py:class}`TextSample <megatron.energon.TextSample>`. |
| 42 | + |
| 43 | +A crude sample is essentially a dictionary ({py:class}`CrudeSample <megatron.energon.CrudeSample>` inherits from `dict`) and the loader will look for a {py:class}`Cooker <megatron.energon.Cooker>` that can handle and convert your crude samples. |
| 44 | +You can provide cookers as follows: |
| 45 | + |
| 46 | +```python |
| 47 | +from megatron.energon import Cooker, basic_sample_keys |
| 48 | +
|
| 49 | +# ... |
| 50 | +
|
| 51 | +# We recommend to place your cooker functions in a separate file (esp. if they are larger) |
| 52 | +def cook_text(sample: dict) -> TextSample: |
| 53 | + return TextSample( |
| 54 | + **basic_sample_keys(sample), |
| 55 | + text=f">{sample['txt'].decode()}<", |
| 56 | + ) |
| 57 | +
|
| 58 | +
|
| 59 | +class MyTaskEncoder(DefaultTaskEncoder[TextSample, TextSample, TextRawBatch, TextBatch]): |
| 60 | + cookers = [ |
| 61 | + Cooker(cook_text, has_subflavors={"example_crude_selector": "foobar"}), |
| 62 | + Cooker(...) # other cookers for other crude data if needed |
| 63 | + ] |
| 64 | +
|
| 65 | + # ... |
| 66 | +``` |
| 67 | + |
| 68 | +In the example above, the cooker acts on all crude samples that have a subflavor `example_crude_selector` set to `foobar`. |
| 69 | +If you leave out the `has_subflavors` argument, the cooker will apply to any sample. |
| 70 | + |
| 71 | +The cooker will convert the dictionary to a {py:class}`TextSample <megatron.energon.TextSample>` by decoding the raw bytes and decorating the text with some nice angle brackets. |
| 72 | +Probably you noticed the {py:meth}`basic_sample_keys <megatron.energon.task_encoder.cooking.basic_sample_keys>` helper that we inserted. |
| 73 | +All it does, is to forward the key, restore key and flavors from the dict to the real sample. You will always need to forward these, or your dataset will not be restorable. |
| 74 | + |
| 75 | +In a real use-case you will want to do a lot more here and we recommend keeping the cook methods in separate files and importing them where you define your TaskEncoder. |
| 76 | + |
| 77 | +(aux-data)= |
| 78 | +## Auxiliary Data for Polylithic Datasets |
| 79 | + |
| 80 | +Using a crude dataset allows you to benefit from two other features of energon: |
| 81 | + |
| 82 | +* Auxiliary Data |
| 83 | +* Cache Pools |
| 84 | + |
| 85 | +Both of which are often used in combination. A typical use case is online packing. |
| 86 | + |
| 87 | +An **auxiliary data source** is an additional data source that supports random access and can be used to load data on-demand using its filename. |
| 88 | +It is typically used with polylithic datasets where you have one primary dataset that contains only the text-based sample data |
| 89 | +and one or more additional auxiliary data sources that contain the (larger) media data such as images or videos. |
| 90 | + |
| 91 | +An auxiliary data source can be either |
| 92 | + |
| 93 | +* Another energon-prepared WebDataset |
| 94 | +* A folder on the local or a remote file system |
| 95 | + |
| 96 | +You can specify it in your [metadataset](../basic/metadataset) yaml as follows (look at the `aux:` section) |
| 97 | +```yaml |
| 98 | +__module__: megatron.energon |
| 99 | +__class__: MetadatasetV2 |
| 100 | +splits: |
| 101 | + train: |
| 102 | + path: ./my_pimary_ds |
| 103 | + aux: |
| 104 | + foo_bar_source: ./aux_ds123 |
| 105 | + fs_source: filesystem://./images |
| 106 | + fs_source_abs: filesystem:///absolute/path/to/images |
| 107 | + remote_source: msc://mybucket/path/ds |
| 108 | + subflavors: |
| 109 | + crude_type: my_dual_aux_example |
| 110 | +``` |
| 111 | + |
| 112 | +The format is like |
| 113 | +```yaml |
| 114 | +aux: |
| 115 | + NAME: PATH_OR_URL |
| 116 | + NAME: PATH_OR_URL |
| 117 | + ... |
| 118 | +``` |
| 119 | + |
| 120 | +You can specify multiple aux sources each of which can be one of |
| 121 | + |
| 122 | +* Relative or absolute path to a local prepared energon dataset |
| 123 | +* Relative or absolute path to a local folder (use the prefix `filesystem://`) |
| 124 | +* Path to a remote prepared energon dataset (use prefix `msc://`) |
| 125 | +* *[Planned future feature]*: Path to a remote folder (use prefix `filesystem+msc://`) |
| 126 | + |
| 127 | +In your code, the cooker will automatically receive a {py:class}`FileStore <megatron.energon.FileStore>` reference to the data source as a keyword argument: |
| 128 | + |
| 129 | +```python |
| 130 | +from megatron.energon import FileStore |
| 131 | +
|
| 132 | +# ... |
| 133 | +
|
| 134 | +def cook_text(sample: dict, foo_bar_source: FileStore) -> TextSample: |
| 135 | + additional_text = foo_bar_source.get(sample['add_txt_fname']) |
| 136 | + return TextSample( |
| 137 | + **basic_sample_keys(sample), |
| 138 | + text=f"{sample['txt'].decode()} + {additional_text.decode()}", |
| 139 | + ) |
| 140 | +
|
| 141 | +# ... |
| 142 | +``` |
| 143 | + |
| 144 | +You can use multiple sources. You'll have to specify a cooker argument for each source that was defined in the metadataset. |
| 145 | + |
| 146 | +For easier debugging, you should always keep track of all the sources you used. The `get` method takes care of this if you pass it the sample like this: |
| 147 | + |
| 148 | +```python |
| 149 | +additional_text = foo_bar_source.get(sample['add_txt_fname'], sample) |
| 150 | +``` |
| 151 | + |
| 152 | +This will update the sample-internal `__sources__` list with the aux dataset you used. |
| 153 | + |
| 154 | +If you want, you can even use your primary dataset as an auxiliary dataset and look up files by name, yes! If you want to do that, you specify it in the cooker decorator and add an arg: |
| 155 | + |
| 156 | +```python |
| 157 | +from megatron.energon import cooker, FileStore |
| 158 | +
|
| 159 | +# ... |
| 160 | +
|
| 161 | +@cooker(need_primary=True) |
| 162 | +def cook_text(sample: dict, primary: FileStore, foo_bar_source: FileStore) -> TextSample: |
| 163 | + # ... |
| 164 | +``` |
| 165 | + |
| 166 | +You can then retrieve files by their names from the primary dataset. |
| 167 | + |
| 168 | +(cache-pools)= |
| 169 | +## Cache Pools |
| 170 | + |
| 171 | +Cache pools allow the user to defer the data transfer if the content will be needed at some point in the future but not immediately. |
| 172 | +This is only needed if the media data is rather large and does not reside on a local disk, but rather on a network file system (e.g. lustre) or a remote file system (e.g. object storage). |
| 173 | + |
| 174 | +Cache pools are especially beneficial if you are using buffers in your pipeline, like a shuffle buffer or a packing buffer. For example, when using [online packing](../advanced/packing), we may need to keep a buffer of several thousand samples to optimize for the best packing, |
| 175 | +but we cannot keep several thousand images in memory, also we don't need the actual image content to optimize the packing. |
| 176 | +Hence we will use auxiliary datasets as explained above. |
| 177 | + |
| 178 | +However, at the time of filling the buffer, **we already know** that we **will need the image content in the future**, so cache pools can be used to **prefetch it in the background**. |
| 179 | + |
| 180 | +Initially we want to load some information about the sample and its image but not the actual image pixels. |
| 181 | +Later, when the packing is computed, we need to retrieve the pixel values. |
| 182 | + |
| 183 | +In practice, this means the cooker will use a cache pool to queue the data retrieval from an auxiliary data source and obtain a lazy object (a handle to this future data). In a later stage (like {py:meth}`pack_selected_samples <megatron.energon.TaskEncoder.pack_selected_samples>`), the lazy object can be used to retrieve the content. |
| 184 | +Ideally, in the mean-time, the cache pool has already downloaded the data to a local SSD. |
| 185 | + |
| 186 | +### Using a Cache Pool |
| 187 | + |
| 188 | +When calling {py:func}`get_savable_loader <megatron.energon.get_savable_loader>`, |
| 189 | +we pass a cache pool as an additional argument: |
| 190 | + |
| 191 | +```python |
| 192 | +from megatron.energon import FileStoreCachePool |
| 193 | +
|
| 194 | +# ... |
| 195 | +
|
| 196 | +loader = get_savable_loader( |
| 197 | + my_ds, |
| 198 | + ..., |
| 199 | + cache_pool=FileStoreCachePool( |
| 200 | + parent_cache_dir="/local_scratch/cache", |
| 201 | + num_workers=1, |
| 202 | + ), |
| 203 | +) |
| 204 | +``` |
| 205 | + |
| 206 | +Then we tell the cooker decorator that we need access to the cache pool and use it to get a lazy object: |
| 207 | + |
| 208 | +```python |
| 209 | +from megatron.energon import cooker, FileStore, CachePool |
| 210 | +
|
| 211 | +# ... |
| 212 | +
|
| 213 | +@edataclass |
| 214 | +class LazyTextSample(Sample): |
| 215 | + text: str |
| 216 | + secondary_text: Lazy[str] |
| 217 | +
|
| 218 | +
|
| 219 | +@cooker(need_cache=True) |
| 220 | +def cook_text(sample: dict, foo_bar_source: FileStore, cache: CachePool) -> LazyTextSample: |
| 221 | + # Previous non-cached version: |
| 222 | + # additional_text = foo_bar_source.get(sample['add_txt_fname']) |
| 223 | +
|
| 224 | + # Cached version: |
| 225 | + additional_text = cache.get_lazy(foo_bar_source, sample['add_txt_fname']) |
| 226 | +
|
| 227 | + return TextSample( |
| 228 | + **basic_sample_keys(sample), |
| 229 | + text=f"{sample['txt'].decode()} + {additional_text.decode()}", |
| 230 | + secondary_text=additional_text, # Pass the lazy object on |
| 231 | + ) |
| 232 | +``` |
| 233 | + |
| 234 | +Later down the data processing pipeline, we can retrieve the data, for example here: |
| 235 | + |
| 236 | +```python |
| 237 | +@stateless |
| 238 | +def pack_selected_samples(self, samples: List[LazyTextSample]) -> TextSample: |
| 239 | + # Get the real object now: |
| 240 | + secondary_txt = samples[0].secondary_txt.get(samples[0]) |
| 241 | + |
| 242 | + return TextSample.derive_from( |
| 243 | + samples[0], |
| 244 | + text=samples[0].txt + "|" + secondary_txt, |
| 245 | + ) |
| 246 | +``` |
0 commit comments