Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions include/spock_output_plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ typedef struct SpockOutputData
const char *spock_version;
int spock_version_num;

/* Protocol version negotiation */
uint32 negotiated_proto_version;

/* List of origin names */
List *forward_origins;
/* List of SpockRepSet */
Expand Down
9 changes: 7 additions & 2 deletions include/spock_output_proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,14 @@
* we can support. SPOCK_PROTO_MIN_VERSION_NUM is the oldest version we
* have backwards compatibility for. We negotiate protocol versions during the
* startup handshake. See the protocol documentation for details.
*
* SPOCK_MIN_VERSION_NUM_FOR_MULTI_PROTO is the minimum Spock version that
* supports multi-protocol mode (protocol versions 4 and 5). Both nodes must
* be at least this version to use protocol version 4.
*/
#define SPOCK_PROTO_VERSION_NUM 4
#define SPOCK_PROTO_MIN_VERSION_NUM 3
#define SPOCK_PROTO_VERSION_NUM 5
#define SPOCK_PROTO_MIN_VERSION_NUM 4
#define SPOCK_MIN_VERSION_NUM_FOR_MULTI_PROTO 50000

/*
* The startup parameter format is versioned separately to the rest of the wire
Expand Down
12 changes: 11 additions & 1 deletion include/spock_proto_native.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ extern TimestampTz spock_read_commit_order(StringInfo in);
extern void spock_read_begin(StringInfo in, XLogRecPtr *remote_lsn,
TimestampTz *committime, TransactionId *remote_xid);
extern void spock_read_commit(StringInfo in, XLogRecPtr *commit_lsn,
XLogRecPtr *end_lsn, TimestampTz *committime);
XLogRecPtr *end_lsn, TimestampTz *committime,
XLogRecPtr *remote_insert_lsn);
extern RepOriginId spock_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
extern uint32 spock_read_rel(StringInfo in);
extern SpockRelation *spock_read_insert(StringInfo in, LOCKMODE lockmode,
Expand All @@ -66,4 +67,13 @@ extern void spock_write_message(StringInfo out, TransactionId xid, XLogRecPtr ls
bool transactional, const char *prefix, Size sz,
const char *message);

/* Protocol version management for publisher */
extern void spock_set_proto_version(uint32 version);
extern uint32 spock_get_proto_version(void);

/* Protocol version management for subscriber */
extern void spock_apply_set_proto_version(uint32 version);
extern uint32 spock_apply_get_proto_version(void);

#endif /* SPOCK_PROTO_NATIVE_H */

49 changes: 47 additions & 2 deletions src/spock_apply.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
#include "spock_conflict.h"
#include "spock_executor.h"
#include "spock_node.h"
#include "spock_proto_native.h"
#include "spock_queue.h"
#include "spock_relcache.h"
#include "spock_repset.h"
Expand Down Expand Up @@ -224,6 +225,7 @@ static void maybe_send_feedback(PGconn *applyconn, XLogRecPtr lsn_to_send,
static void append_feedback_position(XLogRecPtr recvpos);
static void get_feedback_position(XLogRecPtr *recvpos, XLogRecPtr *writepos,
XLogRecPtr *flushpos, XLogRecPtr *max_recvpos);
static void UpdateWorkerStats(XLogRecPtr last_received, XLogRecPtr last_inserted);

/* Wrapper for latch for waiting for previous transaction to commit */
void
Expand Down Expand Up @@ -661,11 +663,20 @@ handle_commit(StringInfo s)
XLogRecPtr commit_lsn;
XLogRecPtr end_lsn;
TimestampTz commit_time;
XLogRecPtr remote_insert_lsn;

errcallback_arg.action_name = "COMMIT";
xact_action_counter++;

spock_read_commit(s, &commit_lsn, &end_lsn, &commit_time);
spock_read_commit(s, &commit_lsn, &end_lsn, &commit_time, &remote_insert_lsn);

/*
* For protocol version 4, remote_insert_lsn is read from the end of
* COMMIT message. For protocol version 5+, it's read at the beginning of
* all messages in apply_work.
*/
if (remote_insert_lsn != InvalidXLogRecPtr)
UpdateWorkerStats(end_lsn, remote_insert_lsn);

Assert(commit_time == replorigin_session_origin_timestamp);

Expand Down Expand Up @@ -1724,6 +1735,32 @@ handle_startup_param(const char *key, const char *value)
MySubscription->name, fwd ? "t" : "f");
}

/*
* Extract the negotiated protocol version from the publisher. This is the
* protocol version the publisher decided to use based on what both sides
* support. The subscriber must use this version when reading messages.
*/
if (strcmp(key, "proto_version") == 0)
{
uint32 proto_version;
char *endptr;

errno = 0;
proto_version = strtoul(value, &endptr, 10);

if (errno != 0 || *endptr != '\0')
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("SPOCK %s: invalid proto_version value: %s",
MySubscription->name, value)));

/* Set the protocol version for the subscriber */
spock_apply_set_proto_version(proto_version);

elog(LOG, "SPOCK %s: using protocol version %u from publisher",
MySubscription->name, proto_version);
}

/*
* We just ignore a bunch of parameters here because we specify what we
* require when we send our params to the upstream. It's required to ERROR
Expand Down Expand Up @@ -2791,8 +2828,16 @@ apply_work(PGconn *streamConn)
/*
* Update statistics before applying the record to let the
* apply machinery to check consistency of these values.
*
* Protocol version 5+ includes remote_insert_lsn at the
* beginning of all messages. Protocol version 4 only
* includes it at the end of COMMIT messages (handled in
* handle_commit).
*/
last_inserted = pq_getmsgint64(msg);
if (spock_apply_get_proto_version() >= 5)
last_inserted = pq_getmsgint64(msg);
else
last_inserted = last_received;
UpdateWorkerStats(last_received, last_inserted);

replication_handler(msg);
Expand Down
3 changes: 3 additions & 0 deletions src/spock_output_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,9 @@ prepare_startup_message(SpockOutputData *data)
l = add_startup_msg_i(l, "max_proto_version", SPOCK_PROTO_VERSION_NUM);
l = add_startup_msg_i(l, "min_proto_version", SPOCK_PROTO_MIN_VERSION_NUM);

/* Send the negotiated protocol version to the subscriber */
l = add_startup_msg_i(l, "proto_version", data->negotiated_proto_version);

/* We don't support understand column types yet */
l = add_startup_msg_b(l, "coltypes", false);

Expand Down
55 changes: 54 additions & 1 deletion src/spock_output_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "spock_executor.h"
#include "spock_node.h"
#include "spock_output_proto.h"
#include "spock_proto_native.h"
#include "spock_queue.h"
#include "spock_repset.h"
#include "spock_worker.h"
Expand Down Expand Up @@ -239,7 +240,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
ALLOCSET_DEFAULT_SIZES);
data->allow_internal_basetypes = false;
data->allow_binary_basetypes = false;

data->negotiated_proto_version = SPOCK_PROTO_VERSION_NUM;

ctx->output_plugin_private = data;

Expand Down Expand Up @@ -331,6 +332,58 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
errmsg("client sent max_proto_version=%d but we only support protocol %d or higher",
data->client_max_proto_version, SPOCK_PROTO_MIN_VERSION_NUM)));

/*
* Negotiate protocol version.
*
* The negotiated protocol version is the minimum of what the server
* supports (SPOCK_PROTO_VERSION_NUM) and what the client can handle
* (client_max_proto_version).
*/
data->negotiated_proto_version = Min(SPOCK_PROTO_VERSION_NUM,
data->client_max_proto_version);

/* Validate the negotiated version is within acceptable range */
if (data->negotiated_proto_version < SPOCK_PROTO_MIN_VERSION_NUM)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot negotiate acceptable protocol version"),
errdetail("Client supports versions %d-%d, server supports %d-%d",
data->client_min_proto_version,
data->client_max_proto_version,
SPOCK_PROTO_MIN_VERSION_NUM,
SPOCK_PROTO_VERSION_NUM)));

/*
* Protocol version 4 requires both nodes to be at least Spock 5.0.0
* (SPOCK_MIN_VERSION_NUM_FOR_MULTI_PROTO) because that's when
* multi-protocol support was added. This ensures both sides can
* handle the protocol correctly.
*/
if (data->negotiated_proto_version == 4)
{
if (SPOCK_VERSION_NUM < SPOCK_MIN_VERSION_NUM_FOR_MULTI_PROTO ||
data->spock_version_num < SPOCK_MIN_VERSION_NUM_FOR_MULTI_PROTO)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("protocol version 4 requires both nodes to be at least Spock 5.0.0"),
errdetail("Server version: %d, client version: %d, required: %d",
SPOCK_VERSION_NUM,
data->spock_version_num,
SPOCK_MIN_VERSION_NUM_FOR_MULTI_PROTO)));
}

/* Set the negotiated protocol version for the publisher */
spock_set_proto_version(data->negotiated_proto_version);

elog(DEBUG1, "spock negotiated protocol version %d (server: %d-%d, client: %d-%d, spock client: %d, spock server: %d)",
data->negotiated_proto_version,
SPOCK_PROTO_MIN_VERSION_NUM,
SPOCK_PROTO_VERSION_NUM,
data->client_min_proto_version,
data->client_max_proto_version,
data->spock_version_num,
SPOCK_VERSION_NUM);

/*
* Set correct protocol format.
*
Expand Down
Loading