From 6d1ff8798b953b35e0469f22edbdc161f09153c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BB=B6=E4=B9=8B?= Date: Tue, 7 Aug 2018 11:17:54 +0800 Subject: [PATCH 1/3] add a new proxy provider named effectiveProxyProvider --- .../proxy/EffectiveProxyProvider.java | 98 +++++++++++++++++++ .../webmagic/proxy/ProxyPageValidator.java | 13 +++ .../proxy/EffectiveProxyProviderTest.java | 58 +++++++++++ 3 files changed, 169 insertions(+) create mode 100644 webmagic-core/src/main/java/us/codecraft/webmagic/proxy/EffectiveProxyProvider.java create mode 100644 webmagic-core/src/main/java/us/codecraft/webmagic/proxy/ProxyPageValidator.java create mode 100644 webmagic-core/src/test/java/us/codecraft/webmagic/proxy/EffectiveProxyProviderTest.java diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/EffectiveProxyProvider.java b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/EffectiveProxyProvider.java new file mode 100644 index 000000000..d4bc9e120 --- /dev/null +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/EffectiveProxyProvider.java @@ -0,0 +1,98 @@ +package us.codecraft.webmagic.proxy; + +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.commons.collections.CollectionUtils; +import us.codecraft.webmagic.Page; +import us.codecraft.webmagic.Task; + +/** + * a ProxyProvider can remove invalid proxy and add newProxies dynamically

+ * New feature:

+ * 1. remove invalid proxy + * 2. async add proxies when proxy is less than threshold + * + * @author evan + */ +public abstract class EffectiveProxyProvider implements ProxyProvider { + + public static final int DEFAULT_EXPAND_POOL_SIZE = 1; + + private final ConcurrentLinkedQueue validProxyQueue = new ConcurrentLinkedQueue(); + + private final ExecutorService addProxyPool = Executors.newFixedThreadPool(1);; + + private final ReentrantLock addProxyLock = new ReentrantLock(); + + private int expandPoolSize = DEFAULT_EXPAND_POOL_SIZE; + + private ProxyPageValidator proxyPageValidator; + + public EffectiveProxyProvider() { + } + + public EffectiveProxyProvider(ProxyPageValidator proxyPageValidator) { + addProxies(); + this.proxyPageValidator = proxyPageValidator; + } + + public EffectiveProxyProvider(ProxyPageValidator proxyPageValidator, int expandPoolSize) { + addProxies(); + this.proxyPageValidator = proxyPageValidator; + this.expandPoolSize = expandPoolSize; + } + + public EffectiveProxyProvider(ProxyPageValidator pageValidator, List proxies) { + this.validProxyQueue.addAll(proxies); + this.proxyPageValidator = pageValidator; + } + + public EffectiveProxyProvider(ProxyPageValidator pageValidator, List proxies, int expandPoolSize) { + this.validProxyQueue.addAll(proxies); + this.proxyPageValidator = pageValidator; + this.expandPoolSize = expandPoolSize; + } + + @Override + public Proxy getProxy(Task task) { + + //async addProxy and avoid invoke extra times + if (validProxyQueue.size() <= expandPoolSize) { + addProxyPool.submit(new Runnable() { + @Override public void run() { + if (addProxyLock.tryLock()) { + try { + List newProxies = addProxies(); + if (CollectionUtils.isNotEmpty(newProxies)) { + validProxyQueue.addAll(newProxies); + } + }finally { + addProxyLock.unlock(); + } + } + } + }); + } + + Proxy proxy = validProxyQueue.poll(); + if (proxy == null) { + return null; + } + //put tail realize loop + validProxyQueue.offer(proxy); + + return proxy; + } + + @Override public void returnProxy(Proxy proxy, Page page, Task task) { + //remove it when proxy is invalid + if (proxyPageValidator != null && !proxyPageValidator.proxyValid(proxy, page, task)) { + validProxyQueue.remove(proxy); + } + } + + public abstract List addProxies(); +} diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/ProxyPageValidator.java b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/ProxyPageValidator.java new file mode 100644 index 000000000..ac815bb1d --- /dev/null +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/ProxyPageValidator.java @@ -0,0 +1,13 @@ +package us.codecraft.webmagic.proxy; + +import us.codecraft.webmagic.Page; +import us.codecraft.webmagic.Task; + +/** + * Created by evan on 2018/8/6. + */ +public interface ProxyPageValidator { + + boolean proxyValid(Proxy proxy, Page page, Task task); + +} diff --git a/webmagic-core/src/test/java/us/codecraft/webmagic/proxy/EffectiveProxyProviderTest.java b/webmagic-core/src/test/java/us/codecraft/webmagic/proxy/EffectiveProxyProviderTest.java new file mode 100644 index 000000000..b8b006f79 --- /dev/null +++ b/webmagic-core/src/test/java/us/codecraft/webmagic/proxy/EffectiveProxyProviderTest.java @@ -0,0 +1,58 @@ +package us.codecraft.webmagic.proxy; + +import com.google.common.collect.Lists; +import java.util.List; +import java.util.Random; +import org.junit.Test; +import us.codecraft.webmagic.Page; +import us.codecraft.webmagic.Site; +import us.codecraft.webmagic.Task; + +/** + * Created by evan on 2018/8/6. + */ +public class EffectiveProxyProviderTest { + + public static final Task TASK = Site.me().toTask(); + + public static final Page page = new Page(); + + @Test + public void test_get_proxy_process() throws Exception { + while (true) { + Proxy proxy = effectiveProxyProvider.getProxy(TASK); + System.out.println("get proxy :"+proxy.getHost()); + effectiveProxyProvider.returnProxy(proxy, page, TASK); + Thread.sleep(2000); + } + } + + private ProxyPageValidator proxyPageValidator = new ProxyPageValidator() { + @Override public boolean proxyValid(Proxy proxy, Page page, Task task) { + + Random random = new Random(); + if (random.nextInt(10) < 5) { + System.out.println("===remove===" + proxy.getHost()); + return false; + } + return true; + } + }; + + private EffectiveProxyProvider effectiveProxyProvider = new EffectiveProxyProvider(proxyPageValidator, Lists.newArrayList(new Proxy("127.0.0.1", 1121))) { + @Override public List addProxies() { + System.out.println("===Expand==="); + return Lists.newArrayList(getRandomProxy(3)); + } + }; + + private List getRandomProxy(int count){ + Random random = new Random(); + List temp = Lists.newArrayList(); + for (int i = 0;i Date: Thu, 9 Aug 2018 18:08:35 +0800 Subject: [PATCH 2/3] fix constract method --- .../us/codecraft/webmagic/proxy/EffectiveProxyProvider.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/EffectiveProxyProvider.java b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/EffectiveProxyProvider.java index d4bc9e120..0b1ba5cb1 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/EffectiveProxyProvider.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/EffectiveProxyProvider.java @@ -35,12 +35,12 @@ public EffectiveProxyProvider() { } public EffectiveProxyProvider(ProxyPageValidator proxyPageValidator) { - addProxies(); + validProxyQueue.addAll(addProxies()); this.proxyPageValidator = proxyPageValidator; } public EffectiveProxyProvider(ProxyPageValidator proxyPageValidator, int expandPoolSize) { - addProxies(); + validProxyQueue.addAll(addProxies()); this.proxyPageValidator = proxyPageValidator; this.expandPoolSize = expandPoolSize; } From 5e863d28d4e770ecb45131489e8cf50576a18883 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BB=B6=E4=B9=8B?= Date: Thu, 9 Aug 2018 18:11:12 +0800 Subject: [PATCH 3/3] fix getProxy to make poll and offer atomic --- .../proxy/EffectiveProxyProvider.java | 59 +++++++++++-------- 1 file changed, 36 insertions(+), 23 deletions(-) diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/EffectiveProxyProvider.java b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/EffectiveProxyProvider.java index 0b1ba5cb1..78ef8236b 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/EffectiveProxyProvider.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/proxy/EffectiveProxyProvider.java @@ -27,6 +27,8 @@ public abstract class EffectiveProxyProvider implements ProxyProvider { private final ReentrantLock addProxyLock = new ReentrantLock(); + private final ReentrantLock pollAndOfferLock = new ReentrantLock(); + private int expandPoolSize = DEFAULT_EXPAND_POOL_SIZE; private ProxyPageValidator proxyPageValidator; @@ -59,32 +61,43 @@ public EffectiveProxyProvider(ProxyPageValidator pageValidator, List prox @Override public Proxy getProxy(Task task) { - //async addProxy and avoid invoke extra times - if (validProxyQueue.size() <= expandPoolSize) { - addProxyPool.submit(new Runnable() { - @Override public void run() { - if (addProxyLock.tryLock()) { - try { - List newProxies = addProxies(); - if (CollectionUtils.isNotEmpty(newProxies)) { - validProxyQueue.addAll(newProxies); - } - }finally { - addProxyLock.unlock(); - } - } - } - }); + //make atomic poll and offer + pollAndOfferLock.lock(); + try { + Proxy proxy = validProxyQueue.poll(); + if (proxy != null) { + //put tail realize loop + validProxyQueue.offer(proxy); + } + + //get more proxies when queue capacity less than expect + if (validProxyQueue.size() <= expandPoolSize) { + expand(); + } + return proxy; + + }finally { + pollAndOfferLock.unlock(); } + } - Proxy proxy = validProxyQueue.poll(); - if (proxy == null) { - return null; - } - //put tail realize loop - validProxyQueue.offer(proxy); + //async addProxy and avoid invoke extra times + public void expand(){ - return proxy; + if (addProxyLock.tryLock()) { + try { + addProxyPool.submit(new Runnable() { + @Override public void run() { + List newProxies = addProxies(); + if (CollectionUtils.isNotEmpty(newProxies)) { + validProxyQueue.addAll(newProxies); + } + } + }); + } finally { + addProxyLock.unlock(); + } + } } @Override public void returnProxy(Proxy proxy, Page page, Task task) {