Skip to content

Commit

Permalink
dual_selector: impl cache
Browse files Browse the repository at this point in the history
  • Loading branch information
IrineSistiana committed Nov 1, 2023
1 parent 3ef1ca1 commit d8960af
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 30 deletions.
11 changes: 11 additions & 0 deletions plugin/executable/dual_selector/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package dual_selector

import "hash/maphash"

type key string

var seed = maphash.MakeSeed()

func (k key) Sum() uint64 {
return maphash.String(seed, string(k))
}
89 changes: 63 additions & 26 deletions plugin/executable/dual_selector/dual_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ package dual_selector

import (
"context"
"io"
"time"

"github.com/IrineSistiana/mosdns/v5/pkg/cache"
"github.com/IrineSistiana/mosdns/v5/pkg/dnsutils"
"github.com/IrineSistiana/mosdns/v5/pkg/pool"
"github.com/IrineSistiana/mosdns/v5/pkg/query_context"
Expand All @@ -34,6 +36,11 @@ import (
const (
referenceWaitTimeout = time.Millisecond * 500
defaultSubRoutineTimeout = time.Second * 5

// TODO: Make cache configurable?
cacheSize = 64 * 1024
cacheTlt = time.Hour
cacheGcInterval = time.Minute
)

func init() {
Expand All @@ -46,10 +53,13 @@ func init() {
}

var _ sequence.RecursiveExecutable = (*Selector)(nil)
var _ io.Closer = (*Selector)(nil)

type Selector struct {
sequence.BQ
prefer uint16 // dns.TypeA or dns.TypeAAAA

preferTypOkCache *cache.Cache[key, bool]
}

// Exec implements handler.Executable.
Expand All @@ -60,30 +70,47 @@ func (s *Selector) Exec(ctx context.Context, qCtx *query_context.Context, next s
}

qtype := q.Question[0].Qtype
// skip queries that have preferred type or have other unrelated types.
if qtype == s.prefer || (qtype != dns.TypeA && qtype != dns.TypeAAAA) {
// skip queries that have other unrelated types.
if qtype != dns.TypeA && qtype != dns.TypeAAAA {
return next.ExecNext(ctx, qCtx)
}

// start reference goroutine
qCtxRef := qCtx.Copy()
var refQtype uint16
if qtype == dns.TypeA {
refQtype = dns.TypeAAAA
} else {
refQtype = dns.TypeA
qName := key(q.Question[0].Name)
if qtype == s.prefer {
err := next.ExecNext(ctx, qCtx)
if err != nil {
return err
}

if r := qCtx.R(); r != nil && msgAnsHasRR(r, s.prefer) {
s.preferTypOkCache.Store(qName, true, time.Now().Add(cacheTlt))
}
return nil
}

// Qtype is not the preferred type.
preferredTypOk, _, _ := s.preferTypOkCache.Get(qName)
if preferredTypOk {
// We know that domain has preferred type so this qtype can be blocked
// right away.
r := dnsutils.GenEmptyReply(q, dns.RcodeSuccess)
qCtx.SetResponse(r)
return nil
}
qCtxRef.Q().Question[0].Qtype = refQtype

ddl, ok := ctx.Deadline()
if !ok {
// async check whether domain has the preferred type
qCtxPreferred := qCtx.Copy()
qCtxPreferred.Q().Question[0].Qtype = s.prefer

ddl, cacheOk := ctx.Deadline()
if !cacheOk {
ddl = time.Now().Add(defaultSubRoutineTimeout)
}

shouldBlock := make(chan struct{}, 0)
shouldPass := make(chan struct{}, 0)
shouldBlock := make(chan struct{})
shouldPass := make(chan struct{})
go func() {
qCtx := qCtxRef
qCtx := qCtxPreferred
ctx, cancel := context.WithDeadline(context.Background(), ddl)
defer cancel()
err := next.ExecNext(ctx, qCtx)
Expand All @@ -92,13 +119,13 @@ func (s *Selector) Exec(ctx context.Context, qCtx *query_context.Context, next s
close(shouldPass)
return
}
if r := qCtx.R(); r != nil && msgAnsHasRR(r, refQtype) {
// Target domain has reference type.
if r := qCtx.R(); r != nil && msgAnsHasRR(r, s.prefer) {
// Target domain has preferred type.
s.preferTypOkCache.Store(qName, true, time.Now().Add(cacheTlt))
close(shouldBlock)
return
}
close(shouldPass)
return
}()

// start original query goroutine
Expand All @@ -114,11 +141,11 @@ func (s *Selector) Exec(ctx context.Context, qCtx *query_context.Context, next s
select {
case <-ctx.Done():
return context.Cause(ctx)
case <-shouldBlock: // Reference indicates we should block this query before the original query finished.
case <-shouldBlock: // Domain has preferred type. Block this type now.
r := dnsutils.GenEmptyReply(q, dns.RcodeSuccess)
qCtx.SetResponse(r)
return nil
case err := <-doneChan: // The original query finished. Waiting for reference.
case err := <-doneChan: // The original query finished. Waiting for preferred type check.
waitTimeoutTimer := pool.GetTimer(referenceWaitTimeout)
defer pool.ReleaseTimer(waitTimeoutTimer)
select {
Expand All @@ -140,17 +167,27 @@ func (s *Selector) Exec(ctx context.Context, qCtx *query_context.Context, next s
}
}

func (s *Selector) Close() error {
s.preferTypOkCache.Close()
return nil
}

func NewPreferIpv4(bq sequence.BQ) *Selector {
return &Selector{
BQ: bq,
prefer: dns.TypeA,
}
return newSelector(bq, dns.TypeA)
}

func NewPreferIpv6(bq sequence.BQ) *Selector {
return newSelector(bq, dns.TypeAAAA)
}

func newSelector(bq sequence.BQ, preferType uint16) *Selector {
if preferType != dns.TypeA && preferType != dns.TypeAAAA {
panic("dual_selector: invalid dns qtype")
}
return &Selector{
BQ: bq,
prefer: dns.TypeAAAA,
BQ: bq,
prefer: preferType,
preferTypOkCache: cache.New[key, bool](cache.Opts{Size: cacheSize, CleanerInterval: cacheGcInterval}),
}
}

Expand Down
5 changes: 1 addition & 4 deletions plugin/executable/dual_selector/dual_selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,7 @@ func TestSelector_Exec(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &Selector{
BQ: sequence.NewBQ(coremain.NewTestMosdnsWithPlugins(nil), zap.NewNop()),
prefer: tt.prefer,
}
s := newSelector(sequence.NewBQ(coremain.NewTestMosdnsWithPlugins(nil), zap.NewNop()), tt.prefer)

q := new(dns.Msg)
q.SetQuestion("example.", tt.qtype)
Expand Down

0 comments on commit d8960af

Please # to comment.