Skip to content

Commit

Permalink
update flink
Browse files Browse the repository at this point in the history
  • Loading branch information
ha2hi committed Jan 16, 2025
1 parent d6aadb4 commit fac2ee5
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 1 deletion.
Binary file added Flink/__pycache__/select.cpython-38.pyc
Binary file not shown.
3 changes: 2 additions & 1 deletion Flink/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@

res_ds = t_env.to_data_stream(res_table)

res_ds.print()
res_ds.print()
env.execute()
47 changes: 47 additions & 0 deletions Flink/select.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from pyflink.table import (
EnvironmentSettings, TableEnvironment
)

t_env = TableEnvironment.create(
EnvironmentSettings.in_streaming_mode())
t_env.get_config().get_configuration().set_string("parallelism.default", "1")

input_path = "tripsdata/sample_trips.csv"
source_ddl = f"""
create table sample_trips (
VendorID INT,
tpep_pickup_datetime STRING,
tpep_dropoff_datetime STRING,
passenger_count INT,
trip_distance DOUBLE,
RatecodeID INT,
store_and_fwd_flag STRING,
PULocationID INT,
DOLocationID INT,
payment_type INT,
fare_amount DOUBLE,
extra DOUBLE,
mta_tax DOUBLE,
tip_amount DOUBLE,
tolls_amount DOUBLE,
improvement_surcharge DOUBLE,
total_amount DOUBLE,
congestion_surcharge DOUBLE
) with (
'connector' = 'filesystem',
'format' = 'csv',
'path' = '{input_path}',
'csv.ignore-parse-errors' = 'true'
)
"""

t_env.execute_sql(source_ddl)
tbl = t_env.from_path("sample_trips")

# 기본적인 select
print("===========BASIC SELECT============")
r1 = tbl.select(
tbl.PULocationID.alias("pickup_location_id"),
tbl.total_amount
)
print(r1.to_pandas())
3 changes: 3 additions & 0 deletions Flink/tripsdata/sample_trips.csv
Git LFS file not shown

0 comments on commit fac2ee5

Please sign in to comment.