Skip to content

Commit

Permalink
Reduce the number of fetches harvesting one component (clearlydefined…
Browse files Browse the repository at this point in the history
…#475)

* Cache in progress fetch promises, cached fetched results

Cache in progress fetch promises, cached fetched results for maven

Add a unit test for gitCloner

Cache fetch results from gitCloner

Add a unit test for pypiFetch

Cache fetch results from pypiFetch

Minor refactoring

Cache fetch results from npmjsFetch

Add unit tests for rubyGem

Cache fetch results from rubyGemFetch

Cache fetch results from packagistFetch

Cache fetch results from crateioFetch

Cache fetch results from debianFetch

Cache fetch results from goFetch

Deep clone cached result on copy

Cache fetch results from nugetFetch

Add unit tests for podFetch

Cache results from podFetch

Delay fetchResult construction until end of fetch.

Delay fetchResult construction and transfer the clean up of the download directory at the end of the fetch.
This is to ensure when error occurs, the cleanup of the download directory will still be tracked in request.

Minor refactoring

Minor refactoring

Remove todo to avoid merge conflict

Adapt tests after merge

* Add ScopedQueueSets

ScopedQueueSets contains local and global scoped queue sets.
local scoped queueset holds tasks to be performed on the fetched result (package) that is currently processed and cached locally on the crawler instance.  This avoid refectch and increase the cache hit.
global scoped queueset is the shared queues among crawler instances.
local queueset is popped prior to the global one.  This ensures that cache is utilized before expiration.

* Publish requests on local queues to global upon crawler shutdown

Fix and add tests

Allow graceful shutdown

* Minor refactor and add more tests

* Update docker file to relay of shutdown signal

* Add config for dispatcher.fetched cache

After the scopedQueueSets is introduced, the tool tasks on the same fetched result (in the local scoped queueset) are processed consecutively.
Therefore, cache ttl for the fetched result can now be reduced.

* Address review comments

* Removed --init option in docker run

In my previous changes:
-nodejs application is run as PID 1 in the docker container, and
-the application can handle termination signals.

Therefore, --init option is not longer necessary and hence removed in docker run command.
  • Loading branch information
qtomlinson authored Jul 22, 2022
1 parent cc4af2e commit 3fea16e
Show file tree
Hide file tree
Showing 64 changed files with 2,005 additions and 278 deletions.
2 changes: 1 addition & 1 deletion DevDockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ ENV NODE_ENV "localhost"

ENV PORT 5000
EXPOSE 5000
ENTRYPOINT ["npm", "start"]
ENTRYPOINT ["node", "index.js"]
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,4 @@ COPY . "${APPDIR}"

ENV PORT 5000
EXPOSE 5000
ENTRYPOINT ["npm", "start"]
ENTRYPOINT ["node", "index.js"]
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ See `local.env.list`, `dev.env.list` and `prod.env.list` tempate files.

With a debugger:

`docker run --rm -d --env-file ../dev.env.list -p 9229:9229 -p 5000:5000 --entrypoint npm cdcrawler:latest run local`
`docker run --rm -d --env-file ../dev.env.list -p 9229:9229 -p 5000:5000 --entrypoint node cdcrawler:latest --inspect-brk=0.0.0.0:9229 index.js`

At this point you can attach VS Code with the built in debugging profile (see .vscode/launch.json)

Expand Down
4 changes: 3 additions & 1 deletion config/cdConfig.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ module.exports = {
},
fetch: {
dispatcher: 'cdDispatch',
cdDispatch: {},
cdDispatch: {
fetched: { defaultTtlSeconds: 60 * 60 * 8 }
},
cocoapods: { githubToken },
cratesio: {},
debian: { cdFileLocation: cd_file.location },
Expand Down
2 changes: 1 addition & 1 deletion dev-scripts/debug.bat
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
CALL docker kill cdcrawler
CALL mkdir C:\temp\crawler-data
CALL docker run --rm --name cdcrawler --env-file %~dp0\..\..\env.list -p 5000:5000 -p 9229:9229 -v C:\temp\crawler-data:/tmp/cd --entrypoint npm cdcrawler:latest run local
CALL docker run --rm --name cdcrawler --env-file %~dp0\..\..\env.list -p 5000:5000 -p 9229:9229 -v C:\temp\crawler-data:/tmp/cd --entrypoint node cdcrawler:latest --inspect-brk=0.0.0.0:9229 index.js
27 changes: 27 additions & 0 deletions ghcrawler/bin/www.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ function run(service, logger) {

server.on('error', onError)
server.on('listening', onListening)
server.on('close', onClose)

process.on('SIGTERM', onShutdown)
process.on('SIGINT', onShutdown)
process.on('SIGHUP', onShutdown)

/**
* Normalize a port into a number, string, or false.
Expand Down Expand Up @@ -90,6 +95,28 @@ function run(service, logger) {
var bind = typeof addr === 'string' ? 'pipe ' + addr : 'port ' + addr.port
console.log(`Crawler service listening on ${bind}`)
}

/**
* Event listener for HTTP server 'close' event.
*/
function onClose() {
service.stop()
.then(() => {
console.log('Server closed.')
process.exit(0)
}, error => {
console.error(`Closing server: ${error}`)
process.exit(1)
})
}

/**
* Event listener for terminal signals
*/
function onShutdown(signal) {
console.log(`Received ${signal}`)
server.close()
}
}

module.exports = run
9 changes: 8 additions & 1 deletion ghcrawler/crawlerFactory.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
const Crawler = require('./lib/crawler')
const CrawlerService = require('./lib/crawlerService')
const QueueSet = require('./providers/queuing/queueSet')
const ScopedQueueSets = require('./providers/queuing/scopedQueueSets')
const RefreshingConfig = require('@microsoft/refreshing-config')

let logger = null
Expand Down Expand Up @@ -58,7 +59,7 @@ class CrawlerFactory {
} = {}
) {
logger.info('creating crawler')
queues = queues || CrawlerFactory.createQueues(options.queue)
queues = queues || CrawlerFactory.createScopedQueueSets(options.queue)
store = store || CrawlerFactory.createStore(options.store)
deadletters = deadletters || CrawlerFactory.createDeadLetterStore(options.deadletter)
locker = locker || CrawlerFactory.createLocker(options.lock)
Expand Down Expand Up @@ -216,6 +217,12 @@ class CrawlerFactory {
const later = manager.createQueueChain('later', options)
return new QueueSet([immediate, soon, normal, later], options)
}

static createScopedQueueSets(queueOptions) {
const globalQueues = CrawlerFactory.createQueues(queueOptions)
const localQueues = CrawlerFactory.createQueues(queueOptions, 'memory')
return new ScopedQueueSets(globalQueues, localQueues)
}
}

module.exports = CrawlerFactory
8 changes: 6 additions & 2 deletions ghcrawler/lib/crawler.js
Original file line number Diff line number Diff line change
Expand Up @@ -647,8 +647,8 @@ class Crawler {
return deadDocument
}

queue(requests, name = null) {
return this.queues.push(this._preFilter(requests), name || 'normal')
queue(requests, name = null, scope = null) {
return this.queues.push(this._preFilter(requests), name || 'normal', scope)
}

_preFilter(requests) {
Expand All @@ -672,6 +672,10 @@ class Crawler {
}
return false
}

done() {
return this.queues.publish()
}
}

module.exports = Crawler
19 changes: 10 additions & 9 deletions ghcrawler/lib/crawlerService.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ class CrawlerService {
console.log(`Done loop ${loop.options.name}`)
}

async ensureLoops() {
async ensureLoops(targetCount = this.options.crawler.count) {
this.loops = this.loops.filter(loop => loop.running())
const running = this.status()
const delta = this.options.crawler.count - running
const delta = targetCount - running
if (delta < 0) {
for (let i = 0; i < Math.abs(delta); i++) {
const loop = this.loops.shift()
Expand All @@ -61,7 +61,8 @@ class CrawlerService {
}

stop() {
return this.ensureLoops()
return this.ensureLoops(0)
.then(() => this.crawler.done())
}

queues() {
Expand All @@ -72,24 +73,24 @@ class CrawlerService {
return this.crawler.queue(requests, name)
}

async flushQueue(name) {
const queue = this.crawler.queues.getQueue(name)
async flushQueue(name, scope = null) {
const queue = this.crawler.queues.getQueue(name, scope)
if (!queue) {
return null
}
return queue.flush()
}

getQueueInfo(name) {
const queue = this.crawler.queues.getQueue(name)
getQueueInfo(name, scope = null) {
const queue = this.crawler.queues.getQueue(name, scope)
if (!queue) {
return Promise.reject(`No queue found: ${name}`)
}
return queue.getInfo()
}

async getRequests(name, count, remove = false) {
const queue = this.crawler.queues.getQueue(name)
async getRequests(name, count, remove = false, scope = null) {
const queue = this.crawler.queues.getQueue(name, scope)
if (!queue) {
return null
}
Expand Down
17 changes: 13 additions & 4 deletions ghcrawler/lib/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,15 @@ class Request {
return this
}

removeCleanup(cleanups) {
if (!cleanups || !this.cleanups) {
return this
}
const toRemove = Array.isArray(cleanups) ? cleanups : [cleanups]
this.cleanups = this.cleanups.filter(item => !toRemove.includes(item))
return this
}

addMeta(data) {
this.meta = Object.assign({}, this.meta, data)
return this
Expand Down Expand Up @@ -181,13 +190,13 @@ class Request {
return this.policy.getNextPolicy(name)
}

queueRequests(requests, name = null) {
queueRequests(requests, name = null, scope = null) {
requests = Array.isArray(requests) ? requests : [requests]
const toQueue = requests.filter(request => !this.hasSeen(request))
this.track(this.crawler.queue(toQueue, name))
this.track(this.crawler.queue(toQueue, name, scope))
}

queue(type, url, policy, context = null, pruneRelation = true) {
queue(type, url, policy, context = null, pruneRelation = true, scope = null) {
if (!policy) {
return
}
Expand All @@ -199,7 +208,7 @@ class Request {
if (pruneRelation) {
delete newRequest.context.relation
}
this.queueRequests(newRequest, _.get(this._originQueue, 'queue.name'))
this.queueRequests(newRequest, _.get(this._originQueue, 'queue.name'), scope)
}

markDead(outcome, message) {
Expand Down
16 changes: 15 additions & 1 deletion ghcrawler/providers/queuing/attenuatedQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,15 @@ class AttenuatedQueue extends NestedQueue {
this.logger = options.logger
}

done(request) {
return super.done(request)
.then(() => {
const key = this._getCacheKey(request)
const deleted = memoryCache.del(key)
if (deleted) this.logger.verbose(`Deleted ${key}`)
})
}

push(requests) {
const self = this
requests = Array.isArray(requests) ? requests : [requests]
Expand All @@ -27,7 +36,7 @@ class AttenuatedQueue extends NestedQueue {
_pushOne(request) {
// Include the attempt count in the key. This allows for one concurrent requeue
const attemptCount = request.attemptCount || 0
const key = `t:${attemptCount}:${request.toUniqueString()}`
const key = this._getCacheKey(request)
let entry = memoryCache.get(key)
if (entry) {
// We've seen this request recently. The push is either in progress (and may fail) or is already done.
Expand All @@ -54,6 +63,11 @@ class AttenuatedQueue extends NestedQueue {
return entry.promise
}

_getCacheKey(request) {
const attemptCount = request.attemptCount || 0
return `t:${attemptCount}:${request.toUniqueString()}`
}

_log(message) {
return this.logger ? this.logger.silly(message) : null
}
Expand Down
21 changes: 0 additions & 21 deletions ghcrawler/providers/queuing/queueSet.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ class QueueSet {
return this.getQueue(name).push(requests)
}

repush(original, newRequest) {
const queue = original._retryQueue ? this.getQueue(original._retryQueue) : original._originQueue
return queue.push(newRequest)
}

subscribe() {
return Promise.all(
this.queues.map(queue => {
Expand Down Expand Up @@ -77,22 +72,6 @@ class QueueSet {
return result
}

done(request) {
const acked = request.acked
request.acked = true
return !acked && request._originQueue ? request._originQueue.done(request) : Promise.resolve()
}

defer(request) {
return request._originQueue ? request._originQueue.defer(request) : Promise.resolve()
}

abandon(request) {
const acked = request.acked
request.acked = true
return !acked && request._originQueue ? request._originQueue.abandon(request) : Promise.resolve()
}

getQueue(name) {
const result = this.queueTable[name]
if (!result) {
Expand Down
Loading

0 comments on commit 3fea16e

Please # to comment.