Skip to content

Commit f05dcff

Browse files
authored
Add handling for parallel slot migrations with maintenance notifications flow for OSS Cluster API (#3869)
* Adding handling of parallel slot migrations when OSS cluster api is used * Applying review comments
1 parent f0b8fe1 commit f05dcff

File tree

4 files changed

+90
-46
lines changed

4 files changed

+90
-46
lines changed

redis/connection.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,8 @@ def __init__(
319319
oss_cluster_maint_notifications_handler,
320320
parser,
321321
)
322+
self._processed_start_maint_notifications = set()
323+
self._skipped_end_maint_notifications = set()
322324

323325
@abstractmethod
324326
def _get_parser(self) -> Union[_HiredisParser, _RESP3Parser]:
@@ -667,6 +669,22 @@ def maintenance_state(self) -> MaintenanceState:
667669
def maintenance_state(self, state: "MaintenanceState"):
668670
self._maintenance_state = state
669671

672+
def add_maint_start_notification(self, id: int):
673+
self._processed_start_maint_notifications.add(id)
674+
675+
def get_processed_start_notifications(self) -> set:
676+
return self._processed_start_maint_notifications
677+
678+
def add_skipped_end_notification(self, id: int):
679+
self._skipped_end_maint_notifications.add(id)
680+
681+
def get_skipped_end_notifications(self) -> set:
682+
return self._skipped_end_maint_notifications
683+
684+
def reset_received_notifications(self):
685+
self._processed_start_maint_notifications.clear()
686+
self._skipped_end_maint_notifications.clear()
687+
670688
def getpeername(self):
671689
"""
672690
Returns the peer name of the connection.

redis/maint_notifications.py

Lines changed: 50 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -894,12 +894,14 @@ def handle_notification(self, notification: MaintenanceNotification):
894894
return
895895

896896
if notification_type:
897-
self.handle_maintenance_start_notification(MaintenanceState.MAINTENANCE)
897+
self.handle_maintenance_start_notification(
898+
MaintenanceState.MAINTENANCE, notification
899+
)
898900
else:
899901
self.handle_maintenance_completed_notification()
900902

901903
def handle_maintenance_start_notification(
902-
self, maintenance_state: MaintenanceState
904+
self, maintenance_state: MaintenanceState, notification: MaintenanceNotification
903905
):
904906
if (
905907
self.connection.maintenance_state == MaintenanceState.MOVING
@@ -913,6 +915,11 @@ def handle_maintenance_start_notification(
913915
)
914916
# extend the timeout for all created connections
915917
self.connection.update_current_socket_timeout(self.config.relaxed_timeout)
918+
if isinstance(notification, OSSNodeMigratingNotification):
919+
# add the notification id to the set of processed start maint notifications
920+
# this is used to skip the unrelaxing of the timeouts if we have received more than
921+
# one start notification before the the final end notification
922+
self.connection.add_maint_start_notification(notification.id)
916923

917924
def handle_maintenance_completed_notification(self):
918925
# Only reset timeouts if state is not MOVING and relaxed timeouts are enabled
@@ -926,6 +933,9 @@ def handle_maintenance_completed_notification(self):
926933
# timeouts by providing -1 as the relaxed timeout
927934
self.connection.update_current_socket_timeout(-1)
928935
self.connection.maintenance_state = MaintenanceState.NONE
936+
# reset the sets that keep track of received start maint
937+
# notifications and skipped end maint notifications
938+
self.connection.reset_received_notifications()
929939

930940

931941
class OSSMaintNotificationsHandler:
@@ -1004,35 +1014,45 @@ def handle_oss_maintenance_completed_notification(
10041014
disconnect_startup_nodes_pools=False,
10051015
additional_startup_nodes_info=additional_startup_nodes_info,
10061016
)
1007-
# mark for reconnect all in use connections to the node - this will force them to
1008-
# disconnect after they complete their current commands
1009-
# Some of them might be used by sub sub and we don't know which ones - so we disconnect
1010-
# all in flight connections after they are done with current command execution
1011-
for conn in (
1012-
current_node.redis_connection.connection_pool._get_in_use_connections()
1013-
):
1014-
conn.mark_for_reconnect()
1017+
with current_node.redis_connection.connection_pool._lock:
1018+
# mark for reconnect all in use connections to the node - this will force them to
1019+
# disconnect after they complete their current commands
1020+
# Some of them might be used by sub sub and we don't know which ones - so we disconnect
1021+
# all in flight connections after they are done with current command execution
1022+
for conn in current_node.redis_connection.connection_pool._get_in_use_connections():
1023+
conn.mark_for_reconnect()
10151024

1016-
if (
1017-
current_node
1018-
not in self.cluster_client.nodes_manager.nodes_cache.values()
1019-
):
1020-
# disconnect all free connections to the node - this node will be dropped
1021-
# from the cluster, so we don't need to revert the timeouts
1022-
for conn in current_node.redis_connection.connection_pool._get_free_connections():
1023-
conn.disconnect()
1024-
else:
1025-
if self.config.is_relaxed_timeouts_enabled():
1026-
# reset the timeouts for the node to which the connection is connected
1027-
# TODO: add check if other maintenance ops are in progress for the same node - CAE-1038
1028-
# and if so, don't reset the timeouts
1029-
for conn in (
1030-
*current_node.redis_connection.connection_pool._get_in_use_connections(),
1031-
*current_node.redis_connection.connection_pool._get_free_connections(),
1032-
):
1033-
conn.reset_tmp_settings(reset_relaxed_timeout=True)
1034-
conn.update_current_socket_timeout(relaxed_timeout=-1)
1035-
conn.maintenance_state = MaintenanceState.NONE
1025+
if (
1026+
current_node
1027+
not in self.cluster_client.nodes_manager.nodes_cache.values()
1028+
):
1029+
# disconnect all free connections to the node - this node will be dropped
1030+
# from the cluster, so we don't need to revert the timeouts
1031+
for conn in current_node.redis_connection.connection_pool._get_free_connections():
1032+
conn.disconnect()
1033+
else:
1034+
if self.config.is_relaxed_timeouts_enabled():
1035+
# reset the timeouts for the node to which the connection is connected
1036+
# Perform check if other maintenance ops are in progress for the same node
1037+
# and if so, don't reset the timeouts and wait for the last maintenance
1038+
# to complete
1039+
for conn in (
1040+
*current_node.redis_connection.connection_pool._get_in_use_connections(),
1041+
*current_node.redis_connection.connection_pool._get_free_connections(),
1042+
):
1043+
if (
1044+
len(conn.get_processed_start_notifications())
1045+
> len(conn.get_skipped_end_notifications()) + 1
1046+
):
1047+
# we have received more start notifications than end notifications
1048+
# for this connection - we should not reset the timeouts
1049+
# and add the notification id to the set of skipped end notifications
1050+
conn.add_skipped_end_notification(notification.id)
1051+
else:
1052+
conn.reset_tmp_settings(reset_relaxed_timeout=True)
1053+
conn.update_current_socket_timeout(relaxed_timeout=-1)
1054+
conn.maintenance_state = MaintenanceState.NONE
1055+
conn.reset_received_notifications()
10361056

10371057
# mark the notification as processed
10381058
self._processed_notifications.add(notification)

tests/maint_notifications/test_cluster_maint_notifications_handling.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1006,17 +1006,17 @@ def test_smigrating_smigrated_on_the_same_node_two_slot_ranges(
10061006
self.cluster.set("anyprefix:{3}:k", "VAL")
10071007
# this functionality is part of CAE-1038 and will be added later
10081008
# validate the timeout is still relaxed
1009-
# self._validate_connections_states(
1010-
# self.cluster,
1011-
# [
1012-
# ConnectionStateExpectation(
1013-
# node_port=NODE_PORT_1,
1014-
# changed_connections_count=1,
1015-
# state=MaintenanceState.MAINTENANCE,
1016-
# relaxed_timeout=self.config.relaxed_timeout,
1017-
# ),
1018-
# ],
1019-
# )
1009+
self._validate_connections_states(
1010+
self.cluster,
1011+
[
1012+
ConnectionStateExpectation(
1013+
node_port=NODE_PORT_1,
1014+
changed_connections_count=1,
1015+
state=MaintenanceState.MAINTENANCE,
1016+
relaxed_timeout=self.config.relaxed_timeout,
1017+
),
1018+
],
1019+
)
10201020
smigrated_node_1_2 = RespTranslator.oss_maint_notification_to_resp(
10211021
"SMIGRATED 15 0.0.0.0:15381 3000-4000"
10221022
)

tests/maint_notifications/test_maint_notifications.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -864,7 +864,9 @@ def test_handle_notification_migrating(self):
864864
self.handler, "handle_maintenance_start_notification"
865865
) as mock_handle:
866866
self.handler.handle_notification(notification)
867-
mock_handle.assert_called_once_with(MaintenanceState.MAINTENANCE)
867+
mock_handle.assert_called_once_with(
868+
MaintenanceState.MAINTENANCE, notification
869+
)
868870

869871
def test_handle_notification_migrated(self):
870872
"""Test handling of NodeMigratedNotification."""
@@ -884,7 +886,9 @@ def test_handle_notification_failing_over(self):
884886
self.handler, "handle_maintenance_start_notification"
885887
) as mock_handle:
886888
self.handler.handle_notification(notification)
887-
mock_handle.assert_called_once_with(MaintenanceState.MAINTENANCE)
889+
mock_handle.assert_called_once_with(
890+
MaintenanceState.MAINTENANCE, notification
891+
)
888892

889893
def test_handle_notification_failed_over(self):
890894
"""Test handling of NodeFailedOverNotification."""
@@ -911,7 +915,7 @@ def test_handle_maintenance_start_notification_disabled(self):
911915
handler = MaintNotificationsConnectionHandler(self.mock_connection, config)
912916

913917
result = handler.handle_maintenance_start_notification(
914-
MaintenanceState.MAINTENANCE
918+
MaintenanceState.MAINTENANCE, NodeMigratingNotification(id=1, ttl=5)
915919
)
916920

917921
assert result is None
@@ -922,7 +926,7 @@ def test_handle_maintenance_start_notification_moving_state(self):
922926
self.mock_connection.maintenance_state = MaintenanceState.MOVING
923927

924928
result = self.handler.handle_maintenance_start_notification(
925-
MaintenanceState.MAINTENANCE
929+
MaintenanceState.MAINTENANCE, NodeMigratingNotification(id=1, ttl=5)
926930
)
927931
assert result is None
928932
self.mock_connection.update_current_socket_timeout.assert_not_called()
@@ -931,7 +935,9 @@ def test_handle_maintenance_start_notification_success(self):
931935
"""Test successful maintenance start notification handling for migrating."""
932936
self.mock_connection.maintenance_state = MaintenanceState.NONE
933937

934-
self.handler.handle_maintenance_start_notification(MaintenanceState.MAINTENANCE)
938+
self.handler.handle_maintenance_start_notification(
939+
MaintenanceState.MAINTENANCE, NodeMigratingNotification(id=1, ttl=5)
940+
)
935941

936942
assert self.mock_connection.maintenance_state == MaintenanceState.MAINTENANCE
937943
self.mock_connection.update_current_socket_timeout.assert_called_once_with(20)

0 commit comments

Comments
 (0)