From 82635efe18707bd8e6963b210dc070aeec330a5d Mon Sep 17 00:00:00 2001 From: Raynor Chavez Date: Tue, 1 Oct 2024 08:40:39 +0800 Subject: [PATCH 1/7] initial --- .../large_scale_performance_tests.yml | 137 ++++++++++ .gitignore | 7 +- perf_tests/analyze_telemetry.py | 60 +++++ .../large_scale_performance_locustfile.py | 250 ++++++++++++++++++ 4 files changed, 453 insertions(+), 1 deletion(-) create mode 100644 .github/workflows/large_scale_performance_tests.yml create mode 100644 perf_tests/analyze_telemetry.py create mode 100644 perf_tests/large_scale_performance_locustfile.py diff --git a/.github/workflows/large_scale_performance_tests.yml b/.github/workflows/large_scale_performance_tests.yml new file mode 100644 index 000000000..97c716871 --- /dev/null +++ b/.github/workflows/large_scale_performance_tests.yml @@ -0,0 +1,137 @@ +name: Large Scale Performance Tests + +on: + workflow_dispatch: + inputs: + test_suite: + type: choice + required: true + options: + - large_scale_performance_locustfile.py + description: > + Location of the Locust file that contains the test suite. + + number_of_users: + type: string + required: true + default: "100" + description: > + Total number of simulated users. + + user_spawn_rate: + type: string + required: true + default: "10" + description: > + Users spawned per second. + + duration: + type: string + required: true + default: "1h" + description: > + Duration of the test (e.g., 1h, 24h, 72h). + + marqo_host: + type: string + required: true + default: "https://your-marqo-instance.com" + description: > + Marqo host to run performance test on. + + index_name: + type: string + required: true + default: "locust-test" + description: > + Index name to test. + + marqo_cloud_api_key: + type: string + required: true + description: > + Marqo Cloud API key to use for the test. + +jobs: + Extract-Data: + name: Extract Required Data + runs-on: ubuntu-latest + + steps: + - name: Checkout Repository + uses: actions/checkout@v3 + + - name: Set up Python 3.8 + uses: actions/setup-python@v4 + with: + python-version: "3.8" + + - name: Install Dependencies + run: | + pip install pandas + + - name: Extract Required Data + run: | + python perf_tests/extract_required_data.py + + - name: Upload Extracted Data as Artifact + uses: actions/upload-artifact@v3 + with: + name: extracted-data + path: perf_tests/data/extracted_*.csv + + Run-Performance-Test: + name: Run Large Scale Performance Tests + needs: Extract-Data + runs-on: ubuntu-latest + + steps: + - name: Checkout Repository + uses: actions/checkout@v3 + + - name: Download Extracted Data + uses: actions/download-artifact@v3 + with: + name: extracted-data + path: perf_tests/data/ + + - name: Set up Python 3.8 + uses: actions/setup-python@v4 + with: + python-version: "3.8" + + - name: Install Dependencies + run: | + pip install marqo locust numpy pandas + + - name: Prepare Test Environment + run: | + mkdir -p perf_tests/data + cp perf_tests/large_scale_performance_locustfile.py perf_tests/ + cp perf_tests/bquxjob_5a025798_19241877e37.csv perf_tests/ + # Extracted data is already downloaded to perf_tests/data/ + + - name: Run Locust Test + working-directory: perf_tests + run: | + locust -f ${{ github.event.inputs.test_suite }} \ + -u ${{ github.event.inputs.number_of_users }} \ + -r ${{ github.event.inputs.user_spawn_rate }} \ + --run-time ${{ github.event.inputs.duration }} \ + --headless \ + --host ${{ github.event.inputs.marqo_host }} + env: + MARQO_INDEX_NAME: ${{ github.event.inputs.index_name }} + MARQO_CLOUD_API_KEY: ${{ github.event.inputs.marqo_cloud_api_key }} + + - name: Upload Telemetry Data + uses: actions/upload-artifact@v3 + with: + name: telemetry-data + path: perf_tests/telemetry_data_*.json + + - name: Upload Telemetry Summaries + uses: actions/upload-artifact@v3 + with: + name: telemetry-summaries + path: perf_tests/telemetry_summary_*.json diff --git a/.gitignore b/.gitignore index f1e4c26a1..906f848f1 100644 --- a/.gitignore +++ b/.gitignore @@ -150,4 +150,9 @@ dump.rdb .DS_Store # Tester app for unit tests -scripts/vespa_local/vespa_tester_app.zip \ No newline at end of file +scripts/vespa_local/vespa_tester_app.zip + +# Large scale performance testing +/perf_tests/*.csv +/perf_tests/*/*.csv +/perf_tests/*.json diff --git a/perf_tests/analyze_telemetry.py b/perf_tests/analyze_telemetry.py new file mode 100644 index 000000000..7a93e22aa --- /dev/null +++ b/perf_tests/analyze_telemetry.py @@ -0,0 +1,60 @@ +import json +import glob +import numpy as np +import pandas as pd +import os +from datetime import datetime + +def load_telemetry_data(pattern='telemetry_data_*.json'): + telemetry_files = glob.glob(pattern) + all_data = [] + + for file in telemetry_files: + with open(file, 'r') as f: + data = json.load(f) + all_data.extend(data) + + return all_data + +def analyze_telemetry(all_data): + if not all_data: + print("No telemetry data to analyze.") + return + + # Convert to DataFrame + df = pd.DataFrame(all_data) + + # Ensure 'total_time_ms' is numeric + df['total_time_ms'] = pd.to_numeric(df['total_time_ms'], errors='coerce') + + # Calculate metrics + avg_time = df['total_time_ms'].mean() + median_time = df['total_time_ms'].median() + p95_time = df['total_time_ms'].quantile(0.95) + total_requests = len(df) + successful_requests = df['total_time_ms'].notna().sum() + error_rate = ((total_requests - successful_requests) / total_requests) * 100 if total_requests > 0 else 0.0 + + summary = { + 'Average Response Time (ms)': avg_time, + 'Median Response Time (ms)': median_time, + '95th Percentile Response Time (ms)': p95_time, + 'Total Requests': total_requests, + 'Successful Requests': successful_requests, + 'Error Rate (%)': error_rate, + } + + # Save summary to JSON + timestamp = datetime.now().strftime('%Y%m%d%H%M%S') + summary_filename = f"telemetry_summary_{timestamp}.json" + with open(summary_filename, 'w') as f: + json.dump(summary, f, indent=4) + + # Print summary + print("Telemetry Summary:") + for key, value in summary.items(): + print(f"{key}: {value}") + +if __name__ == "__main__": + all_data = load_telemetry_data() + analyze_telemetry(all_data) diff --git a/perf_tests/large_scale_performance_locustfile.py b/perf_tests/large_scale_performance_locustfile.py new file mode 100644 index 000000000..ca7927328 --- /dev/null +++ b/perf_tests/large_scale_performance_locustfile.py @@ -0,0 +1,250 @@ +from __future__ import annotations + +import random +import os +import csv +import time +import json +from datetime import datetime + +from locust import events, task, between, FastHttpUser, LoadTestShape +from locust.env import Environment +import marqo +import numpy as np +import pandas as pd + +# Load queries from the CSV file +def load_queries(csv_path): + queries = [] + df = pd.read_csv(csv_path) + for _, row in df.iterrows(): + queries.append({ + 'search_keywords': row['search_keywords'], + 'search_category': row['search_category'] + }) + return queries + +# Load necessary data from extracted CSV files +def load_data(data_dir): + data = {} + + # Load available product codes + data['available_product_codes'] = set() + available_product_codes_csv = os.path.join(data_dir, 'extracted_available_product_codes.csv') + df = pd.read_csv(available_product_codes_csv) + data['available_product_codes'] = set(df['available_ia_code'].dropna().unique()) + + # Load ia_codes + data['ia_codes'] = set() + ia_codes_csv = os.path.join(data_dir, 'extracted_ia_codes.csv') + df = pd.read_csv(ia_codes_csv) + data['ia_codes'] = set(df['ia_code'].dropna().unique()) + + # Load ro_queries + data['ro_queries'] = set() + ro_queries_csv = os.path.join(data_dir, 'extracted_ro_queries.csv') + df = pd.read_csv(ro_queries_csv) + data['ro_queries'] = set(df['query'].dropna().unique()) + + # Load truncated_tags + data['truncated_tags'] = set() + truncated_tags_csv = os.path.join(data_dir, 'extracted_truncated_tags.csv') + df = pd.read_csv(truncated_tags_csv) + data['truncated_tags'] = set(df['truncated_tags'].dropna().unique()) + + return data + +# Paths +DATA_DIR = os.path.join(os.path.dirname(__file__), 'data') +QUERIES_CSV = os.path.join(os.path.dirname(__file__), 'bquxjob_5a025798_19241877e37.csv') + +# Load data +DATA = load_data(DATA_DIR) + +# Load queries +QUERIES = load_queries(QUERIES_CSV) +NUM_QUERIES = len(QUERIES) + +# Regions and their probabilities +REGIONS = ['US', 'AU', 'CA', 'GB', 'DE', 'FR', 'JP', 'ALL_WORLD'] +REGION_PROBABILITIES = [0.5, 0.15, 0.05, 0.05, 0.02, 0.02, 0.01, 0.2] + +# Index name from environment variable +INDEX_NAME = os.getenv('MARQO_INDEX_NAME', 'locust-test') + +class MarqoUser(FastHttpUser): + wait_time = between(0.5, 2) # Simulate user think time + client = None + telemetry_data = [] + + def on_start(self): + host = self.environment.host + api_key = os.getenv('MARQO_CLOUD_API_KEY', None) + if api_key: + self.client = marqo.Client(url=host, api_key=api_key, return_telemetry=True) + else: + self.client = marqo.Client(url=host, return_telemetry=True) + + @task + def perform_search(self): + # Randomly select a query + query_info = random.choice(QUERIES) + search_keywords = query_info['search_keywords'] + search_category = query_info['search_category'] + + # Randomly select a region based on probabilities + region = random.choices(REGIONS, weights=REGION_PROBABILITIES, k=1)[0] + + # Determine if the query is 'Topic Only' or 'Topic+product' + if search_category.strip().lower() == 'all-departments': + # Topic Only + q = search_keywords + # Randomly select an ia_code for the modifiers + ia_code = random.choice(list(DATA['ia_codes'])) + score_modifiers = { + "add_to_score": [ + {"field_name": f"artist_sales_scores.{ia_code}", "weight": random.uniform(0.0001, 0.005)}, + {"field_name": f"recent_sales_scores.{ia_code}+{region}", "weight": random.uniform(0.0001, 0.005)}, + {"field_name": f"recent_sales_scores.{ia_code}+ALL_WORLD", "weight": random.uniform(0.0001, 0.005)}, + {"field_name": f"all_time_sales_scores.{ia_code}+ALL_WORLD", "weight": random.uniform(0.0001, 0.005)}, + {"field_name": f"ro_scores.{search_keywords}+{ia_code}", "weight": random.uniform(0.0001, 0.005)}, + ] + } + # No filters + filter_string = None + else: + # Topic+product + q = f"{search_category}: {search_keywords}" + # Use the search_category as the ia_code if available, else random + ia_code = search_category if search_category in DATA['available_product_codes'] else random.choice(list(DATA['ia_codes'])) + score_modifiers = { + "add_to_score": [ + {"field_name": f"artist_sales_scores.{ia_code}", "weight": random.uniform(0.0001, 0.005)}, + {"field_name": f"recent_sales_scores.{ia_code}+{region}", "weight": random.uniform(0.0001, 0.005)}, + {"field_name": f"recent_sales_scores.{ia_code}+ALL_WORLD", "weight": random.uniform(0.0001, 0.005)}, + {"field_name": f"all_time_sales_scores.{ia_code}+ALL_WORLD", "weight": random.uniform(0.0001, 0.005)}, + {"field_name": f"ro_scores.{search_keywords}+{ia_code}", "weight": random.uniform(0.0001, 0.005)}, + ] + } + # Add filter on available_product_codes + filter_string = f"available_product_codes:{ia_code}" + + # Construct the search parameters + search_params = { + 'q': q, + 'limit': 20, + 'search_method': 'HYBRID', + 'hybrid_parameters': { + "retrievalMethod": "disjunction", + "rankingMethod": "rrf", + "alpha": 0.3, + "rrfK": 60, + "searchableAttributesLexical": ["tags"], + "scoreModifiersTensor": score_modifiers, + "scoreModifiersLexical": { + "add_to_score": [ + {"field_name": f"artist_sales_scores.{ia_code}", "weight": random.uniform(0.1, 5)}, + {"field_name": f"recent_sales_scores.{ia_code}+{region}", "weight": random.uniform(0.1, 5)}, + {"field_name": f"recent_sales_scores.{ia_code}+ALL_WORLD", "weight": random.uniform(0.1, 5)}, + {"field_name": f"all_time_sales_scores.{ia_code}+ALL_WORLD", "weight": random.uniform(0.1, 5)}, + {"field_name": f"ro_scores.{search_keywords}+{ia_code}", "weight": random.uniform(1, 300)}, + ] + }, + }, + 'attributes_to_retrieve': ['work_id', 'tags', 'available_product_codes'], + } + if filter_string: + search_params['filter_string'] = filter_string + + # Perform the search and capture telemetry data + start_time = time.time() + try: + response = self.client.index(INDEX_NAME).search(**search_params) + # Print the response for debugging + #print("DEBUG: Search Response") + #print(json.dumps(response, indent=2)) + total_time = (time.time() - start_time) * 1000 # in ms + # Extract telemetry data + telemetry = response.get('telemetry', {}) + telemetry['total_time_ms'] = total_time + self.telemetry_data.append(telemetry) + # Record success + self.environment.events.request.fire( + request_type='SEARCH', + name='perform_search', + response_time=total_time, + response_length=len(json.dumps(response)), + exception=None, + context={}, + ) + except Exception as e: + print("DEBUG: Search Exception") + print(str(e)) + total_time = (time.time() - start_time) * 1000 # in ms + # Record failure + self.environment.events.request.fire( + request_type='SEARCH', + name='perform_search', + response_time=total_time, + response_length=0, + exception=e, + context={}, + ) + + def on_stop(self): + # Save telemetry data to a file + timestamp = datetime.now().strftime('%Y%m%d%H%M%S') + telemetry_filename = f"telemetry_data_{timestamp}.json" + summary_filename = f"telemetry_summary_{timestamp}.json" + with open(telemetry_filename, 'w') as f: + json.dump(self.telemetry_data, f) + + # Process telemetry data and output summary + if self.telemetry_data: + total_times = [t.get('total_time_ms', 0) for t in self.telemetry_data] + # Calculate average, median, percentiles, etc. + avg_time = np.mean(total_times) + median_time = np.median(total_times) + p95_time = np.percentile(total_times, 95) + # Calculate error rate + total_requests = len(total_times) + successful_requests = sum(1 for t in self.telemetry_data if 'total_time_ms' in t) + error_rate = ((total_requests - successful_requests) / total_requests) * 100 if total_requests > 0 else 0.0 + # Save summary + summary = { + 'avg_response_time_ms': avg_time, + 'median_response_time_ms': median_time, + '95th_percentile_response_time_ms': p95_time, + 'total_requests': total_requests, + 'successful_requests': successful_requests, + 'error_rate_percent': error_rate, + } + with open(summary_filename, 'w') as f: + json.dump(summary, f) + +# Event listener to ensure telemetry data is saved even if the test stops prematurely +@events.quitting.add_listener +def save_telemetry_on_quit(environment, **kw): + for user in environment.runner.user_classes: + if hasattr(user, 'telemetry_data') and user.telemetry_data: + timestamp = datetime.now().strftime('%Y%m%d%H%M%S') + telemetry_filename = f"telemetry_data_{timestamp}.json" + summary_filename = f"telemetry_summary_{timestamp}.json" + with open(telemetry_filename, 'w') as f: + json.dump(user.telemetry_data, f) + # Optionally, add summary processing here if needed + +# Optionally, define a LoadTestShape to simulate burst traffic patterns +class BurstLoadShape(LoadTestShape): + stages = [ + {"duration": 300, "users": 100, "spawn_rate": 10}, # First 5 minutes: ramp up to 100 users + {"duration": 600, "users": 500, "spawn_rate": 50}, # Next 10 minutes: ramp up to 500 users + {"duration": 900, "users": 100, "spawn_rate": 50}, # Next 15 minutes: ramp down to 100 users + ] + + def tick(self): + run_time = self.get_run_time() + for stage in self.stages: + if run_time < stage["duration"]: + return (stage["users"], stage["spawn_rate"]) + return None From 76fde3ced7c1a21b353e4ace9c625552351337f9 Mon Sep 17 00:00:00 2001 From: Raynor Chavez Date: Tue, 1 Oct 2024 09:39:31 +0800 Subject: [PATCH 2/7] change all_world --- perf_tests/large_scale_performance_locustfile.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/perf_tests/large_scale_performance_locustfile.py b/perf_tests/large_scale_performance_locustfile.py index ca7927328..9955a6600 100644 --- a/perf_tests/large_scale_performance_locustfile.py +++ b/perf_tests/large_scale_performance_locustfile.py @@ -66,8 +66,8 @@ def load_data(data_dir): NUM_QUERIES = len(QUERIES) # Regions and their probabilities -REGIONS = ['US', 'AU', 'CA', 'GB', 'DE', 'FR', 'JP', 'ALL_WORLD'] -REGION_PROBABILITIES = [0.5, 0.15, 0.05, 0.05, 0.02, 0.02, 0.01, 0.2] +REGIONS = ['US', 'AU', 'CA', 'GB', 'DE', 'FR', 'JP'] +REGION_PROBABILITIES = [0.7, 0.15, 0.05, 0.05, 0.02, 0.02, 0.01] # Index name from environment variable INDEX_NAME = os.getenv('MARQO_INDEX_NAME', 'locust-test') From 170e37e088c2a8abf738465756a452203aec33e5 Mon Sep 17 00:00:00 2001 From: Raynor Chavez Date: Tue, 1 Oct 2024 09:47:18 +0800 Subject: [PATCH 3/7] change burst stages --- perf_tests/large_scale_performance_locustfile.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/perf_tests/large_scale_performance_locustfile.py b/perf_tests/large_scale_performance_locustfile.py index 9955a6600..824241c79 100644 --- a/perf_tests/large_scale_performance_locustfile.py +++ b/perf_tests/large_scale_performance_locustfile.py @@ -237,8 +237,12 @@ def save_telemetry_on_quit(environment, **kw): # Optionally, define a LoadTestShape to simulate burst traffic patterns class BurstLoadShape(LoadTestShape): stages = [ - {"duration": 300, "users": 100, "spawn_rate": 10}, # First 5 minutes: ramp up to 100 users - {"duration": 600, "users": 500, "spawn_rate": 50}, # Next 10 minutes: ramp up to 500 users + {"duration": 300, "users": 20, "spawn_rate": 2}, # First 5 minutes: ramp up to 20 users + {"duration": 300, "users": 50, "spawn_rate": 5}, # Next 5 minutes: ramp up to 50 users + {"duration": 300, "users": 100, "spawn_rate": 10}, # Next 5 minutes: ramp up to 100 users + {"duration": 600, "users": 300, "spawn_rate": 30}, # Next 10 minutes: ramp up to 300 users + {"duration": 300, "users": 500, "spawn_rate": 50}, # Next 5 minutes: ramp up to 500 users + {"duration": 600, "users": 300, "spawn_rate": 30}, # Next 10 minutes: ramp down to 300 users {"duration": 900, "users": 100, "spawn_rate": 50}, # Next 15 minutes: ramp down to 100 users ] From 3eb5f26b85e1e1f468a8b1b95a6c877bfb150385 Mon Sep 17 00:00:00 2001 From: Raynor Chavez Date: Tue, 1 Oct 2024 13:18:07 +0800 Subject: [PATCH 4/7] add retries --- .../large_scale_performance_locustfile.py | 295 +++++++++++------- 1 file changed, 180 insertions(+), 115 deletions(-) diff --git a/perf_tests/large_scale_performance_locustfile.py b/perf_tests/large_scale_performance_locustfile.py index 824241c79..dc6705349 100644 --- a/perf_tests/large_scale_performance_locustfile.py +++ b/perf_tests/large_scale_performance_locustfile.py @@ -10,8 +10,14 @@ from locust import events, task, between, FastHttpUser, LoadTestShape from locust.env import Environment import marqo +import marqo.errors import numpy as np import pandas as pd +import sys +import argparse +import os +import re + # Load queries from the CSV file def load_queries(csv_path): @@ -73,7 +79,7 @@ def load_data(data_dir): INDEX_NAME = os.getenv('MARQO_INDEX_NAME', 'locust-test') class MarqoUser(FastHttpUser): - wait_time = between(0.5, 2) # Simulate user think time + wait_time = between(1, 5) # Simulate user think time client = None telemetry_data = [] @@ -87,109 +93,128 @@ def on_start(self): @task def perform_search(self): - # Randomly select a query - query_info = random.choice(QUERIES) - search_keywords = query_info['search_keywords'] - search_category = query_info['search_category'] - - # Randomly select a region based on probabilities - region = random.choices(REGIONS, weights=REGION_PROBABILITIES, k=1)[0] - - # Determine if the query is 'Topic Only' or 'Topic+product' - if search_category.strip().lower() == 'all-departments': - # Topic Only - q = search_keywords - # Randomly select an ia_code for the modifiers - ia_code = random.choice(list(DATA['ia_codes'])) - score_modifiers = { - "add_to_score": [ - {"field_name": f"artist_sales_scores.{ia_code}", "weight": random.uniform(0.0001, 0.005)}, - {"field_name": f"recent_sales_scores.{ia_code}+{region}", "weight": random.uniform(0.0001, 0.005)}, - {"field_name": f"recent_sales_scores.{ia_code}+ALL_WORLD", "weight": random.uniform(0.0001, 0.005)}, - {"field_name": f"all_time_sales_scores.{ia_code}+ALL_WORLD", "weight": random.uniform(0.0001, 0.005)}, - {"field_name": f"ro_scores.{search_keywords}+{ia_code}", "weight": random.uniform(0.0001, 0.005)}, - ] - } - # No filters - filter_string = None - else: - # Topic+product - q = f"{search_category}: {search_keywords}" - # Use the search_category as the ia_code if available, else random - ia_code = search_category if search_category in DATA['available_product_codes'] else random.choice(list(DATA['ia_codes'])) - score_modifiers = { - "add_to_score": [ - {"field_name": f"artist_sales_scores.{ia_code}", "weight": random.uniform(0.0001, 0.005)}, - {"field_name": f"recent_sales_scores.{ia_code}+{region}", "weight": random.uniform(0.0001, 0.005)}, - {"field_name": f"recent_sales_scores.{ia_code}+ALL_WORLD", "weight": random.uniform(0.0001, 0.005)}, - {"field_name": f"all_time_sales_scores.{ia_code}+ALL_WORLD", "weight": random.uniform(0.0001, 0.005)}, - {"field_name": f"ro_scores.{search_keywords}+{ia_code}", "weight": random.uniform(0.0001, 0.005)}, - ] - } - # Add filter on available_product_codes - filter_string = f"available_product_codes:{ia_code}" - - # Construct the search parameters - search_params = { - 'q': q, - 'limit': 20, - 'search_method': 'HYBRID', - 'hybrid_parameters': { - "retrievalMethod": "disjunction", - "rankingMethod": "rrf", - "alpha": 0.3, - "rrfK": 60, - "searchableAttributesLexical": ["tags"], - "scoreModifiersTensor": score_modifiers, - "scoreModifiersLexical": { + max_retries = 3 + retry_delay = 1 # Start with 1 second + attempt = 0 + while attempt < max_retries: + attempt += 1 + # Randomly select a query + query_info = random.choice(QUERIES) + search_keywords = query_info['search_keywords'] + search_category = query_info['search_category'] + + # Randomly select a region based on probabilities + region = random.choices(REGIONS, weights=REGION_PROBABILITIES, k=1)[0] + + # Determine if the query is 'Topic Only' or 'Topic+product' + if search_category.strip().lower() == 'all-departments': + # Topic Only + q = search_keywords + # Randomly select an ia_code for the modifiers + ia_code = random.choice(list(DATA['ia_codes'])) + score_modifiers = { + "add_to_score": [ + {"field_name": f"artist_sales_scores.{ia_code}", "weight": random.uniform(0.0001, 0.005)}, + {"field_name": f"recent_sales_scores.{ia_code}+{region}", "weight": random.uniform(0.0001, 0.005)}, + {"field_name": f"recent_sales_scores.{ia_code}+ALL_WORLD", "weight": random.uniform(0.0001, 0.005)}, + {"field_name": f"all_time_sales_scores.{ia_code}+ALL_WORLD", "weight": random.uniform(0.0001, 0.005)}, + {"field_name": f"ro_scores.{search_keywords}+{ia_code}", "weight": random.uniform(0.0001, 0.005)}, + ] + } + # No filters + filter_string = None + else: + # Topic+product + q = f"{search_category}: {search_keywords}" + # Use the search_category as the ia_code if available, else random + ia_code = search_category if search_category in DATA['available_product_codes'] else random.choice(list(DATA['ia_codes'])) + score_modifiers = { "add_to_score": [ - {"field_name": f"artist_sales_scores.{ia_code}", "weight": random.uniform(0.1, 5)}, - {"field_name": f"recent_sales_scores.{ia_code}+{region}", "weight": random.uniform(0.1, 5)}, - {"field_name": f"recent_sales_scores.{ia_code}+ALL_WORLD", "weight": random.uniform(0.1, 5)}, - {"field_name": f"all_time_sales_scores.{ia_code}+ALL_WORLD", "weight": random.uniform(0.1, 5)}, - {"field_name": f"ro_scores.{search_keywords}+{ia_code}", "weight": random.uniform(1, 300)}, + {"field_name": f"artist_sales_scores.{ia_code}", "weight": random.uniform(0.0001, 0.005)}, + {"field_name": f"recent_sales_scores.{ia_code}+{region}", "weight": random.uniform(0.0001, 0.005)}, + {"field_name": f"recent_sales_scores.{ia_code}+ALL_WORLD", "weight": random.uniform(0.0001, 0.005)}, + {"field_name": f"all_time_sales_scores.{ia_code}+ALL_WORLD", "weight": random.uniform(0.0001, 0.005)}, + {"field_name": f"ro_scores.{search_keywords}+{ia_code}", "weight": random.uniform(0.0001, 0.005)}, ] + } + # Add filter on available_product_codes + filter_string = f"available_product_codes:{ia_code}" + + # Construct the search parameters + search_params = { + 'q': q, + 'limit': 20, + 'search_method': 'HYBRID', + 'hybrid_parameters': { + "retrievalMethod": "disjunction", + "rankingMethod": "rrf", + "alpha": 0.3, + "rrfK": 60, + "searchableAttributesLexical": ["tags"], + "scoreModifiersTensor": score_modifiers, + "scoreModifiersLexical": { + "add_to_score": [ + {"field_name": f"artist_sales_scores.{ia_code}", "weight": random.uniform(0.1, 5)}, + {"field_name": f"recent_sales_scores.{ia_code}+{region}", "weight": random.uniform(0.1, 5)}, + {"field_name": f"recent_sales_scores.{ia_code}+ALL_WORLD", "weight": random.uniform(0.1, 5)}, + {"field_name": f"all_time_sales_scores.{ia_code}+ALL_WORLD", "weight": random.uniform(0.1, 5)}, + {"field_name": f"ro_scores.{search_keywords}+{ia_code}", "weight": random.uniform(1, 300)}, + ] + }, }, - }, - 'attributes_to_retrieve': ['work_id', 'tags', 'available_product_codes'], - } - if filter_string: - search_params['filter_string'] = filter_string - - # Perform the search and capture telemetry data - start_time = time.time() - try: - response = self.client.index(INDEX_NAME).search(**search_params) - # Print the response for debugging - #print("DEBUG: Search Response") - #print(json.dumps(response, indent=2)) - total_time = (time.time() - start_time) * 1000 # in ms - # Extract telemetry data - telemetry = response.get('telemetry', {}) - telemetry['total_time_ms'] = total_time - self.telemetry_data.append(telemetry) - # Record success - self.environment.events.request.fire( - request_type='SEARCH', - name='perform_search', - response_time=total_time, - response_length=len(json.dumps(response)), - exception=None, - context={}, - ) - except Exception as e: - print("DEBUG: Search Exception") - print(str(e)) - total_time = (time.time() - start_time) * 1000 # in ms - # Record failure - self.environment.events.request.fire( - request_type='SEARCH', - name='perform_search', - response_time=total_time, - response_length=0, - exception=e, - context={}, - ) + 'attributes_to_retrieve': ['work_id', 'tags', 'available_product_codes'], + } + if filter_string: + search_params['filter_string'] = filter_string + + # Perform the search and capture telemetry data + start_time = time.time() + try: + response = self.client.index(INDEX_NAME).search(**search_params) + total_time = (time.time() - start_time) * 1000 # in ms + # Extract telemetry data + telemetry = response.get('telemetry', {}) + telemetry['total_time_ms'] = total_time + self.telemetry_data.append(telemetry) + # Record success + self.environment.events.request.fire( + request_type='SEARCH', + name='perform_search', + response_time=total_time, + response_length=len(json.dumps(response)), + exception=None, + context={}, + ) + break # Break out of the retry loop if successful + except marqo.errors.MarqoWebError as e: + if e.code == 'too_many_requests': + # Implement exponential backoff + time.sleep(retry_delay) + retry_delay *= 2 # Exponential backoff + else: + total_time = (time.time() - start_time) * 1000 # in ms + # Record failure + self.environment.events.request.fire( + request_type='SEARCH', + name='perform_search', + response_time=total_time, + response_length=0, + exception=e, + context={}, + ) + break + except Exception as e: + total_time = (time.time() - start_time) * 1000 # in ms + # Record failure + self.environment.events.request.fire( + request_type='SEARCH', + name='perform_search', + response_time=total_time, + response_length=0, + exception=e, + context={}, + ) + break def on_stop(self): # Save telemetry data to a file @@ -234,21 +259,61 @@ def save_telemetry_on_quit(environment, **kw): json.dump(user.telemetry_data, f) # Optionally, add summary processing here if needed -# Optionally, define a LoadTestShape to simulate burst traffic patterns + class BurstLoadShape(LoadTestShape): - stages = [ - {"duration": 300, "users": 20, "spawn_rate": 2}, # First 5 minutes: ramp up to 20 users - {"duration": 300, "users": 50, "spawn_rate": 5}, # Next 5 minutes: ramp up to 50 users - {"duration": 300, "users": 100, "spawn_rate": 10}, # Next 5 minutes: ramp up to 100 users - {"duration": 600, "users": 300, "spawn_rate": 30}, # Next 10 minutes: ramp up to 300 users - {"duration": 300, "users": 500, "spawn_rate": 50}, # Next 5 minutes: ramp up to 500 users - {"duration": 600, "users": 300, "spawn_rate": 30}, # Next 10 minutes: ramp down to 300 users - {"duration": 900, "users": 100, "spawn_rate": 50}, # Next 15 minutes: ramp down to 100 users - ] + def __init__(self): + super().__init__() + self.stages = [ + {"duration": 120, "users": 10, "spawn_rate": 1}, # First 2 minutes: ramp up to 10 users + {"duration": 300, "users": 55, "spawn_rate": 2}, # Next 5 minutes: ramp up to 55 users + {"duration": 300, "users": 100, "spawn_rate": 5}, # Next 5 minutes: ramp up to 100 users + {"duration": 600, "users": 250, "spawn_rate": 7}, # Next 10 minutes: ramp up to 250 users + {"duration": 600, "users": 350, "spawn_rate": 10}, # Next 10 minutes: ramp up to 350 users + {"duration": 600, "users": 250, "spawn_rate": 7}, # Next 10 minutes: ramp down to 250 users + {"duration": 900, "users": 50, "spawn_rate": 5}, # Next 15 minutes: ramp down to 50 users + ] + self.stage_index = 0 + self.stage_start_time = 0 + + # Parse the -t flag to get the maximum duration + parser = argparse.ArgumentParser() + parser.add_argument("-t", "--run-time", type=str, help="Stop after given time. e.g. 72h") + args, _ = parser.parse_known_args() + self.max_duration = self.parse_run_time(args.run_time) if args.run_time else sum(stage["duration"] for stage in self.stages) + + def parse_run_time(self, run_time_str): + """ + Parse the run-time string from the -t flag and convert it to seconds. + Supports formats like '72h', '30m', '1h30m', etc. + """ + pattern = re.compile(r'^(?:(?P\d+)h)?(?:(?P\d+)m)?(?:(?P\d+)s)?$') + match = pattern.fullmatch(run_time_str.strip()) + if not match: + raise ValueError(f"Invalid run-time format: {run_time_str}") + + time_params = {name: int(value) if value else 0 for name, value in match.groupdict().items()} + total_seconds = time_params['hours'] * 3600 + time_params['minutes'] * 60 + time_params['seconds'] + if total_seconds == 0: + raise ValueError("Run-time must be greater than 0 seconds.") + return total_seconds def tick(self): run_time = self.get_run_time() - for stage in self.stages: - if run_time < stage["duration"]: - return (stage["users"], stage["spawn_rate"]) - return None + if run_time >= self.max_duration: + return None # Stop the test + + # Initialize stage start time + if self.stage_start_time == 0: + self.stage_start_time = run_time + + current_stage = self.stages[self.stage_index] + stage_elapsed = run_time - self.stage_start_time + + if stage_elapsed >= current_stage["duration"]: + # Move to the next stage + self.stage_index = (self.stage_index + 1) % len(self.stages) + self.stage_start_time = run_time + current_stage = self.stages[self.stage_index] + + return (current_stage["users"], current_stage["spawn_rate"]) + From b6954ca43f04a4ab0239e56adccaa0448a95a394 Mon Sep 17 00:00:00 2001 From: Raynor Chavez Date: Wed, 2 Oct 2024 12:44:12 +0800 Subject: [PATCH 5/7] mod test shape --- perf_tests/large_scale_performance_locustfile.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/perf_tests/large_scale_performance_locustfile.py b/perf_tests/large_scale_performance_locustfile.py index dc6705349..cacf8595a 100644 --- a/perf_tests/large_scale_performance_locustfile.py +++ b/perf_tests/large_scale_performance_locustfile.py @@ -268,9 +268,10 @@ def __init__(self): {"duration": 300, "users": 55, "spawn_rate": 2}, # Next 5 minutes: ramp up to 55 users {"duration": 300, "users": 100, "spawn_rate": 5}, # Next 5 minutes: ramp up to 100 users {"duration": 600, "users": 250, "spawn_rate": 7}, # Next 10 minutes: ramp up to 250 users - {"duration": 600, "users": 350, "spawn_rate": 10}, # Next 10 minutes: ramp up to 350 users + {"duration": 600, "users": 300, "spawn_rate": 10}, # Next 10 minutes: ramp up to 300 users {"duration": 600, "users": 250, "spawn_rate": 7}, # Next 10 minutes: ramp down to 250 users {"duration": 900, "users": 50, "spawn_rate": 5}, # Next 15 minutes: ramp down to 50 users + {"duration": 900, "users": 25, "spawn_rate": 2}, # Next 15 minutes: ramp down to 25 users ] self.stage_index = 0 self.stage_start_time = 0 From 00bfc2236a88b429aeeeeb8c2fcbbbe3fb7d468f Mon Sep 17 00:00:00 2001 From: Raynor Chavez Date: Thu, 3 Oct 2024 00:27:54 +0800 Subject: [PATCH 6/7] better memory management --- .gitignore | 1 + .../large_scale_performance_locustfile.py | 185 ++++++++++++------ 2 files changed, 129 insertions(+), 57 deletions(-) diff --git a/.gitignore b/.gitignore index 906f848f1..ba416f115 100644 --- a/.gitignore +++ b/.gitignore @@ -156,3 +156,4 @@ scripts/vespa_local/vespa_tester_app.zip /perf_tests/*.csv /perf_tests/*/*.csv /perf_tests/*.json +/perf_tests/*.json1 diff --git a/perf_tests/large_scale_performance_locustfile.py b/perf_tests/large_scale_performance_locustfile.py index cacf8595a..f09fccce8 100644 --- a/perf_tests/large_scale_performance_locustfile.py +++ b/perf_tests/large_scale_performance_locustfile.py @@ -1,26 +1,26 @@ from __future__ import annotations -import random +import argparse +import json import os -import csv +import random +import re +import threading import time -import json from datetime import datetime -from locust import events, task, between, FastHttpUser, LoadTestShape -from locust.env import Environment import marqo import marqo.errors import numpy as np import pandas as pd -import sys -import argparse -import os -import re +from locust import FastHttpUser, LoadTestShape, between, events, task +# --------------------------- +# Data Loading Functions +# --------------------------- -# Load queries from the CSV file def load_queries(csv_path): + """Load search queries from a CSV file.""" queries = [] df = pd.read_csv(csv_path) for _, row in df.iterrows(): @@ -30,36 +30,36 @@ def load_queries(csv_path): }) return queries -# Load necessary data from extracted CSV files def load_data(data_dir): + """Load necessary data from extracted CSV files.""" data = {} # Load available product codes - data['available_product_codes'] = set() available_product_codes_csv = os.path.join(data_dir, 'extracted_available_product_codes.csv') df = pd.read_csv(available_product_codes_csv) data['available_product_codes'] = set(df['available_ia_code'].dropna().unique()) # Load ia_codes - data['ia_codes'] = set() ia_codes_csv = os.path.join(data_dir, 'extracted_ia_codes.csv') df = pd.read_csv(ia_codes_csv) data['ia_codes'] = set(df['ia_code'].dropna().unique()) # Load ro_queries - data['ro_queries'] = set() ro_queries_csv = os.path.join(data_dir, 'extracted_ro_queries.csv') df = pd.read_csv(ro_queries_csv) data['ro_queries'] = set(df['query'].dropna().unique()) # Load truncated_tags - data['truncated_tags'] = set() truncated_tags_csv = os.path.join(data_dir, 'extracted_truncated_tags.csv') df = pd.read_csv(truncated_tags_csv) data['truncated_tags'] = set(df['truncated_tags'].dropna().unique()) return data +# --------------------------- +# Configuration and Initialization +# --------------------------- + # Paths DATA_DIR = os.path.join(os.path.dirname(__file__), 'data') QUERIES_CSV = os.path.join(os.path.dirname(__file__), 'bquxjob_5a025798_19241877e37.csv') @@ -78,25 +78,47 @@ def load_data(data_dir): # Index name from environment variable INDEX_NAME = os.getenv('MARQO_INDEX_NAME', 'locust-test') +# --------------------------- +# Locust User Class +# --------------------------- + class MarqoUser(FastHttpUser): + """ + Locust user that performs search operations against a Marqo index. + """ wait_time = between(1, 5) # Simulate user think time client = None - telemetry_data = [] + telemetry_filename = None + telemetry_file = None + telemetry_lock = threading.Lock() + summary_stats = { + 'total_times': [], + 'total_requests': 0, + 'successful_requests': 0, + 'failed_requests': 0, + } def on_start(self): + """Initialize Marqo client and telemetry file.""" host = self.environment.host api_key = os.getenv('MARQO_CLOUD_API_KEY', None) if api_key: self.client = marqo.Client(url=host, api_key=api_key, return_telemetry=True) else: self.client = marqo.Client(url=host, return_telemetry=True) + + # Initialize telemetry file + timestamp = datetime.now().strftime('%Y%m%d%H%M%S') + self.telemetry_filename = f"telemetry_data_{timestamp}.jsonl" + self.telemetry_file = open(self.telemetry_filename, 'a') # Use append mode @task def perform_search(self): + """Perform a search operation with retry mechanism and telemetry collection.""" max_retries = 3 - retry_delay = 1 # Start with 1 second + retry_delay = 2 # Initial delay in seconds attempt = 0 - while attempt < max_retries: + while attempt <= max_retries: attempt += 1 # Randomly select a query query_info = random.choice(QUERIES) @@ -172,11 +194,19 @@ def perform_search(self): try: response = self.client.index(INDEX_NAME).search(**search_params) total_time = (time.time() - start_time) * 1000 # in ms + # Extract telemetry data telemetry = response.get('telemetry', {}) telemetry['total_time_ms'] = total_time - self.telemetry_data.append(telemetry) - # Record success + + # Write telemetry to file and update summary stats + with self.telemetry_lock: + self.telemetry_file.write(json.dumps(telemetry) + '\n') + self.summary_stats['total_times'].append(total_time) + self.summary_stats['total_requests'] += 1 + self.summary_stats['successful_requests'] += 1 + + # Record success in Locust self.environment.events.request.fire( request_type='SEARCH', name='perform_search', @@ -185,15 +215,23 @@ def perform_search(self): exception=None, context={}, ) - break # Break out of the retry loop if successful + break # Exit loop on success + except marqo.errors.MarqoWebError as e: - if e.code == 'too_many_requests': - # Implement exponential backoff - time.sleep(retry_delay) + if e.code == 'too_many_requests' and attempt <= max_retries: + # Implement exponential backoff with jitter + jitter = random.uniform(0, 0.1) # Add up to 100ms jitter + sleep_time = retry_delay + jitter + time.sleep(sleep_time) retry_delay *= 2 # Exponential backoff else: total_time = (time.time() - start_time) * 1000 # in ms - # Record failure + # Record failure in telemetry + with self.telemetry_lock: + self.summary_stats['total_requests'] += 1 + self.summary_stats['failed_requests'] += 1 + + # Record failure in Locust self.environment.events.request.fire( request_type='SEARCH', name='perform_search', @@ -202,10 +240,15 @@ def perform_search(self): exception=e, context={}, ) - break + break # Exit loop on failure except Exception as e: total_time = (time.time() - start_time) * 1000 # in ms - # Record failure + # Record failure in telemetry + with self.telemetry_lock: + self.summary_stats['total_requests'] += 1 + self.summary_stats['failed_requests'] += 1 + + # Record failure in Locust self.environment.events.request.fire( request_type='SEARCH', name='perform_search', @@ -214,53 +257,76 @@ def perform_search(self): exception=e, context={}, ) - break + break # Exit loop on failure def on_stop(self): - # Save telemetry data to a file - timestamp = datetime.now().strftime('%Y%m%d%H%M%S') - telemetry_filename = f"telemetry_data_{timestamp}.json" - summary_filename = f"telemetry_summary_{timestamp}.json" - with open(telemetry_filename, 'w') as f: - json.dump(self.telemetry_data, f) - - # Process telemetry data and output summary - if self.telemetry_data: - total_times = [t.get('total_time_ms', 0) for t in self.telemetry_data] - # Calculate average, median, percentiles, etc. + """Finalize telemetry data and generate summary report.""" + # Close telemetry file + if self.telemetry_file: + self.telemetry_file.close() + + # Generate summary from summary_stats + if self.summary_stats['total_times']: + total_times = self.summary_stats['total_times'] avg_time = np.mean(total_times) median_time = np.median(total_times) p95_time = np.percentile(total_times, 95) - # Calculate error rate - total_requests = len(total_times) - successful_requests = sum(1 for t in self.telemetry_data if 'total_time_ms' in t) - error_rate = ((total_requests - successful_requests) / total_requests) * 100 if total_requests > 0 else 0.0 - # Save summary + error_rate = (self.summary_stats['failed_requests'] / self.summary_stats['total_requests']) * 100 if self.summary_stats['total_requests'] > 0 else 0.0 + summary = { 'avg_response_time_ms': avg_time, 'median_response_time_ms': median_time, '95th_percentile_response_time_ms': p95_time, - 'total_requests': total_requests, - 'successful_requests': successful_requests, + 'total_requests': self.summary_stats['total_requests'], + 'successful_requests': self.summary_stats['successful_requests'], 'error_rate_percent': error_rate, } + summary_filename = f"telemetry_summary_{datetime.now().strftime('%Y%m%d%H%M%S')}.json" with open(summary_filename, 'w') as f: - json.dump(summary, f) + json.dump(summary, f, indent=4) + +# --------------------------- +# Event Listener for Quitting +# --------------------------- -# Event listener to ensure telemetry data is saved even if the test stops prematurely @events.quitting.add_listener def save_telemetry_on_quit(environment, **kw): - for user in environment.runner.user_classes: - if hasattr(user, 'telemetry_data') and user.telemetry_data: - timestamp = datetime.now().strftime('%Y%m%d%H%M%S') - telemetry_filename = f"telemetry_data_{timestamp}.json" - summary_filename = f"telemetry_summary_{timestamp}.json" - with open(telemetry_filename, 'w') as f: - json.dump(user.telemetry_data, f) - # Optionally, add summary processing here if needed + """ + Event listener to save telemetry data when Locust quits unexpectedly. + """ + for user_class in environment.runner.user_classes: + if isinstance(user_class, MarqoUser): + user = user_class() + if hasattr(user, 'telemetry_file') and user.telemetry_file: + user.telemetry_file.close() + # Generate summary if possible + if user.summary_stats['total_times']: + total_times = user.summary_stats['total_times'] + avg_time = np.mean(total_times) + median_time = np.median(total_times) + p95_time = np.percentile(total_times, 95) + error_rate = (user.summary_stats['failed_requests'] / user.summary_stats['total_requests']) * 100 if user.summary_stats['total_requests'] > 0 else 0.0 + + summary = { + 'avg_response_time_ms': avg_time, + 'median_response_time_ms': median_time, + '95th_percentile_response_time_ms': p95_time, + 'total_requests': user.summary_stats['total_requests'], + 'successful_requests': user.summary_stats['successful_requests'], + 'error_rate_percent': error_rate, + } + summary_filename = f"telemetry_summary_{datetime.now().strftime('%Y%m%d%H%M%S')}.json" + with open(summary_filename, 'w') as f: + json.dump(summary, f, indent=4) +# --------------------------- +# Load Test Shape Class +# --------------------------- class BurstLoadShape(LoadTestShape): + """ + Defines a burst load pattern with multiple stages. + """ def __init__(self): super().__init__() self.stages = [ @@ -299,6 +365,9 @@ def parse_run_time(self, run_time_str): return total_seconds def tick(self): + """ + Determine the current stage based on elapsed time and return the user count and spawn rate. + """ run_time = self.get_run_time() if run_time >= self.max_duration: return None # Stop the test @@ -312,7 +381,9 @@ def tick(self): if stage_elapsed >= current_stage["duration"]: # Move to the next stage - self.stage_index = (self.stage_index + 1) % len(self.stages) + self.stage_index += 1 + if self.stage_index >= len(self.stages): + self.stage_index = len(self.stages) - 1 # Stay at the last stage self.stage_start_time = run_time current_stage = self.stages[self.stage_index] From 0385e538181af3c95fd10a58004e4f760f8bde79 Mon Sep 17 00:00:00 2001 From: Raynor Chavez Date: Thu, 3 Oct 2024 00:47:31 +0800 Subject: [PATCH 7/7] fix cyclical loads --- .../large_scale_performance_locustfile.py | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/perf_tests/large_scale_performance_locustfile.py b/perf_tests/large_scale_performance_locustfile.py index f09fccce8..17900b1ec 100644 --- a/perf_tests/large_scale_performance_locustfile.py +++ b/perf_tests/large_scale_performance_locustfile.py @@ -330,6 +330,11 @@ class BurstLoadShape(LoadTestShape): def __init__(self): super().__init__() self.stages = [ + #{"duration": 30, "users": 10, "spawn_rate": 1}, # First 2 minutes: ramp up to 10 users + #{"duration": 30, "users": 20, "spawn_rate": 1}, # First 2 minutes: ramp up to 10 users + #{"duration": 30, "users": 30, "spawn_rate": 1}, # First 2 minutes: ramp up to 10 users + #{"duration": 30, "users": 20, "spawn_rate": 1}, # First 2 minutes: ramp up to 10 users + #{"duration": 30, "users": 10, "spawn_rate": 1}, # First 2 minutes: ramp up to 10 users {"duration": 120, "users": 10, "spawn_rate": 1}, # First 2 minutes: ramp up to 10 users {"duration": 300, "users": 55, "spawn_rate": 2}, # Next 5 minutes: ramp up to 55 users {"duration": 300, "users": 100, "spawn_rate": 5}, # Next 5 minutes: ramp up to 100 users @@ -372,20 +377,19 @@ def tick(self): if run_time >= self.max_duration: return None # Stop the test - # Initialize stage start time - if self.stage_start_time == 0: - self.stage_start_time = run_time + # Calculate total duration of all stages + total_duration = sum(stage["duration"] for stage in self.stages) - current_stage = self.stages[self.stage_index] - stage_elapsed = run_time - self.stage_start_time + # Calculate time within current cycle + cycle_time = run_time % total_duration - if stage_elapsed >= current_stage["duration"]: - # Move to the next stage - self.stage_index += 1 - if self.stage_index >= len(self.stages): - self.stage_index = len(self.stages) - 1 # Stay at the last stage - self.stage_start_time = run_time - current_stage = self.stages[self.stage_index] + cumulative_duration = 0 + for stage in self.stages: + cumulative_duration += stage["duration"] + if cycle_time < cumulative_duration: + return (stage["users"], stage["spawn_rate"]) - return (current_stage["users"], current_stage["spawn_rate"]) + # Should not reach here, but return the last stage + last_stage = self.stages[-1] + return (last_stage["users"], last_stage["spawn_rate"])