diff --git a/test.py b/test.py index afc5d7fc7..2e38ba013 100755 --- a/test.py +++ b/test.py @@ -85,7 +85,12 @@ class TestCase: self.error_msg = None self.error_details = None self.test_key = f"test:{self.__class__.__name__.lower()}" + # Primary Redis instance (default port) self.redis = redis.Redis() + # Replica Redis instance (port 6380) + self.replica = redis.Redis(port=6380) + # Replication status + self.replication_setup = False def setup(self): self.redis.delete(self.test_key) @@ -93,6 +98,36 @@ class TestCase: def teardown(self): self.redis.delete(self.test_key) + def setup_replication(self) -> bool: + """ + Setup replication between primary and replica Redis instances. + Returns True if replication is successfully established, False otherwise. + """ + # Configure replica to replicate from primary + self.replica.execute_command('REPLICAOF', '127.0.0.1', 6379) + + # Wait for replication to be established + max_attempts = 10 + for attempt in range(max_attempts): + # Check replication info + repl_info = self.replica.info('replication') + + # Check if replication is established + if (repl_info.get('role') == 'slave' and + repl_info.get('master_host') == '127.0.0.1' and + repl_info.get('master_port') == 6379 and + repl_info.get('master_link_status') == 'up'): + + self.replication_setup = True + return True + + # Wait before next attempt + time.sleep(0.5) + + # If we get here, replication wasn't established + self.error_msg = "Failed to establish replication between primary and replica" + return False + def test(self): raise NotImplementedError("Subclasses must implement test method") @@ -146,6 +181,7 @@ def run_tests(): print("================================================\n"+ "Make sure to have Redis running in the localhost\n"+ "with --enable-debug-command yes\n"+ + "Both primary (6379) and replica (6380) instances\n"+ "================================================\n") tests = find_test_classes() diff --git a/tests/replication.py b/tests/replication.py new file mode 100644 index 000000000..91dfdf736 --- /dev/null +++ b/tests/replication.py @@ -0,0 +1,92 @@ +from test import TestCase, generate_random_vector +import struct +import random +import time + +class ComprehensiveReplicationTest(TestCase): + def getname(self): + return "Comprehensive Replication Test with mixed operations" + + def estimated_runtime(self): + # This test will take longer than the default 100ms + return 20.0 # 20 seconds estimate + + def test(self): + # Setup replication between primary and replica + assert self.setup_replication(), "Failed to setup replication" + + # Test parameters + num_vectors = 5000 + vector_dim = 8 + delete_probability = 0.1 + cas_probability = 0.3 + + # Keep track of added items for potential deletion + added_items = [] + + # Add vectors and occasionally delete + for i in range(num_vectors): + # Generate a random vector + vec = generate_random_vector(vector_dim) + vec_bytes = struct.pack(f'{vector_dim}f', *vec) + item_name = f"{self.test_key}:item:{i}" + + # Decide whether to use CAS or not + use_cas = random.random() < cas_probability + + if use_cas and added_items: + # Get an existing item for CAS reference (if available) + cas_item = random.choice(added_items) + try: + # Add with CAS + result = self.redis.execute_command('VADD', self.test_key, 'FP32', vec_bytes, + item_name, 'CAS') + # Only add to our list if actually added (CAS might fail) + if result == 1: + added_items.append(item_name) + except Exception as e: + print(f" CAS VADD failed: {e}") + else: + try: + # Add without CAS + result = self.redis.execute_command('VADD', self.test_key, 'FP32', vec_bytes, item_name) + # Only add to our list if actually added + if result == 1: + added_items.append(item_name) + except Exception as e: + print(f" VADD failed: {e}") + + # Randomly delete items (with 10% probability) + if random.random() < delete_probability and added_items: + try: + # Select a random item to delete + item_to_delete = random.choice(added_items) + # Delete the item using VREM (not VDEL) + self.redis.execute_command('VREM', self.test_key, item_to_delete) + # Remove from our list + added_items.remove(item_to_delete) + except Exception as e: + print(f" VREM failed: {e}") + + # Allow time for replication to complete + time.sleep(2.0) + + # Verify final VCARD matches + primary_card = self.redis.execute_command('VCARD', self.test_key) + replica_card = self.replica.execute_command('VCARD', self.test_key) + assert primary_card == replica_card, f"Final VCARD mismatch: primary={primary_card}, replica={replica_card}" + + # Verify VDIM matches + primary_dim = self.redis.execute_command('VDIM', self.test_key) + replica_dim = self.replica.execute_command('VDIM', self.test_key) + assert primary_dim == replica_dim, f"VDIM mismatch: primary={primary_dim}, replica={replica_dim}" + + # Verify digests match using DEBUG DIGEST + primary_digest = self.redis.execute_command('DEBUG', 'DIGEST-VALUE', self.test_key) + replica_digest = self.replica.execute_command('DEBUG', 'DIGEST-VALUE', self.test_key) + assert primary_digest == replica_digest, f"Digest mismatch: primary={primary_digest}, replica={replica_digest}" + + # Print summary + print(f"\n Added and maintained {len(added_items)} vectors with dimension {vector_dim}") + print(f" Final vector count: {primary_card}") + print(f" Final digest: {primary_digest[0].decode()}")