Skip to content

Commit

Permalink
feat: start new ct-watchers as new ct logs become available
Browse files Browse the repository at this point in the history
fixes #42

We also needed a way to properly prevent the entries of the metrics map from being reset to 0. That's why I added the Init() method to the LogMetrics. It only sets the value of the operator/url combination to 0 if it doesn't exist before.

If it already exists, it will be left alone.
  • Loading branch information
d-Rickyy-b committed Aug 7, 2024
1 parent 6adc0e5 commit cd4aaef
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 29 deletions.
111 changes: 82 additions & 29 deletions internal/certificatetransparency/ct-watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ var (
// Watcher describes a component that watches for new certificates in a CT log.
type Watcher struct {
workers []*worker
wg sync.WaitGroup
context context.Context
certChan chan certstream.Entry
cancelFunc context.CancelFunc
}
Expand All @@ -45,48 +47,99 @@ func NewWatcher(certChan chan certstream.Entry) *Watcher {

// Start starts the watcher. This method is blocking.
func (w *Watcher) Start() {
w.context, w.cancelFunc = context.WithCancel(context.Background())

// Create new certChan if it doesn't exist yet
if w.certChan == nil {
w.certChan = make(chan certstream.Entry, 5000)
}

// initialize the watcher with currently available logs
w.addNewlyAvailableLogs()

log.Println("Started CT watcher")
go certHandler(w.certChan)
go w.watchNewLogs()

w.wg.Wait()
close(w.certChan)
}

// watchNewLogs monitors the ct log list for new logs and starts a worker for each new log found.
// This method is blocking. It can be stopped by cancelling the context.
func (w *Watcher) watchNewLogs() {
// Add all available logs to the watcher
w.addNewlyAvailableLogs()

// Check for new logs every 6 hours
ticker := time.NewTicker(6 * time.Hour)
for {
select {
case <-ticker.C:
w.addNewlyAvailableLogs()
case <-w.context.Done():
ticker.Stop()
return
}
}
}

// The transparency log list is constantly updated with new Log servers.
// This function checks for new ct logs and adds them to the watcher.
func (w *Watcher) addNewlyAvailableLogs() {
log.Println("Checking for new ct logs...")

// Get a list of urls of all CT logs
logList, err := getAllLogs()
if err != nil {
log.Println(err)
return
}

ctx, cancel := context.WithCancel(context.Background())
w.cancelFunc = cancel

// Create new certChan if it doesn't exist yet
if w.certChan == nil {
w.certChan = make(chan certstream.Entry, 5000)
}

var wg sync.WaitGroup
newCTs := 0

// Check the ct log list for new, unwatched logs
// For each CT log, create a worker and start downloading certs
for _, operator := range logList.Operators {
// Iterate over each log of the operator
for _, transparencyLog := range operator.Logs {
wg.Add(1)
ctWorker := worker{
name: transparencyLog.Description,
operatorName: operator.Name,
ctURL: transparencyLog.URL,
entryChan: w.certChan,
// Check if the log is already being watched
newURL := normalizeCtlogURL(transparencyLog.URL)

alreadyWatched := false
for _, ctWorker := range w.workers {
workerURL := normalizeCtlogURL(ctWorker.ctURL)
if workerURL == newURL {
alreadyWatched = true
break
}
}
w.workers = append(w.workers, &ctWorker)

// Start a goroutine for each worker
go func() {
defer wg.Done()
ctWorker.startDownloadingCerts(ctx)
}()
// TODO maybe add a check for logs that are still watched but no longer on the logList and remove them? See also issue #41 and #42

// If the log is not being watched, create a new worker
if !alreadyWatched {
w.wg.Add(1)
newCTs++

ctWorker := worker{
name: transparencyLog.Description,
operatorName: operator.Name,
ctURL: transparencyLog.URL,
entryChan: w.certChan,
}
w.workers = append(w.workers, &ctWorker)

// Start a goroutine for each worker
go func() {
defer w.wg.Done()
ctWorker.startDownloadingCerts(w.context)
}()
}
}
}

log.Println("Started CT watcher")
go certHandler(w.certChan)

wg.Wait()
close(w.certChan)
log.Printf("Found %d new ct logs\n", newCTs)
}

// Stop stops the watcher.
Expand Down Expand Up @@ -150,7 +203,7 @@ func (w *worker) startDownloadingCerts(ctx context.Context) {
log.Printf("Context was cancelled; Stopping worker for '%s'\n", w.ctURL)
return
default:
log.Println("Sleeping for 5 seconds")
log.Printf("Worker for '%s' sleeping for 5 seconds due to error\n", w.ctURL)
time.Sleep(5 * time.Second)
log.Printf("Restarting worker for '%s'\n", w.ctURL)
continue
Expand Down Expand Up @@ -274,11 +327,11 @@ func getAllLogs() (loglist3.LogList, error) {
return loglist3.LogList{}, parseErr
}

// Initial setup of the urlMetricsMap
// Add new ct logs to metrics
for _, operator := range allLogs.Operators {
for _, ctlog := range operator.Logs {
url := normalizeCtlogURL(ctlog.URL)
metrics.Set(operator.Name, url, 0)
metrics.Init(operator.Name, url)
}
}

Expand Down
16 changes: 16 additions & 0 deletions internal/certificatetransparency/logmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,22 @@ func (m *LogMetrics) OperatorLogMapping() OperatorLogs {
return logOperators
}

// Init initializes the internal metrics map with the given operator names and CT log urls if it doesn't exist yet.
func (m *LogMetrics) Init(operator, url string) {
m.mutex.Lock()
defer m.mutex.Unlock()

// if the operator does not exist, create a new entry
if _, ok := m.metrics[operator]; !ok {
m.metrics[operator] = make(OperatorMetric)
}

// if the operator exists but the url does not, create a new entry
if _, ok := m.metrics[operator][url]; !ok {
m.metrics[operator][url] = 0
}
}

// Get the metric for a given operator and ct url.
func (m *LogMetrics) Get(operator, url string) int64 {
m.mutex.RLock()
Expand Down

0 comments on commit cd4aaef

Please # to comment.