-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathproject.go
246 lines (207 loc) · 6.84 KB
/
project.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
// project implements a project expression in relational algebra
package rel
import (
"reflect"
)
// projection is a type that represents a project operation
type projectExpr struct {
// the input relation
source1 Relation
// the new tuple type
zero interface{}
// first error encountered during construction or evaluation
err error
}
// TupleChan sends each tuple in the relation to a channel
func (r1 *projectExpr) TupleChan(t interface{}) chan<- struct{} {
cancel := make(chan struct{})
// reflect on the channel
chv := reflect.ValueOf(t)
err := EnsureChan(chv.Type(), r1.zero)
if err != nil {
r1.err = err
return cancel
}
if r1.err != nil {
chv.Close()
return cancel
}
// transform the channel of tuples from the relation
z1 := r1.source1.Zero()
// first figure out if the tuple types of the relation and
// projection are equivalent. If so, convert the tuples to
// the (possibly new) type and then return the new relation.
e1 := reflect.TypeOf(z1)
e2 := reflect.TypeOf(r1.zero)
// create the channel of tuples from source
// TODO(jonlawlor): restrict the channel direction
body := reflect.MakeChan(reflect.ChanOf(reflect.BothDir, e1), 0)
bcancel := r1.source1.TupleChan(body.Interface())
// figure out which fields stay, and where they are in each of the tuple
// types.
fMap := FieldMap(e1, e2)
// figure out if we need to distinct the results because there are no
// candidate keys left
// TODO(jonlawlor): refactor with the code in the CKeys() method, or
// include in an isDistinct field?
cKeys := SubsetCandidateKeys(r1.source1.CKeys(), Heading(r1.source1), fMap)
if len(cKeys) == 0 {
go func(body, res reflect.Value) {
m := map[interface{}]struct{}{}
// input channels
sourceSel := reflect.SelectCase{Dir: reflect.SelectRecv, Chan: body}
canSel := reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(cancel)}
inCases := []reflect.SelectCase{canSel, sourceSel}
// output channels
resSel := reflect.SelectCase{Dir: reflect.SelectSend, Chan: res}
for {
chosen, tup, ok := reflect.Select(inCases)
// cancel has been closed, so close the results
if chosen == 0 {
close(bcancel)
return
}
if !ok {
// source channel was closed
break
}
tup2 := reflect.Indirect(reflect.New(e2))
for _, fm := range fMap {
tupf2 := tup2.Field(fm.J)
tupf2.Set(tup.Field(fm.I))
}
// set the field in the new tuple to the value from the old one
if _, isdup := m[tup2.Interface()]; !isdup {
m[tup2.Interface()] = struct{}{}
resSel.Send = tup2
chosen, _, ok = reflect.Select([]reflect.SelectCase{canSel, resSel})
if chosen == 0 {
close(bcancel)
return
}
}
}
if err := r1.source1.Err(); err != nil {
r1.err = err
}
res.Close()
}(body, chv)
return cancel
}
// assign fields from the old relation to fields in the new
// TODO(jonlawlor) add parallelism here
go func(body, res reflect.Value) {
// input channels
sourceSel := reflect.SelectCase{Dir: reflect.SelectRecv, Chan: body}
canSel := reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(cancel)}
inCases := []reflect.SelectCase{canSel, sourceSel}
// output channels
resSel := reflect.SelectCase{Dir: reflect.SelectSend, Chan: res}
for {
chosen, tup, ok := reflect.Select(inCases)
// cancel has been closed, so close the results
if chosen == 0 {
close(bcancel)
return
}
if !ok {
// source channel was closed
break
}
tup2 := reflect.Indirect(reflect.New(e2))
for _, fm := range fMap {
tupf2 := tup2.Field(fm.J)
tupf2.Set(tup.Field(fm.I))
}
resSel.Send = tup2
chosen, _, ok = reflect.Select([]reflect.SelectCase{canSel, resSel})
if chosen == 0 {
close(bcancel)
return
}
}
if err := r1.source1.Err(); err != nil {
r1.err = err
}
res.Close()
}(body, chv)
return cancel
}
// Zero returns the zero value of the relation (a blank tuple)
func (r1 *projectExpr) Zero() interface{} {
return r1.zero
}
// CKeys is the set of candidate keys in the relation
func (r1 *projectExpr) CKeys() CandKeys {
z1 := r1.source1.Zero()
cKeys := r1.source1.CKeys()
// first figure out if the tuple types of the relation and projection are
// equivalent. If so, we don't have to do anything with the candidate
// keys.
e1 := reflect.TypeOf(z1)
e2 := reflect.TypeOf(r1.zero)
if e1.AssignableTo(e2) {
// nothing to do
return cKeys
}
// otherwise we have to subset the candidate keys.
fMap := FieldMap(e1, e2)
cKeys = SubsetCandidateKeys(cKeys, Heading(r1.source1), fMap)
// every relation except dee and dum have at least one candidate key
if len(cKeys) == 0 {
cKeys = DefaultKeys(r1.zero)
}
return cKeys
}
// text representation
// GoString returns a text representation of the Relation
func (r1 *projectExpr) GoString() string {
return r1.source1.GoString() + ".Project(" + HeadingString(r1) + ")"
}
// String returns a text representation of the Relation
func (r1 *projectExpr) String() string {
return "π{" + HeadingString(r1) + "}(" + r1.source1.String() + ")"
}
// Project creates a new relation with less than or equal degree
// t2 has to be a new type which is a subdomain of r.
// This can always be rewritten as a project of the source, and skip the
// intermediate project.
func (r1 *projectExpr) Project(z2 interface{}) Relation {
return NewProject(r1.source1, z2)
}
// Restrict creates a new relation with less than or equal cardinality
// p has to be a func(tup T) bool where tup is a subdomain of the input r.
// This can always be rewritten to pass the restrict up the relational
// expression.
func (r1 *projectExpr) Restrict(p Predicate) Relation {
return NewProject(r1.source1.Restrict(p), r1.zero)
}
// Rename creates a new relation with new column names
// z2 has to be a struct with the same number of fields as the input relation
func (r1 *projectExpr) Rename(z2 interface{}) Relation {
return NewRename(r1, z2)
}
// Union creates a new relation by unioning the bodies of both inputs
func (r1 *projectExpr) Union(r2 Relation) Relation {
return NewUnion(r1, r2)
}
// Diff creates a new relation by set minusing the two inputs
func (r1 *projectExpr) Diff(r2 Relation) Relation {
return NewDiff(r1, r2)
}
// Join creates a new relation by performing a natural join on the inputs
func (r1 *projectExpr) Join(r2 Relation, zero interface{}) Relation {
return NewJoin(r1, r2, zero)
}
// GroupBy creates a new relation by grouping and applying a user defined func
func (r1 *projectExpr) GroupBy(t2, gfcn interface{}) Relation {
return NewGroupBy(r1, t2, gfcn)
}
// Map creates a new relation by applying a function to tuples in the source
func (r1 *projectExpr) Map(mfcn interface{}, ckeystr [][]string) Relation {
return NewMap(r1, mfcn, ckeystr)
}
// Err returns an error encountered during construction or computation
func (r1 *projectExpr) Err() error {
return r1.err
}