X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fclustering%2Fintegrationtest%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fclustering%2Fservices_implementation%2Finternal%2FClusteringServicesIT.java;h=80f5558bcf853d91e03cf35ed95ceac3bb1419b0;hp=71a452b4cf57688e57b1ce7802129c9765b6594d;hb=1d125367e70817a6c6fdcb253a00a0cfaa7d7e3e;hpb=fad07e38e57f1c0d8d687e3bae01532196d6e99d diff --git a/opendaylight/clustering/integrationtest/src/test/java/org/opendaylight/controller/clustering/services_implementation/internal/ClusteringServicesIT.java b/opendaylight/clustering/integrationtest/src/test/java/org/opendaylight/controller/clustering/services_implementation/internal/ClusteringServicesIT.java index 71a452b4cf..80f5558bcf 100644 --- a/opendaylight/clustering/integrationtest/src/test/java/org/opendaylight/controller/clustering/services_implementation/internal/ClusteringServicesIT.java +++ b/opendaylight/clustering/integrationtest/src/test/java/org/opendaylight/controller/clustering/services_implementation/internal/ClusteringServicesIT.java @@ -11,8 +11,14 @@ import static org.ops4j.pax.exam.CoreOptions.options; import static org.ops4j.pax.exam.CoreOptions.systemPackages; import static org.ops4j.pax.exam.CoreOptions.systemProperty; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; import java.net.InetAddress; +import java.util.Dictionary; import java.util.HashSet; +import java.util.Hashtable; +import java.util.Set; import java.util.List; import java.util.concurrent.ConcurrentMap; @@ -24,9 +30,14 @@ import org.junit.runner.RunWith; import org.opendaylight.controller.clustering.services.CacheConfigException; import org.opendaylight.controller.clustering.services.CacheExistException; import org.opendaylight.controller.clustering.services.CacheListenerAddException; +import org.opendaylight.controller.clustering.services.IClusterGlobalServices; import org.opendaylight.controller.clustering.services.IClusterServices; +import org.opendaylight.controller.clustering.services.IClusterContainerServices; import org.opendaylight.controller.clustering.services.IClusterServices.cacheMode; import org.opendaylight.controller.clustering.services.IGetUpdates; +import org.opendaylight.controller.clustering.services.ICacheUpdateAware; +import org.opendaylight.controller.sal.utils.ServiceHelper; +import org.opendaylight.controller.sal.core.UpdateType; import org.ops4j.pax.exam.Option; import org.ops4j.pax.exam.junit.Configuration; import org.ops4j.pax.exam.junit.PaxExam; @@ -34,65 +45,73 @@ import org.ops4j.pax.exam.util.PathUtils; import org.osgi.framework.Bundle; import org.osgi.framework.BundleContext; import org.osgi.framework.ServiceReference; +import org.osgi.framework.ServiceRegistration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.CountDownLatch; @RunWith(PaxExam.class) public class ClusteringServicesIT { private Logger log = LoggerFactory - .getLogger(ClusteringServicesIT.class); + .getLogger(ClusteringServicesIT.class); // get the OSGI bundle context @Inject private BundleContext bc; - private IClusterServices clusterServices = null; + private IClusterContainerServices clusterDefaultServices = null; + private IClusterGlobalServices clusterGlobalServices = null; // Configure the OSGi container @Configuration public Option[] config() { return options( - // - systemProperty("logback.configurationFile").value( - "file:" + PathUtils.getBaseDir() - + "/src/test/resources/logback.xml"), - // To start OSGi console for inspection remotely - systemProperty("osgi.console").value("2401"), - // Set the systemPackages (used by clustering) - systemPackages("sun.reflect", "sun.reflect.misc", "sun.misc"), - // List framework bundles - mavenBundle("equinoxSDK381", "org.eclipse.equinox.console", - "1.0.0.v20120522-1841"), - mavenBundle("equinoxSDK381", "org.eclipse.equinox.util", - "1.0.400.v20120522-2049"), - mavenBundle("equinoxSDK381", "org.eclipse.osgi.services", - "3.3.100.v20120522-1822"), - mavenBundle("equinoxSDK381", "org.eclipse.equinox.ds", - "1.4.0.v20120522-1841"), - mavenBundle("equinoxSDK381", "org.apache.felix.gogo.command", - "0.8.0.v201108120515"), - mavenBundle("equinoxSDK381", "org.apache.felix.gogo.runtime", - "0.8.0.v201108120515"), - mavenBundle("equinoxSDK381", "org.apache.felix.gogo.shell", - "0.8.0.v201110170705"), - // List logger bundles - mavenBundle("org.slf4j", "slf4j-api").versionAsInProject(), - mavenBundle("org.slf4j", "log4j-over-slf4j").versionAsInProject(), - mavenBundle("ch.qos.logback", "logback-core").versionAsInProject(), - mavenBundle("ch.qos.logback", "logback-classic").versionAsInProject(), - // List all the bundles on which the test case depends - mavenBundle("org.opendaylight.controller", + // + systemProperty("logback.configurationFile").value( + "file:" + PathUtils.getBaseDir() + + "/src/test/resources/logback.xml"), + // To start OSGi console for inspection remotely + systemProperty("osgi.console").value("2401"), + // Set the systemPackages (used by clustering) + systemPackages("sun.reflect", "sun.reflect.misc", "sun.misc"), + // List framework bundles + mavenBundle("equinoxSDK381", + "org.eclipse.equinox.console").versionAsInProject(), + mavenBundle("equinoxSDK381", + "org.eclipse.equinox.util").versionAsInProject(), + mavenBundle("equinoxSDK381", + "org.eclipse.osgi.services").versionAsInProject(), + mavenBundle("equinoxSDK381", + "org.eclipse.equinox.ds").versionAsInProject(), + mavenBundle("equinoxSDK381", + "org.apache.felix.gogo.command").versionAsInProject(), + mavenBundle("equinoxSDK381", + "org.apache.felix.gogo.runtime").versionAsInProject(), + mavenBundle("equinoxSDK381", + "org.apache.felix.gogo.shell").versionAsInProject(), + // List logger bundles + mavenBundle("org.slf4j", "slf4j-api").versionAsInProject(), + mavenBundle("org.slf4j", "log4j-over-slf4j").versionAsInProject(), + mavenBundle("ch.qos.logback", "logback-core").versionAsInProject(), + mavenBundle("ch.qos.logback", "logback-classic").versionAsInProject(), + // List all the bundles on which the test case depends + mavenBundle("org.opendaylight.controller", "clustering.services").versionAsInProject(), - mavenBundle("org.opendaylight.controller", + mavenBundle("org.opendaylight.controller", "clustering.services-implementation").versionAsInProject(), - mavenBundle("org.opendaylight.controller", "sal").versionAsInProject(), - mavenBundle("org.opendaylight.controller", + mavenBundle("org.opendaylight.controller", "sal").versionAsInProject(), + mavenBundle("org.opendaylight.controller", "sal.implementation").versionAsInProject(), - mavenBundle("org.jboss.spec.javax.transaction", + mavenBundle("org.opendaylight.controller", "containermanager").versionAsInProject(), + mavenBundle("org.opendaylight.controller", + "containermanager.implementation").versionAsInProject(), + mavenBundle("org.jboss.spec.javax.transaction", "jboss-transaction-api_1.1_spec").versionAsInProject(), - mavenBundle("org.apache.commons", "commons-lang3").versionAsInProject(), - mavenBundle("org.apache.felix", + mavenBundle("org.apache.commons", "commons-lang3").versionAsInProject(), + mavenBundle("org.apache.felix", "org.apache.felix.dependencymanager").versionAsInProject(), - junitBundles()); + mavenBundle("org.apache.felix", + "org.apache.felix.dependencymanager.shell").versionAsInProject(), + junitBundles()); } private String stateToString(int state) { @@ -119,30 +138,34 @@ public class ClusteringServicesIT { int state = b[i].getState(); if (state != Bundle.ACTIVE && state != Bundle.RESOLVED) { log.debug("Bundle:" + b[i].getSymbolicName() + " state:" - + stateToString(state)); + + stateToString(state)); debugit = true; } } if (debugit) { log.debug("Do some debugging because some bundle is " - + "unresolved"); + + "unresolved"); } // Assert if true, if false we are good to go! assertFalse(debugit); - ServiceReference r = bc.getServiceReference(IClusterServices.class - .getName()); - if (r != null) { - this.clusterServices = (IClusterServices) bc.getService(r); - } + this.clusterServices = (IClusterServices)ServiceHelper + .getGlobalInstance(IClusterServices.class, this); assertNotNull(this.clusterServices); + this.clusterDefaultServices = (IClusterContainerServices)ServiceHelper + .getInstance(IClusterContainerServices.class, "default", this); + assertNotNull(this.clusterDefaultServices); + + this.clusterGlobalServices = (IClusterGlobalServices)ServiceHelper + .getGlobalInstance(IClusterGlobalServices.class, this); + assertNotNull(this.clusterGlobalServices); } @Test public void clusterTest() throws CacheExistException, CacheConfigException, - CacheListenerAddException { + CacheListenerAddException { String container1 = "Container1"; String container2 = "Container2"; @@ -220,7 +243,7 @@ public class ClusteringServicesIT { } @Override - public void entryUpdated(Integer key, String new_value, + public void entryUpdated(Integer key, String newValue, String containerName, String cacheName, boolean originLocal) { return; } @@ -231,4 +254,438 @@ public class ClusteringServicesIT { return; } } + + @Test + public void clusterContainerAndGlobalTest() throws CacheExistException, CacheConfigException, + CacheListenerAddException, InterruptedException { + String cache1 = "Cache1"; + String cache2 = "Cache2"; + // Lets test the case of caches with same name in different + // containers (actually global an container case) + String cache3 = "Cache2"; + + HashSet cacheModeSet = new HashSet(); + cacheModeSet.add(cacheMode.NON_TRANSACTIONAL); + ConcurrentMap cm11 = this.clusterDefaultServices.createCache(cache1, cacheModeSet); + assertNotNull(cm11); + + assertTrue(this.clusterDefaultServices.existCache(cache1)); + assertEquals(cm11, this.clusterDefaultServices.getCache(cache1)); + + ConcurrentMap cm12 = this.clusterDefaultServices.createCache(cache2, cacheModeSet); + ConcurrentMap cm23 = this.clusterGlobalServices.createCache(cache3, cacheModeSet); + + // Now given cahe2 and cache3 have same name lets make sure + // they don't return the same reference + assertNotNull(this.clusterGlobalServices.getCache(cache2)); + // cm12 reference must be different than cm23 + assertTrue(cm12 != cm23); + + HashSet cacheList = (HashSet) this.clusterDefaultServices + .getCacheList(); + assertEquals(2, cacheList.size()); + assertTrue(cacheList.contains(cache1)); + assertTrue(cacheList.contains(cache2)); + + assertNotNull(this.clusterDefaultServices.getCacheProperties(cache1)); + + { + /***********************************/ + /* Testing cacheAware in Container */ + /***********************************/ + Dictionary props = new Hashtable(); + Set propSet = new HashSet(); + propSet.add(cache1); + propSet.add(cache2); + props.put("cachenames", propSet); + CacheAware listener = new CacheAware(); + CacheAware listenerRepeated = new CacheAware(); + ServiceRegistration updateServiceReg = ServiceHelper.registerServiceWReg(ICacheUpdateAware.class, "default", + listener, props); + assertNotNull(updateServiceReg); + + // Register another service for the same caches, this + // should not get any update because we don't allow to + // override the existing unless before unregistered + ServiceRegistration updateServiceRegRepeated = ServiceHelper.registerServiceWReg(ICacheUpdateAware.class, + "default", + listenerRepeated, props); + assertNotNull(updateServiceRegRepeated); + CountDownLatch res = null; + List ups = null; + Update up = null; + Integer k1 = new Integer(10); + Long k2 = new Long(100L); + + /***********************/ + /* CREATE NEW KEY CASE */ + /***********************/ + // Start monitoring the updates + res = listener.restart(2); + // modify the cache + cm11.put(k1, "foo"); + // Wait + res.await(100L, TimeUnit.SECONDS); + // Analyze the updates + ups = listener.getUpdates(); + assertTrue(ups.size() == 2); + // Validate that first we get an update (yes even in case of a + // new value added) + up = ups.get(0); + assertTrue(up.t.equals(UpdateType.CHANGED)); + assertTrue(up.key.equals(k1)); + assertTrue(up.value.equals("foo")); + assertTrue(up.cacheName.equals(cache1)); + // Validate that we then get a create + up = ups.get(1); + assertTrue(up.t.equals(UpdateType.ADDED)); + assertTrue(up.key.equals(k1)); + assertNull(up.value); + assertTrue(up.cacheName.equals(cache1)); + + /*******************************/ + /* UPDATE AN EXISTING KEY CASE */ + /*******************************/ + // Start monitoring the updates + res = listener.restart(1); + // modify the cache + cm11.put(k1, "baz"); + // Wait + res.await(100L, TimeUnit.SECONDS); + // Analyze the updates + ups = listener.getUpdates(); + assertTrue(ups.size() == 1); + // Validate we get an update with expect fields + up = ups.get(0); + assertTrue(up.t.equals(UpdateType.CHANGED)); + assertTrue(up.key.equals(k1)); + assertTrue(up.value.equals("baz")); + assertTrue(up.cacheName.equals(cache1)); + + /********************************/ + /* REMOVAL OF EXISTING KEY CASE */ + /********************************/ + // Start monitoring the updates + res = listener.restart(1); + // modify the cache + cm11.remove(k1); + // Wait + res.await(100L, TimeUnit.SECONDS); + // Analyze the updates + ups = listener.getUpdates(); + assertTrue(ups.size() == 1); + // Validate we get a delete with expected fields + up = ups.get(0); + assertTrue(up.t.equals(UpdateType.REMOVED)); + assertTrue(up.key.equals(k1)); + assertNull(up.value); + assertTrue(up.cacheName.equals(cache1)); + + /***********************/ + /* CREATE NEW KEY CASE */ + /***********************/ + // Start monitoring the updates + res = listener.restart(2); + // modify the cache + cm12.put(k2, new Short((short)15)); + // Wait + res.await(100L, TimeUnit.SECONDS); + // Analyze the updates + ups = listener.getUpdates(); + assertTrue(ups.size() == 2); + // Validate that first we get an update (yes even in case of a + // new value added) + up = ups.get(0); + assertTrue(up.t.equals(UpdateType.CHANGED)); + assertTrue(up.key.equals(k2)); + assertTrue(up.value.equals(new Short((short)15))); + assertTrue(up.cacheName.equals(cache2)); + // Validate that we then get a create + up = ups.get(1); + assertTrue(up.t.equals(UpdateType.ADDED)); + assertTrue(up.key.equals(k2)); + assertNull(up.value); + assertTrue(up.cacheName.equals(cache2)); + + /*******************************/ + /* UPDATE AN EXISTING KEY CASE */ + /*******************************/ + // Start monitoring the updates + res = listener.restart(1); + // modify the cache + cm12.put(k2, "BAZ"); + // Wait + res.await(100L, TimeUnit.SECONDS); + // Analyze the updates + ups = listener.getUpdates(); + assertTrue(ups.size() == 1); + // Validate we get an update with expect fields + up = ups.get(0); + assertTrue(up.t.equals(UpdateType.CHANGED)); + assertTrue(up.key.equals(k2)); + assertTrue(up.value.equals("BAZ")); + assertTrue(up.cacheName.equals(cache2)); + + /********************************/ + /* REMOVAL OF EXISTING KEY CASE */ + /********************************/ + // Start monitoring the updates + res = listener.restart(1); + // modify the cache + cm12.remove(k2); + // Wait + res.await(100L, TimeUnit.SECONDS); + // Analyze the updates + ups = listener.getUpdates(); + assertTrue(ups.size() == 1); + // Validate we get a delete with expected fields + up = ups.get(0); + assertTrue(up.t.equals(UpdateType.REMOVED)); + assertTrue(up.key.equals(k2)); + assertNull(up.value); + assertTrue(up.cacheName.equals(cache2)); + + /******************************************************************/ + /* NOW LETS REMOVE THE REGISTRATION AND MAKE SURE NO UPDATS COMES */ + /******************************************************************/ + updateServiceReg.unregister(); + // Start monitoring the updates, noone should come in + res = listener.restart(1); + + /***********************/ + /* CREATE NEW KEY CASE */ + /***********************/ + // modify the cache + cm11.put(k1, "foo"); + + /*******************************/ + /* UPDATE AN EXISTING KEY CASE */ + /*******************************/ + // modify the cache + cm11.put(k1, "baz"); + + /********************************/ + /* REMOVAL OF EXISTING KEY CASE */ + /********************************/ + // modify the cache + cm11.remove(k1); + + /***********************/ + /* CREATE NEW KEY CASE */ + /***********************/ + // modify the cache + cm12.put(k2, new Short((short)15)); + + /*******************************/ + /* UPDATE AN EXISTING KEY CASE */ + /*******************************/ + // modify the cache + cm12.put(k2, "BAZ"); + + /********************************/ + /* REMOVAL OF EXISTING KEY CASE */ + /********************************/ + // modify the cache + cm12.remove(k2); + + + // Wait to make sure no updates came in, clearly this is + // error prone as logic, but cannot find a better way than + // this to make sure updates didn't get in + res.await(1L, TimeUnit.SECONDS); + // Analyze the updates + ups = listener.getUpdates(); + assertTrue(ups.size() == 0); + } + + { + /***********************************/ + /* Testing cacheAware in Global */ + /***********************************/ + Dictionary props = new Hashtable(); + Set propSet = new HashSet(); + propSet.add(cache3); + props.put("cachenames", propSet); + CacheAware listener = new CacheAware(); + ServiceRegistration updateServiceReg = ServiceHelper.registerGlobalServiceWReg(ICacheUpdateAware.class, + listener, props); + assertNotNull(updateServiceReg); + + CountDownLatch res = null; + List ups = null; + Update up = null; + Integer k1 = new Integer(10); + + /***********************/ + /* CREATE NEW KEY CASE */ + /***********************/ + // Start monitoring the updates + res = listener.restart(2); + // modify the cache + cm23.put(k1, "foo"); + // Wait + res.await(100L, TimeUnit.SECONDS); + // Analyze the updates + ups = listener.getUpdates(); + assertTrue(ups.size() == 2); + // Validate that first we get an update (yes even in case of a + // new value added) + up = ups.get(0); + assertTrue(up.t.equals(UpdateType.CHANGED)); + assertTrue(up.key.equals(k1)); + assertTrue(up.value.equals("foo")); + assertTrue(up.cacheName.equals(cache3)); + // Validate that we then get a create + up = ups.get(1); + assertTrue(up.t.equals(UpdateType.ADDED)); + assertTrue(up.key.equals(k1)); + assertNull(up.value); + assertTrue(up.cacheName.equals(cache3)); + + /*******************************/ + /* UPDATE AN EXISTING KEY CASE */ + /*******************************/ + // Start monitoring the updates + res = listener.restart(1); + // modify the cache + cm23.put(k1, "baz"); + // Wait + res.await(100L, TimeUnit.SECONDS); + // Analyze the updates + ups = listener.getUpdates(); + assertTrue(ups.size() == 1); + // Validate we get an update with expect fields + up = ups.get(0); + assertTrue(up.t.equals(UpdateType.CHANGED)); + assertTrue(up.key.equals(k1)); + assertTrue(up.value.equals("baz")); + assertTrue(up.cacheName.equals(cache3)); + + /********************************/ + /* REMOVAL OF EXISTING KEY CASE */ + /********************************/ + // Start monitoring the updates + res = listener.restart(1); + // modify the cache + cm23.remove(k1); + // Wait + res.await(100L, TimeUnit.SECONDS); + // Analyze the updates + ups = listener.getUpdates(); + assertTrue(ups.size() == 1); + // Validate we get a delete with expected fields + up = ups.get(0); + assertTrue(up.t.equals(UpdateType.REMOVED)); + assertTrue(up.key.equals(k1)); + assertNull(up.value); + assertTrue(up.cacheName.equals(cache3)); + + /******************************************************************/ + /* NOW LETS REMOVE THE REGISTRATION AND MAKE SURE NO UPDATS COMES */ + /******************************************************************/ + updateServiceReg.unregister(); + // Start monitoring the updates, noone should come in + res = listener.restart(1); + + /***********************/ + /* CREATE NEW KEY CASE */ + /***********************/ + // modify the cache + cm23.put(k1, "foo"); + + /*******************************/ + /* UPDATE AN EXISTING KEY CASE */ + /*******************************/ + // modify the cache + cm23.put(k1, "baz"); + + /********************************/ + /* REMOVAL OF EXISTING KEY CASE */ + /********************************/ + // modify the cache + cm23.remove(k1); + + // Wait to make sure no updates came in, clearly this is + // error prone as logic, but cannot find a better way than + // this to make sure updates didn't get in + res.await(1L, TimeUnit.SECONDS); + // Analyze the updates + ups = listener.getUpdates(); + assertTrue(ups.size() == 0); + } + + InetAddress addr = this.clusterDefaultServices.getMyAddress(); + assertNotNull(addr); + + List addrList = this.clusterDefaultServices + .getClusteredControllers(); + + this.clusterDefaultServices.destroyCache(cache1); + assertFalse(this.clusterDefaultServices.existCache(cache1)); + } + + private class Update { + Object key; + Object value; + String cacheName; + UpdateType t; + + Update (UpdateType t, Object key, Object value, String cacheName) { + this.t = t; + this.key = key; + this.value = value; + this.cacheName = cacheName; + } + } + + private class CacheAware implements ICacheUpdateAware { + private CopyOnWriteArrayList gotUpdates; + private CountDownLatch latch = null; + + CacheAware() { + this.gotUpdates = new CopyOnWriteArrayList(); + } + + + /** + * Restart the monitor of the updates on the CacheAware object + * + * @param expectedOperations Number of expected updates + * + * @return a countdown latch which will be used to wait till the updates are done + */ + CountDownLatch restart(int expectedOperations) { + this.gotUpdates.clear(); + this.latch = new CountDownLatch(expectedOperations); + return this.latch; + } + + List getUpdates() { + return this.gotUpdates; + } + + @Override + public void entryCreated(Object key, String cacheName, boolean originLocal) { + log.debug("CACHE[{}] Got an entry created for key:{}", cacheName, key); + Update u = new Update(UpdateType.ADDED, key, null, cacheName); + this.gotUpdates.add(u); + this.latch.countDown(); + } + + @Override + public void entryUpdated(Object key, Object newValue, String cacheName, boolean originLocal) { + log.debug("CACHE[{}] Got an entry updated for key:{} newValue:{}", cacheName, key, newValue); + Update u = new Update(UpdateType.CHANGED, key, newValue, cacheName); + this.gotUpdates.add(u); + this.latch.countDown(); + } + + @Override + public void entryDeleted(Object key, String cacheName, boolean originLocal) { + log.debug("CACHE[{}] Got an entry delete for key:{}", cacheName, key); + Update u = new Update(UpdateType.REMOVED, key, null, cacheName); + this.gotUpdates.add(u); + this.latch.countDown(); + } + } }