Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 43 additions & 13 deletions include/spock_conflict.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@
*
*-------------------------------------------------------------------------
*/
#ifndef SPOCK_CONGLICT_H
#define SPOCK_CONGLICT_H
#ifndef SPOCK_CONFLICT_H
#define SPOCK_CONFLICT_H

#include "nodes/execnodes.h"

#include "utils/guc.h"

#include "spock_proto_native.h"
Expand Down Expand Up @@ -44,13 +43,45 @@ extern int spock_conflict_resolver;
extern int spock_conflict_log_level;
extern bool spock_save_resolutions;

typedef enum SpockConflictType
/* Avoid conflicts with PG's conflict.h via pgstat.h */
#ifndef CONFLICT_H
/*
* From include/replication/conflict.h in PostgreSQL
* Once we only support >= PG17, we may just include it.
*
* Conflict types that could occur while applying remote changes.
*/
typedef enum
{
CONFLICT_INSERT_EXISTS,
CONFLICT_UPDATE_UPDATE,
CONFLICT_UPDATE_DELETE,
CONFLICT_DELETE_DELETE
} SpockConflictType;
/* The row to be inserted violates unique constraint */
CT_INSERT_EXISTS,

/* The row to be updated was modified by a different origin */
CT_UPDATE_ORIGIN_DIFFERS,

/* The updated row value violates unique constraint */
CT_UPDATE_EXISTS,

/* The row to be updated is missing */
CT_UPDATE_MISSING,

/* The row to be deleted was modified by a different origin */
CT_DELETE_ORIGIN_DIFFERS,

/* The row to be deleted is missing */
CT_DELETE_MISSING

/*
* Other conflicts, such as exclusion constraint violations, involve more
* complex rules than simple equality checks. These conflicts are left for
* future improvements.
*/
} ConflictType;
#endif

extern int spock_conflict_resolver;
extern int spock_conflict_log_level;
extern bool spock_save_resolutions;

extern bool get_tuple_origin(SpockRelation *rel, HeapTuple local_tuple,
ItemPointer tid, TransactionId *xmin,
Expand All @@ -61,8 +92,7 @@ extern bool try_resolve_conflict(Relation rel, HeapTuple localtuple,
RepOriginId local_origin, TimestampTz local_ts,
SpockConflictResolution *resolution);


extern void spock_report_conflict(SpockConflictType conflict_type,
extern void spock_report_conflict(ConflictType conflict_type,
SpockRelation *rel,
HeapTuple localtuple,
SpockTupleData *oldkey,
Expand All @@ -75,7 +105,7 @@ extern void spock_report_conflict(SpockConflictType conflict_type,
TimestampTz local_tuple_timestamp,
Oid conflict_idx_id);

extern void spock_conflict_log_table(SpockConflictType conflict_type,
extern void spock_conflict_log_table(ConflictType conflict_type,
SpockRelation *rel,
HeapTuple localtuple,
SpockTupleData *oldkey,
Expand All @@ -95,4 +125,4 @@ extern bool spock_conflict_resolver_check_hook(int *newval, void **extra,
extern void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc,
HeapTuple tuple);

#endif /* SPOCK_CONGLICT_H */
#endif /* SPOCK_CONFLICT_H */
10 changes: 10 additions & 0 deletions sql/spock--5.0.4--6.0.0-devel.sql
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,13 @@ CREATE VIEW spock.lag_tracker AS
-- Source for sub_id values.
CREATE SEQUENCE spock.sub_id_generator AS integer MINVALUE 1 CYCLE START WITH 1
OWNED BY spock.subscription.sub_id;

-- Migrate spock.resolutions to the new conflict types
-- insert_exists stays the same
UPDATE spock.resolutions
SET conflict_type = CASE conflict_type
WHEN 'update_update' THEN 'update_exists'
WHEN 'update_delete' THEN 'update_missing'
WHEN 'delete_delete' THEN 'delete_missing'
ELSE conflict_type
END;
15 changes: 12 additions & 3 deletions src/spock_apply_heap.c
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,10 @@ finish_apply_exec_state(ApplyExecState *aestate)
pfree(aestate);
}

/**
* This is called when there is a potential conflict that may be able to be resolved
* according to resolution rules
*/
static bool
spock_handle_conflict_and_apply(SpockRelation *rel, EState *estate,
TupleTableSlot *localslot, TupleTableSlot *remoteslot,
Expand Down Expand Up @@ -760,9 +764,10 @@ spock_handle_conflict_and_apply(SpockRelation *rel, EState *estate,
}

/*
* See if we need to log any conflict
* See if we need to log any conflict to the server log and spock.resolutions
* Calling this does not necessarily mean that there is a conflict
*/
spock_report_conflict(is_insert ? CONFLICT_INSERT_EXISTS : CONFLICT_UPDATE_UPDATE,
spock_report_conflict(is_insert ? CT_INSERT_EXISTS : CT_UPDATE_EXISTS,
rel, TTS_TUP(localslot), oldtup,
remotetuple, applytuple, resolution,
xmin, local_origin_found, local_origin,
Expand Down Expand Up @@ -1066,6 +1071,9 @@ spock_apply_heap_update(SpockRelation *rel, SpockTupleData *oldtup,
* Perform the UPDATE if Tuple found.
*
* Note this will fail if there are other conflicting unique indexes.
*
* spock_handle_conflict_and_apply is a misnomer as it is called for
* the normal UPDATE case, too.
*/
if (found)
{
Expand All @@ -1075,6 +1083,7 @@ spock_apply_heap_update(SpockRelation *rel, SpockTupleData *oldtup,
}
else
{
/* CT_UPDATE_MISSING case gets logged in exception_log, not resolutions */
SpockExceptionLog *exception_log = &exception_log_ptr[my_exception_log_index];

/*
Expand Down Expand Up @@ -1207,7 +1216,7 @@ spock_apply_heap_delete(SpockRelation *rel, SpockTupleData *oldtup)
exception_log->local_tuple = NULL;
remotetuple = heap_form_tuple(RelationGetDescr(rel->rel),
oldtup->values, oldtup->nulls);
spock_report_conflict(CONFLICT_DELETE_DELETE,
spock_report_conflict(CT_DELETE_MISSING,
rel, NULL, oldtup,
remotetuple, NULL, SpockResolution_Skip,
InvalidTransactionId, false, InvalidRepOriginId,
Expand Down
69 changes: 38 additions & 31 deletions src/spock_conflict.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,18 @@
#include "spock_node.h"
#include "spock_worker.h"


/* From src/backend/replication/logical/conflict.c */
static const char *const ConflictTypeNames[] = {
[CT_INSERT_EXISTS] = "insert_exists",
[CT_UPDATE_ORIGIN_DIFFERS] = "update_origin_differs",
[CT_UPDATE_EXISTS] = "update_exists",
[CT_UPDATE_MISSING] = "update_missing",
[CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
[CT_DELETE_MISSING] = "delete_missing"
};


int spock_conflict_resolver = SPOCK_RESOLVE_LAST_UPDATE_WINS;
int spock_conflict_log_level = LOG;
bool spock_save_resolutions = false;
Expand Down Expand Up @@ -274,25 +286,6 @@ try_resolve_conflict(Relation rel, HeapTuple localtuple, HeapTuple remotetuple,
return apply;
}

static char *
conflict_type_to_string(SpockConflictType conflict_type)
{
switch (conflict_type)
{
case CONFLICT_INSERT_EXISTS:
return "insert_exists";
case CONFLICT_UPDATE_UPDATE:
return "update_update";
case CONFLICT_UPDATE_DELETE:
return "update_delete";
case CONFLICT_DELETE_DELETE:
return "delete_delete";
}

/* Unreachable */
return NULL;
}

static char *
conflict_resolution_to_string(SpockConflictResolution resolution)
{
Expand All @@ -313,6 +306,8 @@ conflict_resolution_to_string(SpockConflictResolution resolution)
/*
* Log the conflict to server log.
*
* If configured to do so, also log to the spock.resolutions table
*
* There are number of tuples passed:
*
* - The local tuple we conflict with or NULL if not found [localtuple];
Expand All @@ -333,7 +328,7 @@ conflict_resolution_to_string(SpockConflictResolution resolution)
* we still try to free the big chunks as we go.
*/
void
spock_report_conflict(SpockConflictType conflict_type,
spock_report_conflict(ConflictType conflict_type,
SpockRelation *rel,
HeapTuple localtuple,
SpockTupleData *oldkey,
Expand All @@ -355,7 +350,7 @@ spock_report_conflict(SpockConflictType conflict_type,


/* Ignore update-update conflict for same origin */
if (conflict_type == CONFLICT_UPDATE_UPDATE)
if (conflict_type == CT_UPDATE_EXISTS)
{
/*
* If updating a row that came from the same origin, do not report it
Expand All @@ -368,13 +363,16 @@ spock_report_conflict(SpockConflictType conflict_type,
if (local_tuple_origin == InvalidRepOriginId &&
TransactionIdEquals(local_tuple_xid, GetTopTransactionId()))
return;

/* Differing origin */
conflict_type = CT_UPDATE_ORIGIN_DIFFERS;
}

/* Count statistics */
handle_stats_counter(rel->rel, MyApplyWorker->subid,
SPOCK_STATS_CONFLICT_COUNT, 1);

/* If configured log resolution to table */
/* If configured log resolution to spock.resolutions table */
spock_conflict_log_table(conflict_type, rel, localtuple, oldkey,
remotetuple, applytuple, resolution,
local_tuple_xid, found_local_origin,
Expand All @@ -388,7 +386,10 @@ spock_report_conflict(SpockConflictType conflict_type,
MAXDATELEN);

initStringInfo(&remotetup);
tuple_to_stringinfo(&remotetup, desc, remotetuple);

/* Check for old tuple to in case handling DELETE case */
if (remotetuple)
tuple_to_stringinfo(&remotetup, desc, remotetuple);

if (localtuple != NULL)
{
Expand All @@ -413,15 +414,18 @@ spock_report_conflict(SpockConflictType conflict_type,
* This deliberately somewhat overlaps with the context info we log with
* log_error_verbosity=verbose because we don't necessarily have all that
* info enabled.
*
* Handling for CT_DELETE_ORIGIN_DIFFERS will be added separately.
*/
switch (conflict_type)
{
case CONFLICT_INSERT_EXISTS:
case CONFLICT_UPDATE_UPDATE:
case CT_INSERT_EXISTS:
case CT_UPDATE_EXISTS:
case CT_UPDATE_ORIGIN_DIFFERS:
ereport(spock_conflict_log_level,
(errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
errmsg("CONFLICT: remote %s on relation %s (local index %s). Resolution: %s.",
conflict_type == CONFLICT_INSERT_EXISTS ? "INSERT EXISTS" : "UPDATE",
ConflictTypeNames[conflict_type],
qualrelname, idxname,
conflict_resolution_to_string(resolution)),
errdetail("existing local tuple {%s} xid=%u,origin=%d,timestamp=%s; remote tuple {%s} in xact origin=%u,timestamp=%s,commit_lsn=%X/%X",
Expand All @@ -434,12 +438,12 @@ spock_report_conflict(SpockConflictType conflict_type,
(uint32) (replorigin_session_origin_lsn << 32),
(uint32) replorigin_session_origin_lsn)));
break;
case CONFLICT_UPDATE_DELETE:
case CONFLICT_DELETE_DELETE:
case CT_UPDATE_MISSING:
case CT_DELETE_MISSING:
ereport(spock_conflict_log_level,
(errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
errmsg("CONFLICT: remote %s on relation %s replica identity index %s (tuple not found). Resolution: %s.",
conflict_type == CONFLICT_UPDATE_DELETE ? "UPDATE" : "DELETE",
ConflictTypeNames[conflict_type],
qualrelname, idxname,
conflict_resolution_to_string(resolution)),
errdetail("remote tuple {%s} in xact origin=%u,timestamp=%s,commit_lsn=%X/%X",
Expand All @@ -449,6 +453,9 @@ spock_report_conflict(SpockConflictType conflict_type,
(uint32) (replorigin_session_origin_lsn << 32),
(uint32) replorigin_session_origin_lsn)));
break;
case CT_DELETE_ORIGIN_DIFFERS:
/* keep compiler happy; handling will be added separately */
break;
}
}

Expand All @@ -475,7 +482,7 @@ spock_report_conflict(SpockConflictType conflict_type,
* we still try to free the big chunks as we go.
*/
void
spock_conflict_log_table(SpockConflictType conflict_type,
spock_conflict_log_table(ConflictType conflict_type,
SpockRelation *rel,
HeapTuple localtuple,
SpockTupleData *oldkey,
Expand Down Expand Up @@ -536,7 +543,7 @@ spock_conflict_log_table(SpockConflictType conflict_type,
nulls[4] = true;

/* conflict type */
values[5] = CStringGetTextDatum(conflict_type_to_string(conflict_type));
values[5] = CStringGetTextDatum(ConflictTypeNames[conflict_type]);
/* conflict_resolution */
values[6] = CStringGetTextDatum(conflict_resolution_to_string(resolution));
/* local_origin */
Expand Down
Loading