Replies: 23 comments
-
I found the following using RDD cartesian product and then filtering out for N Choose K. But I didn't see a cartesian product in Spark dotnet. [https://stackoverflow.com/questions/26557873/spark-produce-rddx-x-of-all-possible-combinations-from-rddx] val combinations = rdd.cartesian(rdd).filter{ case (a,b) => a < b }`. I don't know how performant this would be, and there would need to be a user function called to process the set. |
Beta Was this translation helpful? Give feedback.
-
You can use spark/src/csharp/Microsoft.Spark/Sql/DataFrame.cs Lines 234 to 243 in 5c5c249 |
Beta Was this translation helpful? Give feedback.
-
I was actually looking at that based on this combination example using SQL. https://sqlspellbook.com/2017/02/21/use-sql-to-create-all-possible-unique-combinations/ I just did a crossjoin and I have the combinations. I am now putting in a user defined function to compute something to get some bench marks. I don't know if this is the best way for performance but its a start. |
Beta Was this translation helpful? Give feedback.
-
How do I setup the names of the columns when doing a crossjoin? when doing a join there is the following option: This option doesn't appear available for crossjoin. I am looking at this ability to remove ambiguity of column names so that I can filter out duplicates to create the final list of N Choose K rows. Also, if you know of the proper filter syntax, that will help. Example: funds.CrossJoin(funds).CrossJoin(funds).Show(); The output has the same named columns, and from the crossjoin only one of the rows below is valid for N Choose K. I am guessing a filter on each cross join with the unique names of the columns can remove the unneeded rows. |
Beta Was this translation helpful? Give feedback.
-
How about something like: funds.WithColumnRenamed("ETF_Code", "c1").CrossJoin(funds.WithColumnRenamed("ETF_Code", "c2")).CrossJoin(funds.WithColumnRenamed("ETF_Code", "c3")).Show(); |
Beta Was this translation helpful? Give feedback.
-
Yes, that worked. thanks for the help. |
Beta Was this translation helpful? Give feedback.
-
While the rename worked on the column, the subsequent filter did not find the unique column (maybe I have the wrong syntax). var cj_df1 = funds.WithColumnRenamed("ETF_Code", "C1").CrossJoin(funds.WithColumnRenamed("ETF_Code", "C2")); i get the following error: any ideas what the issue might be? I am trying to filter out the duplicates to get only N choose K items. |
Beta Was this translation helpful? Give feedback.
-
Note that |
Beta Was this translation helpful? Give feedback.
-
Sorry, I should have seen that. for N choose 2, the following worked: var cj_df1 = funds.WithColumnRenamed("ETF_Code", "C1").CrossJoin(funds.WithColumnRenamed("ETF_Code", "C2")); Now I am looking to see if I can continue the pattern for N choose 3 etc... |
Beta Was this translation helpful? Give feedback.
-
The below worked for N choose 3. I used the results from N choose 2 and did a cross join. If this pattern continues to work, then the concern is memory. If I don't need a dataframe anymore, I would want to dispose it. // N choose 2 // N choose 3 |
Beta Was this translation helpful? Give feedback.
-
How can I get data into a dataframe if I have an in-memory object like a c# object? for example, could I serialize object into a json string and do spark.read().Json(some-serialized-string) ? in spark an example would be that RDD can be used: Is this possible in spark dotnet? I am getting data from azure blob storage using azure blob storage API's. |
Beta Was this translation helpful? Give feedback.
-
Have you tried looking at the spark/src/csharp/Microsoft.Spark/Sql/SparkSession.cs Lines 153 to 163 in 6bdc9be Examples: |
Beta Was this translation helpful? Give feedback.
-
Works great. thanks. foreach (var fund in fundSparkTestList)
{
object[] item = new object[1] {fund.ETF_Code};
data.Add(new GenericRow(item));
}
var schema = new StructType(new List<StructField>()
{
new StructField("ETF_Code", new StringType())
});
DataFrame df_etfcodes = spark.CreateDataFrame(data, schema); |
Beta Was this translation helpful? Give feedback.
-
Is there a way to "dispose" of a DataFrame if your done using it to release memory? I have a sample that is working locally and within azure databricks for N choose 8, and I probably need to clean up memory as I am staging each crossjoin. I do have a comparison of 50 choose 8 with spark .net locally vs azure databricks with 2 workers. My local PC took over 1 minute and azure databricks took 8 seconds. I still need bigger numbers, but I want to start being more efficient with resources now. |
Beta Was this translation helpful? Give feedback.
-
I am getting an exceptions using a UDF. I have a DataFrame "funds" that looks like this: +--------+ My UDF looks like this:
The exceptions only happen when udfResult.Show() is called. Otherwise, funds.Show() works fine. 20/08/31 17:19:16 INFO CodeGenerator: Code generated in 11.1558 ms [2020-08-31T21:19:18.5955127Z] [DESKTOP-GKQU782] [Exception] [JvmBridge] JVM method execution failed: Nonstatic method showString failed for class 17 when called with 3 arguments ([Index=1, Type=Int32, Value=20], [Index=2, Type=Int32, Value=20], [Index=3, Type=Boolean, Value=False], ) I was following this guide:
Any ideas on why adding this UDF caused an exception? |
Beta Was this translation helpful? Give feedback.
-
@RobHogue make sure you have set your |
Beta Was this translation helpful? Give feedback.
-
Your constant string s1 is defined outside the function lambda and thus is probably passed as a 3rd argument to your function. The reason you get the error which .Show is that before that, you are only creating the op-tree, which does not do the check that will cause the error.
Try one of the following:
1. Make s1 an explicit argument of the lambda expression. Since it is a constant, you will have to pass it as a Lit(s1):
Func<Column, Column, Column> udf = Microsoft.Spark.Sql.Functions.Udf<string, string, string>((s1,str) => $"{s1} {str}");
DataFrame udfResult = funds.Select(udf(funds["ETF_Code"], Lit(s1)));
2. Make the string part of an explicit function body instead of a lambda.
3. Use the constant inside your lambda.
Cheers
Michael
From: Robert Hogue <[email protected]>
Sent: Monday, August 31, 2020 2:28 PM
To: dotnet/spark <[email protected]>
Cc: Subscribed <[email protected]>
Subject: Re: [dotnet/spark] [FEATURE REQUEST]: add example showing N Choose K combinations (#627)
I am getting an exceptions using a UDF. I have a DataFrame "funds" that looks like this:
+--------+
|ETF_Code|
+--------+
| AAXJ|
| ACWI|
| ACWV|
| ACWX|
| AGG|
| AGGY|
| AGZ|
| AIA|
+--------+
My UDF looks like this:
string s1 = "hello";
Func<Column, Column> udf = Microsoft.Spark.Sql.Functions.Udf<string, string>(str => $"{s1} {str}");
DataFrame udfResult = funds.Select(udf(funds["ETF_Code"]));
udfResult.Show();
The exceptions only happen when udfResult.Show() is called. Otherwise, funds.Show() works fine.
20/08/31 17:19:16 INFO CodeGenerator: Code generated in 11.1558 ms
20/08/31 17:19:16 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.io.IOException: Cannot run program "c:\spark\netcore31\Microsoft.Spark.Worker.exe": CreateProcess error=2, The system cannot find the file specified
at java.lang.ProcessBuilder.start(Unknown Source)
...
[2020-08-31T21:19:18.5955127Z] [DESKTOP-GKQU782] [Exception] [JvmBridge] JVM method execution failed: Nonstatic method showString failed for class 17 when called with 3 arguments ([Index=1, Type=Int32, Value=20], [Index=2, Type=Int32, Value=20], [Index=3, Type=Boolean, Value=False], )
at Microsoft.Spark.Interop.Ipc.JvmBridge.CallJavaMethod(Boolean isStatic, Object classNameOrJvmObjectReference, String methodName, Object[] args)
I was following this guide:
https://docs.microsoft.com/en-us/dotnet/spark/how-to-guides/udf-guide
Any ideas on why adding this UDF caused an exception?
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fdotnet%2Fspark%2Fissues%2F627%23issuecomment-684050466&data=02%7C01%7Cmrys%40microsoft.com%7Ced6bd463551445cb998f08d84df4b12e%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637345060625023663&sdata=7k90hQ47B9FYqtYdMJjyKFrYw9NYqWodXOeRZ41kAWY%3D&reserved=0>, or unsubscribe<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fnotifications%2Funsubscribe-auth%2FACZXGJCDX6SPBTHQTF2EVL3SDQIUZANCNFSM4P6V2AGQ&data=02%7C01%7Cmrys%40microsoft.com%7Ced6bd463551445cb998f08d84df4b12e%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637345060625023663&sdata=91NNhjdd4LPD3IzNYTUY26l7gQrwQ%2FQrK81xeMUQau4%3D&reserved=0>.
|
Beta Was this translation helpful? Give feedback.
-
@suhsteve , I was able to get the sample to run, but there was one issue that was not obvious. DOTNET_WORKER_DIR was set properly to the worker.exe, but spark submit had a problem finding my app dll in the environment path. For example, I wanted to do this: set DOTNET_WORKER_DIR=c:\spark\netcore31\Microsoft.Spark.Worker-0.8.0;c:\spark\source\mySparkApp\bin\Debug\netcoreapp3.1 (location of my app) Are there other environment variables where the worker will look for the mysparkapp.dll? Then I can do a spark-submit from any location? At the moment, I have to run the spark-submit from the bin\Debug\netcoreapp3.1 folder for it to find my app dll. Note: MikeRys change worked (thanks). |
Beta Was this translation helpful? Give feedback.
-
@suhsteve, perfect thanks. DOTNET_ASSEMBLY_SEARCH_PATHS worked. |
Beta Was this translation helpful? Give feedback.
-
@suhsteve , can I pass a reference to a c# List into a UDF? I am trying to speed up a computation to the combinations that have been generated. For example: I have a dataframe that looks like this: +----+----+----+----+----+----+---+ I have a UDF that looks like this: Func<Column, Column, Column, Column, Column, Column, Column, Column> addition = Microsoft.Spark.Sql.Functions.Udf<string, string, string, string, string, string, string, string>( the "FundUDF" can be: public static string FundUDF(string C1, string C2, etc...) The question is within FundUDF how can pass in a C# object which is a list of funds with closing prices to make it very quick to do the lookup and average. If this is not possible, there would need to be some type of join operation which would be expensive on a large number of combinations. |
Beta Was this translation helpful? Give feedback.
-
@suhsteve , here is how I solved the issue, but I don't know if it's a good solution. Func<Column, Column, Column, Column, Column, Column, Column, Column> addition = Microsoft.Spark.Sql.Functions.Udf<string, string, string, string, string, string, string, string>( public static string FundUDF(List<ETF_Fundamental_Model> rawData ,string fund1, string fund2) While access to the data can be a lot quicker, I noticed a serialization process which could slow things down a lot. But I guess the answer to my question is simply to pass it through the function. |
Beta Was this translation helpful? Give feedback.
-
I added a dictionary and passed it into a UDF which worked great locally, but when I published it azure databricks, it could not find the dictionary class (see exception below): local deployment that is working : https://docs.microsoft.com/en-us/dotnet/spark/how-to-guides/deploy-worker-udf-binaries I followed the azure data bricks deployment guide which worked until I tried the UDF with the generic dictionary: I see that system.collection.dll is in the published folder. Is there some other setup needed for UDF and azure databricks? System.Runtime.Serialization.SerializationException: Unable to load type System.Collections.Generic.Dictionary Update 9/3/2020: I had to follow more closely section 3 of the azure data brick deployment for UDF's: the additional steps look like this: for UDF scnearios:
|
Beta Was this translation helpful? Give feedback.
-
I am working on a project doing 100 choose 20 combinations. I have been in contact with over 4 microsoft big data partners and none of them were willing to help because the project was "to small". I discovered Spark .NET and set it up...this opened up a lot of doors for me as I am familiar with .net and visual studio, and there are deployment options into a databricks environment. This means, I could try to do this project myself.
The problem is that I don't understand the programming model well enough yet to properly build an N choose K distributed loop using a DataFrame where a user defined function is called to do some work on the set.
I have looked at the other examples which have helped a lot. But it is still not clear how to perform an N choose K scenario.
It would be incredibly helpful to myself and others, to show a combination example using Spark .NET. I am not sure if DataFrame can support N choose K via the SQL type statements or if it requires some other coding techniques.
I have questions on other forums, but no responses. I am hoping someone could provide this type of example for documentation and educational purposes. Once someone has this combinatoral distributed loop, they can then build all kinds of user defined workers around it.
thank you for your consideration. Spark .NET has opened up a lot of options to us common developers who are not data science guru's.
Beta Was this translation helpful? Give feedback.
All reactions