From 0c9b57444a329a241989ade5ffcf902c1cc300b9 Mon Sep 17 00:00:00 2001 From: Brennan Lamey <66885902+brennanjl@users.noreply.github.com> Date: Tue, 22 Oct 2024 12:30:33 -0500 Subject: [PATCH] pg: recognize streaming prepared tx msg (#1071) (#1072) Co-authored-by: jchappelow --- internal/sql/pg/repl.go | 2 +- internal/sql/pg/repl_msgs.go | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/internal/sql/pg/repl.go b/internal/sql/pg/repl.go index 4c4ab7223..258bede0b 100644 --- a/internal/sql/pg/repl.go +++ b/internal/sql/pg/repl.go @@ -480,7 +480,7 @@ func decodeWALData(hasher hash.Hash, walData []byte, relations map[uint32]*pglog hasher.Reset() - // v2 Stream control messages. Not expected for kwil + // v2 Stream control messages. Only expected with large transactions. case *pglogrepl.StreamStartMessageV2: *inStream = true logger.Warnf(" [msg] StreamStartMessageV2: xid %d, first segment? %d", logicalMsg.Xid, logicalMsg.FirstSegment) diff --git a/internal/sql/pg/repl_msgs.go b/internal/sql/pg/repl_msgs.go index 7f0ff7661..d412e8254 100644 --- a/internal/sql/pg/repl_msgs.go +++ b/internal/sql/pg/repl_msgs.go @@ -14,6 +14,7 @@ const ( MessageTypeBeginPrepare pglogrepl.MessageType = 'b' MessageTypeCommitPrepared pglogrepl.MessageType = 'K' MessageTypeRollbackPrepared pglogrepl.MessageType = 'r' + MessageTypeStreamPrepare pglogrepl.MessageType = 'p' ) // msgTypeToString is helpful for debugging, but normally unused. @@ -21,6 +22,8 @@ func msgTypeToString(t pglogrepl.MessageType) string { //nolint:unused switch t { case MessageTypePrepare: return "Prepare" + case MessageTypeStreamPrepare: + return "Stream Prepared" case MessageTypeBeginPrepare: return "Begin Prepared" case MessageTypeCommitPrepared: @@ -39,7 +42,7 @@ func parseV3(data []byte, inStream bool) (m pglogrepl.Message, err error) { var decoder pglogrepl.MessageDecoder // v1 and v3 have same Decode signature (stream not relevant) switch msgType { - case MessageTypePrepare: + case MessageTypePrepare, MessageTypeStreamPrepare: // same encoding decoder = new(PrepareMessageV3) case MessageTypeBeginPrepare: decoder = new(BeginPrepareMessageV3)