Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete rows function from table where data existe in another dataframe #57

Open
ilyasse05 opened this issue Mar 2, 2023 · 20 comments
Open

Comments

@ilyasse05
Copy link

Hello,
There is an interesting function, is to delete rows from Table when value of some columns exist in dataframe, i searched a function like that i haven't found it with scala/spark, but there is possibility to do it with SQL spark with "where exists" and it works very well and very performed, don't need to do it in intermediate dataframe or table or adding flags to delete after merge...

Exemple of sql :
Delete from table1 as A
Where exist (
Select 1 from table2 as B
Where
A.col1 = B.col1
And B.col2=A.col2
).

What do you think about this function?

@ilyasse05
Copy link
Author

exemple of function
delete(targetTable,sourceTable, [conditions columns])

@puneetsharma04
Copy link

puneetsharma04 commented Mar 9, 2023

@ilyasse05 : If you are trying to delete the rows from target table by comparing columns between source & target , then below code should work.
Let me know if that works for you.

from pyspark.sql.functions import col

def delete(target_df, source_df, condition_columns):
    # Join the target and source DataFrames on the condition columns
    join_condition = [col(f't.{col}') == col(f's.{col}') for col in condition_columns]
    joined_df = target_df.alias('t').join(source_df.alias('s'), join_condition, 'leftsemi')
    
    # Drop the matching rows from the target DataFrame
    result_df = target_df.subtract(joined_df)
    
    return result_df

@ilyasse05
Copy link
Author

@puneetsharma04, we need to do it in target table it self without intermediate dataframe and without returning new dataframe.

@puneetsharma04
Copy link

puneetsharma04 commented Mar 9, 2023

@ilyasse05 & @MrPowers :
Can you check the below code, if that serves the purpose?

from delta.tables import DeltaTable

def delete(target_table_path, source_df, condition_columns):
    # Convert the source DataFrame to a Delta table
    source_table = DeltaTable.forPath(spark, "source_table")
    
    # Generate the delete condition
    delete_condition = ""
    for col in condition_columns:
        delete_condition += f"{col} = source_table.{col} AND "
    delete_condition = delete_condition[:-5]  # remove the last " AND "
    
    # Delete the matching rows from the target table
    target_table = DeltaTable.forPath(spark, target_table_path)
    target_table.delete(delete_condition)

@ilyasse05
Copy link
Author

ilyasse05 commented Mar 9, 2023

@MrPowers & @puneetsharma04
i'am not confortable with scala but i have code this and i test it

`import io.delta.tables._

def deleteFromAnotherDataframe
(
targetTableDelta: io.delta.tables.DeltaTable,
sourceDF: org.apache.spark.sql.DataFrame,
attrColNames: Seq[String]
) : String =
{

val stagedUpdatesAttrs = attrColNames.map(attr => f"target.$attr = source.$attr").mkString(" AND ")
targetTableDelta.alias("target")
.merge(
sourceDF.alias("source"),
stagedUpdatesAttrs)
.whenMatched().delete()
.execute()
return targetTableDelta.history(1).select("operationMetrics.numTargetRowsDeleted").as[String].first
}`

the function returns a number of deleted rows from metrics
i thing we have to change de name of function and also the name of parameters
and also we have to add error handling

what do you think ?

@puneetsharma04
Copy link

@MrPowers & @ilyasse05

I am not sure about the above code , however the same can be implemented in scala as below:

import org.apache.spark.sql.functions.col
import io.delta.tables.DeltaTable

def delete(target_table_path: String, source_df: DataFrame, condition_columns: List[String]): Unit = {
    try {
        // Convert the source DataFrame to a Delta table
        val source_table = DeltaTable.forPath(spark, "source_table")
        
        // Generate the delete condition
        val delete_condition = condition_columns.map(col(_))
          .zip(condition_columns.map(c => s"source_table.$c"))
          .map{ case (col, source_col) => col.eqNullSafe(source_col) }
          .reduce(_ && _)
        
        // Delete the matching rows from the target table
        val target_table = DeltaTable.forPath(spark, target_table_path)
        target_table.delete(delete_condition)
    } catch {
        case e: Exception => println("An error occurred: " + e.getMessage)
    }
}

Python code :

from delta.tables import DeltaTable
def delete(target_table_path, source_df, condition_columns):
    try:
        # Convert the source DataFrame to a Delta table
        source_table = DeltaTable.forPath(spark, "source_table")
        
        # Generate the delete condition
        delete_condition = ""
        for col in condition_columns:
            delete_condition += f"{col} = source_table.{col} AND "
        delete_condition = delete_condition[:-5]  # remove the last " AND "
        
        # Delete the matching rows from the target table
        target_table = DeltaTable.forPath(spark, target_table_path)
        target_table.delete(delete_condition)
        
    except Exception as e:
        print(f"An error occurred: {e}")

In this modified code, I've wrapped the main block of code inside a try block. If an exception is thrown during the execution of the code, it will be caught by the except block, which will print an error message to the console.

Of course, this is just an example, and you may want to handle errors differently depending on your specific use case.

@ilyasse05
Copy link
Author

def deleteFromAnotherDataframe ( targetTableDelta: io.delta.tables.DeltaTable, sourceDF: org.apache.spark.sql.DataFrame, attrColNames: Seq[String] ) : String = {

val stagedUpdatesAttrs = attrColNames.map(attr => f"target.$attr = source.$attr").mkString(" AND ") targetTableDelta.alias("target") .merge( sourceDF.alias("source"), stagedUpdatesAttrs) .whenMatched().delete() .execute() return targetTableDelta.history(1).select("operationMetrics.numTargetRowsDeleted").as[String].first }

@puneetsharma04 i have test this code and it works correctly
`def deleteFromAnotherDataframe
(
targetTableDelta: io.delta.tables.DeltaTable,
sourceDF: org.apache.spark.sql.DataFrame,
attrColNames: Seq[String]
) : String =
{

val stagedUpdatesAttrs = attrColNames.map(attr => f"target.$attr = source.$attr").mkString(" AND ")
targetTableDelta.alias("target")
.merge(
sourceDF.alias("source"),
stagedUpdatesAttrs)
.whenMatched().delete()
.execute()
return targetTableDelta.history(1).select("operationMetrics.numTargetRowsDeleted").as[String].first
}`

@puneetsharma04
Copy link

@ilyasse05 : Then in that case , you just need to put the error handling mechanism in the code.
However, can you test the code that i shared above, let me know the outcome ?

@MrPowers
Copy link
Collaborator

@puneetsharma04 @ilyasse05 - can you please put code in pull requests? The code should be checked with unit tests. That will make it easier for other people to run the code and make tweaks. Thanks!

@brayanjuls
Copy link
Collaborator

@puneetsharma04 @ilyasse05 - hey! I am glad you want to contribute to the project. This feature is interesting. Which one of you wants to send the pull request, so I can assign the issue to you?

Regarding the function, an observation is that maybe other users would like to use the same functionality but with a different comparison operator, it would be good if somehow we can make the function flexible enough that one can delete using >,<,=,!=, etc, operators.

@puneetsharma04
Copy link

@MrPowers & @brayanjuls : Thanks for recognition.
Could you check the below input and out , so that i can consider my understanding as correct?
Input :
target_df = spark.createDataFrame([ ("Alice", 25), ("Bob", 30), ("Charlie", 35)], ["name", "age"])

source_df = spark.createDataFrame([ ("Alice", 26), ("David", 40), ("Charlie", 35)], ["name", "age"])

Output should be:
+------+---+
| name|age|
+------+---+
|Alice | 26|
|David | 40|
+------+---+
As the record with value for column Name as 'Charlie' and Age as '35' is there in source and target both has to be deleted?
Can you confirm on this assumption and post i can ask you to assign it to me.

@brayanjuls
Copy link
Collaborator

brayanjuls commented Mar 11, 2023

@puneetsharma04 - My understanding is that the idea that @ilyasse05 is proposing, is to delete only from the delta table based on an input dataframe, if we assume that given your example the delta table is source_df and the comparison operator is equal(=), then the final state of your example is correct. But again I think we should make the comparison operator flexible to support all of them (=,>,<,!=, etc).

@ilyasse05
Copy link
Author

@brayanjuls & @puneetsharma04
this is a proposition of code :

/**

  • this function allows to delete rows from a DeltaTable from rows of a dataframe with join of one or more columns and the comparison operator like ("=",">","!=",...)
  • and return number of deleted rows
  • @param deltaTable
  • : delta table object.
  • @param dataFrame
  • : dataframe source
  • @param attrColNameOper
  • : Map[("column Name", "Operator")]
    */

def deleteFromAnotherDataframe
(
targetTableDelta: io.delta.tables.DeltaTable,
sourceDF: org.apache.spark.sql.DataFrame,
attrColNameOper: Map[String,String]
) : String =
{
Try{
val stagedUpdatesAttrs = attrColNameOper.map(attr => "target."+ attr._1 + " " +attr._2 + " source."+ attr._1).mkString(" AND ")

targetTableDelta.alias("target")
.merge(
sourceDF.alias("source"),
stagedUpdatesAttrs)
.whenMatched().delete()
.execute()
return targetTableDelta.history(1).select("operationMetrics.numTargetRowsDeleted").as[String].first

}
match {
case Success(_) => return "Operation completed successfully"
case Failure(e) =>
throw new Exception(s" Check param of deleteFromAnotherDataframe : ${e}")
}
}

@brayanjuls
Copy link
Collaborator

@ilyasse05 - could you please open a pull request with this code, so we can iterate on it more easily ?

@ilyasse05
Copy link
Author

@brayanjuls I'm not sure if i have access to do it !

@brayanjuls
Copy link
Collaborator

@ilyasse05 - yes you can. You need to:

  1. fork this repository to your account
  2. create a new branch from main in your forked repository.
  3. apply the changes(make sure you write unit tests for your function) to your forked repository.
  4. open a pull request from your branch pointing to this repository.

That should do the job, if you need help, let me know.

@ilyasse05
Copy link
Author

thank you @brayanjuls i will try that

i have an issu to use this code
targetTableDelta.history(1).select("operationMetrics.numTargetRowsDeleted").as[String].first

error :
Unable to find encoder for type String. An implicit Encoder[String] is needed to store String instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.bloop

any idea ?

@brayanjuls
Copy link
Collaborator

@ilyasse05 - when you use dataframe.as[] you are telling spark that you want to convert your dataframe into a dataset, that's why it is asking for an encoder. What you probably would like to do is call an action function that triggers your query and returns the desired value, for example, something like this.

targetTableDelta.history(1).select("operationMetrics.numTargetRowsDeleted").head()(0).asInstanceOf[String]

@ilyasse05
Copy link
Author

ilyasse05 commented Mar 12, 2023

@brayanjuls i have done a pull request with Unit Test and Readme #66

but i'am stilling not able to run the unit test with locally, do you have tuto for that please ?

@brayanjuls
Copy link
Collaborator

@ilyasse05 - You need to make sure you have installed sbt and Scala with the versions mentioned in the readme of this repository.

Regarding a tutorial on how to test, please take a look at the official sbt documentation https://www.scala-sbt.org/1.x/docs/Testing.html. I will write something more concrete for this project in the following weeks but for now that documentation should help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants