From c384bba1b5284e78f4f7bf61f87ca8de288fdbc4 Mon Sep 17 00:00:00 2001 From: Mason Sharp Date: Thu, 4 Dec 2025 07:36:32 -0800 Subject: [PATCH] Adopt PostgreSQL native conflict types for better compatibility MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace Spock's custom SpockConflictType enum with PostgreSQL's ConflictType enum from replication/conflict.h. This provides more granular conflict classification and better alignment with PostgreSQL's built-in logical replication conflict handling. Key changes: - Rename conflict types to match PostgreSQL conventions: • update_update → update_exists (same-value conflicts) • update_update → update_origin_differs (different-origin conflicts) • update_delete → update_missing • delete_delete → delete_missing - Add migration for existing spock.resolutions.conflict_type data - Fix header guard typo (SPOCK_CONGLICT_H → SPOCK_CONFLICT_H) - Expand test coverage for all conflict scenarios including insert_exists, update_missing, and delete_missing cases The migration ensures existing conflict resolution data uses the new naming convention. --- include/spock_conflict.h | 56 +++++++++++++++----- sql/spock--5.0.4--6.0.0-devel.sql | 10 ++++ src/spock_apply_heap.c | 15 ++++-- src/spock_conflict.c | 69 ++++++++++++++----------- tests/regress/expected/tuple_origin.out | 65 ++++++++++++++++++++--- tests/regress/sql/tuple_origin.sql | 28 +++++++++- 6 files changed, 188 insertions(+), 55 deletions(-) diff --git a/include/spock_conflict.h b/include/spock_conflict.h index 324eb51b..ef417778 100644 --- a/include/spock_conflict.h +++ b/include/spock_conflict.h @@ -9,11 +9,10 @@ * *------------------------------------------------------------------------- */ -#ifndef SPOCK_CONGLICT_H -#define SPOCK_CONGLICT_H +#ifndef SPOCK_CONFLICT_H +#define SPOCK_CONFLICT_H #include "nodes/execnodes.h" - #include "utils/guc.h" #include "spock_proto_native.h" @@ -44,13 +43,45 @@ extern int spock_conflict_resolver; extern int spock_conflict_log_level; extern bool spock_save_resolutions; -typedef enum SpockConflictType +/* Avoid conflicts with PG's conflict.h via pgstat.h */ +#ifndef CONFLICT_H +/* + * From include/replication/conflict.h in PostgreSQL + * Once we only support >= PG17, we may just include it. + * + * Conflict types that could occur while applying remote changes. + */ +typedef enum { - CONFLICT_INSERT_EXISTS, - CONFLICT_UPDATE_UPDATE, - CONFLICT_UPDATE_DELETE, - CONFLICT_DELETE_DELETE -} SpockConflictType; + /* The row to be inserted violates unique constraint */ + CT_INSERT_EXISTS, + + /* The row to be updated was modified by a different origin */ + CT_UPDATE_ORIGIN_DIFFERS, + + /* The updated row value violates unique constraint */ + CT_UPDATE_EXISTS, + + /* The row to be updated is missing */ + CT_UPDATE_MISSING, + + /* The row to be deleted was modified by a different origin */ + CT_DELETE_ORIGIN_DIFFERS, + + /* The row to be deleted is missing */ + CT_DELETE_MISSING + + /* + * Other conflicts, such as exclusion constraint violations, involve more + * complex rules than simple equality checks. These conflicts are left for + * future improvements. + */ +} ConflictType; +#endif + +extern int spock_conflict_resolver; +extern int spock_conflict_log_level; +extern bool spock_save_resolutions; extern bool get_tuple_origin(SpockRelation *rel, HeapTuple local_tuple, ItemPointer tid, TransactionId *xmin, @@ -61,8 +92,7 @@ extern bool try_resolve_conflict(Relation rel, HeapTuple localtuple, RepOriginId local_origin, TimestampTz local_ts, SpockConflictResolution *resolution); - -extern void spock_report_conflict(SpockConflictType conflict_type, +extern void spock_report_conflict(ConflictType conflict_type, SpockRelation *rel, HeapTuple localtuple, SpockTupleData *oldkey, @@ -75,7 +105,7 @@ extern void spock_report_conflict(SpockConflictType conflict_type, TimestampTz local_tuple_timestamp, Oid conflict_idx_id); -extern void spock_conflict_log_table(SpockConflictType conflict_type, +extern void spock_conflict_log_table(ConflictType conflict_type, SpockRelation *rel, HeapTuple localtuple, SpockTupleData *oldkey, @@ -95,4 +125,4 @@ extern bool spock_conflict_resolver_check_hook(int *newval, void **extra, extern void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple); -#endif /* SPOCK_CONGLICT_H */ +#endif /* SPOCK_CONFLICT_H */ diff --git a/sql/spock--5.0.4--6.0.0-devel.sql b/sql/spock--5.0.4--6.0.0-devel.sql index e46a6064..5a54e744 100644 --- a/sql/spock--5.0.4--6.0.0-devel.sql +++ b/sql/spock--5.0.4--6.0.0-devel.sql @@ -57,3 +57,13 @@ CREATE VIEW spock.lag_tracker AS -- Source for sub_id values. CREATE SEQUENCE spock.sub_id_generator AS integer MINVALUE 1 CYCLE START WITH 1 OWNED BY spock.subscription.sub_id; + +-- Migrate spock.resolutions to the new conflict types +-- insert_exists stays the same +UPDATE spock.resolutions +SET conflict_type = CASE conflict_type + WHEN 'update_update' THEN 'update_exists' + WHEN 'update_delete' THEN 'update_missing' + WHEN 'delete_delete' THEN 'delete_missing' + ELSE conflict_type +END; diff --git a/src/spock_apply_heap.c b/src/spock_apply_heap.c index abf6ca84..6bdf4c8e 100644 --- a/src/spock_apply_heap.c +++ b/src/spock_apply_heap.c @@ -705,6 +705,10 @@ finish_apply_exec_state(ApplyExecState *aestate) pfree(aestate); } +/** + * This is called when there is a potential conflict that may be able to be resolved + * according to resolution rules + */ static bool spock_handle_conflict_and_apply(SpockRelation *rel, EState *estate, TupleTableSlot *localslot, TupleTableSlot *remoteslot, @@ -760,9 +764,10 @@ spock_handle_conflict_and_apply(SpockRelation *rel, EState *estate, } /* - * See if we need to log any conflict + * See if we need to log any conflict to the server log and spock.resolutions + * Calling this does not necessarily mean that there is a conflict */ - spock_report_conflict(is_insert ? CONFLICT_INSERT_EXISTS : CONFLICT_UPDATE_UPDATE, + spock_report_conflict(is_insert ? CT_INSERT_EXISTS : CT_UPDATE_EXISTS, rel, TTS_TUP(localslot), oldtup, remotetuple, applytuple, resolution, xmin, local_origin_found, local_origin, @@ -1066,6 +1071,9 @@ spock_apply_heap_update(SpockRelation *rel, SpockTupleData *oldtup, * Perform the UPDATE if Tuple found. * * Note this will fail if there are other conflicting unique indexes. + * + * spock_handle_conflict_and_apply is a misnomer as it is called for + * the normal UPDATE case, too. */ if (found) { @@ -1075,6 +1083,7 @@ spock_apply_heap_update(SpockRelation *rel, SpockTupleData *oldtup, } else { + /* CT_UPDATE_MISSING case gets logged in exception_log, not resolutions */ SpockExceptionLog *exception_log = &exception_log_ptr[my_exception_log_index]; /* @@ -1207,7 +1216,7 @@ spock_apply_heap_delete(SpockRelation *rel, SpockTupleData *oldtup) exception_log->local_tuple = NULL; remotetuple = heap_form_tuple(RelationGetDescr(rel->rel), oldtup->values, oldtup->nulls); - spock_report_conflict(CONFLICT_DELETE_DELETE, + spock_report_conflict(CT_DELETE_MISSING, rel, NULL, oldtup, remotetuple, NULL, SpockResolution_Skip, InvalidTransactionId, false, InvalidRepOriginId, diff --git a/src/spock_conflict.c b/src/spock_conflict.c index b8bff6f1..f1d62b5f 100644 --- a/src/spock_conflict.c +++ b/src/spock_conflict.c @@ -55,6 +55,18 @@ #include "spock_node.h" #include "spock_worker.h" + +/* From src/backend/replication/logical/conflict.c */ +static const char *const ConflictTypeNames[] = { + [CT_INSERT_EXISTS] = "insert_exists", + [CT_UPDATE_ORIGIN_DIFFERS] = "update_origin_differs", + [CT_UPDATE_EXISTS] = "update_exists", + [CT_UPDATE_MISSING] = "update_missing", + [CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs", + [CT_DELETE_MISSING] = "delete_missing" +}; + + int spock_conflict_resolver = SPOCK_RESOLVE_LAST_UPDATE_WINS; int spock_conflict_log_level = LOG; bool spock_save_resolutions = false; @@ -274,25 +286,6 @@ try_resolve_conflict(Relation rel, HeapTuple localtuple, HeapTuple remotetuple, return apply; } -static char * -conflict_type_to_string(SpockConflictType conflict_type) -{ - switch (conflict_type) - { - case CONFLICT_INSERT_EXISTS: - return "insert_exists"; - case CONFLICT_UPDATE_UPDATE: - return "update_update"; - case CONFLICT_UPDATE_DELETE: - return "update_delete"; - case CONFLICT_DELETE_DELETE: - return "delete_delete"; - } - - /* Unreachable */ - return NULL; -} - static char * conflict_resolution_to_string(SpockConflictResolution resolution) { @@ -313,6 +306,8 @@ conflict_resolution_to_string(SpockConflictResolution resolution) /* * Log the conflict to server log. * + * If configured to do so, also log to the spock.resolutions table + * * There are number of tuples passed: * * - The local tuple we conflict with or NULL if not found [localtuple]; @@ -333,7 +328,7 @@ conflict_resolution_to_string(SpockConflictResolution resolution) * we still try to free the big chunks as we go. */ void -spock_report_conflict(SpockConflictType conflict_type, +spock_report_conflict(ConflictType conflict_type, SpockRelation *rel, HeapTuple localtuple, SpockTupleData *oldkey, @@ -355,7 +350,7 @@ spock_report_conflict(SpockConflictType conflict_type, /* Ignore update-update conflict for same origin */ - if (conflict_type == CONFLICT_UPDATE_UPDATE) + if (conflict_type == CT_UPDATE_EXISTS) { /* * If updating a row that came from the same origin, do not report it @@ -368,13 +363,16 @@ spock_report_conflict(SpockConflictType conflict_type, if (local_tuple_origin == InvalidRepOriginId && TransactionIdEquals(local_tuple_xid, GetTopTransactionId())) return; + + /* Differing origin */ + conflict_type = CT_UPDATE_ORIGIN_DIFFERS; } /* Count statistics */ handle_stats_counter(rel->rel, MyApplyWorker->subid, SPOCK_STATS_CONFLICT_COUNT, 1); - /* If configured log resolution to table */ + /* If configured log resolution to spock.resolutions table */ spock_conflict_log_table(conflict_type, rel, localtuple, oldkey, remotetuple, applytuple, resolution, local_tuple_xid, found_local_origin, @@ -388,7 +386,10 @@ spock_report_conflict(SpockConflictType conflict_type, MAXDATELEN); initStringInfo(&remotetup); - tuple_to_stringinfo(&remotetup, desc, remotetuple); + + /* Check for old tuple to in case handling DELETE case */ + if (remotetuple) + tuple_to_stringinfo(&remotetup, desc, remotetuple); if (localtuple != NULL) { @@ -413,15 +414,18 @@ spock_report_conflict(SpockConflictType conflict_type, * This deliberately somewhat overlaps with the context info we log with * log_error_verbosity=verbose because we don't necessarily have all that * info enabled. + * + * Handling for CT_DELETE_ORIGIN_DIFFERS will be added separately. */ switch (conflict_type) { - case CONFLICT_INSERT_EXISTS: - case CONFLICT_UPDATE_UPDATE: + case CT_INSERT_EXISTS: + case CT_UPDATE_EXISTS: + case CT_UPDATE_ORIGIN_DIFFERS: ereport(spock_conflict_log_level, (errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION), errmsg("CONFLICT: remote %s on relation %s (local index %s). Resolution: %s.", - conflict_type == CONFLICT_INSERT_EXISTS ? "INSERT EXISTS" : "UPDATE", + ConflictTypeNames[conflict_type], qualrelname, idxname, conflict_resolution_to_string(resolution)), errdetail("existing local tuple {%s} xid=%u,origin=%d,timestamp=%s; remote tuple {%s} in xact origin=%u,timestamp=%s,commit_lsn=%X/%X", @@ -434,12 +438,12 @@ spock_report_conflict(SpockConflictType conflict_type, (uint32) (replorigin_session_origin_lsn << 32), (uint32) replorigin_session_origin_lsn))); break; - case CONFLICT_UPDATE_DELETE: - case CONFLICT_DELETE_DELETE: + case CT_UPDATE_MISSING: + case CT_DELETE_MISSING: ereport(spock_conflict_log_level, (errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION), errmsg("CONFLICT: remote %s on relation %s replica identity index %s (tuple not found). Resolution: %s.", - conflict_type == CONFLICT_UPDATE_DELETE ? "UPDATE" : "DELETE", + ConflictTypeNames[conflict_type], qualrelname, idxname, conflict_resolution_to_string(resolution)), errdetail("remote tuple {%s} in xact origin=%u,timestamp=%s,commit_lsn=%X/%X", @@ -449,6 +453,9 @@ spock_report_conflict(SpockConflictType conflict_type, (uint32) (replorigin_session_origin_lsn << 32), (uint32) replorigin_session_origin_lsn))); break; + case CT_DELETE_ORIGIN_DIFFERS: + /* keep compiler happy; handling will be added separately */ + break; } } @@ -475,7 +482,7 @@ spock_report_conflict(SpockConflictType conflict_type, * we still try to free the big chunks as we go. */ void -spock_conflict_log_table(SpockConflictType conflict_type, +spock_conflict_log_table(ConflictType conflict_type, SpockRelation *rel, HeapTuple localtuple, SpockTupleData *oldkey, @@ -536,7 +543,7 @@ spock_conflict_log_table(SpockConflictType conflict_type, nulls[4] = true; /* conflict type */ - values[5] = CStringGetTextDatum(conflict_type_to_string(conflict_type)); + values[5] = CStringGetTextDatum(ConflictTypeNames[conflict_type]); /* conflict_resolution */ values[6] = CStringGetTextDatum(conflict_resolution_to_string(resolution)); /* local_origin */ diff --git a/tests/regress/expected/tuple_origin.out b/tests/regress/expected/tuple_origin.out index fbdf32e1..584d59c0 100644 --- a/tests/regress/expected/tuple_origin.out +++ b/tests/regress/expected/tuple_origin.out @@ -56,17 +56,49 @@ SELECT COUNT(*) FROM spock.resolutions -- DELETE the row from subscriber first, in order to create a conflict DELETE FROM users where id = 3; +TRUNCATE spock.resolutions; +TRUNCATE spock.exception_log; +\c :provider_dsn +-- This will create a update_missing conflict on the subscriber, row does not exist +UPDATE users SET mgr_id = 99 WHERE id = 3; +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + +\c :subscriber_dsn +-- Expect 0 rows in spock.resolutions +SELECT COUNT(*) FROM spock.resolutions; + count +------- + 0 +(1 row) + +-- Expect 1 row in spock.exception_log +SELECT operation, table_name FROM spock.exception_log; + operation | table_name +-----------+------------ + UPDATE | users +(1 row) + \c :provider_dsn -- This will create a conflict on the subscriber DELETE FROM users where id = 3; +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + \c :subscriber_dsn -- Expect 1 row in spock.resolutions with NULL local_timestamp -SELECT COUNT(*) FROM spock.resolutions +SELECT conflict_type FROM spock.resolutions WHERE relname='public.users' AND local_timestamp IS NULL; - count -------- - 1 + conflict_type +---------------- + delete_missing (1 row) -- More tests @@ -153,9 +185,9 @@ SELECT * FROM basic_conflict ORDER BY id; -- We should now see a conflict SELECT relname, conflict_type FROM spock.resolutions WHERE relname = 'public.basic_conflict'; - relname | conflict_type ------------------------+--------------- - public.basic_conflict | update_update + relname | conflict_type +-----------------------+----------------------- + public.basic_conflict | update_origin_differs (1 row) -- Clean @@ -187,7 +219,26 @@ SELECT relname, conflict_type FROM spock.resolutions WHERE relname = 'public.bas ---------+--------------- (0 rows) +-- insert_exists check. Add a row to conflict with +TRUNCATE spock.exception_log; +TRUNCATE spock.resolutions; +INSERT INTO basic_conflict VALUES (4, 'D'); \c :provider_dsn +INSERT INTO basic_conflict VALUES (4, 'DD'); +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + +\c :subscriber_dsn +-- The insert gets converted into an update, conflict type insert_exists +SELECT conflict_type, conflict_resolution, remote_tuple FROM spock.resolutions; + conflict_type | conflict_resolution | remote_tuple +---------------+---------------------+---------------------- + insert_exists | apply_remote | {"id":4,"data":"DD"} +(1 row) + -- cleanup \c :provider_dsn SELECT * FROM spock.repset_remove_table('default', 'users'); diff --git a/tests/regress/sql/tuple_origin.sql b/tests/regress/sql/tuple_origin.sql index 7de870e3..e27130c8 100644 --- a/tests/regress/sql/tuple_origin.sql +++ b/tests/regress/sql/tuple_origin.sql @@ -33,18 +33,33 @@ SELECT COUNT(*) FROM spock.resolutions -- DELETE the row from subscriber first, in order to create a conflict DELETE FROM users where id = 3; +TRUNCATE spock.resolutions; +TRUNCATE spock.exception_log; + +\c :provider_dsn +-- This will create a update_missing conflict on the subscriber, row does not exist +UPDATE users SET mgr_id = 99 WHERE id = 3; +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + +\c :subscriber_dsn +-- Expect 0 rows in spock.resolutions +SELECT COUNT(*) FROM spock.resolutions; +-- Expect 1 row in spock.exception_log +SELECT operation, table_name FROM spock.exception_log; \c :provider_dsn -- This will create a conflict on the subscriber DELETE FROM users where id = 3; +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); \c :subscriber_dsn -- Expect 1 row in spock.resolutions with NULL local_timestamp -SELECT COUNT(*) FROM spock.resolutions +SELECT conflict_type FROM spock.resolutions WHERE relname='public.users' AND local_timestamp IS NULL; -- More tests + \c :provider_dsn SELECT spock.replicate_ddl($$ CREATE TABLE basic_conflict ( @@ -111,7 +126,18 @@ SELECT * FROM basic_conflict ORDER BY id; -- We should not see a conflict SELECT relname, conflict_type FROM spock.resolutions WHERE relname = 'public.basic_conflict'; +-- insert_exists check. Add a row to conflict with +TRUNCATE spock.exception_log; +TRUNCATE spock.resolutions; +INSERT INTO basic_conflict VALUES (4, 'D'); + \c :provider_dsn +INSERT INTO basic_conflict VALUES (4, 'DD'); +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + +\c :subscriber_dsn +-- The insert gets converted into an update, conflict type insert_exists +SELECT conflict_type, conflict_resolution, remote_tuple FROM spock.resolutions; -- cleanup \c :provider_dsn