Skip to content

Commit f0b8fe1

Browse files
committed
Refactoring the logic related to SMIGRATED notification format. Applying the new format.
1 parent a0e5aea commit f0b8fe1

File tree

5 files changed

+188
-102
lines changed

5 files changed

+188
-102
lines changed

redis/_parsers/base.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -193,12 +193,14 @@ def parse_oss_maintenance_start_msg(response):
193193
@staticmethod
194194
def parse_oss_maintenance_completed_msg(response):
195195
# Expected message format is:
196-
# SMIGRATED <seq_number> <host:port> <slot, range1-range2,...>
196+
# SMIGRATED <seq_number> [<host:port> <slot, range1-range2,...>, ...]
197197
id = response[1]
198-
node_address = safe_str(response[2])
199-
slots = response[3]
198+
nodes_to_slots_mapping_data = response[2]
199+
nodes_to_slots_mapping = {}
200+
for node, slots in nodes_to_slots_mapping_data:
201+
nodes_to_slots_mapping[safe_str(node)] = safe_str(slots)
200202

201-
return OSSNodeMigratedNotification(id, node_address, slots)
203+
return OSSNodeMigratedNotification(id, nodes_to_slots_mapping)
202204

203205
@staticmethod
204206
def parse_maintenance_start_msg(response, notification_type):

redis/maint_notifications.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import threading
66
import time
77
from abc import ABC, abstractmethod
8-
from typing import TYPE_CHECKING, List, Literal, Optional, Union
8+
from typing import TYPE_CHECKING, Dict, List, Literal, Optional, Union
99

1010
from redis.typing import Number
1111

@@ -463,31 +463,26 @@ class OSSNodeMigratedNotification(MaintenanceNotification):
463463
464464
Args:
465465
id (int): Unique identifier for this notification
466-
node_address (Optional[str]): Address of the node that has completed migration
467-
in the format "host:port"
468-
slots (Optional[List[int]]): List of slots that have been migrated
466+
nodes_to_slots_mapping (Dict[str, str]): Mapping of node addresses to slots
469467
"""
470468

471469
DEFAULT_TTL = 30
472470

473471
def __init__(
474472
self,
475473
id: int,
476-
node_address: str,
477-
slots: Optional[List[int]] = None,
474+
nodes_to_slots_mapping: Dict[str, str],
478475
):
479476
super().__init__(id, OSSNodeMigratedNotification.DEFAULT_TTL)
480-
self.node_address = node_address
481-
self.slots = slots
477+
self.nodes_to_slots_mapping = nodes_to_slots_mapping
482478

483479
def __repr__(self) -> str:
484480
expiry_time = self.creation_time + self.ttl
485481
remaining = max(0, expiry_time - time.monotonic())
486482
return (
487483
f"{self.__class__.__name__}("
488484
f"id={self.id}, "
489-
f"node_address={self.node_address}, "
490-
f"slots={self.slots}, "
485+
f"nodes_to_slots_mapping={self.nodes_to_slots_mapping}, "
491486
f"ttl={self.ttl}, "
492487
f"creation_time={self.creation_time}, "
493488
f"expires_at={expiry_time}, "
@@ -999,10 +994,15 @@ def handle_oss_maintenance_completed_notification(
999994

1000995
# Updates the cluster slots cache with the new slots mapping
1001996
# This will also update the nodes cache with the new nodes mapping
1002-
new_node_host, new_node_port = notification.node_address.split(":")
997+
additional_startup_nodes_info = []
998+
for node_address, _ in notification.nodes_to_slots_mapping.items():
999+
new_node_host, new_node_port = node_address.split(":")
1000+
additional_startup_nodes_info.append(
1001+
(new_node_host, int(new_node_port))
1002+
)
10031003
self.cluster_client.nodes_manager.initialize(
10041004
disconnect_startup_nodes_pools=False,
1005-
additional_startup_nodes_info=[(new_node_host, int(new_node_port))],
1005+
additional_startup_nodes_info=additional_startup_nodes_info,
10061006
)
10071007
# mark for reconnect all in use connections to the node - this will force them to
10081008
# disconnect after they complete their current commands

tests/maint_notifications/proxy_server_helpers.py

Lines changed: 42 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -11,37 +11,51 @@ class RespTranslator:
1111
"""Helper class to translate between RESP and other encodings."""
1212

1313
@staticmethod
14-
def str_or_list_to_resp(txt: str) -> str:
15-
"""
16-
Convert specific string or list to RESP format.
17-
"""
18-
if re.match(r"^<.*>$", txt):
19-
items = txt[1:-1].split(",")
20-
return f"*{len(items)}\r\n" + "\r\n".join(
21-
f"${len(x)}\r\n{x}" for x in items
14+
def oss_maint_notification_to_resp(txt: str) -> str:
15+
"""Convert query to RESP format."""
16+
if txt.startswith("SMIGRATED"):
17+
# Format: SMIGRATED SeqID host:port slot1,range1-range2 host1:port1 slot2,range3-range4
18+
# SMIGRATED 93923 abc.com:6789 123,789-1000 abc.com:4545 1000-2000 abc.com:4323 900,910,920
19+
# SMIGRATED - simple string
20+
# SeqID - integer
21+
# host and slots info are provided as array of arrays
22+
# host:port - simple string
23+
# slots - simple string
24+
25+
parts = txt.split()
26+
notification = parts[0]
27+
seq_id = parts[1]
28+
hosts_and_slots = parts[2:]
29+
resp = (
30+
">3\r\n" # Push message with 3 elements
31+
f"+{notification}\r\n" # Element 1: Command
32+
f":{seq_id}\r\n" # Element 2: SeqID
33+
f"*{len(hosts_and_slots) // 2}\r\n" # Element 3: Array of host:port, slots pairs
2234
)
35+
for i in range(0, len(hosts_and_slots), 2):
36+
resp += "*2\r\n"
37+
resp += f"+{hosts_and_slots[i]}\r\n"
38+
resp += f"+{hosts_and_slots[i + 1]}\r\n"
2339
else:
24-
return f"${len(txt)}\r\n{txt}"
25-
26-
@staticmethod
27-
def cluster_slots_to_resp(resp: str) -> str:
28-
"""Convert query to RESP format."""
29-
return (
30-
f"*{len(resp.split())}\r\n"
31-
+ "\r\n".join(f"${len(x)}\r\n{x}" for x in resp.split())
32-
+ "\r\n"
33-
)
34-
35-
@staticmethod
36-
def oss_maint_notification_to_resp(resp: str) -> str:
37-
"""Convert query to RESP format."""
38-
return (
39-
f">{len(resp.split())}\r\n"
40-
+ "\r\n".join(
41-
f"{RespTranslator.str_or_list_to_resp(x)}" for x in resp.split()
40+
# SMIGRATING
41+
# Format: SMIGRATING SeqID slot,range1-range2
42+
# SMIGRATING 93923 123,789-1000
43+
# SMIGRATING - simple string
44+
# SeqID - integer
45+
# slots - simple string
46+
47+
parts = txt.split()
48+
notification = parts[0]
49+
seq_id = parts[1]
50+
slots = parts[2]
51+
52+
resp = (
53+
">3\r\n" # Push message with 3 elements
54+
f"+{notification}\r\n" # Element 1: Command
55+
f":{seq_id}\r\n" # Element 2: SeqID
56+
f"+{slots}\r\n" # Element 3: Array of [host:port, slots] pairs
4257
)
43-
+ "\r\n"
44-
)
58+
return resp
4559

4660

4761
@dataclass

0 commit comments

Comments
 (0)