mirror of
https://github.com/redis/redis.git
synced 2026-05-28 04:02:46 -04:00
Tests: replication test.
This commit is contained in:
parent
854c7fdddb
commit
8f479b22b9
2 changed files with 128 additions and 0 deletions
36
test.py
36
test.py
|
|
@ -85,7 +85,12 @@ class TestCase:
|
|||
self.error_msg = None
|
||||
self.error_details = None
|
||||
self.test_key = f"test:{self.__class__.__name__.lower()}"
|
||||
# Primary Redis instance (default port)
|
||||
self.redis = redis.Redis()
|
||||
# Replica Redis instance (port 6380)
|
||||
self.replica = redis.Redis(port=6380)
|
||||
# Replication status
|
||||
self.replication_setup = False
|
||||
|
||||
def setup(self):
|
||||
self.redis.delete(self.test_key)
|
||||
|
|
@ -93,6 +98,36 @@ class TestCase:
|
|||
def teardown(self):
|
||||
self.redis.delete(self.test_key)
|
||||
|
||||
def setup_replication(self) -> bool:
|
||||
"""
|
||||
Setup replication between primary and replica Redis instances.
|
||||
Returns True if replication is successfully established, False otherwise.
|
||||
"""
|
||||
# Configure replica to replicate from primary
|
||||
self.replica.execute_command('REPLICAOF', '127.0.0.1', 6379)
|
||||
|
||||
# Wait for replication to be established
|
||||
max_attempts = 10
|
||||
for attempt in range(max_attempts):
|
||||
# Check replication info
|
||||
repl_info = self.replica.info('replication')
|
||||
|
||||
# Check if replication is established
|
||||
if (repl_info.get('role') == 'slave' and
|
||||
repl_info.get('master_host') == '127.0.0.1' and
|
||||
repl_info.get('master_port') == 6379 and
|
||||
repl_info.get('master_link_status') == 'up'):
|
||||
|
||||
self.replication_setup = True
|
||||
return True
|
||||
|
||||
# Wait before next attempt
|
||||
time.sleep(0.5)
|
||||
|
||||
# If we get here, replication wasn't established
|
||||
self.error_msg = "Failed to establish replication between primary and replica"
|
||||
return False
|
||||
|
||||
def test(self):
|
||||
raise NotImplementedError("Subclasses must implement test method")
|
||||
|
||||
|
|
@ -146,6 +181,7 @@ def run_tests():
|
|||
print("================================================\n"+
|
||||
"Make sure to have Redis running in the localhost\n"+
|
||||
"with --enable-debug-command yes\n"+
|
||||
"Both primary (6379) and replica (6380) instances\n"+
|
||||
"================================================\n")
|
||||
|
||||
tests = find_test_classes()
|
||||
|
|
|
|||
92
tests/replication.py
Normal file
92
tests/replication.py
Normal file
|
|
@ -0,0 +1,92 @@
|
|||
from test import TestCase, generate_random_vector
|
||||
import struct
|
||||
import random
|
||||
import time
|
||||
|
||||
class ComprehensiveReplicationTest(TestCase):
|
||||
def getname(self):
|
||||
return "Comprehensive Replication Test with mixed operations"
|
||||
|
||||
def estimated_runtime(self):
|
||||
# This test will take longer than the default 100ms
|
||||
return 20.0 # 20 seconds estimate
|
||||
|
||||
def test(self):
|
||||
# Setup replication between primary and replica
|
||||
assert self.setup_replication(), "Failed to setup replication"
|
||||
|
||||
# Test parameters
|
||||
num_vectors = 5000
|
||||
vector_dim = 8
|
||||
delete_probability = 0.1
|
||||
cas_probability = 0.3
|
||||
|
||||
# Keep track of added items for potential deletion
|
||||
added_items = []
|
||||
|
||||
# Add vectors and occasionally delete
|
||||
for i in range(num_vectors):
|
||||
# Generate a random vector
|
||||
vec = generate_random_vector(vector_dim)
|
||||
vec_bytes = struct.pack(f'{vector_dim}f', *vec)
|
||||
item_name = f"{self.test_key}:item:{i}"
|
||||
|
||||
# Decide whether to use CAS or not
|
||||
use_cas = random.random() < cas_probability
|
||||
|
||||
if use_cas and added_items:
|
||||
# Get an existing item for CAS reference (if available)
|
||||
cas_item = random.choice(added_items)
|
||||
try:
|
||||
# Add with CAS
|
||||
result = self.redis.execute_command('VADD', self.test_key, 'FP32', vec_bytes,
|
||||
item_name, 'CAS')
|
||||
# Only add to our list if actually added (CAS might fail)
|
||||
if result == 1:
|
||||
added_items.append(item_name)
|
||||
except Exception as e:
|
||||
print(f" CAS VADD failed: {e}")
|
||||
else:
|
||||
try:
|
||||
# Add without CAS
|
||||
result = self.redis.execute_command('VADD', self.test_key, 'FP32', vec_bytes, item_name)
|
||||
# Only add to our list if actually added
|
||||
if result == 1:
|
||||
added_items.append(item_name)
|
||||
except Exception as e:
|
||||
print(f" VADD failed: {e}")
|
||||
|
||||
# Randomly delete items (with 10% probability)
|
||||
if random.random() < delete_probability and added_items:
|
||||
try:
|
||||
# Select a random item to delete
|
||||
item_to_delete = random.choice(added_items)
|
||||
# Delete the item using VREM (not VDEL)
|
||||
self.redis.execute_command('VREM', self.test_key, item_to_delete)
|
||||
# Remove from our list
|
||||
added_items.remove(item_to_delete)
|
||||
except Exception as e:
|
||||
print(f" VREM failed: {e}")
|
||||
|
||||
# Allow time for replication to complete
|
||||
time.sleep(2.0)
|
||||
|
||||
# Verify final VCARD matches
|
||||
primary_card = self.redis.execute_command('VCARD', self.test_key)
|
||||
replica_card = self.replica.execute_command('VCARD', self.test_key)
|
||||
assert primary_card == replica_card, f"Final VCARD mismatch: primary={primary_card}, replica={replica_card}"
|
||||
|
||||
# Verify VDIM matches
|
||||
primary_dim = self.redis.execute_command('VDIM', self.test_key)
|
||||
replica_dim = self.replica.execute_command('VDIM', self.test_key)
|
||||
assert primary_dim == replica_dim, f"VDIM mismatch: primary={primary_dim}, replica={replica_dim}"
|
||||
|
||||
# Verify digests match using DEBUG DIGEST
|
||||
primary_digest = self.redis.execute_command('DEBUG', 'DIGEST-VALUE', self.test_key)
|
||||
replica_digest = self.replica.execute_command('DEBUG', 'DIGEST-VALUE', self.test_key)
|
||||
assert primary_digest == replica_digest, f"Digest mismatch: primary={primary_digest}, replica={replica_digest}"
|
||||
|
||||
# Print summary
|
||||
print(f"\n Added and maintained {len(added_items)} vectors with dimension {vector_dim}")
|
||||
print(f" Final vector count: {primary_card}")
|
||||
print(f" Final digest: {primary_digest[0].decode()}")
|
||||
Loading…
Reference in a new issue