diff --git a/internal/certificatetransparency/ct-watcher.go b/internal/certificatetransparency/ct-watcher.go index 5880b88..7157308 100644 --- a/internal/certificatetransparency/ct-watcher.go +++ b/internal/certificatetransparency/ct-watcher.go @@ -32,6 +32,8 @@ var ( // Watcher describes a component that watches for new certificates in a CT log. type Watcher struct { workers []*worker + wg sync.WaitGroup + context context.Context certChan chan certstream.Entry cancelFunc context.CancelFunc } @@ -45,6 +47,48 @@ func NewWatcher(certChan chan certstream.Entry) *Watcher { // Start starts the watcher. This method is blocking. func (w *Watcher) Start() { + w.context, w.cancelFunc = context.WithCancel(context.Background()) + + // Create new certChan if it doesn't exist yet + if w.certChan == nil { + w.certChan = make(chan certstream.Entry, 5000) + } + + // initialize the watcher with currently available logs + w.addNewlyAvailableLogs() + + log.Println("Started CT watcher") + go certHandler(w.certChan) + go w.watchNewLogs() + + w.wg.Wait() + close(w.certChan) +} + +// watchNewLogs monitors the ct log list for new logs and starts a worker for each new log found. +// This method is blocking. It can be stopped by cancelling the context. +func (w *Watcher) watchNewLogs() { + // Add all available logs to the watcher + w.addNewlyAvailableLogs() + + // Check for new logs every 6 hours + ticker := time.NewTicker(6 * time.Hour) + for { + select { + case <-ticker.C: + w.addNewlyAvailableLogs() + case <-w.context.Done(): + ticker.Stop() + return + } + } +} + +// The transparency log list is constantly updated with new Log servers. +// This function checks for new ct logs and adds them to the watcher. +func (w *Watcher) addNewlyAvailableLogs() { + log.Println("Checking for new ct logs...") + // Get a list of urls of all CT logs logList, err := getAllLogs() if err != nil { @@ -52,41 +96,50 @@ func (w *Watcher) Start() { return } - ctx, cancel := context.WithCancel(context.Background()) - w.cancelFunc = cancel - - // Create new certChan if it doesn't exist yet - if w.certChan == nil { - w.certChan = make(chan certstream.Entry, 5000) - } - - var wg sync.WaitGroup + newCTs := 0 + // Check the ct log list for new, unwatched logs // For each CT log, create a worker and start downloading certs for _, operator := range logList.Operators { + // Iterate over each log of the operator for _, transparencyLog := range operator.Logs { - wg.Add(1) - ctWorker := worker{ - name: transparencyLog.Description, - operatorName: operator.Name, - ctURL: transparencyLog.URL, - entryChan: w.certChan, + // Check if the log is already being watched + newURL := normalizeCtlogURL(transparencyLog.URL) + + alreadyWatched := false + for _, ctWorker := range w.workers { + workerURL := normalizeCtlogURL(ctWorker.ctURL) + if workerURL == newURL { + alreadyWatched = true + break + } } - w.workers = append(w.workers, &ctWorker) - // Start a goroutine for each worker - go func() { - defer wg.Done() - ctWorker.startDownloadingCerts(ctx) - }() + // TODO maybe add a check for logs that are still watched but no longer on the logList and remove them? See also issue #41 and #42 + + // If the log is not being watched, create a new worker + if !alreadyWatched { + w.wg.Add(1) + newCTs++ + + ctWorker := worker{ + name: transparencyLog.Description, + operatorName: operator.Name, + ctURL: transparencyLog.URL, + entryChan: w.certChan, + } + w.workers = append(w.workers, &ctWorker) + + // Start a goroutine for each worker + go func() { + defer w.wg.Done() + ctWorker.startDownloadingCerts(w.context) + }() + } } } - log.Println("Started CT watcher") - go certHandler(w.certChan) - - wg.Wait() - close(w.certChan) + log.Printf("Found %d new ct logs\n", newCTs) } // Stop stops the watcher. @@ -150,7 +203,7 @@ func (w *worker) startDownloadingCerts(ctx context.Context) { log.Printf("Context was cancelled; Stopping worker for '%s'\n", w.ctURL) return default: - log.Println("Sleeping for 5 seconds") + log.Printf("Worker for '%s' sleeping for 5 seconds due to error\n", w.ctURL) time.Sleep(5 * time.Second) log.Printf("Restarting worker for '%s'\n", w.ctURL) continue @@ -274,11 +327,11 @@ func getAllLogs() (loglist3.LogList, error) { return loglist3.LogList{}, parseErr } - // Initial setup of the urlMetricsMap + // Add new ct logs to metrics for _, operator := range allLogs.Operators { for _, ctlog := range operator.Logs { url := normalizeCtlogURL(ctlog.URL) - metrics.Set(operator.Name, url, 0) + metrics.Init(operator.Name, url) } } diff --git a/internal/certificatetransparency/logmetrics.go b/internal/certificatetransparency/logmetrics.go index bfe627e..89d9327 100644 --- a/internal/certificatetransparency/logmetrics.go +++ b/internal/certificatetransparency/logmetrics.go @@ -62,6 +62,22 @@ func (m *LogMetrics) OperatorLogMapping() OperatorLogs { return logOperators } +// Init initializes the internal metrics map with the given operator names and CT log urls if it doesn't exist yet. +func (m *LogMetrics) Init(operator, url string) { + m.mutex.Lock() + defer m.mutex.Unlock() + + // if the operator does not exist, create a new entry + if _, ok := m.metrics[operator]; !ok { + m.metrics[operator] = make(OperatorMetric) + } + + // if the operator exists but the url does not, create a new entry + if _, ok := m.metrics[operator][url]; !ok { + m.metrics[operator][url] = 0 + } +} + // Get the metric for a given operator and ct url. func (m *LogMetrics) Get(operator, url string) int64 { m.mutex.RLock()