diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index cdf1bdb9..a8db56a6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -56,7 +56,7 @@ jobs: - uses: actions/setup-python@v2 with: python-version: ${{ matrix.python }} - - name: Run Zookeeper and Kafka + - name: Run Kafka run: sh scripts/run-kafka.sh - name: Install dependencies run: | diff --git a/.github/workflows/rust-ci.yml b/.github/workflows/rust-ci.yml index 2b181326..36c92899 100644 --- a/.github/workflows/rust-ci.yml +++ b/.github/workflows/rust-ci.yml @@ -26,7 +26,7 @@ jobs: steps: - uses: actions/checkout@v3 name: Checkout code - - name: Run Zookeeper and Kafka + - name: Run Kafka run: sh scripts/run-kafka.sh - name: Run tests run: cargo test diff --git a/arroyo/backends/kafka/consumer.py b/arroyo/backends/kafka/consumer.py index c8665f3a..23953816 100644 --- a/arroyo/backends/kafka/consumer.py +++ b/arroyo/backends/kafka/consumer.py @@ -157,13 +157,13 @@ def __init__( KafkaError.REQUEST_TIMED_OUT, KafkaError.NOT_COORDINATOR, KafkaError._WAIT_COORD, + KafkaError.STALE_MEMBER_EPOCH, # kip-848 ), ) configuration = dict(configuration) - self.__is_incremental = ( + self.__is_cooperative_sticky = ( configuration.get("partition.assignment.strategy") == "cooperative-sticky" - or configuration.get("group.protocol") == "consumer" ) auto_offset_reset = configuration.get("auto.offset.reset", "largest") @@ -463,7 +463,7 @@ def __assign(self, offsets: Mapping[Partition, int]) -> None: ConfluentTopicPartition(partition.topic.name, partition.index, offset) for partition, offset in offsets.items() ] - if self.__is_incremental: + if self.__is_cooperative_sticky: self.__consumer.incremental_assign(partitions) else: self.__consumer.assign(partitions) diff --git a/requirements.txt b/requirements.txt index dad6095b..670c67de 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1 @@ -confluent-kafka>=2.3.0 +confluent-kafka>=2.7.0 diff --git a/rust-arroyo/src/backends/kafka/mod.rs b/rust-arroyo/src/backends/kafka/mod.rs index 21d8ae83..a97b268b 100644 --- a/rust-arroyo/src/backends/kafka/mod.rs +++ b/rust-arroyo/src/backends/kafka/mod.rs @@ -563,6 +563,7 @@ mod tests { #[test] fn test_tell() { let topic = TestTopic::create("test-tell"); + sleep(Duration::from_secs(1)); let configuration = KafkaConfig::new_consumer_config( vec![std::env::var("DEFAULT_BROKERS").unwrap_or("127.0.0.1:9092".to_string())], "my-group-1".to_string(), diff --git a/scripts/run-kafka.sh b/scripts/run-kafka.sh old mode 100644 new mode 100755 index f22d0fec..adb06c12 --- a/scripts/run-kafka.sh +++ b/scripts/run-kafka.sh @@ -1,18 +1,18 @@ #!/bin/sh docker run \ - --name sentry_zookeeper \ + --name arroyo_kafka \ -d --network host \ - -e ZOOKEEPER_CLIENT_PORT=2181 \ - confluentinc/cp-zookeeper:6.2.0 - -docker run \ - --name sentry_kafka \ - -d --network host \ - -e KAFKA_ZOOKEEPER_CONNECT=127.0.0.1:2181 \ - -e KAFKA_LISTENERS=INTERNAL://0.0.0.0:9093,EXTERNAL://0.0.0.0:9092 \ - -e KAFKA_ADVERTISED_LISTENERS=INTERNAL://127.0.0.1:9093,EXTERNAL://127.0.0.1:9092 \ - -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT \ - -e KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL \ + -e KAFKA_PROCESS_ROLES=broker,controller \ + -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093 \ + -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \ + -e KAFKA_NODE_ID=1 \ + -e CLUSTER_ID=MkU3OEVBNTcwNTJENDM2Qk \ + -e KAFKA_LISTENERS=PLAINTEXT://127.0.0.1:9092,CONTROLLER://127.0.0.1:9093 \ + -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \ + -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT \ + -e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \ -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ - confluentinc/cp-kafka:6.2.0 + -e KAFKA_GROUP_COORDINATOR_REBALANCE_PROTOCOLS=classic,consumer \ + -e KAFKA_TRANSACTION_PARTITION_VERIFICATION_ENABLE=false \ + confluentinc/cp-kafka:7.8.0 diff --git a/tests/backends/mixins.py b/tests/backends/mixins.py index 7fb9f92e..fcee2043 100644 --- a/tests/backends/mixins.py +++ b/tests/backends/mixins.py @@ -15,6 +15,7 @@ class StreamsTestMixin(ABC, Generic[TStrategyPayload]): cooperative_sticky = False + kip_848 = False @abstractmethod def get_topic(self, partitions: int = 1) -> ContextManager[Topic]: @@ -421,7 +422,7 @@ def test_pause_resume_rebalancing(self) -> None: def wait_until_rebalancing( from_consumer: Consumer[Any], to_consumer: Consumer[Any] ) -> None: - for _ in range(10): + for _ in range(20): assert from_consumer.poll(0) is None if to_consumer.poll(1.0) is not None: return @@ -453,9 +454,10 @@ def wait_until_rebalancing( wait_until_rebalancing(consumer_a, consumer_b) - if self.cooperative_sticky: + if self.cooperative_sticky or self.kip_848: # within incremental rebalancing, only one partition should have been reassigned to the consumer_b, and consumer_a should remain paused - assert consumer_a.paused() == [Partition(topic, 1)] + # Either partition 0 or 1 might be the paused one + assert len(consumer_a.paused()) == 1 assert consumer_a.poll(10.0) is None else: # The first consumer should have had its offsets rolled back, as @@ -481,20 +483,28 @@ def wait_until_rebalancing( assert len(consumer_b.tell()) == 2 - if self.cooperative_sticky: + if self.cooperative_sticky or self.kip_848: + consumer_a_on_assign.assert_has_calls( + [ + mock.call({Partition(topic, 0): 0, Partition(topic, 1): 0}), + ] + ) - assert consumer_a_on_assign.mock_calls == [ - mock.call({Partition(topic, 0): 0, Partition(topic, 1): 0}), - ] - assert consumer_a_on_revoke.mock_calls == [ - mock.call([Partition(topic, 0)]), - mock.call([Partition(topic, 1)]), - ] + consumer_a_on_revoke.assert_has_calls( + [ + mock.call([Partition(topic, 0)]), + mock.call([Partition(topic, 1)]), + ], + any_order=True, + ) - assert consumer_b_on_assign.mock_calls == [ - mock.call({Partition(topic, 0): 0}), - mock.call({Partition(topic, 1): 0}), - ] + consumer_b_on_assign.assert_has_calls( + [ + mock.call({Partition(topic, 0): 0}), + mock.call({Partition(topic, 1): 0}), + ], + any_order=True, + ) assert consumer_b_on_revoke.mock_calls == [] else: assert consumer_a_on_assign.mock_calls == [ diff --git a/tests/backends/test_kafka.py b/tests/backends/test_kafka.py index 17364f86..6a21c872 100644 --- a/tests/backends/test_kafka.py +++ b/tests/backends/test_kafka.py @@ -271,10 +271,13 @@ class TestKafkaStreamsIncrementalRebalancing(TestKafkaStreams): cooperative_sticky = True -@pytest.mark.skip("kip-848 not functional yet") class TestKafkaStreamsKip848(TestKafkaStreams): kip_848 = True + @pytest.mark.xfail(reason="To be fixed") + def test_pause_resume_rebalancing(self) -> None: + super().test_pause_resume_rebalancing() + def test_commit_codec() -> None: commit = Commit( diff --git a/tests/test_kip848_e2e.py b/tests/test_kip848_e2e.py new file mode 100644 index 00000000..2cc4a5dc --- /dev/null +++ b/tests/test_kip848_e2e.py @@ -0,0 +1,114 @@ +from typing import Any + +import time +import contextlib +from contextlib import closing +import os +import threading +import logging +from typing import Iterator, Mapping + +from confluent_kafka.admin import AdminClient, NewTopic +from arroyo.types import Commit, Message, Partition, Topic +from arroyo.backends.kafka.configuration import build_kafka_consumer_configuration +from arroyo.backends.kafka.consumer import KafkaConsumer, KafkaPayload +from arroyo.processing.strategies import RunTask, CommitOffsets, ProcessingStrategy +from arroyo.processing.strategies.abstract import ProcessingStrategyFactory +from arroyo.processing.processor import StreamProcessor +from arroyo.backends.kafka import KafkaProducer + +logging.basicConfig(level=logging.INFO) + +TOPIC = "test-kip848" + + +@contextlib.contextmanager +def get_topic( + configuration: Mapping[str, Any], partitions_count: int +) -> Iterator[Topic]: + name = TOPIC + configuration = dict(configuration) + client = AdminClient(configuration) + [[key, future]] = client.create_topics( + [NewTopic(name, num_partitions=partitions_count, replication_factor=1)] + ).items() + assert key == name + assert future.result() is None + try: + yield Topic(name) + finally: + [[key, future]] = client.delete_topics([name]).items() + assert key == name + assert future.result() is None + + +def test_kip848_e2e() -> None: + counter = 0 + + def print_msg(message: Message[Any]) -> Message[Any]: + nonlocal counter + ((partition, offset),) = message.committable.items() + print(f"message: {partition.index}-{offset}") + counter += 1 + return message + + class Strat(RunTask[Any, Any]): + def join(self, *args: Any, **kwargs: Any) -> None: + print("joining strategy, sleeping 5 seconds") + time.sleep(5) + print("joining strategy, sleeping 5 seconds -- DONE") + return super().join(*args, **kwargs) + + class Factory(ProcessingStrategyFactory[KafkaPayload]): + def create_with_partitions( + self, commit: Commit, partitions: Mapping[Partition, int] + ) -> ProcessingStrategy[KafkaPayload]: + print("assign: ", [p.index for p in partitions]) + return Strat(print_msg, CommitOffsets(commit)) + + default_config = { + "bootstrap.servers": os.environ.get("DEFAULT_BROKERS", "localhost:9092") + } + + with get_topic(default_config, 2) as topic: + producer = KafkaProducer(default_config) + + with closing(producer): + for i in range(30): + message = KafkaPayload(None, i.to_bytes(1, "big"), []) + producer.produce(topic, message).result() + + consumer_config = build_kafka_consumer_configuration( + default_config, + group_id="kip848", + ) + + consumer_config["group.protocol"] = "consumer" + consumer_config.pop("session.timeout.ms", None) + consumer_config.pop("max.poll.interval.ms", None) + consumer_config.pop("partition.assignment.strategy", None) + consumer_config.pop("group.protocol.type", None) + consumer_config.pop("heartbeat.interval.ms", None) + + consumer = KafkaConsumer(consumer_config) + + processor = StreamProcessor( + consumer=consumer, topic=Topic(TOPIC), processor_factory=Factory() + ) + + def shutdown() -> None: + for i in range(100): + time.sleep(0.1) + if counter == 30: + break + print("shutting down") + processor.signal_shutdown() + + t = threading.Thread(target=shutdown) + t.start() + + processor.run() + + assert counter == 30 + + t.join()