Fix CacheUpdateAware mechanism in cluster.services-implementation 93/493/1
authorGiovanni Meo <gmeo@cisco.com>
Mon, 17 Jun 2013 14:57:43 +0000 (16:57 +0200)
committerGiovanni Meo <gmeo@cisco.com>
Mon, 17 Jun 2013 15:41:40 +0000 (17:41 +0200)
bundle

- Add the logic for clustering.services_implementation bundle to track
all the ICacheUpdateAware services and to relate them to a container
or a global context. Make sure to be able to handle the service
registering and unregistering case. In case of
registration/deregistration of an ICacheUpdateAware check for the
"cacheNames" property and then use it for learning for what caches the
updates will be generated. Also make sure in phase of registration if
there is an attempt to override a registration for an existing cache
to avoid it and to raise an error message.
- Given the LDAP filter that allows to track for the global
ICacheUpdateAware services match also the per-container ones, make
sure that ClusterGlobalManager filter them out before reaching the
ClusterManagerCommon, in fact that would prevent to handle seamlessly
the case of cache name being the same in global context as well
per-container.
- Removed stale OSGi declarative service file in
clustering.services-implementation bundle.
- Created extensive integration test case for clustering.services to
make sure the logic is correct and automatically tested.
- Added two new ServiceHelper methods that allow to register and OSGi
service and to get back a ServiceRegistration, extremely useful in
integration tests to test difference between when a service is present
and when it's gone.
- Added logback.xml for the unit tests of
clustering.services-implementation bundle to avoid to print out infos
while executing the unit test that would be useless
- Modified ClusteringServicesIT to use the versionAsInProject on all
the configured bundles, so to avoid too much manual sync process
between the versions in the POM and in the configuration

Change-Id: Id1ae2e22e2896df686a377fd3ae9f232f3545bca
Signed-off-by: Giovanni Meo <gmeo@cisco.com>
opendaylight/clustering/integrationtest/pom.xml
opendaylight/clustering/integrationtest/src/test/java/org/opendaylight/controller/clustering/services_implementation/internal/ClusteringServicesIT.java
opendaylight/clustering/services_implementation/src/main/java/org/opendaylight/controller/clustering/services_implementation/internal/Activator.java
opendaylight/clustering/services_implementation/src/main/java/org/opendaylight/controller/clustering/services_implementation/internal/ClusterGlobalManager.java
opendaylight/clustering/services_implementation/src/main/java/org/opendaylight/controller/clustering/services_implementation/internal/ClusterManagerCommon.java
opendaylight/clustering/services_implementation/src/main/java/org/opendaylight/controller/clustering/services_implementation/internal/GetUpdatesContainer.java [new file with mode: 0644]
opendaylight/clustering/services_implementation/src/main/resources/OSGI-INF/component-cachemanager.xml [deleted file]
opendaylight/clustering/services_implementation/src/test/resources/logback.xml [new file with mode: 0644]
opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/utils/ServiceHelper.java

index 52c568203be917dd42c347afbb616b9f907c5b9c..e2e27031ca43f7b150059a1910089a48c1e57020 100644 (file)
   <version>0.4.0-SNAPSHOT</version>
 
   <dependencies>
-    <dependency>
-      <groupId>org.infinispan</groupId>
-      <artifactId>infinispan-core</artifactId>
-      <version>5.2.3.Final</version>
-    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>clustering.services</artifactId>
       <artifactId>clustering.services-implementation</artifactId>
       <version>0.4.0-SNAPSHOT</version>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>containermanager</artifactId>
+      <version>0.4.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>containermanager.implementation</artifactId>
+      <version>0.4.0-SNAPSHOT</version>
+    </dependency>
   </dependencies>
   <properties>
     <!-- Sonar jacoco plugin to get integration test coverage info -->
index 71a452b4cf57688e57b1ce7802129c9765b6594d..80f5558bcf853d91e03cf35ed95ceac3bb1419b0 100644 (file)
@@ -11,8 +11,14 @@ import static org.ops4j.pax.exam.CoreOptions.options;
 import static org.ops4j.pax.exam.CoreOptions.systemPackages;\r
 import static org.ops4j.pax.exam.CoreOptions.systemProperty;\r
 \r
+import java.util.List;\r
+import java.util.concurrent.CopyOnWriteArrayList;\r
+import java.util.concurrent.TimeUnit;\r
 import java.net.InetAddress;\r
+import java.util.Dictionary;\r
 import java.util.HashSet;\r
+import java.util.Hashtable;\r
+import java.util.Set;\r
 import java.util.List;\r
 import java.util.concurrent.ConcurrentMap;\r
 \r
@@ -24,9 +30,14 @@ import org.junit.runner.RunWith;
 import org.opendaylight.controller.clustering.services.CacheConfigException;\r
 import org.opendaylight.controller.clustering.services.CacheExistException;\r
 import org.opendaylight.controller.clustering.services.CacheListenerAddException;\r
+import org.opendaylight.controller.clustering.services.IClusterGlobalServices;\r
 import org.opendaylight.controller.clustering.services.IClusterServices;\r
+import org.opendaylight.controller.clustering.services.IClusterContainerServices;\r
 import org.opendaylight.controller.clustering.services.IClusterServices.cacheMode;\r
 import org.opendaylight.controller.clustering.services.IGetUpdates;\r
+import org.opendaylight.controller.clustering.services.ICacheUpdateAware;\r
+import org.opendaylight.controller.sal.utils.ServiceHelper;\r
+import org.opendaylight.controller.sal.core.UpdateType;\r
 import org.ops4j.pax.exam.Option;\r
 import org.ops4j.pax.exam.junit.Configuration;\r
 import org.ops4j.pax.exam.junit.PaxExam;\r
@@ -34,65 +45,73 @@ import org.ops4j.pax.exam.util.PathUtils;
 import org.osgi.framework.Bundle;\r
 import org.osgi.framework.BundleContext;\r
 import org.osgi.framework.ServiceReference;\r
+import org.osgi.framework.ServiceRegistration;\r
 import org.slf4j.Logger;\r
 import org.slf4j.LoggerFactory;\r
+import java.util.concurrent.CountDownLatch;\r
 \r
 @RunWith(PaxExam.class)\r
 public class ClusteringServicesIT {\r
     private Logger log = LoggerFactory\r
-            .getLogger(ClusteringServicesIT.class);\r
+        .getLogger(ClusteringServicesIT.class);\r
     // get the OSGI bundle context\r
     @Inject\r
     private BundleContext bc;\r
-\r
     private IClusterServices clusterServices = null;\r
+    private IClusterContainerServices clusterDefaultServices = null;\r
+    private IClusterGlobalServices clusterGlobalServices = null;\r
 \r
     // Configure the OSGi container\r
     @Configuration\r
     public Option[] config() {\r
         return options(\r
-                //\r
-                systemProperty("logback.configurationFile").value(\r
-                        "file:" + PathUtils.getBaseDir()\r
-                                + "/src/test/resources/logback.xml"),\r
-                // To start OSGi console for inspection remotely\r
-                systemProperty("osgi.console").value("2401"),\r
-                // Set the systemPackages (used by clustering)\r
-                systemPackages("sun.reflect", "sun.reflect.misc", "sun.misc"),\r
-                // List framework bundles\r
-                mavenBundle("equinoxSDK381", "org.eclipse.equinox.console",\r
-                        "1.0.0.v20120522-1841"),\r
-                mavenBundle("equinoxSDK381", "org.eclipse.equinox.util",\r
-                        "1.0.400.v20120522-2049"),\r
-                mavenBundle("equinoxSDK381", "org.eclipse.osgi.services",\r
-                        "3.3.100.v20120522-1822"),\r
-                mavenBundle("equinoxSDK381", "org.eclipse.equinox.ds",\r
-                        "1.4.0.v20120522-1841"),\r
-                mavenBundle("equinoxSDK381", "org.apache.felix.gogo.command",\r
-                        "0.8.0.v201108120515"),\r
-                mavenBundle("equinoxSDK381", "org.apache.felix.gogo.runtime",\r
-                        "0.8.0.v201108120515"),\r
-                mavenBundle("equinoxSDK381", "org.apache.felix.gogo.shell",\r
-                        "0.8.0.v201110170705"),\r
-                // List logger bundles\r
-                mavenBundle("org.slf4j", "slf4j-api").versionAsInProject(),\r
-                mavenBundle("org.slf4j", "log4j-over-slf4j").versionAsInProject(),\r
-                mavenBundle("ch.qos.logback", "logback-core").versionAsInProject(),\r
-                mavenBundle("ch.qos.logback", "logback-classic").versionAsInProject(),\r
-                // List all the bundles on which the test case depends\r
-                mavenBundle("org.opendaylight.controller",\r
+            //\r
+            systemProperty("logback.configurationFile").value(\r
+                "file:" + PathUtils.getBaseDir()\r
+                + "/src/test/resources/logback.xml"),\r
+            // To start OSGi console for inspection remotely\r
+            systemProperty("osgi.console").value("2401"),\r
+            // Set the systemPackages (used by clustering)\r
+            systemPackages("sun.reflect", "sun.reflect.misc", "sun.misc"),\r
+            // List framework bundles\r
+            mavenBundle("equinoxSDK381",\r
+                        "org.eclipse.equinox.console").versionAsInProject(),\r
+            mavenBundle("equinoxSDK381",\r
+                        "org.eclipse.equinox.util").versionAsInProject(),\r
+            mavenBundle("equinoxSDK381",\r
+                        "org.eclipse.osgi.services").versionAsInProject(),\r
+            mavenBundle("equinoxSDK381",\r
+                        "org.eclipse.equinox.ds").versionAsInProject(),\r
+            mavenBundle("equinoxSDK381",\r
+                        "org.apache.felix.gogo.command").versionAsInProject(),\r
+            mavenBundle("equinoxSDK381",\r
+                        "org.apache.felix.gogo.runtime").versionAsInProject(),\r
+            mavenBundle("equinoxSDK381",\r
+                        "org.apache.felix.gogo.shell").versionAsInProject(),\r
+            // List logger bundles\r
+            mavenBundle("org.slf4j", "slf4j-api").versionAsInProject(),\r
+            mavenBundle("org.slf4j", "log4j-over-slf4j").versionAsInProject(),\r
+            mavenBundle("ch.qos.logback", "logback-core").versionAsInProject(),\r
+            mavenBundle("ch.qos.logback", "logback-classic").versionAsInProject(),\r
+            // List all the bundles on which the test case depends\r
+            mavenBundle("org.opendaylight.controller",\r
                         "clustering.services").versionAsInProject(),\r
-                mavenBundle("org.opendaylight.controller",\r
+            mavenBundle("org.opendaylight.controller",\r
                         "clustering.services-implementation").versionAsInProject(),\r
-                mavenBundle("org.opendaylight.controller", "sal").versionAsInProject(),\r
-                mavenBundle("org.opendaylight.controller",\r
+            mavenBundle("org.opendaylight.controller", "sal").versionAsInProject(),\r
+            mavenBundle("org.opendaylight.controller",\r
                         "sal.implementation").versionAsInProject(),\r
-                mavenBundle("org.jboss.spec.javax.transaction",\r
+            mavenBundle("org.opendaylight.controller", "containermanager").versionAsInProject(),\r
+            mavenBundle("org.opendaylight.controller",\r
+                        "containermanager.implementation").versionAsInProject(),\r
+            mavenBundle("org.jboss.spec.javax.transaction",\r
                         "jboss-transaction-api_1.1_spec").versionAsInProject(),\r
-                mavenBundle("org.apache.commons", "commons-lang3").versionAsInProject(),\r
-                mavenBundle("org.apache.felix",\r
+            mavenBundle("org.apache.commons", "commons-lang3").versionAsInProject(),\r
+            mavenBundle("org.apache.felix",\r
                         "org.apache.felix.dependencymanager").versionAsInProject(),\r
-                junitBundles());\r
+            mavenBundle("org.apache.felix",\r
+                        "org.apache.felix.dependencymanager.shell").versionAsInProject(),\r
+            junitBundles());\r
     }\r
 \r
     private String stateToString(int state) {\r
@@ -119,30 +138,34 @@ public class ClusteringServicesIT {
             int state = b[i].getState();\r
             if (state != Bundle.ACTIVE && state != Bundle.RESOLVED) {\r
                 log.debug("Bundle:" + b[i].getSymbolicName() + " state:"\r
-                        + stateToString(state));\r
+                          + stateToString(state));\r
                 debugit = true;\r
             }\r
         }\r
         if (debugit) {\r
             log.debug("Do some debugging because some bundle is "\r
-                    + "unresolved");\r
+                      + "unresolved");\r
         }\r
 \r
         // Assert if true, if false we are good to go!\r
         assertFalse(debugit);\r
 \r
-        ServiceReference r = bc.getServiceReference(IClusterServices.class\r
-                .getName());\r
-        if (r != null) {\r
-            this.clusterServices = (IClusterServices) bc.getService(r);\r
-        }\r
+        this.clusterServices = (IClusterServices)ServiceHelper\r
+            .getGlobalInstance(IClusterServices.class, this);\r
         assertNotNull(this.clusterServices);\r
 \r
+        this.clusterDefaultServices = (IClusterContainerServices)ServiceHelper\r
+            .getInstance(IClusterContainerServices.class, "default", this);\r
+        assertNotNull(this.clusterDefaultServices);\r
+\r
+        this.clusterGlobalServices = (IClusterGlobalServices)ServiceHelper\r
+            .getGlobalInstance(IClusterGlobalServices.class, this);\r
+        assertNotNull(this.clusterGlobalServices);\r
     }\r
 \r
     @Test\r
     public void clusterTest() throws CacheExistException, CacheConfigException,\r
-            CacheListenerAddException {\r
+        CacheListenerAddException {\r
 \r
         String container1 = "Container1";\r
         String container2 = "Container2";\r
@@ -220,7 +243,7 @@ public class ClusteringServicesIT {
         }\r
 \r
         @Override\r
-        public void entryUpdated(Integer key, String new_value,\r
+        public void entryUpdated(Integer key, String newValue,\r
                 String containerName, String cacheName, boolean originLocal) {\r
             return;\r
         }\r
@@ -231,4 +254,438 @@ public class ClusteringServicesIT {
             return;\r
         }\r
     }\r
+\r
+    @Test\r
+    public void clusterContainerAndGlobalTest() throws CacheExistException, CacheConfigException,\r
+        CacheListenerAddException, InterruptedException {\r
+        String cache1 = "Cache1";\r
+        String cache2 = "Cache2";\r
+        // Lets test the case of caches with same name in different\r
+        // containers (actually global an container case)\r
+        String cache3 = "Cache2";\r
+\r
+        HashSet<cacheMode> cacheModeSet = new HashSet<cacheMode>();\r
+        cacheModeSet.add(cacheMode.NON_TRANSACTIONAL);\r
+        ConcurrentMap cm11 = this.clusterDefaultServices.createCache(cache1, cacheModeSet);\r
+        assertNotNull(cm11);\r
+\r
+        assertTrue(this.clusterDefaultServices.existCache(cache1));\r
+        assertEquals(cm11, this.clusterDefaultServices.getCache(cache1));\r
+\r
+        ConcurrentMap cm12 = this.clusterDefaultServices.createCache(cache2, cacheModeSet);\r
+        ConcurrentMap cm23 = this.clusterGlobalServices.createCache(cache3, cacheModeSet);\r
+\r
+        // Now given cahe2 and cache3 have same name lets make sure\r
+        // they don't return the same reference\r
+        assertNotNull(this.clusterGlobalServices.getCache(cache2));\r
+        // cm12 reference must be different than cm23\r
+        assertTrue(cm12 != cm23);\r
+\r
+        HashSet<String> cacheList = (HashSet<String>) this.clusterDefaultServices\r
+            .getCacheList();\r
+        assertEquals(2, cacheList.size());\r
+        assertTrue(cacheList.contains(cache1));\r
+        assertTrue(cacheList.contains(cache2));\r
+\r
+        assertNotNull(this.clusterDefaultServices.getCacheProperties(cache1));\r
+\r
+        {\r
+            /***********************************/\r
+            /* Testing cacheAware in Container */\r
+            /***********************************/\r
+            Dictionary<String, Object> props = new Hashtable<String, Object>();\r
+            Set<String> propSet = new HashSet<String>();\r
+            propSet.add(cache1);\r
+            propSet.add(cache2);\r
+            props.put("cachenames", propSet);\r
+            CacheAware listener = new CacheAware();\r
+            CacheAware listenerRepeated = new CacheAware();\r
+            ServiceRegistration updateServiceReg = ServiceHelper.registerServiceWReg(ICacheUpdateAware.class, "default",\r
+                                                                                     listener, props);\r
+            assertNotNull(updateServiceReg);\r
+\r
+            // Register another service for the same caches, this\r
+            // should not get any update because we don't allow to\r
+            // override the existing unless before unregistered\r
+            ServiceRegistration updateServiceRegRepeated = ServiceHelper.registerServiceWReg(ICacheUpdateAware.class,\r
+                                                                                             "default",\r
+                                                                                             listenerRepeated, props);\r
+            assertNotNull(updateServiceRegRepeated);\r
+            CountDownLatch res = null;\r
+            List<Update> ups = null;\r
+            Update up = null;\r
+            Integer k1 = new Integer(10);\r
+            Long k2 = new Long(100L);\r
+\r
+            /***********************/\r
+            /* CREATE NEW KEY CASE */\r
+            /***********************/\r
+            // Start monitoring the updates\r
+            res = listener.restart(2);\r
+            // modify the cache\r
+            cm11.put(k1, "foo");\r
+            // Wait\r
+            res.await(100L, TimeUnit.SECONDS);\r
+            // Analyze the updates\r
+            ups = listener.getUpdates();\r
+            assertTrue(ups.size() == 2);\r
+            // Validate that first we get an update (yes even in case of a\r
+            // new value added)\r
+            up = ups.get(0);\r
+            assertTrue(up.t.equals(UpdateType.CHANGED));\r
+            assertTrue(up.key.equals(k1));\r
+            assertTrue(up.value.equals("foo"));\r
+            assertTrue(up.cacheName.equals(cache1));\r
+            // Validate that we then get a create\r
+            up = ups.get(1);\r
+            assertTrue(up.t.equals(UpdateType.ADDED));\r
+            assertTrue(up.key.equals(k1));\r
+            assertNull(up.value);\r
+            assertTrue(up.cacheName.equals(cache1));\r
+\r
+            /*******************************/\r
+            /* UPDATE AN EXISTING KEY CASE */\r
+            /*******************************/\r
+            // Start monitoring the updates\r
+            res = listener.restart(1);\r
+            // modify the cache\r
+            cm11.put(k1, "baz");\r
+            // Wait\r
+            res.await(100L, TimeUnit.SECONDS);\r
+            // Analyze the updates\r
+            ups = listener.getUpdates();\r
+            assertTrue(ups.size() == 1);\r
+            // Validate we get an update with expect fields\r
+            up = ups.get(0);\r
+            assertTrue(up.t.equals(UpdateType.CHANGED));\r
+            assertTrue(up.key.equals(k1));\r
+            assertTrue(up.value.equals("baz"));\r
+            assertTrue(up.cacheName.equals(cache1));\r
+\r
+            /********************************/\r
+            /* REMOVAL OF EXISTING KEY CASE */\r
+            /********************************/\r
+            // Start monitoring the updates\r
+            res = listener.restart(1);\r
+            // modify the cache\r
+            cm11.remove(k1);\r
+            // Wait\r
+            res.await(100L, TimeUnit.SECONDS);\r
+            // Analyze the updates\r
+            ups = listener.getUpdates();\r
+            assertTrue(ups.size() == 1);\r
+            // Validate we get a delete with expected fields\r
+            up = ups.get(0);\r
+            assertTrue(up.t.equals(UpdateType.REMOVED));\r
+            assertTrue(up.key.equals(k1));\r
+            assertNull(up.value);\r
+            assertTrue(up.cacheName.equals(cache1));\r
+\r
+            /***********************/\r
+            /* CREATE NEW KEY CASE */\r
+            /***********************/\r
+            // Start monitoring the updates\r
+            res = listener.restart(2);\r
+            // modify the cache\r
+            cm12.put(k2, new Short((short)15));\r
+            // Wait\r
+            res.await(100L, TimeUnit.SECONDS);\r
+            // Analyze the updates\r
+            ups = listener.getUpdates();\r
+            assertTrue(ups.size() == 2);\r
+            // Validate that first we get an update (yes even in case of a\r
+            // new value added)\r
+            up = ups.get(0);\r
+            assertTrue(up.t.equals(UpdateType.CHANGED));\r
+            assertTrue(up.key.equals(k2));\r
+            assertTrue(up.value.equals(new Short((short)15)));\r
+            assertTrue(up.cacheName.equals(cache2));\r
+            // Validate that we then get a create\r
+            up = ups.get(1);\r
+            assertTrue(up.t.equals(UpdateType.ADDED));\r
+            assertTrue(up.key.equals(k2));\r
+            assertNull(up.value);\r
+            assertTrue(up.cacheName.equals(cache2));\r
+\r
+            /*******************************/\r
+            /* UPDATE AN EXISTING KEY CASE */\r
+            /*******************************/\r
+            // Start monitoring the updates\r
+            res = listener.restart(1);\r
+            // modify the cache\r
+            cm12.put(k2, "BAZ");\r
+            // Wait\r
+            res.await(100L, TimeUnit.SECONDS);\r
+            // Analyze the updates\r
+            ups = listener.getUpdates();\r
+            assertTrue(ups.size() == 1);\r
+            // Validate we get an update with expect fields\r
+            up = ups.get(0);\r
+            assertTrue(up.t.equals(UpdateType.CHANGED));\r
+            assertTrue(up.key.equals(k2));\r
+            assertTrue(up.value.equals("BAZ"));\r
+            assertTrue(up.cacheName.equals(cache2));\r
+\r
+            /********************************/\r
+            /* REMOVAL OF EXISTING KEY CASE */\r
+            /********************************/\r
+            // Start monitoring the updates\r
+            res = listener.restart(1);\r
+            // modify the cache\r
+            cm12.remove(k2);\r
+            // Wait\r
+            res.await(100L, TimeUnit.SECONDS);\r
+            // Analyze the updates\r
+            ups = listener.getUpdates();\r
+            assertTrue(ups.size() == 1);\r
+            // Validate we get a delete with expected fields\r
+            up = ups.get(0);\r
+            assertTrue(up.t.equals(UpdateType.REMOVED));\r
+            assertTrue(up.key.equals(k2));\r
+            assertNull(up.value);\r
+            assertTrue(up.cacheName.equals(cache2));\r
+\r
+            /******************************************************************/\r
+            /* NOW LETS REMOVE THE REGISTRATION AND MAKE SURE NO UPDATS COMES */\r
+            /******************************************************************/\r
+            updateServiceReg.unregister();\r
+            // Start monitoring the updates, noone should come in\r
+            res = listener.restart(1);\r
+\r
+            /***********************/\r
+            /* CREATE NEW KEY CASE */\r
+            /***********************/\r
+            // modify the cache\r
+            cm11.put(k1, "foo");\r
+\r
+            /*******************************/\r
+            /* UPDATE AN EXISTING KEY CASE */\r
+            /*******************************/\r
+            // modify the cache\r
+            cm11.put(k1, "baz");\r
+\r
+            /********************************/\r
+            /* REMOVAL OF EXISTING KEY CASE */\r
+            /********************************/\r
+            // modify the cache\r
+            cm11.remove(k1);\r
+\r
+            /***********************/\r
+            /* CREATE NEW KEY CASE */\r
+            /***********************/\r
+            // modify the cache\r
+            cm12.put(k2, new Short((short)15));\r
+\r
+            /*******************************/\r
+            /* UPDATE AN EXISTING KEY CASE */\r
+            /*******************************/\r
+            // modify the cache\r
+            cm12.put(k2, "BAZ");\r
+\r
+            /********************************/\r
+            /* REMOVAL OF EXISTING KEY CASE */\r
+            /********************************/\r
+            // modify the cache\r
+            cm12.remove(k2);\r
+\r
+\r
+            // Wait to make sure no updates came in, clearly this is\r
+            // error prone as logic, but cannot find a better way than\r
+            // this to make sure updates didn't get in\r
+            res.await(1L, TimeUnit.SECONDS);\r
+            // Analyze the updates\r
+            ups = listener.getUpdates();\r
+            assertTrue(ups.size() == 0);\r
+        }\r
+\r
+        {\r
+            /***********************************/\r
+            /* Testing cacheAware in Global */\r
+            /***********************************/\r
+            Dictionary<String, Object> props = new Hashtable<String, Object>();\r
+            Set<String> propSet = new HashSet<String>();\r
+            propSet.add(cache3);\r
+            props.put("cachenames", propSet);\r
+            CacheAware listener = new CacheAware();\r
+            ServiceRegistration updateServiceReg = ServiceHelper.registerGlobalServiceWReg(ICacheUpdateAware.class,\r
+                                                                                           listener, props);\r
+            assertNotNull(updateServiceReg);\r
+\r
+            CountDownLatch res = null;\r
+            List<Update> ups = null;\r
+            Update up = null;\r
+            Integer k1 = new Integer(10);\r
+\r
+            /***********************/\r
+            /* CREATE NEW KEY CASE */\r
+            /***********************/\r
+            // Start monitoring the updates\r
+            res = listener.restart(2);\r
+            // modify the cache\r
+            cm23.put(k1, "foo");\r
+            // Wait\r
+            res.await(100L, TimeUnit.SECONDS);\r
+            // Analyze the updates\r
+            ups = listener.getUpdates();\r
+            assertTrue(ups.size() == 2);\r
+            // Validate that first we get an update (yes even in case of a\r
+            // new value added)\r
+            up = ups.get(0);\r
+            assertTrue(up.t.equals(UpdateType.CHANGED));\r
+            assertTrue(up.key.equals(k1));\r
+            assertTrue(up.value.equals("foo"));\r
+            assertTrue(up.cacheName.equals(cache3));\r
+            // Validate that we then get a create\r
+            up = ups.get(1);\r
+            assertTrue(up.t.equals(UpdateType.ADDED));\r
+            assertTrue(up.key.equals(k1));\r
+            assertNull(up.value);\r
+            assertTrue(up.cacheName.equals(cache3));\r
+\r
+            /*******************************/\r
+            /* UPDATE AN EXISTING KEY CASE */\r
+            /*******************************/\r
+            // Start monitoring the updates\r
+            res = listener.restart(1);\r
+            // modify the cache\r
+            cm23.put(k1, "baz");\r
+            // Wait\r
+            res.await(100L, TimeUnit.SECONDS);\r
+            // Analyze the updates\r
+            ups = listener.getUpdates();\r
+            assertTrue(ups.size() == 1);\r
+            // Validate we get an update with expect fields\r
+            up = ups.get(0);\r
+            assertTrue(up.t.equals(UpdateType.CHANGED));\r
+            assertTrue(up.key.equals(k1));\r
+            assertTrue(up.value.equals("baz"));\r
+            assertTrue(up.cacheName.equals(cache3));\r
+\r
+            /********************************/\r
+            /* REMOVAL OF EXISTING KEY CASE */\r
+            /********************************/\r
+            // Start monitoring the updates\r
+            res = listener.restart(1);\r
+            // modify the cache\r
+            cm23.remove(k1);\r
+            // Wait\r
+            res.await(100L, TimeUnit.SECONDS);\r
+            // Analyze the updates\r
+            ups = listener.getUpdates();\r
+            assertTrue(ups.size() == 1);\r
+            // Validate we get a delete with expected fields\r
+            up = ups.get(0);\r
+            assertTrue(up.t.equals(UpdateType.REMOVED));\r
+            assertTrue(up.key.equals(k1));\r
+            assertNull(up.value);\r
+            assertTrue(up.cacheName.equals(cache3));\r
+\r
+            /******************************************************************/\r
+            /* NOW LETS REMOVE THE REGISTRATION AND MAKE SURE NO UPDATS COMES */\r
+            /******************************************************************/\r
+            updateServiceReg.unregister();\r
+            // Start monitoring the updates, noone should come in\r
+            res = listener.restart(1);\r
+\r
+            /***********************/\r
+            /* CREATE NEW KEY CASE */\r
+            /***********************/\r
+            // modify the cache\r
+            cm23.put(k1, "foo");\r
+\r
+            /*******************************/\r
+            /* UPDATE AN EXISTING KEY CASE */\r
+            /*******************************/\r
+            // modify the cache\r
+            cm23.put(k1, "baz");\r
+\r
+            /********************************/\r
+            /* REMOVAL OF EXISTING KEY CASE */\r
+            /********************************/\r
+            // modify the cache\r
+            cm23.remove(k1);\r
+\r
+            // Wait to make sure no updates came in, clearly this is\r
+            // error prone as logic, but cannot find a better way than\r
+            // this to make sure updates didn't get in\r
+            res.await(1L, TimeUnit.SECONDS);\r
+            // Analyze the updates\r
+            ups = listener.getUpdates();\r
+            assertTrue(ups.size() == 0);\r
+        }\r
+\r
+        InetAddress addr = this.clusterDefaultServices.getMyAddress();\r
+        assertNotNull(addr);\r
+\r
+        List<InetAddress> addrList = this.clusterDefaultServices\r
+            .getClusteredControllers();\r
+\r
+        this.clusterDefaultServices.destroyCache(cache1);\r
+        assertFalse(this.clusterDefaultServices.existCache(cache1));\r
+    }\r
+\r
+    private class Update {\r
+        Object key;\r
+        Object value;\r
+        String cacheName;\r
+        UpdateType t;\r
+\r
+        Update (UpdateType t, Object key, Object value, String cacheName) {\r
+            this.t = t;\r
+            this.key = key;\r
+            this.value = value;\r
+            this.cacheName = cacheName;\r
+        }\r
+    }\r
+\r
+    private class CacheAware implements ICacheUpdateAware {\r
+        private CopyOnWriteArrayList<Update> gotUpdates;\r
+        private CountDownLatch latch = null;\r
+\r
+        CacheAware() {\r
+            this.gotUpdates = new CopyOnWriteArrayList<Update>();\r
+        }\r
+\r
+\r
+        /**\r
+         * Restart the monitor of the updates on the CacheAware object\r
+         *\r
+         * @param expectedOperations Number of expected updates\r
+         *\r
+         * @return a countdown latch which will be used to wait till the updates are done\r
+         */\r
+        CountDownLatch restart(int expectedOperations) {\r
+            this.gotUpdates.clear();\r
+            this.latch = new CountDownLatch(expectedOperations);\r
+            return this.latch;\r
+        }\r
+\r
+        List<Update> getUpdates() {\r
+            return this.gotUpdates;\r
+        }\r
+\r
+        @Override\r
+        public void entryCreated(Object key, String cacheName, boolean originLocal) {\r
+            log.debug("CACHE[{}] Got an entry created for key:{}", cacheName, key);\r
+            Update u = new Update(UpdateType.ADDED, key, null, cacheName);\r
+            this.gotUpdates.add(u);\r
+            this.latch.countDown();\r
+        }\r
+\r
+        @Override\r
+        public void entryUpdated(Object key, Object newValue, String cacheName, boolean originLocal) {\r
+            log.debug("CACHE[{}] Got an entry updated for key:{} newValue:{}", cacheName, key, newValue);\r
+            Update u = new Update(UpdateType.CHANGED, key, newValue, cacheName);\r
+            this.gotUpdates.add(u);\r
+            this.latch.countDown();\r
+        }\r
+\r
+        @Override\r
+        public void entryDeleted(Object key, String cacheName, boolean originLocal) {\r
+            log.debug("CACHE[{}] Got an entry delete for key:{}", cacheName, key);\r
+            Update u = new Update(UpdateType.REMOVED, key, null, cacheName);\r
+            this.gotUpdates.add(u);\r
+            this.latch.countDown();\r
+        }\r
+    }\r
 }\r
index 79af2cf3989f46a9d1296692f44eff4cdfb59d66..cb96ad5c8ece76fab65b518f9ddea8eb4b67c374 100644 (file)
@@ -84,26 +84,27 @@ public class Activator extends ComponentActivatorAbstractBase {
      */
     public void configureInstance(Component c, Object imp, String containerName) {
         if (imp.equals(ClusterContainerManager.class)) {
-            c.setInterface(new String[] { IClusterContainerServices.class
-                    .getName() }, null);
+            c.setInterface(new String[] { IClusterContainerServices.class.getName() },
+                           null);
 
-            c.add(createServiceDependency().setService(IClusterServices.class)
-                    .setCallbacks("setClusterService", "unsetClusterService")
-                    .setRequired(true));
+            c.add(createServiceDependency()
+                  .setService(IClusterServices.class)
+                  .setCallbacks("setClusterService", "unsetClusterService")
+                  .setRequired(true));
 
             // CacheUpdate services will be none or many so the
             // dependency is optional
-            c.add(createContainerServiceDependency(containerName).setService(
-                    ICacheUpdateAware.class).setCallbacks(
-                    "setCacheUpdateAware", "unsetCacheUpdateAware")
-                    .setRequired(false));
+            c.add(createContainerServiceDependency(containerName)
+                  .setService(ICacheUpdateAware.class)
+                  .setCallbacks("setCacheUpdateAware", "unsetCacheUpdateAware")
+                  .setRequired(false));
 
             // Coordinator change event can be one or many so
             // dependency is optional
-            c.add(createContainerServiceDependency(containerName).setService(
-                    ICoordinatorChangeAware.class).setCallbacks(
-                    "setCoordinatorChangeAware", "unsetCoordinatorChangeAware")
-                    .setRequired(false));
+            c.add(createContainerServiceDependency(containerName)
+                  .setService(ICoordinatorChangeAware.class)
+                  .setCallbacks("setCoordinatorChangeAware", "unsetCoordinatorChangeAware")
+                  .setRequired(false));
         }
     }
 
@@ -120,30 +121,30 @@ public class Activator extends ComponentActivatorAbstractBase {
     public void configureGlobalInstance(Component c, Object imp) {
         if (imp.equals(ClusterManager.class)) {
             // export the service for Apps and Plugins
-            c.setInterface(new String[] { IClusterServices.class.getName() },
-                    null);
+            c.setInterface(new String[] { IClusterServices.class.getName() }, null);
         }
 
         if (imp.equals(ClusterGlobalManager.class)) {
-            c.setInterface(new String[] { IClusterGlobalServices.class
-                    .getName() }, null);
+            c.setInterface(new String[] { IClusterGlobalServices.class.getName() }, null);
 
-            c.add(createServiceDependency().setService(IClusterServices.class)
-                    .setCallbacks("setClusterService", "unsetClusterService")
-                    .setRequired(true));
+            c.add(createServiceDependency()
+                  .setService(IClusterServices.class)
+                  .setCallbacks("setClusterService", "unsetClusterService")
+                  .setRequired(true));
 
             // CacheUpdate services will be none or many so the
             // dependency is optional
-            c.add(createServiceDependency().setService(ICacheUpdateAware.class)
-                    .setCallbacks("setCacheUpdateAware",
-                            "unsetCacheUpdateAware").setRequired(false));
+            c.add(createServiceDependency()
+                  .setService(ICacheUpdateAware.class)
+                  .setCallbacks("setCacheUpdateAware", "unsetCacheUpdateAware")
+                  .setRequired(false));
 
             // Coordinator change event can be one or many so
             // dependency is optional
-            c.add(createServiceDependency().setService(
-                    ICoordinatorChangeAware.class).setCallbacks(
-                    "setCoordinatorChangeAware", "unsetCoordinatorChangeAware")
-                    .setRequired(false));
+            c.add(createServiceDependency()
+                  .setService(ICoordinatorChangeAware.class)
+                  .setCallbacks("setCoordinatorChangeAware", "unsetCoordinatorChangeAware")
+                  .setRequired(false));
         }
     }
 }
index 8211846dd65dccc23126b58f22b930c21ce6d0c4..ce33ac8639cf89977c1c1f7f79d19e15a2da3dae 100644 (file)
@@ -9,8 +9,36 @@
 
 package org.opendaylight.controller.clustering.services_implementation.internal;
 
+import java.util.Map;
+import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class ClusterGlobalManager extends ClusterManagerCommon implements
-        IClusterGlobalServices {
+public class ClusterGlobalManager
+    extends ClusterManagerCommon
+    implements IClusterGlobalServices {
+    protected static final Logger logger = LoggerFactory.getLogger(ClusterGlobalManager.class);
+
+    @Override
+    void setCacheUpdateAware(Map props, ICacheUpdateAware s) {
+        logger.trace("setCacheUpdateAware");
+        if (props.get("containerName") != null) {
+            // If we got a reference with the containerName property
+            // that is not what we are looking for, so filter it out.
+            return;
+        }
+        super.setCacheUpdateAware(props, s);
+    }
+
+    @Override
+    void unsetCacheUpdateAware(Map props, ICacheUpdateAware s) {
+        logger.trace("unsetCacheUpdateAware");
+        if (props.get("containerName") != null) {
+            // If we got a reference with the containerName property
+            // that is not what we are looking for, so filter it out.
+            return;
+        }
+        super.unsetCacheUpdateAware(props, s);
+    }
 }
index 7bf495426f61cfb4d177583b157beafa873d6edf..fabf3e9f1d704413346d97b98c561e8d2adc8a16 100644 (file)
 package org.opendaylight.controller.clustering.services_implementation.internal;
 
 import java.net.InetAddress;
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-
 import javax.transaction.HeuristicMixedException;
 import javax.transaction.HeuristicRollbackException;
 import javax.transaction.NotSupportedException;
@@ -22,7 +26,7 @@ import javax.transaction.RollbackException;
 import javax.transaction.SystemException;
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
-
+import org.apache.felix.dm.Component;
 import org.opendaylight.controller.clustering.services.CacheConfigException;
 import org.opendaylight.controller.clustering.services.CacheExistException;
 import org.opendaylight.controller.clustering.services.CacheListenerAddException;
@@ -35,20 +39,13 @@ import org.opendaylight.controller.clustering.services.ListenRoleChangeAddExcept
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Dictionary;
-import java.util.Collections;
-import java.util.HashSet;
-import org.apache.felix.dm.Component;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 abstract public class ClusterManagerCommon implements IClusterServicesCommon {
     protected String containerName = null;
     private IClusterServices clusterService = null;
     protected static final Logger logger = LoggerFactory
             .getLogger(ClusterManagerCommon.class);
-    private Set<ICacheUpdateAware> cacheUpdateAware = Collections
-            .synchronizedSet(new HashSet<ICacheUpdateAware>());
+    private ConcurrentMap<String, GetUpdatesContainer> cacheUpdateAware =
+        new ConcurrentHashMap<String, GetUpdatesContainer>();
     private Set<ICoordinatorChangeAware> coordinatorChangeAware = Collections
             .synchronizedSet(new HashSet<ICoordinatorChangeAware>());
     private ListenCoordinatorChange coordinatorChangeListener = null;
@@ -85,15 +82,60 @@ abstract public class ClusterManagerCommon implements IClusterServicesCommon {
         }
     }
 
-    void setCacheUpdateAware(ICacheUpdateAware s) {
+    void setCacheUpdateAware(Map props, ICacheUpdateAware s) {
+        logger.trace("CacheUpdateAware being set on container:{}",
+                     this.containerName);
         if (this.cacheUpdateAware != null) {
-            this.cacheUpdateAware.add(s);
+            Set<String> caches = (Set<String>)props.get("cachenames");
+            if (caches != null) {
+                logger.trace("cachenames provided below:");
+                for (String cache : caches) {
+                    if (this.cacheUpdateAware.get(cache) != null) {
+                        logger.error("cachename:{} on container:{} has " +
+                                     "already a listener", cache,
+                                     this.containerName);
+                    } else {
+                        GetUpdatesContainer<?, ?> up =
+                            new GetUpdatesContainer(s, this.containerName,
+                                                    cache);
+                        if (up != null) {
+                            try {
+                                this.clusterService.addListener(this.containerName,
+                                                                cache, up);
+                                this.cacheUpdateAware.put(cache, up);
+                                logger.trace("cachename:{} on container:{} has " +
+                                             "been registered", cache,
+                                             this.containerName);
+                            } catch (CacheListenerAddException exc) {
+                                // Do nothing, the important is that
+                                // we don't register the listener in
+                                // the shadow, and we are not doing
+                                // that.
+                            }
+                        }
+                    }
+                }
+            }
         }
     }
 
-    void unsetCacheUpdateAware(ICacheUpdateAware s) {
+    void unsetCacheUpdateAware(Map props, ICacheUpdateAware s) {
+        logger.trace("CacheUpdateAware being unset on container:{}",
+                     this.containerName);
         if (this.cacheUpdateAware != null) {
-            this.cacheUpdateAware.remove(s);
+            Set<String> caches = (Set<String>)props.get("cachenames");
+            if (caches != null) {
+                logger.trace("cachenames provided below:");
+                GetUpdatesContainer<?, ?> up = null;
+                for (String cache : caches) {
+                    up = this.cacheUpdateAware.get(cache);
+                    if (up != null) {
+                        this.cacheUpdateAware.remove(cache);
+                        this.clusterService.removeListener(this.containerName,
+                                                           cache, up);
+                    }
+                }
+            }
         }
     }
 
diff --git a/opendaylight/clustering/services_implementation/src/main/java/org/opendaylight/controller/clustering/services_implementation/internal/GetUpdatesContainer.java b/opendaylight/clustering/services_implementation/src/main/java/org/opendaylight/controller/clustering/services_implementation/internal/GetUpdatesContainer.java
new file mode 100644 (file)
index 0000000..3444a17
--- /dev/null
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.clustering.services_implementation.internal;
+
+import org.opendaylight.controller.clustering.services.IGetUpdates;
+import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
+
+public class GetUpdatesContainer<K,V> implements IGetUpdates<K,V> {
+    private ICacheUpdateAware<K,V> toBeUpdated;
+    private String containerName;
+    private String cacheName;
+
+    public GetUpdatesContainer(ICacheUpdateAware<K,V> i, String containerName,
+                               String cacheName) {
+        this.toBeUpdated = i;
+        this.containerName = containerName;
+        this.cacheName = cacheName;
+    }
+
+    public ICacheUpdateAware<K,V> whichListener() {
+        return this.toBeUpdated;
+    }
+
+    @Override
+    public void entryCreated(K key, String containerName, String cacheName,
+                             boolean local) {
+        if (this.toBeUpdated != null) {
+            this.toBeUpdated.entryCreated(key, cacheName, local);
+        }
+    }
+
+    @Override
+    public void entryUpdated(K key, V new_value, String containerName,
+                             String cacheName,
+                             boolean local) {
+        if (this.toBeUpdated != null) {
+            this.toBeUpdated.entryUpdated(key, new_value, cacheName, local);
+        }
+    }
+
+    @Override
+    public void entryDeleted(K key, String containerName, String cacheName,
+                             boolean local) {
+        if (this.toBeUpdated != null) {
+            this.toBeUpdated.entryDeleted(key, cacheName, local);
+        }
+    }
+}
diff --git a/opendaylight/clustering/services_implementation/src/main/resources/OSGI-INF/component-cachemanager.xml b/opendaylight/clustering/services_implementation/src/main/resources/OSGI-INF/component-cachemanager.xml
deleted file mode 100644 (file)
index f3baf79..0000000
+++ /dev/null
@@ -1,11 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<scr:component xmlns:scr="http://www.osgi.org/xmlns/scr/v1.1.0"
-               activate="start"
-               deactivate="stop"
-               immediate="true"
-               name="org.opendaylight.controller.clustering.services_implementation.internal.ClusterManager">
-  <implementation class="org.opendaylight.controller.clustering.services_implementation.internal.ClusterManager"/>
-  <service>
-    <provide interface="org.opendaylight.controller.clustering.services.IClusterServices"/>
-  </service>
-</scr:component>
diff --git a/opendaylight/clustering/services_implementation/src/test/resources/logback.xml b/opendaylight/clustering/services_implementation/src/test/resources/logback.xml
new file mode 100644 (file)
index 0000000..6d9dfda
--- /dev/null
@@ -0,0 +1,12 @@
+<configuration scan="true">
+  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+    <encoder>
+      <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
+      </pattern>
+    </encoder>
+  </appender>
+
+  <root level="error">
+    <appender-ref ref="STDOUT" />
+  </root>
+</configuration>
index ef9f2f4bada62350d979e4ee0842e45018a9885e..0237b9c499a599ffb646e7252aa723a8e18cd462 100644 (file)
@@ -65,27 +65,65 @@ public class ServiceHelper {
      * @return true if registration happened, false otherwise
      */
     public static boolean registerGlobalService(Class<?> clazz,
-            Object instance, Dictionary<String, Object> properties) {
+                                                Object instance,
+                                                Dictionary<String, Object> properties) {
+        ServiceRegistration registration = registerGlobalServiceWReg(clazz, instance, properties);
+        if (registration == null) {
+            logger.error("Failed to register {} for instance {}", clazz, instance);
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Register a Service in the OSGi service registry and return the ServiceRegistration
+     *
+     * @param clazz The target class
+     * @param containerName The container name
+     * @param instance of the object exporting the service, be careful
+     * the object must implement/extend clazz else the registration
+     * will fail unless a ServiceFactory is passed as parameter
+     * @param properties The properties to be attached to the service
+     * registration
+     * @return the ServiceRegistration if registration happened, null otherwise
+     */
+    public static ServiceRegistration registerServiceWReg(Class<?> clazz, String containerName,
+                                                          Object instance, Dictionary<String, Object> properties) {
+        if (properties == null) {
+            properties = (Dictionary<String, Object>) new Hashtable<String, Object>();
+        }
+        properties.put("containerName", containerName);
+        return registerGlobalServiceWReg(clazz, instance, properties);
+    }
+
+    /**
+     * Register a Global Service in the OSGi service registry
+     *
+     * @param clazz The target class
+     * @param instance of the object exporting the service, be careful
+     * the object must implement/extend clazz else the registration
+     * will fail unless a ServiceFactory is passed as parameter
+     * @param properties The properties to be attached to the service
+     * registration
+     * @return the ServiceRegistration if registration happened, null otherwise
+     */
+    public static ServiceRegistration registerGlobalServiceWReg(Class<?> clazz,
+                                                                Object instance,
+                                                                Dictionary<String, Object> properties) {
         try {
-            BundleContext bCtx = FrameworkUtil.getBundle(instance.getClass())
-                    .getBundleContext();
+            BundleContext bCtx = FrameworkUtil.getBundle(instance.getClass()).getBundleContext();
             if (bCtx == null) {
                 logger.error("Could not retrieve the BundleContext");
-                return false;
+                return null;
             }
 
-            ServiceRegistration registration = bCtx.registerService(clazz
-                    .getName(), instance, properties);
-            if (registration == null) {
-                logger.error("Failed to register {} for instance {}", clazz,
-                        instance);
-            }
-            return true;
+            ServiceRegistration registration = bCtx.registerService(clazz.getName(), instance, properties);
+            return registration;
         } catch (Exception e) {
             logger.error("Exception {} while registering the service {}",
-                    e.getMessage(), instance.toString());
+                         e.getMessage(), instance.toString());
         }
-        return false;
+        return null;
     }
 
     /**