From fac2ee5e2ef867ebbb7055cf96e32c4f171217be Mon Sep 17 00:00:00 2001 From: HyukSangCho Date: Thu, 16 Jan 2025 23:35:51 +0900 Subject: [PATCH] update flink --- Flink/__pycache__/select.cpython-38.pyc | Bin 0 -> 1310 bytes Flink/convert.py | 3 +- Flink/select.py | 47 ++++++++++++++++++++++++ Flink/tripsdata/sample_trips.csv | 3 ++ 4 files changed, 52 insertions(+), 1 deletion(-) create mode 100644 Flink/__pycache__/select.cpython-38.pyc create mode 100644 Flink/select.py create mode 100644 Flink/tripsdata/sample_trips.csv diff --git a/Flink/__pycache__/select.cpython-38.pyc b/Flink/__pycache__/select.cpython-38.pyc new file mode 100644 index 0000000000000000000000000000000000000000..14499de11a0bea19408e53848a3c529b264a845d GIT binary patch literal 1310 zcmZuw&u`;I6t*2Fo4C!=ZWn=894v7NmA1)N+5-oQV7CQTs=8>?a>&J+^*kr0ta`~ms(;UVw-R3ZK42hjgvXSvIJE1zzUR>5Qs9>U)8@ye?h zJQ{oZTkj|nZ$EH*C-OQsN){k-42Zd$TDNn--Uw(|7JHf*CIpDw7DEmhD+N0LaPb69 zt}SQCMwS&?KvDz4)Lygaf#*eWoEl&VF;*etCMo`mta1E3NUqH3aeR7y@w7%zgHCjw zUX?nb(13XX@%ZBP>G`(}0yj#lEZfY`%vz9BFiEOPiH+ElZ^CnnOr{VYzq)*JcCrro znqEp3hh`=gm4Spwo@8%n3pTx%$9I=!DrJ}}*>pO7wF%X1UiCP^xrV!RGFHVe>KvQN zzzrI9FTDa;g6!rVhE&T2XH-JiO@_SCMqNXlhb>JyV`h5438;{XO|Ge;ZrZBUy_vl?3r-$P^_=2J;C zI3WF5H~?dmu{8@#34`kC$Za=OwfOYtZoT+=eEKpTpPZe%ytqS;j@+YLd5PQkB{@Ie z|IqdIJQKOR8rEtxBw)|9=y092irsEb^&=^|pdTeENG`683CL>~O#w-4GRvoK_wHVr zS|2X5WF}8?b5SmJiAhy3;fe+U3FpFX!40G(LSlauu9q2AtT`=wBV1dPFS?EeLA`a` zP3^fh6FIYPyE#4=m_ye?MKL62_7BfIsmiI2zdj#bS}=B0EYfF(2eWyZURfrm=}e7| z!Cs-#BR$8NlA~{`c8u1K4)xr1zo4Q^0blwQ$HsK*grVR0Hw+#Jp&xt{yz71v`BAIu LhkoRT|AgUxbgGm( literal 0 HcmV?d00001 diff --git a/Flink/convert.py b/Flink/convert.py index 5d03f97..ebda585 100644 --- a/Flink/convert.py +++ b/Flink/convert.py @@ -13,4 +13,5 @@ res_ds = t_env.to_data_stream(res_table) -res_ds.print() \ No newline at end of file +res_ds.print() +env.execute() \ No newline at end of file diff --git a/Flink/select.py b/Flink/select.py new file mode 100644 index 0000000..70daaba --- /dev/null +++ b/Flink/select.py @@ -0,0 +1,47 @@ +from pyflink.table import ( + EnvironmentSettings, TableEnvironment +) + +t_env = TableEnvironment.create( + EnvironmentSettings.in_streaming_mode()) +t_env.get_config().get_configuration().set_string("parallelism.default", "1") + +input_path = "tripsdata/sample_trips.csv" +source_ddl = f""" + create table sample_trips ( + VendorID INT, + tpep_pickup_datetime STRING, + tpep_dropoff_datetime STRING, + passenger_count INT, + trip_distance DOUBLE, + RatecodeID INT, + store_and_fwd_flag STRING, + PULocationID INT, + DOLocationID INT, + payment_type INT, + fare_amount DOUBLE, + extra DOUBLE, + mta_tax DOUBLE, + tip_amount DOUBLE, + tolls_amount DOUBLE, + improvement_surcharge DOUBLE, + total_amount DOUBLE, + congestion_surcharge DOUBLE + ) with ( + 'connector' = 'filesystem', + 'format' = 'csv', + 'path' = '{input_path}', + 'csv.ignore-parse-errors' = 'true' + ) +""" + +t_env.execute_sql(source_ddl) +tbl = t_env.from_path("sample_trips") + +# 기본적인 select +print("===========BASIC SELECT============") +r1 = tbl.select( + tbl.PULocationID.alias("pickup_location_id"), + tbl.total_amount +) +print(r1.to_pandas()) \ No newline at end of file diff --git a/Flink/tripsdata/sample_trips.csv b/Flink/tripsdata/sample_trips.csv new file mode 100644 index 0000000..d1a9a1f --- /dev/null +++ b/Flink/tripsdata/sample_trips.csv @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:c04db3ea0381d39d027660a7fe79ac555fcf58d80ef681d8d5d1f5a0e74007ab +size 92728