From a3151c653277a80c914b0c383d254ef5977cf6d6 Mon Sep 17 00:00:00 2001 From: "Andrei V. Lepikhov" Date: Wed, 10 Dec 2025 14:38:27 +0100 Subject: [PATCH] Cleanup the Z0DAN add_node script. Remove the following unused routines: - get_commit_timestamp - advance_replication_slot - monitor_replication_lag(text, boolean) - monitor_replication_lag_wait - monitor_multiple_replication_lags - wait_for_n3_sync - check_node_lag In the procedure monitor_lag_with_dblink, use the function clock_timestamp instead of now(). The now() function is stable and doesn't change the value within the monitoring loop. So, the volatile clock_timestamp looks more correct to obtain fresh value of time. --- samples/Z0DAN/zodan.sql | 514 +------------------------------- samples/Z0DAN/zodan_cleanup.sql | 13 +- 2 files changed, 5 insertions(+), 522 deletions(-) diff --git a/samples/Z0DAN/zodan.sql b/samples/Z0DAN/zodan.sql index 38baba40..514e8cde 100644 --- a/samples/Z0DAN/zodan.sql +++ b/samples/Z0DAN/zodan.sql @@ -570,131 +570,6 @@ BEGIN END; $$; - --- ============================================================================ - - --- ============================================================================ --- Procedure: get_commit_timestamp --- Purpose : Retrieves the commit timestamp for replication lag between two nodes. --- Arguments: --- node_dsn - DSN string to connect to the remote node. --- n1 - Origin node name. --- n2 - Receiver node name. --- verb - Verbose output flag --- commit_ts - OUT parameter to receive the commit timestamp --- Usage : CALL get_commit_timestamp(node_dsn, 'n1', 'n2', true, NULL); --- ============================================================================ - -CREATE OR REPLACE PROCEDURE spock.get_commit_timestamp( - node_dsn text, - n1 text, - n2 text, - verb boolean, - INOUT commit_ts timestamp DEFAULT NULL -) -LANGUAGE plpgsql -AS -$$ -DECLARE - ts_rec RECORD; - remotesql text; -BEGIN - -- Build remote SQL to fetch commit timestamp from lag_tracker - remotesql := format( - 'SELECT commit_timestamp FROM spock.lag_tracker WHERE origin_name = %L AND receiver_name = %L', - n1, n2 - ); - - IF verb THEN - RAISE NOTICE '[QUERY] %', remotesql; - END IF; - - -- Execute remote SQL and capture the commit timestamp - SELECT * FROM dblink(node_dsn, remotesql) AS t(commit_timestamp timestamp) INTO ts_rec; - - IF verb THEN - RAISE NOTICE E'[STEP] Commit timestamp for lag between "%" and "%": %', n1, n2, ts_rec.commit_timestamp; - END IF; - - commit_ts := ts_rec.commit_timestamp; -END; -$$; - - --- ============================================================================ - - --- ============================================================================ --- Procedure: advance_replication_slot --- Purpose : Advances a logical replication slot to a specific commit timestamp on a remote node via dblink. --- Arguments: --- node_dsn - DSN string to connect to the remote node. --- slot_name - Name of the replication slot to advance. --- sync_timestamp- Commit timestamp to advance the slot to. --- verb - Verbose output flag --- Usage : CALL advance_replication_slot(node_dsn, slot_name, sync_timestamp, true); --- ============================================================================ - -CREATE OR REPLACE PROCEDURE spock.advance_replication_slot( - node_dsn text, - slot_name text, - sync_timestamp timestamp, - verb boolean -) -LANGUAGE plpgsql -AS -$$ -DECLARE - remotesql text; - slot_advance_result RECORD; -BEGIN - -- ============================================================================ - -- Step 1: Check if sync_timestamp is NULL - -- ============================================================================ - IF sync_timestamp IS NULL THEN - IF verb THEN - RAISE NOTICE E' - [STEP 1] Commit timestamp is NULL, skipping slot advance for slot "%". - ', slot_name; - END IF; - RETURN; - END IF; - - -- ============================================================================ - -- Step 2: Build remote SQL for advancing replication slot - -- ============================================================================ - remotesql := format( - 'WITH lsn_cte AS ( - SELECT spock.get_lsn_from_commit_ts(%L, %L::timestamp) AS lsn - ) - SELECT pg_replication_slot_advance(%L, lsn) FROM lsn_cte;', - slot_name, sync_timestamp::text, slot_name - ); - - IF verb THEN - RAISE NOTICE '[QUERY] %', remotesql; - END IF; - IF verb THEN - RAISE NOTICE E' - [STEP 2] Remote node DSN: % - ', node_dsn; - END IF; - - -- ============================================================================ - -- Step 3: Execute slot advance on remote node using dblink - -- ============================================================================ - SELECT * FROM dblink(node_dsn, remotesql) AS t(result text) INTO slot_advance_result; - - IF verb THEN - RAISE NOTICE E' - [STEP 3] Replication slot "%" advanced to commit timestamp % on remote node: %', - slot_name, sync_timestamp, node_dsn; - END IF; -END; -$$; - - -- ============================================================================ @@ -976,361 +851,6 @@ BEGIN END; $$; --- ============================================================================ --- Procedure: monitor_replication_lag --- Purpose : Monitors replication lag between nodes on a remote cluster via dblink. --- Arguments: --- node_dsn - DSN string to connect to the remote node. --- verb - Verbose output flag --- Usage : CALL monitor_replication_lag(node_dsn, true); --- ============================================================================ - -CREATE OR REPLACE PROCEDURE spock.monitor_replication_lag(node_dsn text, verb boolean) -LANGUAGE plpgsql -AS -$$ -DECLARE - remotesql text; - node_query text; - node_list text := ''; - lag_vars text := ''; - lag_assignments text := ''; - lag_log text := ''; - lag_conditions text := ''; - node_rec record; - node_count integer := 0; -BEGIN - -- ============================================================================ - -- Step 1: Get all nodes from the remote cluster - -- ============================================================================ - IF verb THEN - RAISE NOTICE '[STEP] Getting nodes from remote cluster: %', node_dsn; - END IF; - - -- Get all nodes except the newest one (assuming it's the receiver) - FOR node_rec IN - SELECT * FROM dblink(node_dsn, 'SELECT node_name FROM spock.node ORDER BY node_id') - AS t(node_name text) - LOOP - node_count := node_count + 1; - IF node_count > 1 THEN - node_list := node_list || ', '; - END IF; - node_list := node_list || '''' || node_rec.node_name || ''''; - - -- Build lag variable declarations - IF node_count > 1 THEN - lag_vars := lag_vars || E'\n '; - END IF; - lag_vars := lag_vars || 'lag_' || node_rec.node_name || ' interval;'; - lag_vars := lag_vars || E'\n lag_' || node_rec.node_name || '_bytes bigint;'; - - -- Build lag assignments - IF node_count > 1 THEN - lag_assignments := lag_assignments || E'\n\n '; - END IF; - lag_assignments := lag_assignments || '-- Calculate lag from ' || node_rec.node_name || ' to newest node'; - lag_assignments := lag_assignments || E'\n SELECT now() - commit_timestamp, replication_lag_bytes INTO lag_' || node_rec.node_name || ', lag_' || node_rec.node_name || '_bytes'; - lag_assignments := lag_assignments || E'\n FROM spock.lag_tracker'; - lag_assignments := lag_assignments || E'\n WHERE origin_name = ''''' || node_rec.node_name || ''''' AND receiver_name = (SELECT node_name FROM spock.node ORDER BY node_id DESC LIMIT 1);'; - - -- Build lag log message - IF node_count > 1 THEN - lag_log := lag_log || ', '; - END IF; - lag_log := lag_log || node_rec.node_name || ' → newest lag: % (bytes: %)'; - - -- Build lag conditions - IF node_count > 1 THEN - lag_conditions := lag_conditions || E'\n AND '; - END IF; - lag_conditions := lag_conditions || 'lag_' || node_rec.node_name || ' IS NOT NULL'; - lag_conditions := lag_conditions || E'\n AND (extract(epoch FROM lag_' || node_rec.node_name || ') < 59 OR lag_' || node_rec.node_name || '_bytes = 0)'; - END LOOP; - - IF node_count <= 1 THEN - RAISE NOTICE '[STEP] Only one node found, skipping lag monitoring'; - RETURN; - END IF; - - -- ============================================================================ - -- Step 2: Build dynamic remote SQL for monitoring replication lag - -- ============================================================================ - -- Build COALESCE parameters for the log message - DECLARE - coalesce_params text := ''; - node_rec2 record; - BEGIN - FOR node_rec2 IN - SELECT * FROM dblink(node_dsn, 'SELECT node_name FROM spock.node ORDER BY node_id') - AS t(node_name text) - LOOP - IF coalesce_params != '' THEN - coalesce_params := coalesce_params || ', '; - END IF; - coalesce_params := coalesce_params || 'COALESCE(lag_' || node_rec2.node_name || '::text, ''NULL''), COALESCE(lag_' || node_rec2.node_name || '_bytes::text, ''NULL'')'; - END LOOP; - - remotesql := format($sql$ - DO ' - DECLARE%s - BEGIN - LOOP%s - - -- Log current lag values - RAISE NOTICE ''[MONITOR] %s'', - %s; - - -- Exit loop when all lags are below 59 seconds - EXIT WHEN %s; - - -- Sleep for 1 second before next check - PERFORM pg_sleep(1); - END LOOP; - END - '; - $sql$, - lag_vars, - lag_assignments, - lag_log, - coalesce_params, - lag_conditions - ); - END; - - IF verb THEN - RAISE NOTICE '[STEP] Generated monitoring SQL for % nodes: %', node_count, node_list; - RAISE NOTICE '[QUERY] %', remotesql; - END IF; - - -- ============================================================================ - -- Step 3: Execute remote monitoring SQL via dblink - -- ============================================================================ - IF verb THEN - RAISE NOTICE E'[STEP] monitor_replication_lag: Executing remote monitoring SQL on node: %', node_dsn; - END IF; - PERFORM dblink(node_dsn, remotesql); - - -- ============================================================================ - -- Step 4: Log completion of monitoring - -- ============================================================================ - IF verb THEN - RAISE NOTICE E'[STEP] monitor_replication_lag: Monitoring replication lag completed on remote node: %', node_dsn; - END IF; -END; -$$; - --- ============================================================================ --- Procedure to monitor replication lag between nodes --- ============================================================================ - -CREATE OR REPLACE PROCEDURE spock.monitor_replication_lag_wait( - origin_node text, - receiver_node text, - max_lag_seconds integer DEFAULT 59, - check_interval_seconds integer DEFAULT 1, - verb boolean DEFAULT true -) -LANGUAGE plpgsql -AS $$ -DECLARE - lag_interval interval; - lag_bytes bigint; - start_time timestamp := now(); - elapsed_interval interval; -BEGIN - IF verb THEN - RAISE NOTICE 'Monitoring replication lag from % to % (max lag: % seconds)', - origin_node, receiver_node, max_lag_seconds; - END IF; - - LOOP - -- Get current lag time and bytes - SELECT now() - commit_timestamp, replication_lag_bytes - INTO lag_interval, lag_bytes - FROM spock.lag_tracker - WHERE origin_name = origin_node AND receiver_name = receiver_node; - - -- Calculate elapsed time - elapsed_interval := now() - start_time; - - IF verb THEN - RAISE NOTICE '% → % lag: % (bytes: %, elapsed: %)', - origin_node, receiver_node, - COALESCE(lag_interval::text, 'NULL'), - COALESCE(lag_bytes::text, 'NULL'), - elapsed_interval::text; - END IF; - - -- Exit when lag is within acceptable limits OR when lag_bytes is zero - EXIT WHEN lag_interval IS NOT NULL - AND (extract(epoch FROM lag_interval) < max_lag_seconds OR lag_bytes = 0); - - -- Sleep before next check - PERFORM pg_sleep(check_interval_seconds); - END LOOP; - - IF verb THEN - IF lag_bytes = 0 THEN - RAISE NOTICE 'Replication lag from % to % completed (lag_bytes = 0)', - origin_node, receiver_node; - ELSE - RAISE NOTICE 'Replication lag from % to % is now within acceptable limits (% seconds)', - origin_node, receiver_node, max_lag_seconds; - END IF; - END IF; -END; -$$; - --- ============================================================================ --- Procedure to monitor multiple replication paths simultaneously --- ============================================================================ - -CREATE OR REPLACE PROCEDURE spock.monitor_multiple_replication_lags( - lag_configs jsonb, - max_lag_seconds integer DEFAULT 59, - check_interval_seconds integer DEFAULT 1, - verb boolean DEFAULT true -) -LANGUAGE plpgsql -AS $$ -DECLARE - lag_record record; - lag_interval interval; - all_within_limits boolean; - start_time timestamp := now(); - elapsed_interval interval; - config jsonb; - max_wait_seconds integer := 60; - wait_count integer := 0; - lag_data record; -BEGIN - IF verb THEN - RAISE NOTICE 'Monitoring multiple replication lags (max lag: % seconds, timeout: % seconds)', - max_lag_seconds, max_wait_seconds; - RAISE NOTICE 'Monitoring % paths', jsonb_array_length(lag_configs); - END IF; - - -- Wait for initial data to appear - WHILE NOT EXISTS (SELECT 1 FROM spock.lag_tracker LIMIT 1) AND wait_count < 10 LOOP - IF verb THEN - RAISE NOTICE 'Waiting for lag_tracker data to appear... (attempt %/10)', wait_count + 1; - END IF; - PERFORM pg_sleep(2); - wait_count := wait_count + 1; - END LOOP; - - IF NOT EXISTS (SELECT 1 FROM spock.lag_tracker LIMIT 1) THEN - RAISE NOTICE 'No lag_tracker data available after waiting - skipping lag monitoring'; - RETURN; - END IF; - - LOOP - all_within_limits := true; - - IF verb THEN - RAISE NOTICE 'Checking lag for % paths...', jsonb_array_length(lag_configs); - END IF; - - -- Check each replication path - FOR config IN SELECT * FROM jsonb_array_elements(lag_configs) - LOOP - IF verb THEN - RAISE NOTICE 'Checking path: % → %', config->>'origin', config->>'receiver'; - END IF; - - SELECT now() - commit_timestamp INTO lag_interval - FROM spock.lag_tracker - WHERE origin_name = config->>'origin' - AND receiver_name = config->>'receiver'; - - IF verb THEN - RAISE NOTICE '% → % lag: %', - config->>'origin', config->>'receiver', - COALESCE(lag_interval::text, 'NULL'); - END IF; - - -- Check if this path is within limits - IF lag_interval IS NULL OR extract(epoch FROM lag_interval) >= max_lag_seconds THEN - all_within_limits := false; - END IF; - END LOOP; - - -- Also show all available lag data for debugging - IF verb THEN - RAISE NOTICE 'All available lag data:'; - FOR lag_data IN SELECT origin_name, receiver_name, commit_timestamp, replication_lag FROM spock.lag_tracker LOOP - RAISE NOTICE ' % → %: commit_ts=%s, lag=%s', - lag_data.origin_name, lag_data.receiver_name, - lag_data.commit_timestamp, lag_data.replication_lag; - END LOOP; - END IF; - - -- Calculate elapsed time - elapsed_interval := now() - start_time; - - IF verb THEN - RAISE NOTICE 'All paths within limits: % (elapsed: %)', - all_within_limits, elapsed_interval::text; - END IF; - - -- Exit when all paths are within acceptable limits - EXIT WHEN all_within_limits; - - -- Exit if we've been waiting too long - IF extract(epoch FROM elapsed_interval) > max_wait_seconds THEN - IF verb THEN - RAISE NOTICE 'Timeout reached (% seconds) - exiting lag monitoring', max_wait_seconds; - END IF; - EXIT; - END IF; - - -- Sleep before next check - PERFORM pg_sleep(check_interval_seconds); - END LOOP; - - IF verb THEN - IF all_within_limits THEN - RAISE NOTICE 'All replication lags are now within acceptable limits (% seconds)', max_lag_seconds; - ELSE - RAISE NOTICE 'Lag monitoring completed with timeout - some paths may still have high lag'; - END IF; - END IF; -END; -$$; - --- ============================================================================ --- Example usage procedure (equivalent to the workflow logic) --- ============================================================================ - -CREATE OR REPLACE PROCEDURE spock.wait_for_n3_sync( - max_lag_seconds integer DEFAULT 59, - check_interval_seconds integer DEFAULT 1, - verb boolean DEFAULT true -) -LANGUAGE plpgsql -AS $$ -DECLARE - lag_configs jsonb; -BEGIN - -- Define the replication paths to monitor - lag_configs := '[ - {"origin": "n1", "receiver": "n3"}, - {"origin": "n2", "receiver": "n3"} - ]'::jsonb; - - -- Monitor both paths - CALL spock.monitor_multiple_replication_lags( - lag_configs, - max_lag_seconds, - check_interval_seconds, - verb - ); -END; -$$; - --- ============================================================================ - -- ============================================================================ -- Procedure to monitor lag using dblink -- ============================================================================ @@ -1347,7 +867,7 @@ DECLARE lag_interval interval; lag_bytes bigint; max_wait_seconds integer := 60; - start_time timestamp := now(); + start_time timestamp := clock_timestamp(); elapsed_interval interval; loop_count integer := 0; lag_sql text; @@ -1368,7 +888,7 @@ BEGIN lag_interval := lag_result.lag_interval; lag_bytes := lag_result.lag_bytes; - elapsed_interval := now() - start_time; + elapsed_interval := clock_timestamp() - start_time; RAISE NOTICE '% → % lag: % (bytes: %, elapsed: %, loop: %)', src_node_name, new_node_name, @@ -2245,36 +1765,6 @@ BEGIN END; $$; --- ============================================================================ --- Simple procedure to check lag between specific nodes (simplified) --- ============================================================================ - -CREATE OR REPLACE PROCEDURE spock.check_node_lag( - origin_node text, - receiver_node text, - verb boolean DEFAULT true, - INOUT lag_interval interval DEFAULT NULL -) -LANGUAGE plpgsql -AS $$ -BEGIN - -- Check for the specific path - SELECT now() - commit_timestamp INTO lag_interval - FROM spock.lag_tracker - WHERE origin_name = origin_node AND receiver_name = receiver_node; - - IF lag_interval IS NOT NULL THEN - IF verb THEN - RAISE NOTICE '% → % lag: %', origin_node, receiver_node, lag_interval; - END IF; - ELSE - IF verb THEN - RAISE NOTICE '% → % lag: NULL (no data)', origin_node, receiver_node; - END IF; - END IF; -END; -$$; - -- ============================================================================ -- Procedure to trigger sync on source node and wait for it on new node using sync_event and wait_for_sync_event -- ============================================================================ diff --git a/samples/Z0DAN/zodan_cleanup.sql b/samples/Z0DAN/zodan_cleanup.sql index c92416b1..bd20d072 100644 --- a/samples/Z0DAN/zodan_cleanup.sql +++ b/samples/Z0DAN/zodan_cleanup.sql @@ -1,7 +1,7 @@ -- ============================================================================ -- ZODAN Cleanup Script -- Purpose: Drops everything created by zodan.sql --- +-- -- Note: Updated to match the corrected zodan.sql file with proper phase numbering -- and all duplicate definitions removed. -- ============================================================================ @@ -12,14 +12,8 @@ DROP PROCEDURE IF EXISTS spock.create_sub(text, text, text, text, boolean, boole DROP PROCEDURE IF EXISTS spock.create_replication_slot(text, text, boolean, text); DROP PROCEDURE IF EXISTS spock.sync_event(text, boolean, pg_lsn); DROP PROCEDURE IF EXISTS spock.create_node(text, text, boolean, text, text, jsonb); -DROP PROCEDURE IF EXISTS spock.get_commit_timestamp(text, text, text, boolean, timestamp); -DROP PROCEDURE IF EXISTS spock.advance_replication_slot(text, text, timestamp, boolean); DROP PROCEDURE IF EXISTS spock.enable_sub(text, text, boolean, boolean); -DROP PROCEDURE IF EXISTS spock.monitor_replication_lag(text, boolean); DROP PROCEDURE IF EXISTS spock.monitor_replication_lag(text, text, text, boolean); -DROP PROCEDURE IF EXISTS spock.monitor_replication_lag_wait(text, text, integer, integer, boolean); -DROP PROCEDURE IF EXISTS spock.monitor_multiple_replication_lags(jsonb, integer, integer, boolean); -DROP PROCEDURE IF EXISTS spock.wait_for_n3_sync(integer, integer, boolean); DROP PROCEDURE IF EXISTS spock.monitor_lag_with_dblink(text, text, text, boolean); DROP PROCEDURE IF EXISTS spock.verify_node_prerequisites(text, text, text, text, boolean); DROP PROCEDURE IF EXISTS spock.create_nodes_only(text, text, text, text, boolean, text, text, jsonb, integer); @@ -31,7 +25,6 @@ DROP PROCEDURE IF EXISTS spock.create_new_to_source_subscription(text, text, tex DROP PROCEDURE IF EXISTS spock.create_source_to_new_subscription(text, text, text, text, boolean); DROP PROCEDURE IF EXISTS spock.trigger_sync_on_other_nodes_and_wait_on_source(text, text, text, text, boolean); DROP PROCEDURE IF EXISTS spock.check_commit_timestamp_and_advance_slot(text, text, text, text, boolean); -DROP PROCEDURE IF EXISTS spock.check_node_lag(text, text, boolean, interval); DROP PROCEDURE IF EXISTS spock.present_final_cluster_state(integer, boolean); DROP PROCEDURE IF EXISTS spock.add_node(text, text, text, text, boolean, text, text, jsonb); @@ -43,10 +36,10 @@ DO $$ BEGIN -- Drop any remaining temporary tables that might have been created DROP TABLE IF EXISTS temp_spock_nodes CASCADE; - + -- Clean up any temporary schemas or objects -- (This is a safety measure in case any temporary objects were created) - + RAISE NOTICE 'ZODAN cleanup completed successfully'; EXCEPTION WHEN OTHERS THEN