From f2b7d35b818ffa670a99198bbb4dcdcc35b028a6 Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Sun, 19 Jan 2025 19:36:07 -0800 Subject: [PATCH 01/23] trying cassandra connector --- .../cassandra_online_store.py | 55 ++++++++++++------- 1 file changed, 35 insertions(+), 20 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py index 1998de464a..96e4769e8f 100644 --- a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py @@ -44,7 +44,7 @@ from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy from cassandra.query import PreparedStatement from pydantic import StrictFloat, StrictInt, StrictStr - +from pyspark.sql import DataFrame, Row, SparkSession from feast import Entity, FeatureView, RepoConfig from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.online_stores.online_store import OnlineStore @@ -351,36 +351,51 @@ def online_write_batch( display progress. """ project = config.project - - def unroll_insertion_tuples() -> Iterable[Tuple[str, bytes, str, datetime]]: + spark = SparkSession.builder.getOrCreate() + keyspace: str = self._keyspace + fqtable = CassandraOnlineStore._fq_table_name(keyspace, project, table) + def prepare_rows() -> List[Row]: """ - We craft an iterable over all rows to be inserted (entities->features), - but this way we can call `progress` after each entity is done. + Transform data into a list of Spark Row objects for insertion. """ + rows = [] for entity_key, values, timestamp, created_ts in data: entity_key_bin = serialize_entity_key( entity_key, entity_key_serialization_version=config.entity_key_serialization_version, ).hex() + for feature_name, val in values.items(): - params: Tuple[str, bytes, str, datetime] = ( - feature_name, - val.SerializeToString(), - entity_key_bin, - timestamp, + row = Row( + feature_name=feature_name, + entity_key=entity_key_bin, + feature_value=val.SerializeToString(), + event_timestamp=timestamp, + created_timestamp=created_ts, ) - yield params - # this happens N-1 times, will be corrected outside: + rows.append(row) + if progress: - progress(1) + progress(1) # Report progress for each entity processed. - self._write_rows_concurrently( - config, - project, - table, - unroll_insertion_tuples(), - ) - # correction for the last missing call to `progress`: + return rows + + rows = prepare_rows() + if rows: + df = spark.createDataFrame(rows) + + # Add ScyllaDB table as the sink using the Cassandra connector. + ( + df.write.format("org.apache.spark.sql.cassandra") + .options( + table=fqtable, + keyspace=config.keyspace, + ) + .mode("append") + .save() + ) + + # Final progress update. if progress: progress(1) From be8c0a190633fdd7efa83297ac31841e098291fc Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Sun, 19 Jan 2025 19:53:29 -0800 Subject: [PATCH 02/23] fix options --- .../contrib/cassandra_online_store/cassandra_online_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py index 96e4769e8f..b26f276c8b 100644 --- a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py @@ -389,7 +389,7 @@ def prepare_rows() -> List[Row]: df.write.format("org.apache.spark.sql.cassandra") .options( table=fqtable, - keyspace=config.keyspace, + keyspace=keyspace, ) .mode("append") .save() From f1db9a61399d035194c744292c7bf5aeb765a10e Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Sun, 19 Jan 2025 19:57:20 -0800 Subject: [PATCH 03/23] fix: lint --- .../contrib/cassandra_online_store/cassandra_online_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py index b26f276c8b..c7afeec538 100644 --- a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py @@ -44,7 +44,7 @@ from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy from cassandra.query import PreparedStatement from pydantic import StrictFloat, StrictInt, StrictStr -from pyspark.sql import DataFrame, Row, SparkSession +from pyspark.sql import Row, SparkSession from feast import Entity, FeatureView, RepoConfig from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.online_stores.online_store import OnlineStore From b741b08147ef67bb5524fea8d4988d7cd02045ac Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Sun, 19 Jan 2025 21:50:22 -0800 Subject: [PATCH 04/23] fix: linter --- .../contrib/cassandra_online_store/cassandra_online_store.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py index c7afeec538..7ceb1bbcba 100644 --- a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py @@ -354,6 +354,7 @@ def online_write_batch( spark = SparkSession.builder.getOrCreate() keyspace: str = self._keyspace fqtable = CassandraOnlineStore._fq_table_name(keyspace, project, table) + def prepare_rows() -> List[Row]: """ Transform data into a list of Spark Row objects for insertion. From 93bd2fc6f56449128fbe80e9fe5635924dca9331 Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Sun, 19 Jan 2025 21:58:43 -0800 Subject: [PATCH 05/23] fix: linter --- .../contrib/cassandra_online_store/cassandra_online_store.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py index 7ceb1bbcba..dd75bad7a3 100644 --- a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py @@ -45,6 +45,7 @@ from cassandra.query import PreparedStatement from pydantic import StrictFloat, StrictInt, StrictStr from pyspark.sql import Row, SparkSession + from feast import Entity, FeatureView, RepoConfig from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.online_stores.online_store import OnlineStore From facce236425b5c03122648ea8de936a2d41abfec Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Mon, 20 Jan 2025 17:50:05 -0800 Subject: [PATCH 06/23] fix: sparksession --- sdk/python/feast/infra/contrib/spark_kafka_processor.py | 3 ++- .../contrib/cassandra_online_store/cassandra_online_store.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index d98366c1a4..cbda82e1ef 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -272,7 +272,7 @@ def _write_stream_data_expedia(self, df: StreamTable, to: PushMode): # TODO: Support writing to offline store and preprocess_fn. Remove _write_stream_data method # Validation occurs at the fs.write_to_online_store() phase against the stream feature view schema. - def batch_write_pandas_df(iterator, spark_serialized_artifacts, join_keys): + def batch_write_pandas_df(iterator, spark_serialized_artifacts, join_keys, spark_session): for pdf in iterator: ( feature_view, @@ -310,6 +310,7 @@ def batch_write_pandas_df(iterator, spark_serialized_artifacts, join_keys): feature_view, rows_to_write, lambda x: None, + spark=spark_session ) yield pd.DataFrame([pd.Series(range(1, 2))]) # dummy result diff --git a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py index dd75bad7a3..ca53dc4e96 100644 --- a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py @@ -336,6 +336,7 @@ def online_write_batch( Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] ], progress: Optional[Callable[[int], Any]], + spark: SparkSession ) -> None: """ Write a batch of features of several entities to the database. @@ -352,7 +353,6 @@ def online_write_batch( display progress. """ project = config.project - spark = SparkSession.builder.getOrCreate() keyspace: str = self._keyspace fqtable = CassandraOnlineStore._fq_table_name(keyspace, project, table) From 39f955e73ab80c911e6d0bae24143ff92b793e18 Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Mon, 20 Jan 2025 17:55:57 -0800 Subject: [PATCH 07/23] fix: sparksession --- sdk/python/feast/infra/contrib/spark_kafka_processor.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index cbda82e1ef..8227bf88a9 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -321,11 +321,12 @@ def batch_write( spark_serialized_artifacts, join_keys, feature_view, + spark_session, ): start_time = time.time() sdf.mapInPandas( lambda x: batch_write_pandas_df( - x, spark_serialized_artifacts, join_keys + x, spark_serialized_artifacts, join_keys, spark_session ), "status int", ).count() # dummy action to force evaluation @@ -344,6 +345,7 @@ def batch_write( self.spark_serialized_artifacts, self.join_keys, self.sfv, + self.spark, ) ) .start() From ace14f7e200c9756d6932167f06239980c3f9704 Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Mon, 20 Jan 2025 21:28:28 -0800 Subject: [PATCH 08/23] fix: revert --- sdk/python/feast/infra/contrib/spark_kafka_processor.py | 6 ++---- .../cassandra_online_store/cassandra_online_store.py | 5 ++--- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index 8227bf88a9..463b0180ba 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -272,7 +272,7 @@ def _write_stream_data_expedia(self, df: StreamTable, to: PushMode): # TODO: Support writing to offline store and preprocess_fn. Remove _write_stream_data method # Validation occurs at the fs.write_to_online_store() phase against the stream feature view schema. - def batch_write_pandas_df(iterator, spark_serialized_artifacts, join_keys, spark_session): + def batch_write_pandas_df(iterator, spark_serialized_artifacts, join_keys): for pdf in iterator: ( feature_view, @@ -310,7 +310,6 @@ def batch_write_pandas_df(iterator, spark_serialized_artifacts, join_keys, spark feature_view, rows_to_write, lambda x: None, - spark=spark_session ) yield pd.DataFrame([pd.Series(range(1, 2))]) # dummy result @@ -321,12 +320,11 @@ def batch_write( spark_serialized_artifacts, join_keys, feature_view, - spark_session, ): start_time = time.time() sdf.mapInPandas( lambda x: batch_write_pandas_df( - x, spark_serialized_artifacts, join_keys, spark_session + x, spark_serialized_artifacts, join_keys ), "status int", ).count() # dummy action to force evaluation diff --git a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py index ca53dc4e96..13493be5de 100644 --- a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py @@ -44,8 +44,7 @@ from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy from cassandra.query import PreparedStatement from pydantic import StrictFloat, StrictInt, StrictStr -from pyspark.sql import Row, SparkSession - +from pyspark.sql import Row from feast import Entity, FeatureView, RepoConfig from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.online_stores.online_store import OnlineStore @@ -336,7 +335,6 @@ def online_write_batch( Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] ], progress: Optional[Callable[[int], Any]], - spark: SparkSession ) -> None: """ Write a batch of features of several entities to the database. @@ -353,6 +351,7 @@ def online_write_batch( display progress. """ project = config.project + spark = SparkSession.builder.getOrCreate() keyspace: str = self._keyspace fqtable = CassandraOnlineStore._fq_table_name(keyspace, project, table) From 2ed16f6c614cdf7b5ea74bbe66dc7dc7ab87e1c6 Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Mon, 20 Jan 2025 23:05:57 -0800 Subject: [PATCH 09/23] fix: spark changes --- .../infra/contrib/spark_kafka_processor.py | 90 +++++++++++++++++-- .../cassandra_online_store.py | 68 ++++++-------- 2 files changed, 109 insertions(+), 49 deletions(-) diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index 463b0180ba..4b088c6698 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -1,17 +1,18 @@ import time +from datetime import datetime from types import MethodType -from typing import List, Optional, Set, Union, no_type_check +from typing import List, Optional, Set, Union, no_type_check, Tuple, Dict, Callable, Any import pandas as pd import pyarrow from pyspark import SparkContext -from pyspark.sql import DataFrame, SparkSession +from pyspark.sql import DataFrame, Row, SparkSession from pyspark.sql.avro.functions import from_avro from pyspark.sql.column import Column, _to_java_column from pyspark.sql.functions import col, from_json from pyspark.sql.streaming import StreamingQuery -from feast import FeatureView +from feast import FeatureView, RepoConfig from feast.data_format import AvroFormat, ConfluentAvroFormat, JsonFormat, StreamFormat from feast.data_source import KafkaSource, PushMode from feast.feature_store import FeatureStore @@ -20,10 +21,13 @@ StreamProcessor, StreamTable, ) +from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.materialization.contrib.spark.spark_materialization_engine import ( _SparkSerializedArtifacts, ) from feast.infra.provider import get_provider +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.stream_feature_view import StreamFeatureView from feast.utils import _convert_arrow_to_proto, _run_pyarrow_field_mapping @@ -272,7 +276,79 @@ def _write_stream_data_expedia(self, df: StreamTable, to: PushMode): # TODO: Support writing to offline store and preprocess_fn. Remove _write_stream_data method # Validation occurs at the fs.write_to_online_store() phase against the stream feature view schema. - def batch_write_pandas_df(iterator, spark_serialized_artifacts, join_keys): + def online_write_with_connector( + self, + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + spark: SparkSession, + ) -> None: + """ + Write a batch of features of several entities to the database. + + Args: + config: The RepoConfig for the current FeatureStore. + table: Feast FeatureView. + data: a list of quadruplets containing Feature data. Each + quadruplet contains an Entity Key, a dict containing feature + values, an event timestamp for the row, and + the created timestamp for the row if it exists. + progress: Optional function to be called once every mini-batch of + rows is written to the online store. Can be used to + display progress. + """ + project = config.project + keyspace: str = self._keyspace + fqtable = f"{project}_{table.name}" + def prepare_rows() -> List[Row]: + """ + Transform data into a list of Spark Row objects for insertion. + """ + rows = [] + for entity_key, values, timestamp, created_ts in data: + entity_key_bin = serialize_entity_key( + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ).hex() + + for feature_name, val in values.items(): + row = Row( + feature_name=feature_name, + entity_key=entity_key_bin, + feature_value=val.SerializeToString(), + event_timestamp=timestamp, + created_timestamp=created_ts, + ) + rows.append(row) + + if progress: + progress(1) # Report progress for each entity processed. + + return rows + + rows = prepare_rows() + if rows: + df = spark.createDataFrame(rows) + + # Add ScyllaDB table as the sink using the Cassandra connector. + ( + df.write.format("org.apache.spark.sql.cassandra") + .options( + table=fqtable, + keyspace=keyspace, + ) + .mode("append") + .save() + ) + + # Final progress update. + if progress: + progress(1) + + def batch_write_pandas_df(iterator, spark_serialized_artifacts, join_keys, spark_session): for pdf in iterator: ( feature_view, @@ -305,11 +381,12 @@ def batch_write_pandas_df(iterator, spark_serialized_artifacts, join_keys): rows_to_write = _convert_arrow_to_proto( table, feature_view, join_key_to_value_type ) - online_store.online_write_batch( + online_write_with_connector( repo_config, feature_view, rows_to_write, lambda x: None, + spark=spark_session, ) yield pd.DataFrame([pd.Series(range(1, 2))]) # dummy result @@ -320,11 +397,12 @@ def batch_write( spark_serialized_artifacts, join_keys, feature_view, + spark_session, ): start_time = time.time() sdf.mapInPandas( lambda x: batch_write_pandas_df( - x, spark_serialized_artifacts, join_keys + x, spark_serialized_artifacts, join_keys, spark_session ), "status int", ).count() # dummy action to force evaluation diff --git a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py index 13493be5de..8b03300f7b 100644 --- a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py @@ -44,7 +44,6 @@ from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy from cassandra.query import PreparedStatement from pydantic import StrictFloat, StrictInt, StrictStr -from pyspark.sql import Row from feast import Entity, FeatureView, RepoConfig from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.online_stores.online_store import OnlineStore @@ -328,13 +327,13 @@ def __del__(self): pass def online_write_batch( - self, - config: RepoConfig, - table: FeatureView, - data: List[ - Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] - ], - progress: Optional[Callable[[int], Any]], + self, + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], ) -> None: """ Write a batch of features of several entities to the database. @@ -351,55 +350,38 @@ def online_write_batch( display progress. """ project = config.project - spark = SparkSession.builder.getOrCreate() - keyspace: str = self._keyspace - fqtable = CassandraOnlineStore._fq_table_name(keyspace, project, table) - def prepare_rows() -> List[Row]: + def unroll_insertion_tuples() -> Iterable[Tuple[str, bytes, str, datetime]]: """ - Transform data into a list of Spark Row objects for insertion. + We craft an iterable over all rows to be inserted (entities->features), + but this way we can call `progress` after each entity is done. """ - rows = [] for entity_key, values, timestamp, created_ts in data: entity_key_bin = serialize_entity_key( entity_key, entity_key_serialization_version=config.entity_key_serialization_version, ).hex() - for feature_name, val in values.items(): - row = Row( - feature_name=feature_name, - entity_key=entity_key_bin, - feature_value=val.SerializeToString(), - event_timestamp=timestamp, - created_timestamp=created_ts, + params: Tuple[str, bytes, str, datetime] = ( + feature_name, + val.SerializeToString(), + entity_key_bin, + timestamp, ) - rows.append(row) - + yield params + # this happens N-1 times, will be corrected outside: if progress: - progress(1) # Report progress for each entity processed. - - return rows + progress(1) - rows = prepare_rows() - if rows: - df = spark.createDataFrame(rows) - - # Add ScyllaDB table as the sink using the Cassandra connector. - ( - df.write.format("org.apache.spark.sql.cassandra") - .options( - table=fqtable, - keyspace=keyspace, - ) - .mode("append") - .save() - ) - - # Final progress update. + self._write_rows_concurrently( + config, + project, + table, + unroll_insertion_tuples(), + ) + # correction for the last missing call to `progress`: if progress: progress(1) - def online_read( self, config: RepoConfig, From 79a7222413deb60709f914b9d5a78d63ecafda43 Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Mon, 20 Jan 2025 23:11:26 -0800 Subject: [PATCH 10/23] fix: spark changes --- .../contrib/cassandra_online_store/cassandra_online_store.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py index 8b03300f7b..4e224b9e11 100644 --- a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py @@ -50,7 +50,6 @@ from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel - # Error messages E_CASSANDRA_UNEXPECTED_CONFIGURATION_CLASS = ( "Unexpected configuration object (not a CassandraOnlineStoreConfig instance)" From 0c353bab47010ba11409810194c4e7b5ef96689f Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Mon, 20 Jan 2025 23:16:09 -0800 Subject: [PATCH 11/23] fix: formatting --- .../infra/contrib/spark_kafka_processor.py | 23 +++++++++++-------- .../cassandra_online_store.py | 16 +++++++------ 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index 4b088c6698..dc47fd7df6 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -277,14 +277,16 @@ def _write_stream_data_expedia(self, df: StreamTable, to: PushMode): # Validation occurs at the fs.write_to_online_store() phase against the stream feature view schema. def online_write_with_connector( - self, - config: RepoConfig, - table: FeatureView, - data: List[ - Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] - ], - progress: Optional[Callable[[int], Any]], - spark: SparkSession, + self, + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[ + EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime] + ] + ], + progress: Optional[Callable[[int], Any]], + spark: SparkSession, ) -> None: """ Write a batch of features of several entities to the database. @@ -303,6 +305,7 @@ def online_write_with_connector( project = config.project keyspace: str = self._keyspace fqtable = f"{project}_{table.name}" + def prepare_rows() -> List[Row]: """ Transform data into a list of Spark Row objects for insertion. @@ -348,7 +351,9 @@ def prepare_rows() -> List[Row]: if progress: progress(1) - def batch_write_pandas_df(iterator, spark_serialized_artifacts, join_keys, spark_session): + def batch_write_pandas_df( + iterator, spark_serialized_artifacts, join_keys, spark_session + ): for pdf in iterator: ( feature_view, diff --git a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py index 4e224b9e11..68f2177610 100644 --- a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py @@ -50,6 +50,7 @@ from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel + # Error messages E_CASSANDRA_UNEXPECTED_CONFIGURATION_CLASS = ( "Unexpected configuration object (not a CassandraOnlineStoreConfig instance)" @@ -326,13 +327,13 @@ def __del__(self): pass def online_write_batch( - self, - config: RepoConfig, - table: FeatureView, - data: List[ - Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] - ], - progress: Optional[Callable[[int], Any]], + self, + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], ) -> None: """ Write a batch of features of several entities to the database. @@ -381,6 +382,7 @@ def unroll_insertion_tuples() -> Iterable[Tuple[str, bytes, str, datetime]]: # correction for the last missing call to `progress`: if progress: progress(1) + def online_read( self, config: RepoConfig, From fef0438d2d3aa1f7886ddda5f3410e281dc96d9b Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Mon, 20 Jan 2025 23:25:47 -0800 Subject: [PATCH 12/23] fix: formatting --- .../contrib/cassandra_online_store/cassandra_online_store.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py index 68f2177610..1998de464a 100644 --- a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py @@ -44,6 +44,7 @@ from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy from cassandra.query import PreparedStatement from pydantic import StrictFloat, StrictInt, StrictStr + from feast import Entity, FeatureView, RepoConfig from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.online_stores.online_store import OnlineStore From 86a5e9575aedd1a52bc6f2806a36262d6feba048 Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Mon, 20 Jan 2025 23:53:46 -0800 Subject: [PATCH 13/23] fix: formatting --- sdk/python/feast/infra/contrib/spark_kafka_processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index dc47fd7df6..ac4fce541e 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -1,7 +1,7 @@ import time from datetime import datetime from types import MethodType -from typing import List, Optional, Set, Union, no_type_check, Tuple, Dict, Callable, Any +from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union, no_type_check import pandas as pd import pyarrow From 07ff8e36b383b64fb467fa938fd2ab0ee82437ea Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Tue, 21 Jan 2025 17:04:22 -0800 Subject: [PATCH 14/23] fix: changes to write_stream --- .../infra/contrib/spark_kafka_processor.py | 152 +++++------------- 1 file changed, 38 insertions(+), 114 deletions(-) diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index ac4fce541e..26c45ca3b1 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -7,10 +7,12 @@ import pyarrow from pyspark import SparkContext from pyspark.sql import DataFrame, Row, SparkSession +from pyspark.sql import functions as F from pyspark.sql.avro.functions import from_avro from pyspark.sql.column import Column, _to_java_column from pyspark.sql.functions import col, from_json from pyspark.sql.streaming import StreamingQuery +from pyspark.sql.window import Window from feast import FeatureView, RepoConfig from feast.data_format import AvroFormat, ConfluentAvroFormat, JsonFormat, StreamFormat @@ -259,54 +261,18 @@ def _construct_transformation_plan(self, df: StreamTable) -> StreamTable: def _write_stream_data_expedia(self, df: StreamTable, to: PushMode): """ - Ensures materialization logic in sync with stream ingestion. - Support only write to online store. No support for preprocess_fn also. - In Spark 3.2.2, toPandas() is throwing error when the dataframe has Boolean columns. - To fix this error, we need spark 3.4.0 or numpy < 1.20.0 but feast needs numpy >= 1.22. - Switching to use mapInPandas to solve the problem for boolean columns and - toPandas() also load all data into driver's memory. - Error Message: - AttributeError: module 'numpy' has no attribute 'bool'. - `np.bool` was a deprecated alias for the builtin `bool`. - To avoid this error in existing code, use `bool` by itself. - Doing this will not modify any behavior and is safe. - If you specifically wanted the numpy scalar type, use `np.bool_` here. + Streamlines data writing logic """ - # TODO: Support writing to offline store and preprocess_fn. Remove _write_stream_data method - - # Validation occurs at the fs.write_to_online_store() phase against the stream feature view schema. - def online_write_with_connector( - self, - config: RepoConfig, - table: FeatureView, - data: List[ - Tuple[ - EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime] - ] - ], - progress: Optional[Callable[[int], Any]], - spark: SparkSession, - ) -> None: + def online_write_with_connector(config, table, data, progress, spark): """ - Write a batch of features of several entities to the database. - - Args: - config: The RepoConfig for the current FeatureStore. - table: Feast FeatureView. - data: a list of quadruplets containing Feature data. Each - quadruplet contains an Entity Key, a dict containing feature - values, an event timestamp for the row, and - the created timestamp for the row if it exists. - progress: Optional function to be called once every mini-batch of - rows is written to the online store. Can be used to - display progress. + Write a batch of features to the online store. """ project = config.project - keyspace: str = self._keyspace + keyspace = self._keyspace fqtable = f"{project}_{table.name}" - def prepare_rows() -> List[Row]: + def prepare_rows(): """ Transform data into a list of Spark Row objects for insertion. """ @@ -318,17 +284,18 @@ def prepare_rows() -> List[Row]: ).hex() for feature_name, val in values.items(): - row = Row( - feature_name=feature_name, - entity_key=entity_key_bin, - feature_value=val.SerializeToString(), - event_timestamp=timestamp, - created_timestamp=created_ts, + rows.append( + Row( + feature_name=feature_name, + entity_key=entity_key_bin, + feature_value=val.SerializeToString(), + event_timestamp=timestamp, + created_timestamp=created_ts, + ) ) - rows.append(row) if progress: - progress(1) # Report progress for each entity processed. + progress(1) return rows @@ -336,84 +303,40 @@ def prepare_rows() -> List[Row]: if rows: df = spark.createDataFrame(rows) - # Add ScyllaDB table as the sink using the Cassandra connector. + # Write to ScyllaDB using the Cassandra connector ( df.write.format("org.apache.spark.sql.cassandra") - .options( - table=fqtable, - keyspace=keyspace, - ) + .options(table=fqtable, keyspace=keyspace) .mode("append") .save() ) - # Final progress update. if progress: progress(1) - def batch_write_pandas_df( - iterator, spark_serialized_artifacts, join_keys, spark_session - ): - for pdf in iterator: - ( - feature_view, - online_store, - repo_config, - ) = spark_serialized_artifacts.unserialize() - - if isinstance(feature_view, StreamFeatureView): - ts_field = feature_view.timestamp_field - else: - ts_field = feature_view.stream_source.timestamp_field - - # Extract the latest feature values for each unique entity row (i.e. the join keys). - pdf = ( - pdf.sort_values(by=[*join_keys, ts_field], ascending=False) - .groupby(join_keys) - .nth(0) - ) - - table = pyarrow.Table.from_pandas(pdf) - if feature_view.batch_source.field_mapping is not None: - table = _run_pyarrow_field_mapping( - table, feature_view.batch_source.field_mapping - ) + def batch_write(sdf: DataFrame, batch_id: int, spark_serialized_artifacts, join_keys, feature_view, spark_session): + """ + Write each batch of data to the online store. + """ + start_time = time.time() - join_key_to_value_type = { - entity.name: entity.dtype.to_value_type() - for entity in feature_view.entity_columns - } - rows_to_write = _convert_arrow_to_proto( - table, feature_view, join_key_to_value_type - ) - online_write_with_connector( - repo_config, - feature_view, - rows_to_write, - lambda x: None, - spark=spark_session, + # Extract latest feature values per entity and write to the online store + latest_df = ( + sdf.withColumn( + "row_number", + F.row_number().over(Window.partitionBy(*join_keys).orderBy(F.desc(feature_view.timestamp_field))), ) + .filter(F.col("row_number") == 1) + .drop("row_number") + ) - yield pd.DataFrame([pd.Series(range(1, 2))]) # dummy result + rows_to_write = latest_df.collect() # Convert to rows for online_write_with_connector - def batch_write( - sdf: DataFrame, - batch_id: int, - spark_serialized_artifacts, - join_keys, - feature_view, - spark_session, - ): - start_time = time.time() - sdf.mapInPandas( - lambda x: batch_write_pandas_df( - x, spark_serialized_artifacts, join_keys, spark_session - ), - "status int", - ).count() # dummy action to force evaluation - print( - f"Time taken to write batch {batch_id} is: {(time.time() - start_time) * 1000:.2f} ms" - ) + # Deserialize artifacts and write the batch + feature_view, online_store, repo_config = spark_serialized_artifacts.unserialize() + online_write_with_connector(repo_config, feature_view, rows_to_write, None, spark_session) + + print(f"Time taken to write batch {batch_id}: {(time.time() - start_time) * 1000:.2f} ms") query = ( df.writeStream.outputMode("update") @@ -435,6 +358,7 @@ def batch_write( query.awaitTermination(timeout=self.query_timeout) return query + def _write_stream_data(self, df: StreamTable, to: PushMode) -> StreamingQuery: # Validation occurs at the fs.write_to_online_store() phase against the stream feature view schema. def batch_write(row: DataFrame, batch_id: int): From eebf532afdf0a7d65624876dcfb7a2e5f211ad4a Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Tue, 21 Jan 2025 17:09:46 -0800 Subject: [PATCH 15/23] fix: changes to write_stream --- sdk/python/feast/infra/contrib/spark_kafka_processor.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index 26c45ca3b1..40d0a82cc3 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -1,10 +1,8 @@ import time -from datetime import datetime from types import MethodType -from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union, no_type_check +from typing import List, Optional, Set, Union, no_type_check import pandas as pd -import pyarrow from pyspark import SparkContext from pyspark.sql import DataFrame, Row, SparkSession from pyspark.sql import functions as F @@ -14,7 +12,7 @@ from pyspark.sql.streaming import StreamingQuery from pyspark.sql.window import Window -from feast import FeatureView, RepoConfig +from feast import FeatureView from feast.data_format import AvroFormat, ConfluentAvroFormat, JsonFormat, StreamFormat from feast.data_source import KafkaSource, PushMode from feast.feature_store import FeatureStore @@ -28,10 +26,7 @@ _SparkSerializedArtifacts, ) from feast.infra.provider import get_provider -from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto -from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.stream_feature_view import StreamFeatureView -from feast.utils import _convert_arrow_to_proto, _run_pyarrow_field_mapping class SparkProcessorConfig(ProcessorConfig): From c1e7659dbb2847397e171853e9339256df2ce466 Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Tue, 21 Jan 2025 17:23:33 -0800 Subject: [PATCH 16/23] fix: formatting --- .../infra/contrib/spark_kafka_processor.py | 32 +++++++++++++++---- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index 40d0a82cc3..3d515d9dfd 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -309,7 +309,14 @@ def prepare_rows(): if progress: progress(1) - def batch_write(sdf: DataFrame, batch_id: int, spark_serialized_artifacts, join_keys, feature_view, spark_session): + def batch_write( + sdf: DataFrame, + batch_id: int, + spark_serialized_artifacts, + join_keys, + feature_view, + spark_session, + ): """ Write each batch of data to the online store. """ @@ -319,19 +326,31 @@ def batch_write(sdf: DataFrame, batch_id: int, spark_serialized_artifacts, join_ latest_df = ( sdf.withColumn( "row_number", - F.row_number().over(Window.partitionBy(*join_keys).orderBy(F.desc(feature_view.timestamp_field))), + F.row_number().over( + Window.partitionBy(*join_keys).orderBy( + F.desc(feature_view.timestamp_field) + ) + ), ) .filter(F.col("row_number") == 1) .drop("row_number") ) - rows_to_write = latest_df.collect() # Convert to rows for online_write_with_connector + rows_to_write = ( + latest_df.collect() + ) # Convert to rows for online_write_with_connector # Deserialize artifacts and write the batch - feature_view, online_store, repo_config = spark_serialized_artifacts.unserialize() - online_write_with_connector(repo_config, feature_view, rows_to_write, None, spark_session) + feature_view, online_store, repo_config = ( + spark_serialized_artifacts.unserialize() + ) + online_write_with_connector( + repo_config, feature_view, rows_to_write, None, spark_session + ) - print(f"Time taken to write batch {batch_id}: {(time.time() - start_time) * 1000:.2f} ms") + print( + f"Time taken to write batch {batch_id}: {(time.time() - start_time) * 1000:.2f} ms" + ) query = ( df.writeStream.outputMode("update") @@ -353,7 +372,6 @@ def batch_write(sdf: DataFrame, batch_id: int, spark_serialized_artifacts, join_ query.awaitTermination(timeout=self.query_timeout) return query - def _write_stream_data(self, df: StreamTable, to: PushMode) -> StreamingQuery: # Validation occurs at the fs.write_to_online_store() phase against the stream feature view schema. def batch_write(row: DataFrame, batch_id: int): From 0e0e8cf920edcc930e836082e8c89d70d94fc0c0 Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Fri, 24 Jan 2025 14:06:41 -0800 Subject: [PATCH 17/23] Revert changes --- .../infra/contrib/spark_kafka_processor.py | 185 ++++++++---------- 1 file changed, 82 insertions(+), 103 deletions(-) diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index 3d515d9dfd..e3c00ec26f 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -3,14 +3,13 @@ from typing import List, Optional, Set, Union, no_type_check import pandas as pd +import pyarrow from pyspark import SparkContext -from pyspark.sql import DataFrame, Row, SparkSession -from pyspark.sql import functions as F +from pyspark.sql import DataFrame, SparkSession from pyspark.sql.avro.functions import from_avro from pyspark.sql.column import Column, _to_java_column from pyspark.sql.functions import col, from_json from pyspark.sql.streaming import StreamingQuery -from pyspark.sql.window import Window from feast import FeatureView from feast.data_format import AvroFormat, ConfluentAvroFormat, JsonFormat, StreamFormat @@ -21,12 +20,12 @@ StreamProcessor, StreamTable, ) -from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.materialization.contrib.spark.spark_materialization_engine import ( _SparkSerializedArtifacts, ) from feast.infra.provider import get_provider from feast.stream_feature_view import StreamFeatureView +from feast.utils import _convert_arrow_to_proto, _run_pyarrow_field_mapping class SparkProcessorConfig(ProcessorConfig): @@ -48,9 +47,9 @@ def _from_confluent_avro(column: Column, abris_config) -> Column: def _to_abris_config( - schema_registry_config: dict, - record_name: str, - record_namespace: str, + schema_registry_config: dict, + record_name: str, + record_namespace: str, ): """:return: za.co.absa.abris.config.FromAvroConfig""" topic = schema_registry_config["schema.registry.topic"] @@ -73,12 +72,12 @@ class SparkKafkaProcessor(StreamProcessor): join_keys: List[str] def __init__( - self, - *, - fs: FeatureStore, - sfv: Union[StreamFeatureView, FeatureView], - config: ProcessorConfig, - preprocess_fn: Optional[MethodType] = None, + self, + *, + fs: FeatureStore, + sfv: Union[StreamFeatureView, FeatureView], + config: ProcessorConfig, + preprocess_fn: Optional[MethodType] = None, ): if not isinstance(sfv.stream_source, KafkaSource): raise ValueError("data source is not kafka source") @@ -120,7 +119,7 @@ def __init__( def _create_infra_if_necessary(self): if self.fs.config.online_store is not None and getattr( - self.fs.config.online_store, "lazy_table_creation", False + self.fs.config.online_store, "lazy_table_creation", False ): print( f"Online store {self.fs.config.online_store.__class__.__name__} supports lazy table creation and it is enabled" @@ -136,7 +135,7 @@ def _create_infra_if_necessary(self): ) def ingest_stream_feature_view( - self, to: PushMode = PushMode.ONLINE + self, to: PushMode = PushMode.ONLINE ) -> StreamingQuery: self._create_infra_if_necessary() ingested_stream_df = self._ingest_stream_data() @@ -224,8 +223,8 @@ def _construct_transformation_plan(self, df: StreamTable) -> StreamTable: if self.sfv.stream_source is not None: if self.sfv.stream_source.field_mapping is not None: for ( - field_mapping_key, - field_mapping_value, + field_mapping_key, + field_mapping_value, ) in self.sfv.stream_source.field_mapping.items(): df = df.withColumn(field_mapping_value, df[field_mapping_key]) @@ -256,100 +255,81 @@ def _construct_transformation_plan(self, df: StreamTable) -> StreamTable: def _write_stream_data_expedia(self, df: StreamTable, to: PushMode): """ - Streamlines data writing logic + Ensures materialization logic in sync with stream ingestion. + Support only write to online store. No support for preprocess_fn also. + In Spark 3.2.2, toPandas() is throwing error when the dataframe has Boolean columns. + To fix this error, we need spark 3.4.0 or numpy < 1.20.0 but feast needs numpy >= 1.22. + Switching to use mapInPandas to solve the problem for boolean columns and + toPandas() also load all data into driver's memory. + Error Message: + AttributeError: module 'numpy' has no attribute 'bool'. + `np.bool` was a deprecated alias for the builtin `bool`. + To avoid this error in existing code, use `bool` by itself. + Doing this will not modify any behavior and is safe. + If you specifically wanted the numpy scalar type, use `np.bool_` here. """ - def online_write_with_connector(config, table, data, progress, spark): - """ - Write a batch of features to the online store. - """ - project = config.project - keyspace = self._keyspace - fqtable = f"{project}_{table.name}" - - def prepare_rows(): - """ - Transform data into a list of Spark Row objects for insertion. - """ - rows = [] - for entity_key, values, timestamp, created_ts in data: - entity_key_bin = serialize_entity_key( - entity_key, - entity_key_serialization_version=config.entity_key_serialization_version, - ).hex() - - for feature_name, val in values.items(): - rows.append( - Row( - feature_name=feature_name, - entity_key=entity_key_bin, - feature_value=val.SerializeToString(), - event_timestamp=timestamp, - created_timestamp=created_ts, - ) - ) - - if progress: - progress(1) - - return rows - - rows = prepare_rows() - if rows: - df = spark.createDataFrame(rows) - - # Write to ScyllaDB using the Cassandra connector + # TODO: Support writing to offline store and preprocess_fn. Remove _write_stream_data method + + # Validation occurs at the fs.write_to_online_store() phase against the stream feature view schema. + def batch_write_pandas_df(iterator, spark_serialized_artifacts, join_keys): + for pdf in iterator: ( - df.write.format("org.apache.spark.sql.cassandra") - .options(table=fqtable, keyspace=keyspace) - .mode("append") - .save() + feature_view, + online_store, + repo_config, + ) = spark_serialized_artifacts.unserialize() + + if isinstance(feature_view, StreamFeatureView): + ts_field = feature_view.timestamp_field + else: + ts_field = feature_view.stream_source.timestamp_field + + # Extract the latest feature values for each unique entity row (i.e. the join keys). + pdf = ( + pdf.sort_values(by=[*join_keys, ts_field], ascending=False) + .groupby(join_keys) + .nth(0) ) - if progress: - progress(1) - - def batch_write( - sdf: DataFrame, - batch_id: int, - spark_serialized_artifacts, - join_keys, - feature_view, - spark_session, - ): - """ - Write each batch of data to the online store. - """ - start_time = time.time() + table = pyarrow.Table.from_pandas(pdf) + if feature_view.batch_source.field_mapping is not None: + table = _run_pyarrow_field_mapping( + table, feature_view.batch_source.field_mapping + ) - # Extract latest feature values per entity and write to the online store - latest_df = ( - sdf.withColumn( - "row_number", - F.row_number().over( - Window.partitionBy(*join_keys).orderBy( - F.desc(feature_view.timestamp_field) - ) - ), + join_key_to_value_type = { + entity.name: entity.dtype.to_value_type() + for entity in feature_view.entity_columns + } + rows_to_write = _convert_arrow_to_proto( + table, feature_view, join_key_to_value_type + ) + online_store.online_write_batch( + repo_config, + feature_view, + rows_to_write, + lambda x: None, ) - .filter(F.col("row_number") == 1) - .drop("row_number") - ) - - rows_to_write = ( - latest_df.collect() - ) # Convert to rows for online_write_with_connector - # Deserialize artifacts and write the batch - feature_view, online_store, repo_config = ( - spark_serialized_artifacts.unserialize() - ) - online_write_with_connector( - repo_config, feature_view, rows_to_write, None, spark_session - ) + yield pd.DataFrame([pd.Series(range(1, 2))]) # dummy result + def batch_write( + sdf: DataFrame, + batch_id: int, + spark_serialized_artifacts, + join_keys, + feature_view, + ): + start_time = time.time() + sdf.mapInPandas( + lambda x: batch_write_pandas_df( + x, spark_serialized_artifacts, join_keys + ), + "status int", + ).count() # dummy action to force evaluation print( - f"Time taken to write batch {batch_id}: {(time.time() - start_time) * 1000:.2f} ms" + f"Time taken to write batch {batch_id} is: {(time.time() - start_time) * 1000:.2f} ms" ) query = ( @@ -363,7 +343,6 @@ def batch_write( self.spark_serialized_artifacts, self.join_keys, self.sfv, - self.spark, ) ) .start() @@ -416,4 +395,4 @@ def batch_write(row: DataFrame, batch_id: int): ) query.awaitTermination(timeout=self.query_timeout) - return query + return query \ No newline at end of file From 59f376efe73240139969b87eed9d569b8c2ba677 Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Fri, 24 Jan 2025 15:29:40 -0800 Subject: [PATCH 18/23] fix: Lint --- sdk/python/feast/infra/contrib/spark_kafka_processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index e3c00ec26f..6e4d5f6782 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -395,4 +395,4 @@ def batch_write(row: DataFrame, batch_id: int): ) query.awaitTermination(timeout=self.query_timeout) - return query \ No newline at end of file + return query From 21960e557c8c3312eb338a687a0b435922781b89 Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Fri, 24 Jan 2025 15:37:50 -0800 Subject: [PATCH 19/23] fix: Lint --- .../infra/contrib/spark_kafka_processor.py | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index 6e4d5f6782..d98366c1a4 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -47,9 +47,9 @@ def _from_confluent_avro(column: Column, abris_config) -> Column: def _to_abris_config( - schema_registry_config: dict, - record_name: str, - record_namespace: str, + schema_registry_config: dict, + record_name: str, + record_namespace: str, ): """:return: za.co.absa.abris.config.FromAvroConfig""" topic = schema_registry_config["schema.registry.topic"] @@ -72,12 +72,12 @@ class SparkKafkaProcessor(StreamProcessor): join_keys: List[str] def __init__( - self, - *, - fs: FeatureStore, - sfv: Union[StreamFeatureView, FeatureView], - config: ProcessorConfig, - preprocess_fn: Optional[MethodType] = None, + self, + *, + fs: FeatureStore, + sfv: Union[StreamFeatureView, FeatureView], + config: ProcessorConfig, + preprocess_fn: Optional[MethodType] = None, ): if not isinstance(sfv.stream_source, KafkaSource): raise ValueError("data source is not kafka source") @@ -119,7 +119,7 @@ def __init__( def _create_infra_if_necessary(self): if self.fs.config.online_store is not None and getattr( - self.fs.config.online_store, "lazy_table_creation", False + self.fs.config.online_store, "lazy_table_creation", False ): print( f"Online store {self.fs.config.online_store.__class__.__name__} supports lazy table creation and it is enabled" @@ -135,7 +135,7 @@ def _create_infra_if_necessary(self): ) def ingest_stream_feature_view( - self, to: PushMode = PushMode.ONLINE + self, to: PushMode = PushMode.ONLINE ) -> StreamingQuery: self._create_infra_if_necessary() ingested_stream_df = self._ingest_stream_data() @@ -223,8 +223,8 @@ def _construct_transformation_plan(self, df: StreamTable) -> StreamTable: if self.sfv.stream_source is not None: if self.sfv.stream_source.field_mapping is not None: for ( - field_mapping_key, - field_mapping_value, + field_mapping_key, + field_mapping_value, ) in self.sfv.stream_source.field_mapping.items(): df = df.withColumn(field_mapping_value, df[field_mapping_key]) @@ -315,11 +315,11 @@ def batch_write_pandas_df(iterator, spark_serialized_artifacts, join_keys): yield pd.DataFrame([pd.Series(range(1, 2))]) # dummy result def batch_write( - sdf: DataFrame, - batch_id: int, - spark_serialized_artifacts, - join_keys, - feature_view, + sdf: DataFrame, + batch_id: int, + spark_serialized_artifacts, + join_keys, + feature_view, ): start_time = time.time() sdf.mapInPandas( From 9a6bb0fd93dabb02cba5fb1d0e902134cf6098a6 Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Sun, 26 Jan 2025 18:36:24 -0800 Subject: [PATCH 20/23] trying cassandra connector --- .../infra/contrib/spark_kafka_processor.py | 108 +++++++++++++++++- 1 file changed, 105 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index d98366c1a4..c2ddf7836a 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -1,6 +1,18 @@ import time +from datetime import datetime from types import MethodType -from typing import List, Optional, Set, Union, no_type_check +from typing import ( + Any, + Callable, + Dict, + Iterable, + List, + Optional, + Set, + Tuple, + Union, + no_type_check, +) import pandas as pd import pyarrow @@ -10,8 +22,15 @@ from pyspark.sql.column import Column, _to_java_column from pyspark.sql.functions import col, from_json from pyspark.sql.streaming import StreamingQuery +from pyspark.sql.types import ( + StructType, + StructField, + StringType, + BinaryType, + TimestampType, +) -from feast import FeatureView +from feast import FeatureView, RepoConfig from feast.data_format import AvroFormat, ConfluentAvroFormat, JsonFormat, StreamFormat from feast.data_source import KafkaSource, PushMode from feast.feature_store import FeatureStore @@ -20,10 +39,16 @@ StreamProcessor, StreamTable, ) +from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.materialization.contrib.spark.spark_materialization_engine import ( _SparkSerializedArtifacts, ) +from feast.infra.online_stores.contrib.cassandra_online_store.cassandra_online_store import ( + CassandraOnlineStore, +) from feast.infra.provider import get_provider +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.stream_feature_view import StreamFeatureView from feast.utils import _convert_arrow_to_proto, _run_pyarrow_field_mapping @@ -272,6 +297,83 @@ def _write_stream_data_expedia(self, df: StreamTable, to: PushMode): # TODO: Support writing to offline store and preprocess_fn. Remove _write_stream_data method # Validation occurs at the fs.write_to_online_store() phase against the stream feature view schema. + def online_write_with_connector( + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[ + EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime] + ] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: + """ + Write a batch of features of several entities to the database using Spark Cassandra Connector. + + Args: + config: The RepoConfig for the current FeatureStore. + table: Feast FeatureView. + data: a list of quadruplets containing Feature data. Each + quadruplet contains an Entity Key, a dict containing feature + values, an event timestamp for the row, and + the created timestamp for the row if it exists. + progress: Optional function to be called once every mini-batch of + rows is written to the online store. Can be used to + display progress. + """ + keyspace = config.online_store.keyspace + + fqtable = CassandraOnlineStore._fq_table_name( + keyspace, config.project, table + ) + cassandra_keyspace = keyspace + cassandra_table = fqtable + + def create_spark_dataframe(): + """ + Convert the data into a Spark DataFrame. + """ + rows = [] + for entity_key, values, timestamp, created_ts in data: + entity_key_bin = serialize_entity_key( + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ).hex() + for feature_name, val in values.items(): + rows.append( + ( + feature_name, + val.SerializeToString(), + entity_key_bin, + timestamp, + created_ts, + ) + ) + + schema = StructType( + [ + StructField("feature_name", StringType(), False), + StructField("feature_value", BinaryType(), False), + StructField("entity_key", StringType(), False), + StructField("event_timestamp", TimestampType(), False), + StructField("created_timestamp", TimestampType(), True), + ] + ) + + return self.spark.createDataFrame(rows, schema) + + # Create a DataFrame from the input data + df = create_spark_dataframe() + + # Write DataFrame to Cassandra + df.write.format("org.apache.spark.sql.cassandra").options( + keyspace=cassandra_keyspace, table=cassandra_table + ).mode("append").save() + + # Call progress function if provided + if progress: + progress(len(data)) + def batch_write_pandas_df(iterator, spark_serialized_artifacts, join_keys): for pdf in iterator: ( @@ -305,7 +407,7 @@ def batch_write_pandas_df(iterator, spark_serialized_artifacts, join_keys): rows_to_write = _convert_arrow_to_proto( table, feature_view, join_key_to_value_type ) - online_store.online_write_batch( + online_write_with_connector( repo_config, feature_view, rows_to_write, From 65551800bd67238f42529e32fd4e776322b9d075 Mon Sep 17 00:00:00 2001 From: vbhagwat Date: Sun, 26 Jan 2025 18:46:29 -0800 Subject: [PATCH 21/23] fix: Formatting --- sdk/python/feast/infra/contrib/spark_kafka_processor.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index c2ddf7836a..a76cb52e8a 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -5,7 +5,6 @@ Any, Callable, Dict, - Iterable, List, Optional, Set, @@ -23,10 +22,10 @@ from pyspark.sql.functions import col, from_json from pyspark.sql.streaming import StreamingQuery from pyspark.sql.types import ( - StructType, - StructField, - StringType, BinaryType, + StringType, + StructField, + StructType, TimestampType, ) From 89fee3b3efde0926a33492e3153f2d48b1c13124 Mon Sep 17 00:00:00 2001 From: vanitabhagwat <92561664+vanitabhagwat@users.noreply.github.com> Date: Mon, 27 Jan 2025 19:33:13 -0800 Subject: [PATCH 22/23] Update spark_kafka_processor.py --- sdk/python/feast/infra/contrib/spark_kafka_processor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index a76cb52e8a..00363e4c78 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -497,3 +497,4 @@ def batch_write(row: DataFrame, batch_id: int): query.awaitTermination(timeout=self.query_timeout) return query + From 7c7d9d4fc8f36e6eabfb823abe9455f970dde88f Mon Sep 17 00:00:00 2001 From: vanitabhagwat <92561664+vanitabhagwat@users.noreply.github.com> Date: Mon, 27 Jan 2025 19:36:11 -0800 Subject: [PATCH 23/23] Update spark_kafka_processor.py --- sdk/python/feast/infra/contrib/spark_kafka_processor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index 00363e4c78..a76cb52e8a 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -497,4 +497,3 @@ def batch_write(row: DataFrame, batch_id: int): query.awaitTermination(timeout=self.query_timeout) return query -