Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions samples/Z0DAN/zodan.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1531,6 +1531,40 @@ BEGIN
END IF;
END;

-- Check: all nodes, included in the cluster, have only enabled subscriptions.
--
-- Connect to each node in the cluster and pass through the spock.subscription
-- table to check subscriptions statuses. Using it we try to avoid cases
-- when somewhere in the middle a crash or disconnection happens that may
-- be aggravated by add_node.
DECLARE
status_rec record;
dsn_rec record;
dsns_sql text;
sub_status_sql text;
BEGIN
dsns_sql := 'SELECT if_dsn,node_name
FROM spock.node JOIN spock.node_interface
ON (if_nodeid = node_id)
WHERE node_id NOT IN (SELECT node_id FROM spock.local_node)';
sub_status_sql := 'SELECT sub_name, sub_enabled FROM spock.subscription';

FOR dsn_rec IN SELECT * FROM dblink(src_dsn, dsns_sql)
AS t(dsn text, node name)
LOOP
FOR status_rec IN SELECT * FROM dblink(dsn_rec.dsn, sub_status_sql)
AS t(name text, status text)
LOOP
IF status_rec.status != 't' THEN
RAISE EXCEPTION ' [FAILED] %', rpad('Node ' || dsn_rec.node || ' has disabled subscription ' || status_rec.name, 60, ' ');
ELSIF verb THEN
RAISE NOTICE ' OK: %', rpad('Node with DSN ' || dsn_rec.dsn || ' has enabled subscription ' || status_rec.name, 120, ' ');
END IF;
END LOOP;
END LOOP;
RAISE NOTICE ' OK: %', rpad('Checking each Spock node has only active subscriptions', 120, ' ');
END;

-- Validating new node prerequisites
SELECT count(*) INTO new_exists FROM spock.node WHERE node_name = new_node_name;
IF new_exists > 0 THEN
Expand Down Expand Up @@ -2317,8 +2351,7 @@ BEGIN
RAISE NOTICE ' OK: %', rpad('Waiting for sync event from ' || src_node_name || ' on new node ' || new_node_name || '...', 120, ' ');
EXCEPTION
WHEN OTHERS THEN
RAISE NOTICE ' ✗ %', rpad('Unable to wait for sync event from ' || src_node_name || ' on new node ' || new_node_name || ' (error: ' || SQLERRM || ')', 120, ' ');
RAISE;
RAISE EXCEPTION ' ✗ %', rpad('Unable to wait for sync event from ' || src_node_name || ' on new node ' || new_node_name || ' (error: ' || SQLERRM || ')', 120, ' ');
END;
END;
$$;
Expand Down
38 changes: 34 additions & 4 deletions sql/spock--6.0.0-devel.sql
Original file line number Diff line number Diff line change
Expand Up @@ -389,8 +389,12 @@ RETURNS void RETURNS NULL ON NULL INPUT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME'
CREATE FUNCTION spock.sync_event()
RETURNS pg_lsn RETURNS NULL ON NULL INPUT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_create_sync_event';

CREATE PROCEDURE spock.wait_for_sync_event(OUT result bool, origin_id oid, lsn pg_lsn, timeout int DEFAULT 0)
AS $$
CREATE PROCEDURE spock.wait_for_sync_event(
OUT result bool,
origin_id oid,
lsn pg_lsn,
timeout int DEFAULT 0
) AS $$
DECLARE
target_id oid;
elapsed_time numeric := 0;
Expand All @@ -402,6 +406,17 @@ BEGIN
target_id := node_id FROM spock.node_info();

WHILE true LOOP
-- If an unresolvable issue occurs with the apply worker, the LR
-- progress gets stuck, and we need to check the subscription's state
-- carefully.
IF NOT EXISTS (SELECT * FROM spock.subscription
WHERE sub_origin = origin_id AND
sub_target = target_id AND
sub_enabled = true) THEN
RAISE EXCEPTION 'Replication % => % does not have any enabled subscription yet',
origin_id, target_id;
END IF;

SELECT INTO progress_lsn remote_commit_lsn
FROM spock.progress
WHERE node_id = target_id AND remote_node_id = origin_id;
Expand All @@ -421,8 +436,12 @@ BEGIN
END;
$$ LANGUAGE plpgsql;

CREATE PROCEDURE spock.wait_for_sync_event(OUT result bool, origin name, lsn pg_lsn, timeout int DEFAULT 0)
AS $$
CREATE PROCEDURE spock.wait_for_sync_event(
OUT result bool,
origin name,
lsn pg_lsn,
timeout int DEFAULT 0
) AS $$
DECLARE
origin_id oid;
target_id oid;
Expand All @@ -436,6 +455,17 @@ BEGIN
target_id := node_id FROM spock.node_info();

WHILE true LOOP
-- If an unresolvable issue occurs with the apply worker, the LR
-- progress gets stuck, and we need to check the subscription's state
-- carefully.
IF NOT EXISTS (SELECT * FROM spock.subscription
WHERE sub_origin = origin_id AND
sub_target = target_id AND
sub_enabled = true) THEN
RAISE EXCEPTION 'Replication % => % does not have any enabled subscription yet',
origin_id, target_id;
END IF;

SELECT INTO progress_lsn remote_commit_lsn
FROM spock.progress
WHERE node_id = target_id AND remote_node_id = origin_id;
Expand Down
6 changes: 3 additions & 3 deletions src/spock.c
Original file line number Diff line number Diff line change
Expand Up @@ -1108,11 +1108,11 @@ _PG_init(void)
"This setting is deprecated and has no effect. "
"The replay queue now dynamically allocates memory as needed.",
&spock_replay_queue_size,
4194304,
4,
0,
INT_MAX,
MAX_KILOBYTES / 1024,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is reason to change that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is just because of boring complain in the log. We assign this GUC in terms of megabytes, so we need to declare units and proper limits too.

PGC_SIGHUP,
0,
GUC_UNIT_MB,
NULL,
NULL,
NULL);
Expand Down
1 change: 1 addition & 0 deletions tests/tap/schedule
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ test: 004_non_default_repset
test: 008_rmgr
test: 009_zodan_add_remove_nodes
test: 010_zodan_add_remove_python
test: 012_zodan_basics

# Tests, consuming too much time to be launched on each check:
#test: 011_zodan_sync_third
Expand Down
124 changes: 124 additions & 0 deletions tests/tap/t/012_zodan_basics.pl
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use strict;
use warnings;
use Test::More;
use lib '.';
use lib 't';
use SpockTest qw(create_cluster destroy_cluster get_test_config psql_or_bail scalar_query);

my ($result);

create_cluster(3, 'Create basic Spock test cluster');

# Get cluster configuration
my $config = get_test_config();
my $node_count = $config->{node_count};
my $node_ports = $config->{node_ports};
my $host = $config->{host};
my $dbname = $config->{db_name};
my $db_user = $config->{db_user};
my $db_password = $config->{db_password};
my $pg_bin = $config->{pg_bin};

psql_or_bail(2, "SELECT spock.node_drop('n2')");
psql_or_bail(3, "SELECT spock.node_drop('n3')");
psql_or_bail(1, "CREATE EXTENSION amcheck");
psql_or_bail(2, "CREATE EXTENSION dblink");
psql_or_bail(3, "CREATE EXTENSION dblink");
psql_or_bail(2, "\\i ../../samples/Z0DAN/zodan.sql");
psql_or_bail(3, "\\i ../../samples/Z0DAN/zodan.sql");
psql_or_bail(1, "CREATE TABLE test(x serial PRIMARY KEY)");
psql_or_bail(1, "INSERT INTO test DEFAULT VALUES");

print STDERR "All supporting stuff has been installed successfully\n";

# ##############################################################################
#
# Basic check that Z0DAN correctly add node to the single-node cluster
#
# ##############################################################################

print STDERR "Call Z0DAN: n2 => n1\n";
psql_or_bail(2, "
CALL spock.add_node(
src_node_name := 'n1',
src_dsn := 'host=$host dbname=$dbname port=$node_ports->[0] user=$db_user password=$db_password',
new_node_name := 'n2',
new_node_dsn := 'host=$host dbname=$dbname port=$node_ports->[1] user=$db_user password=$db_password',
verb := false
)");
print STDERR "Z0DAN (n2 => n1) has finished the attach process\n";
$result = scalar_query(2, "SELECT x FROM test");
print STDERR "Check result: $result\n";
ok($result eq '1', "Check state of the test table after the attachment");

psql_or_bail(1, "SELECT spock.sub_disable('sub_n1_n2')");

# ##############################################################################
#
# Z0DAN reject node addition if some subscriptions are disabled
#
# ##############################################################################

print STDERR "Call Z0DAN: n3 => n2\n";
scalar_query(3, "
CALL spock.add_node(
src_node_name := 'n2',
src_dsn := 'host=$host dbname=$dbname port=$node_ports->[1] user=$db_user password=$db_password',
new_node_name := 'n3', new_node_dsn := 'host=$host dbname=$dbname port=$node_ports->[2] user=$db_user password=$db_password',
verb := false)");

$result = scalar_query(3, "SELECT count(*) FROM spock.local_node");
ok($result eq '0', "N3 is not in the cluster yet");
print STDERR "Z0DAN should fail because of a disabled subscription\n";

psql_or_bail(1, "SELECT spock.sub_enable('sub_n1_n2')");
psql_or_bail(3, "
CALL spock.add_node(
src_node_name := 'n2',
src_dsn := 'host=$host dbname=$dbname port=$node_ports->[1] user=$db_user password=$db_password',
new_node_name := 'n3', new_node_dsn := 'host=$host dbname=$dbname port=$node_ports->[2] user=$db_user password=$db_password',
verb := true)");

$result = scalar_query(3, "SELECT count(*) FROM spock.local_node");
ok($result eq '1', "N3 is in the cluster");
$result = scalar_query(3, "SELECT x FROM test");
print STDERR "Check result: $result\n";
ok($result eq '1', "Check state of the test table on N3 after the attachment");
print STDERR "Z0DAN should add N3 to the cluster\n";

# ##############################################################################
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the reason?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reason for what exactly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the reason?

Do you mean, reason for a new test or some check inside?

#
# Test that Z0DAN correctly doesn't add node to the cluster if something happens
# during the SYNC process.
#
# ##############################################################################

# Remove node from the cluster and data leftovers.
psql_or_bail(3, "\\i ../../samples/Z0DAN/zodremove.sql");
psql_or_bail(3, "CALL spock.remove_node(target_node_name := 'n3',
target_node_dsn := 'host=$host dbname=$dbname port=$node_ports->[2] user=$db_user password=$db_password',
verbose_mode := true)");
psql_or_bail(3, "DROP TABLE test");

psql_or_bail(1, "CREATE FUNCTION fake_fn() RETURNS integer LANGUAGE sql AS \$\$ SELECT 1\$\$");
psql_or_bail(3, "CREATE FUNCTION fake_fn() RETURNS integer LANGUAGE sql AS \$\$ SELECT 1\$\$");
scalar_query(3, "
CALL spock.add_node(
src_node_name := 'n2',
src_dsn := 'host=$host dbname=$dbname port=$node_ports->[1] user=$db_user password=$db_password',
new_node_name := 'n3', new_node_dsn := 'host=$host dbname=$dbname port=$node_ports->[2] user=$db_user password=$db_password',
verb := true)");

# TODO:
# It seems that add_node keeps remnants after unsuccessful execution. It is
# happened because we have commited some intermediate results before.
# It would be better to keep remote transaction opened until the end of the
# operation or just remove these remnants at the end pretending to be a
# distributed transaction.
#
# $result = scalar_query(3, "SELECT count(*) FROM spock.local_node");
# ok($result eq '0', "N3 is not in the cluster");

# Clean up
destroy_cluster('Destroy test cluster');
done_testing();
2 changes: 1 addition & 1 deletion tests/tap/t/SpockTest.pm
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ sub create_postgresql_conf {
print $conf "spock.exception_behaviour=sub_disable\n";
print $conf "spock.conflict_resolution=last_update_wins\n";
print $conf "track_commit_timestamp=on\n";
print $conf "spock.exception_replay_queue_size=1MB\n";
print $conf "spock.exception_replay_queue_size='1MB'\n";
print $conf "spock.enable_spill=on\n";
print $conf "port=$port\n";
print $conf "listen_addresses='*'\n";
Expand Down