-
Notifications
You must be signed in to change notification settings - Fork 77
/
Copy pathparser_tuple.c
133 lines (110 loc) · 3.28 KB
/
parser_tuple.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
/*
* pg_bulkload: lib/parser_tuple.c
*
* Copyright (c) 2009-2025, NIPPON TELEGRAPH AND TELEPHONE CORPORATION
*/
/**
* @file
* @brief Binary HeapTuple format handling module implementation.
*/
#include "pg_bulkload.h"
#include "access/htup.h"
#include "utils/rel.h"
#include "reader.h"
#include "pg_profile.h"
#include "pgut/pgut-ipc.h"
typedef struct TupleParser
{
Parser base;
Queue *queue;
HeapTupleData tuple;
char *buffer;
uint32 buflen;
} TupleParser;
static void TupleParserInit(TupleParser *self, Checker *checker, const char *infile, TupleDesc desc, bool multi_process, Oid collation);
static HeapTuple TupleParserRead(TupleParser *self, Checker *checker);
static int64 TupleParserTerm(TupleParser *self);
static bool TupleParserParam(TupleParser *self, const char *keyword, char *value);
static void TupleParserDumpParams(TupleParser *self);
static void TupleParserDumpRecord(TupleParser *self, FILE *fp, char *filename);
/**
* @brief Create a new binary parser.
*/
Parser *
CreateTupleParser(void)
{
TupleParser *self = palloc0(sizeof(TupleParser));
self->base.init = (ParserInitProc) TupleParserInit;
self->base.read = (ParserReadProc) TupleParserRead;
self->base.term = (ParserTermProc) TupleParserTerm;
self->base.param = (ParserParamProc) TupleParserParam;
self->base.dumpParams = (ParserDumpParamsProc) TupleParserDumpParams;
self->base.dumpRecord = (ParserDumpRecordProc) TupleParserDumpRecord;
return (Parser *)self;
}
static void
TupleParserInit(TupleParser *self, Checker *checker, const char *infile, TupleDesc desc, bool multi_process, Oid collation)
{
unsigned key;
char junk[2];
if (checker->check_constraints)
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("does not support parameter \"CHECK_CONSTRAINTS\" in \"TYPE = TUPLE\"")));
if (checker->encoding != -1)
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("does not support parameter \"ENCODING\" in \"TYPE = TUPLE\"")));
checker->tchecker = NULL;
if (sscanf(infile, ":%u%1s", &key, junk) != 1)
elog(ERROR, "invalid shmem key format: %s", infile);
self->queue = QueueOpen(key);
self->buflen = BLCKSZ;
self->buffer = palloc(self->buflen);
}
static int64
TupleParserTerm(TupleParser *self)
{
if (self->queue)
QueueClose(self->queue);
if (self->buffer)
pfree(self->buffer);
pfree(self);
return 0;
}
static HeapTuple
TupleParserRead(TupleParser *self, Checker *checker)
{
uint32 len;
BULKLOAD_PROFILE(&prof_reader_parser);
if (QueueRead(self->queue, &len, sizeof(uint32), false) == sizeof(uint32) && len > 0)
{
if (self->buflen < len)
{
self->buffer = repalloc(self->buffer, len);
self->buflen = len;
}
if (QueueRead(self->queue, self->buffer, len, false) == len)
{
BULKLOAD_PROFILE(&prof_reader_source);
self->tuple.t_len = len;
self->tuple.t_data = (HeapTupleHeader) self->buffer;
return &self->tuple;
}
}
return NULL;
}
static bool
TupleParserParam(TupleParser *self, const char *keyword, char *value)
{
/* TupleParser does not support OFFSET */
return false; /* no parameters supported */
}
static void
TupleParserDumpParams(TupleParser *self)
{
/* no parameters supported */
}
static void
TupleParserDumpRecord(TupleParser *self, FILE *fp, char *filename)
{
/* parse error does not happen in TupleParser. */
}