Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

分布式redis爬虫缓存清理支持 #892

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

qingmo
Copy link

@qingmo qingmo commented Sep 25, 2019

鉴于目前webmagic主要参考与scrapy,我找到了scrapy的redis分布式方案scrapy-redis.
通过对比当前webmagic源码与scrapy-redis源码发现,RedisScheduler中缺少对于queue的清理。
以scrapy-redis中的源码为例
scrapy-redis/src/scrapy-redis/scheduler.py

...
class Scheduler(object):
...
def open(self, spider):
...
if self.flush_on_start:
self.flush()
# notice if there are requests already in the queue to resume the crawl
if len(self.queue):
spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))

def close(self, reason):
if not self.persist:
self.flush()

def flush(self):
self.df.clear()
self.queue.clear()
scrapy-redis/src/scrapy_redis/queue.py

class Base(object):
...
def clear(self):
"""Clear queue/stack"""
self.server.delete(self.key)
...
scrapy-redis/src/scrapy_redis/dupefilter.py

class RFPDupeFilter(BaseDupeFilter):
...
def close(self, reason=''):
"""Delete data on close. Called by Scrapy's scheduler.
Parameters
----------
reason : str, optional
"""
self.clear()

def clear(self):
    """Clears fingerprints data."""
    self.server.delete(self.key)
...

个人对python不熟悉,简单理解了一下,flush_on_start可以设置为启动的时候对当前实例执行flush清理。或者使用者可以主动调用flush()根据自己的情况进行清理
这里的清理是对key的完整清理。

反观Webmagic这边
webmagic/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/RedisScheduler.java

public class RedisScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler, DuplicateRemover {

private static final String QUEUE_PREFIX = "queue_";

private static final String SET_PREFIX = "set_";

private static final String ITEM_PREFIX = "item_";

...
@Override
public void resetDuplicateCheck(Task task) {
    Jedis jedis = pool.getResource();
    try {
        jedis.del(getSetKey(task));
    } finally {
        pool.returnResource(jedis);
    }
}
...
protected String getSetKey(Task task) {
    return SET_PREFIX + task.getUUID();
}

protected String getQueueKey(Task task) {
    return QUEUE_PREFIX + task.getUUID();
}

protected String getItemKey(Task task)
{
    return ITEM_PREFIX + task.getUUID();
}
...

这里只有DuplicateRemover接口中定义了一个resetDuplicateCheck方法对set_这个key进行了清理动作。
场景举例:目前有多台爬虫机器,一个redis服务。如果这个爬虫集群处理完第一批数据后,理论上来说第
二批数据属于主观需要抓取的,无论是与第一批数据是否重复。那么这里就需要清理之前的queue。
但是实际上的效果是,目前只能调用resetDuplicateCheck进行排除重复。但是随着数据量的持续上升
item_TASKID的队列一直没有得到清理。如下图所示:
redis内存使用(图太久了已裂)
实际情况(图太久了已裂)
其中set_可以得到清理。

add flush function to clean redis data
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant