-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path5_partitioned_watermark.sql
149 lines (129 loc) · 3.34 KB
/
5_partitioned_watermark.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
/*
-- Create outbox database
*/
CREATE DATABASE [outbox]
CONTAINMENT = NONE
ON PRIMARY (
NAME = N'outbox', FILENAME = N'F:\data\outbox.mdf' , SIZE = 2105344KB , FILEGROWTH = 524288KB
)
LOG ON (
NAME = N'outbox_log', FILENAME = N'G:\log\outbox_log.ldf' , SIZE = 1048576KB , FILEGROWTH = 262144KB
)
GO
ALTER DATABASE [outbox] SET AUTO_UPDATE_STATISTICS_ASYNC ON
GO
ALTER DATABASE [outbox] SET RECOVERY SIMPLE
GO
ALTER DATABASE [outbox] SET MULTI_USER
GO
ALTER DATABASE [outbox] SET PAGE_VERIFY CHECKSUM
GO
ALTER DATABASE [outbox] SET TARGET_RECOVERY_TIME = 60 SECONDS
GO
ALTER DATABASE [outbox] SET DELAYED_DURABILITY = DISABLED
GO
/*
-- Change database context
*/
USE [outbox]
GO
/*
-- Create outbox table
*/
CREATE TABLE [dbo].[outbox](
[id] [bigint] IDENTITY(1,1) NOT NULL,
[topic] [nvarchar](512) NOT NULL,
[payload] [nvarchar](max) NOT NULL,
[hashid] AS CONVERT(tinyint, ABS(id % 16)) PERSISTED NOT NULL ,
CONSTRAINT [pk_outbox] PRIMARY KEY NONCLUSTERED
(
[id] ASC
) WITH (
PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF,
ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON
)
)
GO
/*
-- Add partition function and scheme
-- Test with different values (for example allign with cpu architecture, file lay-out), be sure to also change the hashid column
-- Idea from: https://docs.microsoft.com/en-US/troubleshoot/sql/performance/resolve-pagelatch-ex-contention#method-5
*/
CREATE PARTITION FUNCTION pf_hash (tinyint) AS RANGE LEFT
FOR VALUES (0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16)
GO
CREATE PARTITION SCHEME ps_hash AS PARTITION pf_hash ALL TO ([PRIMARY])
GO
ALTER TABLE [dbo].[outbox] WITH CHECK ADD CONSTRAINT [ck_outbox_payload] CHECK (([payload]<>''))
GO
ALTER TABLE [dbo].[outbox] CHECK CONSTRAINT [ck_outbox_payload]
GO
ALTER TABLE [dbo].[outbox] WITH CHECK ADD CONSTRAINT [ck_outbox_topic] CHECK (([topic]<>''))
GO
ALTER TABLE [dbo].[outbox] CHECK CONSTRAINT [ck_outbox_topic]
GO
CREATE UNIQUE CLUSTERED INDEX cix_id ON [outbox] (id, hashid) ON ps_hash(hashid)
GO
/*
-- Create watermark tracking table
-- Idea from: https://forrestmcdaniel.com/2021/06/30/fixing-queues-with-watermarks
-- When reading with multiple threads and pullingh larger batch sizes, out of sync watermark table can occur.
*/
CREATE TABLE dbo.outboxtrack --watermark table
(
id INT PRIMARY KEY
);
GO
INSERT dbo.outboxtrack
VALUES (0);
GO
/*
-- Stored procedure to write towards the outbox table
*/
CREATE OR ALTER PROCEDURE [dbo].[push]
@topic nvarchar(512),
@payload nvarchar(max)
AS
BEGIN
SET NOCOUNT ON
INSERT INTO dbo.outbox VALUES(@topic, @payload);
END;
GO
/*
-- Stored procedure to read and delete towards the outbox table
*/
CREATE OR ALTER PROCEDURE [dbo].[pull]
@batchsize int
AS
BEGIN
SET NOCOUNT ON;
BEGIN TRANSACTION
DECLARE @d TABLE
(
[id] [bigint] NOT NULL
--, [topic] [nvarchar](512) NOT NULL
--, [payload] [nvarchar](max) NOT NULL
);
DECLARE @wm INT = (SELECT TOP(1) ID FROM [dbo].[outboxtrack]);
;WITH T AS (
SELECT TOP (@batchsize)
[id]
, [topic]
, [payload]
FROM [dbo].[outbox] WITH (READPAST,ROWLOCK)
WHERE ID >= @wm
ORDER BY id ASC
)
DELETE FROM T
OUTPUT deleted.id
--, deleted.topic
--, deleted.payload
INTO @d;
DECLARE @t INT = (SELECT MAX(ID) FROM @d)
IF @t % 100 = 0
BEGIN
UPDATE [dbo].[outboxtrack]
SET ID = @t - 100;
END;
COMMIT TRANSACTION
END;