Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PartitionedDataset - Allow for parallelization when saving and allow logging of exceptions #928

Open
crash4 opened this issue Feb 11, 2022 · 5 comments
Labels
Community Issue/PR opened by the open-source community

Comments

@crash4
Copy link

crash4 commented Feb 11, 2022

Description

Lately I have been working with PartitionedDataset a lot in a setting where I am processing many small files (think 30k+ files), all together > 30GB. Processing them sequentially in a node would require loading each one into memory, processing it and then keeping it in memory while processing all other files only to return it in a dict at the end for all files to be saved at once.

To solve this problem, I am only returning functions which are called when saving the dataset (to avoid memory problems). Since by definition, files in a PartitionedDataset should be independent (i.e. processing of one file should not influence the processing of others), we can save several at one time rather than saving them sequentially as is done right now in PartitionedDataset.

Another pain point is that when processing the files this way (only returning functions which do the processing at time of saving) doesnt allow me to drop a file if the processing fails (imagine having an assert for something inside the processing function). Right now, if this happens, the whole processing fails (for all files (that have not yet been run)). Instead, we could just have the call of the processing function in a try-except statement that tries to do the processing and if it fails, it logs the exception.

Context

This change would significantly speed up processing of PartitionedDatasets and handle several pain points I am having (described above).

Possible Implementation

(Optional) Suggest an idea for implementing the addition or change.
I believe all of this can be done just in the PartitionedDataset class. I have "hacked" my own implementation using joblib, which kinda works. Unfortunately, joblib doesnt work well with the logging module, so it breaks logging functionality (the global logger is not propagated to child workers when using joblib).

A minimum working example (reimplementation of PartitionedDataset._save function for PartitionedDataset):


`    
from joblib import Parallel, delayed


def _save_partition(self, partition_data, partition_id):
        kwargs = deepcopy(self._dataset_config)
        partition = self._partition_to_path(partition_id)
        # join the protocol back since tools like PySpark may rely on it
        kwargs[self._filepath_arg] = self._join_protocol(partition)
        dataset = self._dataset_type(**kwargs)  # type: ignore
        if callable(partition_data):
            try:
                partition_data = partition_data()
            except Exception as e:
                logging.error(e)            
        dataset.save(partition_data)

def _save(self, data: Dict[str, Any]) -> None:
    if self._overwrite and self._filesystem.exists(self._normalized_path):
        self._filesystem.rm(self._normalized_path, recursive=True)
    Parallel(n_jobs=1, verbose=10)(delayed(self._save_partition)(partition_data, partition_id) for partition_id, partition_data in sorted(data.items()))
    self._invalidate_caches()

`

The n_jobs parameter specifies how many cpu cores to use. As I mentioned before, joblib breaks the logging functionality and this would have to be solved (I have only tried joblib, maybe multiprocessing or other libs may work better).

Also: the dataset should only be saved when partition_data = partition_data() doesnt fail.

Possible Alternatives

Using other multiprocessing libraries like multiprocessing.

@datajoely
Copy link
Contributor

Hi @crash4 I completely get your reasoning here and also like your solution. In general parallelism in Python can be a pain and my fear is that it would be really difficult to mix this with the ParallelRunner.

For now I think you implementing a custom dataset is exactly the right thing and thank you for sharing your approach with the community. Our view is that in cases where users need to go a little off-piste from the 'general case' Custom/Derived datasets are absolutely the right call - from the Kedro core side this feels like something we won't implement centrally unless lots of people start demanding so on this issue!

datajoely referenced this issue in kedro-org/kedro Feb 11, 2022
[AUTO-MERGE] Merge master into develop via merge-master-to-develop
@roumail
Copy link

roumail commented Feb 28, 2022

+1 for having the possibility for enabling parallelism option for partitioned datasets..

@merelcht merelcht added the Community Issue/PR opened by the open-source community label Mar 7, 2022
@edhenry
Copy link

edhenry commented Oct 12, 2024

Just ran into this issue over the last few weeks (again) and just want to give this a +1. :)

@Galileo-Galilei
Copy link
Member

Galileo-Galilei commented Oct 12, 2024

As @datajoely said, this feels unlikely this ends up in the central codebase in the short run, but we definitely would accept a contribution to kedro_datasets as an experimental dataset. The contribution process is much lighter, I think your code can almost be released "as is". This would be shipped quickly and we can gather feedback before considering making it more "official".

EDIT : just saw that the issue is 3 years old ^^' but the comment still stands if someone wants to contribute with above code

@astrojuanlu astrojuanlu transferred this issue from kedro-org/kedro Nov 8, 2024
@astrojuanlu
Copy link
Member

My understanding is that the ask is very specific.

I'm adding this to our Inbox so that we decide whether this is something we'll do ourselves or let the community do it. In the meantime, as @Galileo-Galilei says, a contribution as an experimental dataset is more than welcome.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Community Issue/PR opened by the open-source community
Projects
Status: No status
Development

No branches or pull requests

7 participants