diff --git a/.github/workflows/large_scale_performance_tests.yml b/.github/workflows/large_scale_performance_tests.yml new file mode 100644 index 000000000..97c716871 --- /dev/null +++ b/.github/workflows/large_scale_performance_tests.yml @@ -0,0 +1,137 @@ +name: Large Scale Performance Tests + +on: + workflow_dispatch: + inputs: + test_suite: + type: choice + required: true + options: + - large_scale_performance_locustfile.py + description: > + Location of the Locust file that contains the test suite. + + number_of_users: + type: string + required: true + default: "100" + description: > + Total number of simulated users. + + user_spawn_rate: + type: string + required: true + default: "10" + description: > + Users spawned per second. + + duration: + type: string + required: true + default: "1h" + description: > + Duration of the test (e.g., 1h, 24h, 72h). + + marqo_host: + type: string + required: true + default: "https://your-marqo-instance.com" + description: > + Marqo host to run performance test on. + + index_name: + type: string + required: true + default: "locust-test" + description: > + Index name to test. + + marqo_cloud_api_key: + type: string + required: true + description: > + Marqo Cloud API key to use for the test. + +jobs: + Extract-Data: + name: Extract Required Data + runs-on: ubuntu-latest + + steps: + - name: Checkout Repository + uses: actions/checkout@v3 + + - name: Set up Python 3.8 + uses: actions/setup-python@v4 + with: + python-version: "3.8" + + - name: Install Dependencies + run: | + pip install pandas + + - name: Extract Required Data + run: | + python perf_tests/extract_required_data.py + + - name: Upload Extracted Data as Artifact + uses: actions/upload-artifact@v3 + with: + name: extracted-data + path: perf_tests/data/extracted_*.csv + + Run-Performance-Test: + name: Run Large Scale Performance Tests + needs: Extract-Data + runs-on: ubuntu-latest + + steps: + - name: Checkout Repository + uses: actions/checkout@v3 + + - name: Download Extracted Data + uses: actions/download-artifact@v3 + with: + name: extracted-data + path: perf_tests/data/ + + - name: Set up Python 3.8 + uses: actions/setup-python@v4 + with: + python-version: "3.8" + + - name: Install Dependencies + run: | + pip install marqo locust numpy pandas + + - name: Prepare Test Environment + run: | + mkdir -p perf_tests/data + cp perf_tests/large_scale_performance_locustfile.py perf_tests/ + cp perf_tests/bquxjob_5a025798_19241877e37.csv perf_tests/ + # Extracted data is already downloaded to perf_tests/data/ + + - name: Run Locust Test + working-directory: perf_tests + run: | + locust -f ${{ github.event.inputs.test_suite }} \ + -u ${{ github.event.inputs.number_of_users }} \ + -r ${{ github.event.inputs.user_spawn_rate }} \ + --run-time ${{ github.event.inputs.duration }} \ + --headless \ + --host ${{ github.event.inputs.marqo_host }} + env: + MARQO_INDEX_NAME: ${{ github.event.inputs.index_name }} + MARQO_CLOUD_API_KEY: ${{ github.event.inputs.marqo_cloud_api_key }} + + - name: Upload Telemetry Data + uses: actions/upload-artifact@v3 + with: + name: telemetry-data + path: perf_tests/telemetry_data_*.json + + - name: Upload Telemetry Summaries + uses: actions/upload-artifact@v3 + with: + name: telemetry-summaries + path: perf_tests/telemetry_summary_*.json diff --git a/.gitignore b/.gitignore index f1e4c26a1..ba416f115 100644 --- a/.gitignore +++ b/.gitignore @@ -150,4 +150,10 @@ dump.rdb .DS_Store # Tester app for unit tests -scripts/vespa_local/vespa_tester_app.zip \ No newline at end of file +scripts/vespa_local/vespa_tester_app.zip + +# Large scale performance testing +/perf_tests/*.csv +/perf_tests/*/*.csv +/perf_tests/*.json +/perf_tests/*.json1 diff --git a/perf_tests/analyze_telemetry.py b/perf_tests/analyze_telemetry.py new file mode 100644 index 000000000..7a93e22aa --- /dev/null +++ b/perf_tests/analyze_telemetry.py @@ -0,0 +1,60 @@ +import json +import glob +import numpy as np +import pandas as pd +import os +from datetime import datetime + +def load_telemetry_data(pattern='telemetry_data_*.json'): + telemetry_files = glob.glob(pattern) + all_data = [] + + for file in telemetry_files: + with open(file, 'r') as f: + data = json.load(f) + all_data.extend(data) + + return all_data + +def analyze_telemetry(all_data): + if not all_data: + print("No telemetry data to analyze.") + return + + # Convert to DataFrame + df = pd.DataFrame(all_data) + + # Ensure 'total_time_ms' is numeric + df['total_time_ms'] = pd.to_numeric(df['total_time_ms'], errors='coerce') + + # Calculate metrics + avg_time = df['total_time_ms'].mean() + median_time = df['total_time_ms'].median() + p95_time = df['total_time_ms'].quantile(0.95) + total_requests = len(df) + successful_requests = df['total_time_ms'].notna().sum() + error_rate = ((total_requests - successful_requests) / total_requests) * 100 if total_requests > 0 else 0.0 + + summary = { + 'Average Response Time (ms)': avg_time, + 'Median Response Time (ms)': median_time, + '95th Percentile Response Time (ms)': p95_time, + 'Total Requests': total_requests, + 'Successful Requests': successful_requests, + 'Error Rate (%)': error_rate, + } + + # Save summary to JSON + timestamp = datetime.now().strftime('%Y%m%d%H%M%S') + summary_filename = f"telemetry_summary_{timestamp}.json" + with open(summary_filename, 'w') as f: + json.dump(summary, f, indent=4) + + # Print summary + print("Telemetry Summary:") + for key, value in summary.items(): + print(f"{key}: {value}") + +if __name__ == "__main__": + all_data = load_telemetry_data() + analyze_telemetry(all_data) diff --git a/perf_tests/large_scale_performance_locustfile.py b/perf_tests/large_scale_performance_locustfile.py new file mode 100644 index 000000000..17900b1ec --- /dev/null +++ b/perf_tests/large_scale_performance_locustfile.py @@ -0,0 +1,395 @@ +from __future__ import annotations + +import argparse +import json +import os +import random +import re +import threading +import time +from datetime import datetime + +import marqo +import marqo.errors +import numpy as np +import pandas as pd +from locust import FastHttpUser, LoadTestShape, between, events, task + +# --------------------------- +# Data Loading Functions +# --------------------------- + +def load_queries(csv_path): + """Load search queries from a CSV file.""" + queries = [] + df = pd.read_csv(csv_path) + for _, row in df.iterrows(): + queries.append({ + 'search_keywords': row['search_keywords'], + 'search_category': row['search_category'] + }) + return queries + +def load_data(data_dir): + """Load necessary data from extracted CSV files.""" + data = {} + + # Load available product codes + available_product_codes_csv = os.path.join(data_dir, 'extracted_available_product_codes.csv') + df = pd.read_csv(available_product_codes_csv) + data['available_product_codes'] = set(df['available_ia_code'].dropna().unique()) + + # Load ia_codes + ia_codes_csv = os.path.join(data_dir, 'extracted_ia_codes.csv') + df = pd.read_csv(ia_codes_csv) + data['ia_codes'] = set(df['ia_code'].dropna().unique()) + + # Load ro_queries + ro_queries_csv = os.path.join(data_dir, 'extracted_ro_queries.csv') + df = pd.read_csv(ro_queries_csv) + data['ro_queries'] = set(df['query'].dropna().unique()) + + # Load truncated_tags + truncated_tags_csv = os.path.join(data_dir, 'extracted_truncated_tags.csv') + df = pd.read_csv(truncated_tags_csv) + data['truncated_tags'] = set(df['truncated_tags'].dropna().unique()) + + return data + +# --------------------------- +# Configuration and Initialization +# --------------------------- + +# Paths +DATA_DIR = os.path.join(os.path.dirname(__file__), 'data') +QUERIES_CSV = os.path.join(os.path.dirname(__file__), 'bquxjob_5a025798_19241877e37.csv') + +# Load data +DATA = load_data(DATA_DIR) + +# Load queries +QUERIES = load_queries(QUERIES_CSV) +NUM_QUERIES = len(QUERIES) + +# Regions and their probabilities +REGIONS = ['US', 'AU', 'CA', 'GB', 'DE', 'FR', 'JP'] +REGION_PROBABILITIES = [0.7, 0.15, 0.05, 0.05, 0.02, 0.02, 0.01] + +# Index name from environment variable +INDEX_NAME = os.getenv('MARQO_INDEX_NAME', 'locust-test') + +# --------------------------- +# Locust User Class +# --------------------------- + +class MarqoUser(FastHttpUser): + """ + Locust user that performs search operations against a Marqo index. + """ + wait_time = between(1, 5) # Simulate user think time + client = None + telemetry_filename = None + telemetry_file = None + telemetry_lock = threading.Lock() + summary_stats = { + 'total_times': [], + 'total_requests': 0, + 'successful_requests': 0, + 'failed_requests': 0, + } + + def on_start(self): + """Initialize Marqo client and telemetry file.""" + host = self.environment.host + api_key = os.getenv('MARQO_CLOUD_API_KEY', None) + if api_key: + self.client = marqo.Client(url=host, api_key=api_key, return_telemetry=True) + else: + self.client = marqo.Client(url=host, return_telemetry=True) + + # Initialize telemetry file + timestamp = datetime.now().strftime('%Y%m%d%H%M%S') + self.telemetry_filename = f"telemetry_data_{timestamp}.jsonl" + self.telemetry_file = open(self.telemetry_filename, 'a') # Use append mode + + @task + def perform_search(self): + """Perform a search operation with retry mechanism and telemetry collection.""" + max_retries = 3 + retry_delay = 2 # Initial delay in seconds + attempt = 0 + while attempt <= max_retries: + attempt += 1 + # Randomly select a query + query_info = random.choice(QUERIES) + search_keywords = query_info['search_keywords'] + search_category = query_info['search_category'] + + # Randomly select a region based on probabilities + region = random.choices(REGIONS, weights=REGION_PROBABILITIES, k=1)[0] + + # Determine if the query is 'Topic Only' or 'Topic+product' + if search_category.strip().lower() == 'all-departments': + # Topic Only + q = search_keywords + # Randomly select an ia_code for the modifiers + ia_code = random.choice(list(DATA['ia_codes'])) + score_modifiers = { + "add_to_score": [ + {"field_name": f"artist_sales_scores.{ia_code}", "weight": random.uniform(0.0001, 0.005)}, + {"field_name": f"recent_sales_scores.{ia_code}+{region}", "weight": random.uniform(0.0001, 0.005)}, + {"field_name": f"recent_sales_scores.{ia_code}+ALL_WORLD", "weight": random.uniform(0.0001, 0.005)}, + {"field_name": f"all_time_sales_scores.{ia_code}+ALL_WORLD", "weight": random.uniform(0.0001, 0.005)}, + {"field_name": f"ro_scores.{search_keywords}+{ia_code}", "weight": random.uniform(0.0001, 0.005)}, + ] + } + # No filters + filter_string = None + else: + # Topic+product + q = f"{search_category}: {search_keywords}" + # Use the search_category as the ia_code if available, else random + ia_code = search_category if search_category in DATA['available_product_codes'] else random.choice(list(DATA['ia_codes'])) + score_modifiers = { + "add_to_score": [ + {"field_name": f"artist_sales_scores.{ia_code}", "weight": random.uniform(0.0001, 0.005)}, + {"field_name": f"recent_sales_scores.{ia_code}+{region}", "weight": random.uniform(0.0001, 0.005)}, + {"field_name": f"recent_sales_scores.{ia_code}+ALL_WORLD", "weight": random.uniform(0.0001, 0.005)}, + {"field_name": f"all_time_sales_scores.{ia_code}+ALL_WORLD", "weight": random.uniform(0.0001, 0.005)}, + {"field_name": f"ro_scores.{search_keywords}+{ia_code}", "weight": random.uniform(0.0001, 0.005)}, + ] + } + # Add filter on available_product_codes + filter_string = f"available_product_codes:{ia_code}" + + # Construct the search parameters + search_params = { + 'q': q, + 'limit': 20, + 'search_method': 'HYBRID', + 'hybrid_parameters': { + "retrievalMethod": "disjunction", + "rankingMethod": "rrf", + "alpha": 0.3, + "rrfK": 60, + "searchableAttributesLexical": ["tags"], + "scoreModifiersTensor": score_modifiers, + "scoreModifiersLexical": { + "add_to_score": [ + {"field_name": f"artist_sales_scores.{ia_code}", "weight": random.uniform(0.1, 5)}, + {"field_name": f"recent_sales_scores.{ia_code}+{region}", "weight": random.uniform(0.1, 5)}, + {"field_name": f"recent_sales_scores.{ia_code}+ALL_WORLD", "weight": random.uniform(0.1, 5)}, + {"field_name": f"all_time_sales_scores.{ia_code}+ALL_WORLD", "weight": random.uniform(0.1, 5)}, + {"field_name": f"ro_scores.{search_keywords}+{ia_code}", "weight": random.uniform(1, 300)}, + ] + }, + }, + 'attributes_to_retrieve': ['work_id', 'tags', 'available_product_codes'], + } + if filter_string: + search_params['filter_string'] = filter_string + + # Perform the search and capture telemetry data + start_time = time.time() + try: + response = self.client.index(INDEX_NAME).search(**search_params) + total_time = (time.time() - start_time) * 1000 # in ms + + # Extract telemetry data + telemetry = response.get('telemetry', {}) + telemetry['total_time_ms'] = total_time + + # Write telemetry to file and update summary stats + with self.telemetry_lock: + self.telemetry_file.write(json.dumps(telemetry) + '\n') + self.summary_stats['total_times'].append(total_time) + self.summary_stats['total_requests'] += 1 + self.summary_stats['successful_requests'] += 1 + + # Record success in Locust + self.environment.events.request.fire( + request_type='SEARCH', + name='perform_search', + response_time=total_time, + response_length=len(json.dumps(response)), + exception=None, + context={}, + ) + break # Exit loop on success + + except marqo.errors.MarqoWebError as e: + if e.code == 'too_many_requests' and attempt <= max_retries: + # Implement exponential backoff with jitter + jitter = random.uniform(0, 0.1) # Add up to 100ms jitter + sleep_time = retry_delay + jitter + time.sleep(sleep_time) + retry_delay *= 2 # Exponential backoff + else: + total_time = (time.time() - start_time) * 1000 # in ms + # Record failure in telemetry + with self.telemetry_lock: + self.summary_stats['total_requests'] += 1 + self.summary_stats['failed_requests'] += 1 + + # Record failure in Locust + self.environment.events.request.fire( + request_type='SEARCH', + name='perform_search', + response_time=total_time, + response_length=0, + exception=e, + context={}, + ) + break # Exit loop on failure + except Exception as e: + total_time = (time.time() - start_time) * 1000 # in ms + # Record failure in telemetry + with self.telemetry_lock: + self.summary_stats['total_requests'] += 1 + self.summary_stats['failed_requests'] += 1 + + # Record failure in Locust + self.environment.events.request.fire( + request_type='SEARCH', + name='perform_search', + response_time=total_time, + response_length=0, + exception=e, + context={}, + ) + break # Exit loop on failure + + def on_stop(self): + """Finalize telemetry data and generate summary report.""" + # Close telemetry file + if self.telemetry_file: + self.telemetry_file.close() + + # Generate summary from summary_stats + if self.summary_stats['total_times']: + total_times = self.summary_stats['total_times'] + avg_time = np.mean(total_times) + median_time = np.median(total_times) + p95_time = np.percentile(total_times, 95) + error_rate = (self.summary_stats['failed_requests'] / self.summary_stats['total_requests']) * 100 if self.summary_stats['total_requests'] > 0 else 0.0 + + summary = { + 'avg_response_time_ms': avg_time, + 'median_response_time_ms': median_time, + '95th_percentile_response_time_ms': p95_time, + 'total_requests': self.summary_stats['total_requests'], + 'successful_requests': self.summary_stats['successful_requests'], + 'error_rate_percent': error_rate, + } + summary_filename = f"telemetry_summary_{datetime.now().strftime('%Y%m%d%H%M%S')}.json" + with open(summary_filename, 'w') as f: + json.dump(summary, f, indent=4) + +# --------------------------- +# Event Listener for Quitting +# --------------------------- + +@events.quitting.add_listener +def save_telemetry_on_quit(environment, **kw): + """ + Event listener to save telemetry data when Locust quits unexpectedly. + """ + for user_class in environment.runner.user_classes: + if isinstance(user_class, MarqoUser): + user = user_class() + if hasattr(user, 'telemetry_file') and user.telemetry_file: + user.telemetry_file.close() + # Generate summary if possible + if user.summary_stats['total_times']: + total_times = user.summary_stats['total_times'] + avg_time = np.mean(total_times) + median_time = np.median(total_times) + p95_time = np.percentile(total_times, 95) + error_rate = (user.summary_stats['failed_requests'] / user.summary_stats['total_requests']) * 100 if user.summary_stats['total_requests'] > 0 else 0.0 + + summary = { + 'avg_response_time_ms': avg_time, + 'median_response_time_ms': median_time, + '95th_percentile_response_time_ms': p95_time, + 'total_requests': user.summary_stats['total_requests'], + 'successful_requests': user.summary_stats['successful_requests'], + 'error_rate_percent': error_rate, + } + summary_filename = f"telemetry_summary_{datetime.now().strftime('%Y%m%d%H%M%S')}.json" + with open(summary_filename, 'w') as f: + json.dump(summary, f, indent=4) + +# --------------------------- +# Load Test Shape Class +# --------------------------- + +class BurstLoadShape(LoadTestShape): + """ + Defines a burst load pattern with multiple stages. + """ + def __init__(self): + super().__init__() + self.stages = [ + #{"duration": 30, "users": 10, "spawn_rate": 1}, # First 2 minutes: ramp up to 10 users + #{"duration": 30, "users": 20, "spawn_rate": 1}, # First 2 minutes: ramp up to 10 users + #{"duration": 30, "users": 30, "spawn_rate": 1}, # First 2 minutes: ramp up to 10 users + #{"duration": 30, "users": 20, "spawn_rate": 1}, # First 2 minutes: ramp up to 10 users + #{"duration": 30, "users": 10, "spawn_rate": 1}, # First 2 minutes: ramp up to 10 users + {"duration": 120, "users": 10, "spawn_rate": 1}, # First 2 minutes: ramp up to 10 users + {"duration": 300, "users": 55, "spawn_rate": 2}, # Next 5 minutes: ramp up to 55 users + {"duration": 300, "users": 100, "spawn_rate": 5}, # Next 5 minutes: ramp up to 100 users + {"duration": 600, "users": 250, "spawn_rate": 7}, # Next 10 minutes: ramp up to 250 users + {"duration": 600, "users": 300, "spawn_rate": 10}, # Next 10 minutes: ramp up to 300 users + {"duration": 600, "users": 250, "spawn_rate": 7}, # Next 10 minutes: ramp down to 250 users + {"duration": 900, "users": 50, "spawn_rate": 5}, # Next 15 minutes: ramp down to 50 users + {"duration": 900, "users": 25, "spawn_rate": 2}, # Next 15 minutes: ramp down to 25 users + ] + self.stage_index = 0 + self.stage_start_time = 0 + + # Parse the -t flag to get the maximum duration + parser = argparse.ArgumentParser() + parser.add_argument("-t", "--run-time", type=str, help="Stop after given time. e.g. 72h") + args, _ = parser.parse_known_args() + self.max_duration = self.parse_run_time(args.run_time) if args.run_time else sum(stage["duration"] for stage in self.stages) + + def parse_run_time(self, run_time_str): + """ + Parse the run-time string from the -t flag and convert it to seconds. + Supports formats like '72h', '30m', '1h30m', etc. + """ + pattern = re.compile(r'^(?:(?P\d+)h)?(?:(?P\d+)m)?(?:(?P\d+)s)?$') + match = pattern.fullmatch(run_time_str.strip()) + if not match: + raise ValueError(f"Invalid run-time format: {run_time_str}") + + time_params = {name: int(value) if value else 0 for name, value in match.groupdict().items()} + total_seconds = time_params['hours'] * 3600 + time_params['minutes'] * 60 + time_params['seconds'] + if total_seconds == 0: + raise ValueError("Run-time must be greater than 0 seconds.") + return total_seconds + + def tick(self): + """ + Determine the current stage based on elapsed time and return the user count and spawn rate. + """ + run_time = self.get_run_time() + if run_time >= self.max_duration: + return None # Stop the test + + # Calculate total duration of all stages + total_duration = sum(stage["duration"] for stage in self.stages) + + # Calculate time within current cycle + cycle_time = run_time % total_duration + + cumulative_duration = 0 + for stage in self.stages: + cumulative_duration += stage["duration"] + if cycle_time < cumulative_duration: + return (stage["users"], stage["spawn_rate"]) + + # Should not reach here, but return the last stage + last_stage = self.stages[-1] + return (last_stage["users"], last_stage["spawn_rate"]) +