Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Rebalancing fixes for kip-848 #419

Merged
merged 33 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:
- uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python }}
- name: Run Zookeeper and Kafka
- name: Run Kafka
run: sh scripts/run-kafka.sh
- name: Install dependencies
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/rust-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
steps:
- uses: actions/checkout@v3
name: Checkout code
- name: Run Zookeeper and Kafka
- name: Run Kafka
run: sh scripts/run-kafka.sh
- name: Run tests
run: cargo test
6 changes: 3 additions & 3 deletions arroyo/backends/kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,13 @@ def __init__(
KafkaError.REQUEST_TIMED_OUT,
KafkaError.NOT_COORDINATOR,
KafkaError._WAIT_COORD,
KafkaError.STALE_MEMBER_EPOCH, # kip-848
),
)

configuration = dict(configuration)
self.__is_incremental = (
self.__is_cooperative_sticky = (
configuration.get("partition.assignment.strategy") == "cooperative-sticky"
or configuration.get("group.protocol") == "consumer"
)
auto_offset_reset = configuration.get("auto.offset.reset", "largest")

Expand Down Expand Up @@ -463,7 +463,7 @@ def __assign(self, offsets: Mapping[Partition, int]) -> None:
ConfluentTopicPartition(partition.topic.name, partition.index, offset)
for partition, offset in offsets.items()
]
if self.__is_incremental:
if self.__is_cooperative_sticky:
self.__consumer.incremental_assign(partitions)
else:
self.__consumer.assign(partitions)
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
confluent-kafka>=2.3.0
confluent-kafka>=2.7.0
1 change: 1 addition & 0 deletions rust-arroyo/src/backends/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,7 @@ mod tests {
#[test]
fn test_tell() {
let topic = TestTopic::create("test-tell");
sleep(Duration::from_secs(1));
let configuration = KafkaConfig::new_consumer_config(
vec![std::env::var("DEFAULT_BROKERS").unwrap_or("127.0.0.1:9092".to_string())],
"my-group-1".to_string(),
Expand Down
26 changes: 13 additions & 13 deletions scripts/run-kafka.sh
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
#!/bin/sh

docker run \
--name sentry_zookeeper \
--name arroyo_kafka \
-d --network host \
-e ZOOKEEPER_CLIENT_PORT=2181 \
confluentinc/cp-zookeeper:6.2.0

docker run \
--name sentry_kafka \
-d --network host \
-e KAFKA_ZOOKEEPER_CONNECT=127.0.0.1:2181 \
-e KAFKA_LISTENERS=INTERNAL://0.0.0.0:9093,EXTERNAL://0.0.0.0:9092 \
-e KAFKA_ADVERTISED_LISTENERS=INTERNAL://127.0.0.1:9093,EXTERNAL://127.0.0.1:9092 \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT \
-e KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL \
-e KAFKA_PROCESS_ROLES=broker,controller \
-e [email protected]:9093 \
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_NODE_ID=1 \
-e CLUSTER_ID=MkU3OEVBNTcwNTJENDM2Qk \
-e KAFKA_LISTENERS=PLAINTEXT://127.0.0.1:9092,CONTROLLER://127.0.0.1:9093 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT \
-e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
confluentinc/cp-kafka:6.2.0
-e KAFKA_GROUP_COORDINATOR_REBALANCE_PROTOCOLS=classic,consumer \
-e KAFKA_TRANSACTION_PARTITION_VERIFICATION_ENABLE=false \
confluentinc/cp-kafka:7.8.0
40 changes: 25 additions & 15 deletions tests/backends/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

class StreamsTestMixin(ABC, Generic[TStrategyPayload]):
cooperative_sticky = False
kip_848 = False

@abstractmethod
def get_topic(self, partitions: int = 1) -> ContextManager[Topic]:
Expand Down Expand Up @@ -421,7 +422,7 @@ def test_pause_resume_rebalancing(self) -> None:
def wait_until_rebalancing(
from_consumer: Consumer[Any], to_consumer: Consumer[Any]
) -> None:
for _ in range(10):
for _ in range(20):
assert from_consumer.poll(0) is None
if to_consumer.poll(1.0) is not None:
return
Expand Down Expand Up @@ -453,9 +454,10 @@ def wait_until_rebalancing(

wait_until_rebalancing(consumer_a, consumer_b)

if self.cooperative_sticky:
if self.cooperative_sticky or self.kip_848:
# within incremental rebalancing, only one partition should have been reassigned to the consumer_b, and consumer_a should remain paused
assert consumer_a.paused() == [Partition(topic, 1)]
# Either partition 0 or 1 might be the paused one
assert len(consumer_a.paused()) == 1
assert consumer_a.poll(10.0) is None
else:
# The first consumer should have had its offsets rolled back, as
Expand All @@ -481,20 +483,28 @@ def wait_until_rebalancing(

assert len(consumer_b.tell()) == 2

if self.cooperative_sticky:
if self.cooperative_sticky or self.kip_848:
consumer_a_on_assign.assert_has_calls(
[
mock.call({Partition(topic, 0): 0, Partition(topic, 1): 0}),
]
)

assert consumer_a_on_assign.mock_calls == [
mock.call({Partition(topic, 0): 0, Partition(topic, 1): 0}),
]
assert consumer_a_on_revoke.mock_calls == [
mock.call([Partition(topic, 0)]),
mock.call([Partition(topic, 1)]),
]
consumer_a_on_revoke.assert_has_calls(
[
mock.call([Partition(topic, 0)]),
mock.call([Partition(topic, 1)]),
],
any_order=True,
)

assert consumer_b_on_assign.mock_calls == [
mock.call({Partition(topic, 0): 0}),
mock.call({Partition(topic, 1): 0}),
]
consumer_b_on_assign.assert_has_calls(
[
mock.call({Partition(topic, 0): 0}),
mock.call({Partition(topic, 1): 0}),
],
any_order=True,
)
assert consumer_b_on_revoke.mock_calls == []
else:
assert consumer_a_on_assign.mock_calls == [
Expand Down
5 changes: 4 additions & 1 deletion tests/backends/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,13 @@ class TestKafkaStreamsIncrementalRebalancing(TestKafkaStreams):
cooperative_sticky = True


@pytest.mark.skip("kip-848 not functional yet")
class TestKafkaStreamsKip848(TestKafkaStreams):
kip_848 = True

@pytest.mark.xfail(reason="To be fixed")
def test_pause_resume_rebalancing(self) -> None:
super().test_pause_resume_rebalancing()


def test_commit_codec() -> None:
commit = Commit(
Expand Down
114 changes: 114 additions & 0 deletions tests/test_kip848_e2e.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
from typing import Any

import time
import contextlib
from contextlib import closing
import os
import threading
import logging
from typing import Iterator, Mapping

from confluent_kafka.admin import AdminClient, NewTopic
from arroyo.types import Commit, Message, Partition, Topic
from arroyo.backends.kafka.configuration import build_kafka_consumer_configuration
from arroyo.backends.kafka.consumer import KafkaConsumer, KafkaPayload
from arroyo.processing.strategies import RunTask, CommitOffsets, ProcessingStrategy
from arroyo.processing.strategies.abstract import ProcessingStrategyFactory
from arroyo.processing.processor import StreamProcessor
from arroyo.backends.kafka import KafkaProducer

logging.basicConfig(level=logging.INFO)

TOPIC = "test-kip848"


@contextlib.contextmanager
def get_topic(
configuration: Mapping[str, Any], partitions_count: int
) -> Iterator[Topic]:
name = TOPIC
configuration = dict(configuration)
client = AdminClient(configuration)
[[key, future]] = client.create_topics(
[NewTopic(name, num_partitions=partitions_count, replication_factor=1)]
).items()
assert key == name
assert future.result() is None
try:
yield Topic(name)
finally:
[[key, future]] = client.delete_topics([name]).items()
assert key == name
assert future.result() is None


def test_kip848_e2e() -> None:
counter = 0

def print_msg(message: Message[Any]) -> Message[Any]:
nonlocal counter
((partition, offset),) = message.committable.items()
print(f"message: {partition.index}-{offset}")
counter += 1
return message

class Strat(RunTask[Any, Any]):
def join(self, *args: Any, **kwargs: Any) -> None:
print("joining strategy, sleeping 5 seconds")
time.sleep(5)
print("joining strategy, sleeping 5 seconds -- DONE")
return super().join(*args, **kwargs)

class Factory(ProcessingStrategyFactory[KafkaPayload]):
def create_with_partitions(
self, commit: Commit, partitions: Mapping[Partition, int]
) -> ProcessingStrategy[KafkaPayload]:
print("assign: ", [p.index for p in partitions])
return Strat(print_msg, CommitOffsets(commit))

default_config = {
"bootstrap.servers": os.environ.get("DEFAULT_BROKERS", "localhost:9092")
}

with get_topic(default_config, 2) as topic:
producer = KafkaProducer(default_config)

with closing(producer):
for i in range(30):
message = KafkaPayload(None, i.to_bytes(1, "big"), [])
producer.produce(topic, message).result()

consumer_config = build_kafka_consumer_configuration(
default_config,
group_id="kip848",
)

consumer_config["group.protocol"] = "consumer"
consumer_config.pop("session.timeout.ms", None)
consumer_config.pop("max.poll.interval.ms", None)
consumer_config.pop("partition.assignment.strategy", None)
consumer_config.pop("group.protocol.type", None)
consumer_config.pop("heartbeat.interval.ms", None)

consumer = KafkaConsumer(consumer_config)

processor = StreamProcessor(
consumer=consumer, topic=Topic(TOPIC), processor_factory=Factory()
)

def shutdown() -> None:
for i in range(100):
time.sleep(0.1)
if counter == 30:
break
print("shutting down")
processor.signal_shutdown()

t = threading.Thread(target=shutdown)
t.start()

processor.run()

assert counter == 30

t.join()
Loading