diff --git a/src/main/java/org/sims/Main.java b/src/main/java/org/sims/Main.java index 6b14dea..b4f7820 100644 --- a/src/main/java/org/sims/Main.java +++ b/src/main/java/org/sims/Main.java @@ -30,6 +30,8 @@ public static void main(String[] args) { resourceManager, AutowireCapableBeanFactory.AUTOWIRE_BY_TYPE, true ); + resourceManager.postInit(); + DiscoveryManager manager = new DiscoveryManager(resourceManager, config.probeInterval); WsDiscovery.WsSettings wsSettings = new WsDiscovery.WsSettings(config.address); diff --git a/src/main/java/org/sims/discovery/manager/BasicResourceManager.java b/src/main/java/org/sims/discovery/manager/BasicResourceManager.java index b4b87a9..edd3dae 100644 --- a/src/main/java/org/sims/discovery/manager/BasicResourceManager.java +++ b/src/main/java/org/sims/discovery/manager/BasicResourceManager.java @@ -9,7 +9,9 @@ import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; public abstract class BasicResourceManager implements IResourceManager{ @@ -71,6 +73,7 @@ private void remove(String remoteId){ serviceMap.remove(localRef); reverseMap.remove(remoteId); System.out.println("Removed service from map " + remoteId + " " + localRef); + flush(); } } @@ -95,6 +98,9 @@ public void stopWatching(String id){ this.remove(id); } + public List getAllServiceIds(){ + return new ArrayList(this.serviceMap.values()); + } @Override public void dispose(){ diff --git a/src/main/java/org/sims/discovery/manager/HybernateResourceManager.java b/src/main/java/org/sims/discovery/manager/HybernateResourceManager.java index 032f41d..7dc2f06 100644 --- a/src/main/java/org/sims/discovery/manager/HybernateResourceManager.java +++ b/src/main/java/org/sims/discovery/manager/HybernateResourceManager.java @@ -14,6 +14,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Optional; public class HybernateResourceManager extends BasicResourceManager{ @Autowired @@ -22,7 +23,20 @@ public class HybernateResourceManager extends BasicResourceManager{ public HybernateResourceManager(){ super(); + } + + public void postInit(){ + for(String id : getAllServiceIds()){ + Optional optional = serviceRepo.findOne(getExample(id)); + if(!optional.isPresent()){ + stopWatching(id); + }else{ + System.out.println("Service is present: " + id); + } + } + } + private Example getExample(String id){ Service exampleService = new Service(); @@ -59,7 +73,7 @@ public String getId() { } public Single getById(String id){ - return null; + return null;//serviceRepo.findOne(getExample(id)).get(); } public Single> getOwnedServices(){ diff --git a/src/main/java/org/sims/discovery/manager/IResourceManager.java b/src/main/java/org/sims/discovery/manager/IResourceManager.java index 666be40..cfe0b05 100644 --- a/src/main/java/org/sims/discovery/manager/IResourceManager.java +++ b/src/main/java/org/sims/discovery/manager/IResourceManager.java @@ -20,4 +20,6 @@ public interface IResourceManager{ // Finish anything important public void dispose(); + + public void postInit(); } \ No newline at end of file diff --git a/src/main/java/org/sims/discovery/mdns/DnsDiscovery.java b/src/main/java/org/sims/discovery/mdns/DnsDiscovery.java index d5348f5..befde8c 100644 --- a/src/main/java/org/sims/discovery/mdns/DnsDiscovery.java +++ b/src/main/java/org/sims/discovery/mdns/DnsDiscovery.java @@ -25,6 +25,7 @@ public class DnsDiscovery implements IDiscoveryService, ServiceListener{ private boolean alive = false; private boolean running = false; private Subject dnsSubject; + private Thread probeThread; private Subject serviceAddSubject = PublishSubject.create(); private Subject serviceUpdateSubject = PublishSubject.create(); private Subject serviceRemoveSubject = PublishSubject.create(); @@ -66,17 +67,18 @@ public Single> probeServices(){ if(!alive){ return Single.just(new ArrayList(0)); } - if(dnsSubject == null){ - dnsSubject = PublishSubject.create(); + if(dnsSubject == null && probeThread == null){ + final Subject dnssub = PublishSubject.create(); + dnsSubject = dnssub; if(!jmdns.isProbing()){ jmdns.startProber(); } - Thread t = new Thread(){ + final Thread t = new Thread(){ public void run(){ try{ while(jmdns.isProbing()){ - sleep(100); + sleep(500); } }catch(Exception e){ System.err.println(e); @@ -84,14 +86,16 @@ public void run(){ for(String type : settings.types){ for(ServiceInfo info : jmdns.list(type)){ DnsService service = new DnsService(info); - dnsSubject.onNext(service); + dnssub.onNext(service); } } - dnsSubject.onComplete(); + dnssub.onComplete(); dnsSubject = null; + probeThread = null; } }; t.start(); + probeThread = t; } diff --git a/src/main/java/org/sims/discovery/ws/WsDiscovery.java b/src/main/java/org/sims/discovery/ws/WsDiscovery.java index 8993de6..f644eb1 100644 --- a/src/main/java/org/sims/discovery/ws/WsDiscovery.java +++ b/src/main/java/org/sims/discovery/ws/WsDiscovery.java @@ -10,6 +10,8 @@ import io.reactivex.subjects.PublishSubject; import io.reactivex.subjects.ReplaySubject; import io.reactivex.subjects.Subject; + +import org.hibernate.cfg.annotations.Nullability; import org.sims.discovery.IDiscoveryService; import org.sims.discovery.models.IService; @@ -35,6 +37,7 @@ public class WsDiscovery implements IDiscoveryService{ private boolean run = false; private Thread notifyThread; + private Thread probeThread; private WsSettings settings; public WsDiscovery(DiscoverySettings settings){ WsDiscoveryConstants.loggerLevel = Level.OFF; @@ -137,8 +140,9 @@ public Single> probeServices(){ if(!alive){ return Single.just(new ArrayList(0)); } - if(wsSubject == null){ - wsSubject = ReplaySubject.create(); + if(wsSubject == null && probeThread == null){ + final Subject wssub = ReplaySubject.create(); + wsSubject = wssub; try{ //Clear out service inventory server.getServiceDirectory().clear(); @@ -148,6 +152,7 @@ public Single> probeServices(){ System.err.println(e); } /* Create thread that waits 2 seconds for services to accumelate */ + final Thread t = new Thread(){ public void run(){ try{ @@ -155,16 +160,18 @@ public void run(){ for(WsDiscoveryService service : server.getServiceDirectory().matchAll()){ IService iservice = new WsService(service); - wsSubject.onNext(iservice); + wssub.onNext(iservice); } } catch(Exception e){ System.err.println(e); } - wsSubject.onComplete(); + wssub.onComplete(); wsSubject = null; + probeThread = null; } }; t.start(); + probeThread = t; } return wsSubject.buffer(Integer.MAX_VALUE).first(new ArrayList(0));