Skip to content

Commit

Permalink
feat: Implement max pending queue size to limit spooler size and remo…
Browse files Browse the repository at this point in the history
…ve receive count restriction on streams
  • Loading branch information
driskell committed Feb 28, 2025
1 parent 1cc837a commit 46dcea5
Show file tree
Hide file tree
Showing 21 changed files with 213 additions and 62 deletions.
27 changes: 26 additions & 1 deletion docs/log-carver/Configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
- [`enabled` (receiver)](#enabled-receiver)
- [`listen`](#listen)
- [`max pending payloads` (receiver)](#max-pending-payloads-receiver)
- [`max queue size` (receiver)](#max-queue-size-receiver)
- [`max tls version` (receiver)](#max-tls-version-receiver)
- [`min tls version` (receiver)](#min-tls-version-receiver)
- [`name` (receiver)](#name-receiver)
Expand Down Expand Up @@ -728,7 +729,10 @@ required)
### `max pending payloads` (receiver)
Number. Optional. Default: 10
Since **alpha** (not yet released)
Since 2.7.0
Only applicable to protocol-based transports such as "tls" and "tcp" that
support acknowledgements.
The maximum number of spools that can be in process from a connection at any
one time. Each spool will be kept in memory until it is fully processed and
Expand All @@ -742,6 +746,27 @@ to retry.
*You should only change this value if you changed the equivilant value on a
Log Courier client.*
### `max queue size` (receiver)
Number. Optional. Default: 134217728 (128 MiB)
Since 2.13.0
Maximum number of bytes that can be received and queued from clients at any
one moment in time.
If too many events are being received than can be processed then this queue
can build in size. When this queue is full, when data is received from a
connection that cannot be added to the queue, the data is discarded and the
connection closed.
Warnings will be logged when this happened no more frequently than 1 per
minute to note that events are discarded.
For protocol-based transports that support acknowledgement, no data loss
occurs as the client will know to resubmit the data again on a retried
connection attempt which in the Log Courier case will backoff longer on
each connection attempt to allow Log Carver to catchup.
### `max tls version` (receiver)
String. Optional. Default: ""
Expand Down
31 changes: 30 additions & 1 deletion lc-lib/admin/api/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,41 @@ func (n Number) HumanReadable(string) ([]byte, error) {
return []byte(strconv.FormatInt(int64(n), 10)), nil
}

// Number represents an integer number in the API
type Bytes int64

// HumanReadable returns the Bytes as a string with a human readable suffix such as KB, MB, GB, TB
func (n Bytes) HumanReadable(string) ([]byte, error) {
var suffix string
var size float64

switch {
case n < 1024:
suffix = " B"
size = float64(n)
case n < 1024*1024:
suffix = " KiB"
size = float64(n) / 1024
case n < 1024*1024*1024:
suffix = " MiB"
size = float64(n) / 1024 / 1024
case n < 1024*1024*1024*1024:
suffix = " GiB"
size = float64(n) / 1024 / 1024 / 1024
default:
suffix = " TiB"
size = float64(n) / 1024 / 1024 / 1024 / 1024
}

return []byte(strconv.FormatFloat(size, 'g', 2, 64) + suffix), nil
}

// Float represents a floating point number in the API
type Float float64

// HumanReadable returns the Float as a string
func (f Float) HumanReadable(string) ([]byte, error) {
return []byte(strconv.FormatFloat(float64(f), 'g', -1, 64)), nil
return []byte(strconv.FormatFloat(float64(f), 'g', 2, 64)), nil
}

// String represents a string in the API
Expand Down
18 changes: 17 additions & 1 deletion lc-lib/harvester/linereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package harvester
import (
"bytes"
"io"

"github.com/driskell/log-courier/lc-lib/transports/tcp"
)

// LineReader is a read interface that tails and returns lines
Expand Down Expand Up @@ -164,7 +166,21 @@ func (lr *LineReader) fill() error {
return lr.err
}

n, err := lr.rd.Read(lr.buf[lr.end:])
// Loop until we receive data or an error occurs
// Avoids the outer loop in ReadItem which would otherwise do unnecessary checks
var n int
var err error
for {
n, err = lr.rd.Read(lr.buf[lr.end:])
if err == tcp.ErrIOWouldBlock {
// Ignore incomplete reads - we will try again
err = nil
}
if n > 0 || err != nil {
break
}
}

lr.end += n
if err != nil {
// Remember last error - so we can continue processing current buffer and once
Expand Down
8 changes: 4 additions & 4 deletions lc-lib/publisher/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,9 @@ func (e *Endpoint) queuePayload(payload *payload.Payload) error {
}

if payload.Resending {
log.Debugf("[E %s] Resending payload %x with %d events", e.Server(), payload.Nonce, payload.Size())
log.Debugf("[E %s] Resending payload %x with %d events", e.Server(), payload.Nonce, payload.Len())
} else {
log.Debugf("[E %s] Sending payload %x with %d events", e.Server(), payload.Nonce, payload.Size())
log.Debugf("[E %s] Sending payload %x with %d events", e.Server(), payload.Nonce, payload.Len())
}

if err := e.transport.SendEvents(payload.Nonce, payload.Events()); err != nil {
Expand Down Expand Up @@ -232,7 +232,7 @@ func (e *Endpoint) ReduceLatency() {
func (e *Endpoint) updateEstDelTime() {
e.estDelTime = time.Now()
for _, payload := range e.pendingPayloads {
e.estDelTime = e.estDelTime.Add(time.Duration(e.averageLatency) * time.Duration(payload.Size()))
e.estDelTime = e.estDelTime.Add(time.Duration(e.averageLatency) * time.Duration(payload.Len()))
}
}

Expand Down Expand Up @@ -274,7 +274,7 @@ func (e *Endpoint) processAck(ack transports.AckEvent, onAck func(*Endpoint, *pa
1,
5,
e.averageLatency,
float64(time.Since(e.transmissionStart))/float64(payload.Size()),
float64(time.Since(e.transmissionStart))/float64(payload.Len()),
)

e.updateEstDelTime()
Expand Down
2 changes: 1 addition & 1 deletion lc-lib/publisher/endpoint/sink_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (s *Sink) QueuePayload(payload *payload.Payload) (*Endpoint, error) {
return nil, nil
}

events := time.Duration(payload.Size())
events := time.Duration(payload.Len())
bestEndpoint := entry.Value.(*Endpoint)
bestEDT := bestEndpoint.EstDelTime().Add(bestEndpoint.AverageLatency() * events)

Expand Down
4 changes: 2 additions & 2 deletions lc-lib/publisher/payload/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ func (pp *Payload) Init() {
pp.ResendElement.Value = pp
}

// Size returns the original size of this payload
func (pp *Payload) Size() int {
// Len returns the original size of this payload
func (pp *Payload) Len() int {
return pp.sequenceLen
}

Expand Down
2 changes: 2 additions & 0 deletions lc-lib/receiver/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ func (a *apiStatus) Update() error {
// Update the values and pass through to node
a.r.connectionLock.RLock()
a.SetEntry("activeConnections", api.Number(len(a.r.connectionStatus)))
a.SetEntry("queuePayloads", api.Number(len(a.r.spool)))
a.SetEntry("queueSize", api.Number(a.r.spoolSize))
a.r.connectionLock.RUnlock()

return nil
Expand Down
72 changes: 53 additions & 19 deletions lc-lib/receiver/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ const (
poolContextEventPosition poolContext = "eventpos"
)

type spoolEntry struct {
events []*event.Event
size int
}

// Pool manages a list of receivers
type Pool struct {
// Pipeline
Expand All @@ -53,6 +58,8 @@ type Pool struct {
scheduler *scheduler.Scheduler
connectionLock sync.RWMutex
connectionStatus map[interface{}]*poolConnectionStatus
spool []*spoolEntry
spoolSize int64

apiConfig *admin.Config
apiConnections api.Array
Expand Down Expand Up @@ -104,16 +111,15 @@ func (r *Pool) Init(cfg *config.Config) error {

// Run starts listening
func (r *Pool) Run() {
var spool [][]*event.Event
var spoolChan chan<- []*event.Event
eventChan := r.eventChan
shutdownChan := r.shutdownChan

ReceiverLoop:
for {
var nextSpool []*event.Event = nil
if len(spool) != 0 {
nextSpool = spool[0]
var nextSpool *spoolEntry = nil
if len(r.spool) != 0 {
nextSpool = r.spool[0]
}

select {
Expand Down Expand Up @@ -182,30 +188,52 @@ ReceiverLoop:
// We replace because a reconnect on the same port could occur before we get around to handling the disconnection, and we're keyed by port
r.apiConnections.ReplaceEntry(eventImpl.Remote(), connectionStatus)
case transports.EventsEvent:
size := calcSize(eventImpl)
r.connectionLock.Lock()
connection := eventImpl.Context().Value(transports.ContextConnection)
receiver := eventImpl.Context().Value(transports.ContextReceiver).(transports.Receiver)
connectionStatus := r.connectionStatus[connection]
// Schedule partial ack if this is first set of events
if len(connectionStatus.progress) == 0 {
r.scheduler.Set(connection, 5*time.Second)
if r.spoolSize+int64(size) > r.receivers[receiver].config.MaxQueueSize {
receiver.ShutdownConnectionRead(eventImpl.Context(), fmt.Errorf("max queue size exceeded"))
r.connectionLock.Unlock()
break
}
connectionStatus.progress = append(connectionStatus.progress, &poolEventProgress{event: eventImpl, sequence: 0})
var acker event.Acknowledger
if receiver.SupportsAck() {
if len(r.connectionStatus[connection].progress)+1 > int(r.receivers[receiver].config.MaxPendingPayloads) {
receiver.ShutdownConnectionRead(eventImpl.Context(), fmt.Errorf("max pending payloads exceeded"))
r.connectionLock.Unlock()
break
}
// Schedule partial ack if this is first set of events
if len(connectionStatus.progress) == 0 {
r.scheduler.Set(connection, 5*time.Second)
}
connectionStatus.progress = append(connectionStatus.progress, &poolEventProgress{event: eventImpl, sequence: 0})
acker = r
} else {
// Reset idle timeout
r.startIdleTimeout(eventImpl.Context(), receiver, connection)
}
connectionStatus.bytes += eventImpl.Size()
r.connectionLock.Unlock()
// Build the events with our acknowledger and submit the bundle
var events = make([]*event.Event, len(eventImpl.Events()))
var ctx context.Context
for idx, item := range eventImpl.Events() {
ctx := context.WithValue(eventImpl.Context(), poolContextEventPosition, &poolEventPosition{nonce: eventImpl.Nonce(), sequence: uint32(idx + 1)})
item := event.NewEvent(ctx, r, item)
if acker == nil {
ctx = eventImpl.Context()
} else {
ctx = context.WithValue(eventImpl.Context(), poolContextEventPosition, &poolEventPosition{nonce: eventImpl.Nonce(), sequence: uint32(idx + 1)})
}
item := event.NewEvent(ctx, acker, item)
item.MustResolve("@metadata[receiver]", connectionStatus.metadataReceiver)
events[idx] = item
}
spool = append(spool, events)
spoolEntry := &spoolEntry{events, size}
r.spool = append(r.spool, spoolEntry)
r.spoolSize += int64(spoolEntry.size)
spoolChan = r.output
// Stop reading events if this client breached our limit
if len(r.connectionStatus[connection].progress) > int(r.receivers[receiver].config.MaxPendingPayloads) {
receiver.ShutdownConnectionRead(eventImpl.Context(), fmt.Errorf("max pending payloads exceeded"))
}
case *transports.EndEvent:
// Connection EOF
r.connectionLock.Lock()
Expand Down Expand Up @@ -263,10 +291,11 @@ ReceiverLoop:
r.startIdleTimeout(eventImpl.Context(), receiver, connection)
}
}
case spoolChan <- nextSpool:
copy(spool, spool[1:])
spool = spool[:len(spool)-1]
if len(spool) == 0 {
case spoolChan <- nextSpool.events:
copy(r.spool, r.spool[1:])
r.spool = r.spool[:len(r.spool)-1]
r.spoolSize -= int64(nextSpool.size)
if len(r.spool) == 0 {
spoolChan = nil
}
}
Expand Down Expand Up @@ -385,6 +414,7 @@ func (r *Pool) updateReceivers(newConfig *config.Config) {
receiverApi := &api.KeyValue{}
receiverApi.SetEntry("listen", api.String(listen))
receiverApi.SetEntry("maxPendingPayloads", api.Number(cfgEntry.MaxPendingPayloads))
receiverApi.SetEntry("maxQueueSize", api.Number(cfgEntry.MaxQueueSize))
r.apiListeners.AddEntry(listen, receiverApi)
}
}
Expand Down Expand Up @@ -414,3 +444,7 @@ func (r *Pool) shutdown() {
receiver.Shutdown()
}
}

func calcSize(eventImpl transports.EventsEvent) int {
return eventImpl.Size()
}
2 changes: 2 additions & 0 deletions lc-lib/receiver/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type poolConnectionStatus struct {
metadataReceiver MetadataReceiver
progress []*poolEventProgress
lines int64
bytes int

api.KeyValue
}
Expand Down Expand Up @@ -69,6 +70,7 @@ func (p *poolConnectionStatus) Update() error {
p.SetEntry("listener", api.String(p.listener))
p.SetEntry("description", api.String(p.desc))
p.SetEntry("completedLines", api.Number(p.lines))
p.SetEntry("completedBytes", api.Bytes(p.bytes))
p.SetEntry("pendingPayloads", api.Number(len(p.progress)))
return nil
}
Expand Down
22 changes: 21 additions & 1 deletion lc-lib/transports/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,23 @@ import (
"errors"
"fmt"
"os"
"time"

"github.com/driskell/log-courier/lc-lib/config"
)

const (
defaultNetworkBackoff time.Duration = 5 * time.Second
defaultNetworkBackoffMax time.Duration = 300 * time.Second
defaultNetworkMaxPendingPayloads int64 = 10
defaultNetworkMaxQueueSize int64 = 128 * 1024 * 1024 // 128 MiB
defaultNetworkMethod string = "random"
defaultNetworkRfc2782Service string = "courier"
defaultNetworkRfc2782Srv bool = true
defaultNetworkTimeout time.Duration = 15 * time.Second
defaultNetworkTransport string = "tls"
)

var (
// ErrCongestion represents temporary congestion, rather than failure
ErrCongestion error = errors.New("Congestion")
Expand Down Expand Up @@ -77,6 +90,7 @@ type EventsEvent interface {
Events() []map[string]interface{}
Nonce() *string
Count() uint32
Size() int
}

// StatusEvent contains information about a status change for a transport
Expand Down Expand Up @@ -273,16 +287,18 @@ type eventsEvent struct {
context context.Context
nonce *string
events []map[string]interface{}
size int
}

var _ EventsEvent = (*eventsEvent)(nil)

// NewEventsEvent generates a new EventsEvent for the given bundle of events
func NewEventsEvent(context context.Context, nonce *string, events []map[string]interface{}) EventsEvent {
func NewEventsEvent(context context.Context, nonce *string, events []map[string]interface{}, size int) EventsEvent {
return &eventsEvent{
context: context,
nonce: nonce,
events: events,
size: size,
}
}

Expand All @@ -306,6 +322,10 @@ func (e *eventsEvent) Count() uint32 {
return uint32(len(e.events))
}

func (e *eventsEvent) Size() int {
return e.size
}

// ParseTLSVersion parses a TLS version string into the tls library value for min/max config
// We explicitly refuse SSLv3 to mitigate POODLE vulnerability
func ParseTLSVersion(version string, fallback uint16) (uint16, error) {
Expand Down
Loading

0 comments on commit 46dcea5

Please # to comment.