-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Trying BatchStatement instead of execute_concurrent_with_args #163
base: master
Are you sure you want to change the base?
Conversation
futures = [] | ||
for batch in batches: | ||
futures.append(session.execute_async(batch)) | ||
if len(futures) >= config.online_store.write_concurrency: | ||
# Raises exception if at least one of the batch fails | ||
try: | ||
for future in futures: | ||
future.result() | ||
futures = [] | ||
except Exception as exc: | ||
logger.error(f"Error writing a batch: {exc}") | ||
print(f"Error writing a batch: {exc}") | ||
raise Exception("Error writing a batch") from exc | ||
|
||
if len(futures) > 0: | ||
try: | ||
for future in futures: | ||
future.result() | ||
futures = [] | ||
except Exception as exc: | ||
logger.error(f"Error writing a batch: {exc}") | ||
print(f"Error writing a batch: {exc}") | ||
raise Exception("Error writing a batch") from exc | ||
|
||
# execute_concurrent_with_args( | ||
# session, | ||
# insert_cql, | ||
# rows, | ||
# concurrency=config.online_store.write_concurrency, | ||
# ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This no longer allows for write_concurrency
to be set in the feature_store.yaml
then, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still using that. Refer Line feast-dev#508
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, missed that. thanks
…ssandraOnlineStore
…g in SparkKafkaProcessor
…e feature_value to value
…oper shutdown in SparkKafkaProcessor
…t it to zero in CassandraOnlineStore
… CassandraOnlineStore
…g in CassandraOnlineStore
What this PR does / why we need it:
fix: Trying BatchStatement instead of execute_concurrent_with_args
Which issue(s) this PR fixes:
execute_concurrent_with_args taking longer to inserts records. Using BatchStatement to write all records specific to an entity_key as a batch. This should avoid the network time. If we group different entity_keys in a single batch, it will run as BatchType.LOGGED mode. Based on the docs, it has performance impact. Not sure how that impacts. So implemented as entity_key per batch which runs as BatchType.UNLOGGED mode. Based on the performance, we will try out the multiple entity_keys in single batch option.
Misc