Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist manually versioned intermediate state / decoupling development on two parts of the pipeline #195

Open
windiana42 opened this issue May 24, 2024 · 0 comments
Labels
enhancement New feature or request usability

Comments

@windiana42
Copy link
Member

Background:
Pydiverse pipedag is currently more used at the beginning of pipelines before training actual models. In the process of increasing adoption further down, a demand was raised for decoupling the experimentation with model training from the early part of preparing the input. The model training part may also include a thin layer of feature engineering which is more about selecting, encoding, and combining base feature data. We call it "feature encoding" here. A general pipeline might be split as follows:

input preparation:
raw ingestion -> cleaning for inspection -> best possible representation for economic reasoning -> feature engineering

model training:
-> feature encoding(s5) -> model training(s6) -> model evaluation(s7)

Idea:
Similar to the local table cache, the pipedag configuration could include another table store called "persistence_table_store". When having a flow f and a task output t, one could actively create a manually persisted version of t with version_id = f.persist([t]). When executing model training code, one could run f.run([s5,s6,s7], persisted_inputs={t: version_id}). Assuming the persistence_table_store is configured as parquet storage in a certain directory or s3 bucket, the version_id would be a monotonically increasing string (i.e date and time) which makes it reasonably easy to manually delete all persisted versions older than 3 months for example. Since it is done manually, keeping a few or all versions long term is no problem.

Interaction:

Questions:

  1. It is unclear whether f.run([s5,s6,s7], persisted_inputs={t: version_id}) should check whether version_id is cache valid with the code in the currently running repo checkout or whether it leaves this check up to the caller. The most common usecase here is to perform this manually versioned persistence on a flatfile representation. It can be assumed that new code typically just adds columns to the flatfile representation. As a consequence, enforcing strict cache validity would be an annoying restriction.
  2. Enforcing identical position_hash for task that produced t would be another option, but it has the same downsides as checking full cache validity
  3. In case persistence_table_store uses parquet, should the default behavior be that dematerialization for SQL based tasks throws and error, or that it should be tried using duckdb. In combination with a parquet based local_table_cache it would even be possible to execute queries spanning both persisted tables and non-persisted tables.
@windiana42 windiana42 added enhancement New feature or request usability labels May 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request usability
Projects
None yet
Development

No branches or pull requests

1 participant