-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.clustering.services_implementation.internal;\r
-\r
-import static org.junit.Assert.assertEquals;\r
-import static org.junit.Assert.assertFalse;\r
-import static org.junit.Assert.assertNotNull;\r
-import static org.junit.Assert.assertNull;\r
-import static org.junit.Assert.assertTrue;\r
-import static org.ops4j.pax.exam.CoreOptions.junitBundles;\r
-import static org.ops4j.pax.exam.CoreOptions.mavenBundle;\r
-import static org.ops4j.pax.exam.CoreOptions.options;\r
-import static org.ops4j.pax.exam.CoreOptions.systemPackages;\r
-import static org.ops4j.pax.exam.CoreOptions.systemProperty;\r
-\r
-import java.util.List;\r
-import java.util.concurrent.CopyOnWriteArrayList;\r
-import java.util.concurrent.TimeUnit;\r
-import java.net.InetAddress;\r
-import java.util.Dictionary;\r
-import java.util.HashSet;\r
-import java.util.Hashtable;\r
-import java.util.Set;\r
-import java.util.concurrent.ConcurrentMap;\r
-\r
-import javax.inject.Inject;\r
-\r
-import org.junit.Before;\r
-import org.junit.Test;\r
-import org.junit.runner.RunWith;\r
-import org.opendaylight.controller.clustering.services.CacheConfigException;\r
-import org.opendaylight.controller.clustering.services.CacheExistException;\r
-import org.opendaylight.controller.clustering.services.CacheListenerAddException;\r
-import org.opendaylight.controller.clustering.services.IClusterGlobalServices;\r
-import org.opendaylight.controller.clustering.services.IClusterServices;\r
-import org.opendaylight.controller.clustering.services.IClusterContainerServices;\r
-import org.opendaylight.controller.clustering.services.IClusterServices.cacheMode;\r
-import org.opendaylight.controller.clustering.services.IGetUpdates;\r
-import org.opendaylight.controller.clustering.services.ICacheUpdateAware;\r
-import org.opendaylight.controller.sal.utils.ServiceHelper;\r
-import org.opendaylight.controller.sal.core.UpdateType;\r
-import org.ops4j.pax.exam.Option;\r
-import org.ops4j.pax.exam.Configuration;\r
-import org.ops4j.pax.exam.junit.PaxExam;\r
-import org.ops4j.pax.exam.util.PathUtils;\r
-import org.osgi.framework.Bundle;\r
-import org.osgi.framework.BundleContext;\r
-import org.osgi.framework.ServiceRegistration;\r
-import org.slf4j.Logger;\r
-import org.slf4j.LoggerFactory;\r
-import java.util.concurrent.CountDownLatch;\r
-\r
-@RunWith(PaxExam.class)\r
-public class ClusteringServicesIT {\r
- private Logger log = LoggerFactory\r
- .getLogger(ClusteringServicesIT.class);\r
- // get the OSGI bundle context\r
- @Inject\r
- private BundleContext bc;\r
- private IClusterServices clusterServices = null;\r
- private IClusterContainerServices clusterDefaultServices = null;\r
- private IClusterGlobalServices clusterGlobalServices = null;\r
-\r
- // Configure the OSGi container\r
- @Configuration\r
- public Option[] config() {\r
- return options(\r
- //\r
- systemProperty("logback.configurationFile").value(\r
- "file:" + PathUtils.getBaseDir()\r
- + "/src/test/resources/logback.xml"),\r
- // To start OSGi console for inspection remotely\r
- systemProperty("osgi.console").value("2401"),\r
- // Set the systemPackages (used by clustering)\r
- systemPackages("sun.reflect", "sun.reflect.misc", "sun.misc"),\r
- // List framework bundles\r
- mavenBundle("equinoxSDK381",\r
- "org.eclipse.equinox.console").versionAsInProject(),\r
- mavenBundle("equinoxSDK381",\r
- "org.eclipse.equinox.util").versionAsInProject(),\r
- mavenBundle("equinoxSDK381",\r
- "org.eclipse.osgi.services").versionAsInProject(),\r
- mavenBundle("equinoxSDK381",\r
- "org.eclipse.equinox.ds").versionAsInProject(),\r
- mavenBundle("equinoxSDK381",\r
- "org.apache.felix.gogo.command").versionAsInProject(),\r
- mavenBundle("equinoxSDK381",\r
- "org.apache.felix.gogo.runtime").versionAsInProject(),\r
- mavenBundle("equinoxSDK381",\r
- "org.apache.felix.gogo.shell").versionAsInProject(),\r
- // List logger bundles\r
- mavenBundle("org.slf4j", "slf4j-api").versionAsInProject(),\r
- mavenBundle("org.slf4j", "log4j-over-slf4j").versionAsInProject(),\r
- mavenBundle("ch.qos.logback", "logback-core").versionAsInProject(),\r
- mavenBundle("ch.qos.logback", "logback-classic").versionAsInProject(),\r
- // List all the bundles on which the test case depends\r
- mavenBundle("org.opendaylight.controller",\r
- "clustering.services").versionAsInProject(),\r
- mavenBundle("org.opendaylight.controller",\r
- "clustering.services-implementation").versionAsInProject(),\r
- mavenBundle("org.opendaylight.controller", "sal").versionAsInProject(),\r
- mavenBundle("org.opendaylight.controller",\r
- "sal.implementation").versionAsInProject(),\r
- mavenBundle("org.opendaylight.controller", "configuration").versionAsInProject(),\r
- mavenBundle("org.opendaylight.controller", "containermanager").versionAsInProject(),\r
- mavenBundle("org.opendaylight.controller",\r
- "containermanager.it.implementation").versionAsInProject(),\r
- mavenBundle("org.jboss.spec.javax.transaction",\r
- "jboss-transaction-api_1.1_spec").versionAsInProject(),\r
- mavenBundle("org.apache.commons", "commons-lang3").versionAsInProject(),\r
- mavenBundle("org.apache.felix",\r
- "org.apache.felix.dependencymanager").versionAsInProject(),\r
- mavenBundle("org.apache.felix",\r
- "org.apache.felix.dependencymanager.shell").versionAsInProject(),\r
- mavenBundle("eclipselink", "javax.resource").versionAsInProject(),\r
- junitBundles());\r
- }\r
-\r
- private String stateToString(int state) {\r
- switch (state) {\r
- case Bundle.ACTIVE:\r
- return "ACTIVE";\r
- case Bundle.INSTALLED:\r
- return "INSTALLED";\r
- case Bundle.RESOLVED:\r
- return "RESOLVED";\r
- case Bundle.UNINSTALLED:\r
- return "UNINSTALLED";\r
- default:\r
- return "Not CONVERTED";\r
- }\r
- }\r
-\r
- @Before\r
- public void areWeReady() {\r
- assertNotNull(bc);\r
- boolean debugit = false;\r
- Bundle b[] = bc.getBundles();\r
- for (Bundle element : b) {\r
- int state = element.getState();\r
- if (state != Bundle.ACTIVE && state != Bundle.RESOLVED) {\r
- log.debug("Bundle:" + element.getSymbolicName() + " state:"\r
- + stateToString(state));\r
- debugit = true;\r
- }\r
- }\r
- if (debugit) {\r
- log.debug("Do some debugging because some bundle is "\r
- + "unresolved");\r
- }\r
-\r
- // Assert if true, if false we are good to go!\r
- assertFalse(debugit);\r
-\r
- this.clusterServices = (IClusterServices)ServiceHelper\r
- .getGlobalInstance(IClusterServices.class, this);\r
- assertNotNull(this.clusterServices);\r
-\r
- this.clusterDefaultServices = (IClusterContainerServices)ServiceHelper\r
- .getInstance(IClusterContainerServices.class, "default", this);\r
- assertNotNull(this.clusterDefaultServices);\r
-\r
- this.clusterGlobalServices = (IClusterGlobalServices)ServiceHelper\r
- .getGlobalInstance(IClusterGlobalServices.class, this);\r
- assertNotNull(this.clusterGlobalServices);\r
- }\r
-\r
- @Test\r
- public void clusterTest() throws CacheExistException, CacheConfigException,\r
- CacheListenerAddException {\r
-\r
- String container1 = "Container1";\r
- String container2 = "Container2";\r
- String cache1 = "Cache1";\r
- String cache2 = "Cache2";\r
- String cache3 = "Cache3";\r
-\r
- HashSet<cacheMode> cacheModeSet = new HashSet<cacheMode>();\r
- cacheModeSet.add(cacheMode.NON_TRANSACTIONAL);\r
- ConcurrentMap cm11 = this.clusterServices.createCache(container1,\r
- cache1, cacheModeSet);\r
- assertNotNull(cm11);\r
-\r
- assertNull(this.clusterServices.getCache(container2, cache2));\r
- assertEquals(cm11, this.clusterServices.getCache(container1, cache1));\r
-\r
- assertFalse(this.clusterServices.existCache(container2, cache2));\r
- assertTrue(this.clusterServices.existCache(container1, cache1));\r
-\r
- ConcurrentMap cm12 = this.clusterServices.createCache(container1,\r
- cache2, cacheModeSet);\r
- ConcurrentMap cm23 = this.clusterServices.createCache(container2,\r
- cache3, cacheModeSet);\r
-\r
- HashSet<String> cacheList = (HashSet<String>) this.clusterServices\r
- .getCacheList(container1);\r
- assertEquals(2, cacheList.size());\r
- assertTrue(cacheList.contains(cache1));\r
- assertTrue(cacheList.contains(cache2));\r
- assertFalse(cacheList.contains(cache3));\r
-\r
- assertNotNull(this.clusterServices.getCacheProperties(container1,\r
- cache1));\r
-\r
- HashSet<IGetUpdates<?, ?>> listeners = (HashSet<IGetUpdates<?, ?>>) this.clusterServices\r
- .getListeners(container1, cache1);\r
- assertEquals(0, listeners.size());\r
-\r
- IGetUpdates<?, ?> getUpdate1 = new GetUpdates();\r
- this.clusterServices.addListener(container1, cache1, getUpdate1);\r
- listeners = (HashSet<IGetUpdates<?, ?>>) this.clusterServices\r
- .getListeners(container1, cache1);\r
- assertEquals(1, listeners.size());\r
- this.clusterServices.addListener(container1, cache1, new GetUpdates());\r
- listeners = (HashSet<IGetUpdates<?, ?>>) this.clusterServices\r
- .getListeners(container1, cache1);\r
- assertEquals(2, listeners.size());\r
-\r
- listeners = (HashSet<IGetUpdates<?, ?>>) this.clusterServices\r
- .getListeners(container2, cache3);\r
- assertEquals(0, listeners.size());\r
-\r
- this.clusterServices.removeListener(container1, cache1, getUpdate1);\r
- listeners = (HashSet<IGetUpdates<?, ?>>) this.clusterServices\r
- .getListeners(container1, cache1);\r
- assertEquals(1, listeners.size());\r
-\r
- InetAddress addr = this.clusterServices.getMyAddress();\r
- assertNotNull(addr);\r
-\r
- List<InetAddress> addrList = this.clusterServices\r
- .getClusteredControllers();\r
-\r
- this.clusterServices.destroyCache(container1, cache1);\r
- assertFalse(this.clusterServices.existCache(container1, cache1));\r
-\r
- }\r
-\r
- private class GetUpdates implements IGetUpdates<Integer, String> {\r
-\r
- @Override\r
- public void entryCreated(Integer key, String containerName,\r
- String cacheName, boolean originLocal) {\r
- return;\r
- }\r
-\r
- @Override\r
- public void entryUpdated(Integer key, String newValue,\r
- String containerName, String cacheName, boolean originLocal) {\r
- return;\r
- }\r
-\r
- @Override\r
- public void entryDeleted(Integer key, String containerName,\r
- String cacheName, boolean originLocal) {\r
- return;\r
- }\r
- }\r
-\r
- @Test\r
- public void clusterContainerAndGlobalTest() throws CacheExistException, CacheConfigException,\r
- CacheListenerAddException, InterruptedException {\r
- String cache1 = "Cache1";\r
- String cache2 = "Cache2";\r
- // Lets test the case of caches with same name in different\r
- // containers (actually global an container case)\r
- String cache3 = "Cache2";\r
-\r
- HashSet<cacheMode> cacheModeSet = new HashSet<cacheMode>();\r
- cacheModeSet.add(cacheMode.NON_TRANSACTIONAL);\r
- ConcurrentMap cm11 = this.clusterDefaultServices.createCache(cache1, cacheModeSet);\r
- assertNotNull(cm11);\r
-\r
- assertTrue(this.clusterDefaultServices.existCache(cache1));\r
- assertEquals(cm11, this.clusterDefaultServices.getCache(cache1));\r
-\r
- ConcurrentMap cm12 = this.clusterDefaultServices.createCache(cache2, cacheModeSet);\r
- ConcurrentMap cm23 = this.clusterGlobalServices.createCache(cache3, cacheModeSet);\r
-\r
- // Now given cahe2 and cache3 have same name lets make sure\r
- // they don't return the same reference\r
- assertNotNull(this.clusterGlobalServices.getCache(cache2));\r
- // cm12 reference must be different than cm23\r
- assertTrue(cm12 != cm23);\r
-\r
- HashSet<String> cacheList = (HashSet<String>) this.clusterDefaultServices\r
- .getCacheList();\r
- assertEquals(2, cacheList.size());\r
- assertTrue(cacheList.contains(cache1));\r
- assertTrue(cacheList.contains(cache2));\r
-\r
- assertNotNull(this.clusterDefaultServices.getCacheProperties(cache1));\r
-\r
- {\r
- /***********************************/\r
- /* Testing cacheAware in Container */\r
- /***********************************/\r
- Dictionary<String, Object> props = new Hashtable<String, Object>();\r
- Set<String> propSet = new HashSet<String>();\r
- propSet.add(cache1);\r
- propSet.add(cache2);\r
- props.put("cachenames", propSet);\r
- CacheAware listener = new CacheAware();\r
- CacheAware listenerRepeated = new CacheAware();\r
- ServiceRegistration updateServiceReg = ServiceHelper.registerServiceWReg(ICacheUpdateAware.class, "default",\r
- listener, props);\r
- assertNotNull(updateServiceReg);\r
-\r
- // Register another service for the same caches, this\r
- // should not get any update because we don't allow to\r
- // override the existing unless before unregistered\r
- ServiceRegistration updateServiceRegRepeated = ServiceHelper.registerServiceWReg(ICacheUpdateAware.class,\r
- "default",\r
- listenerRepeated, props);\r
- assertNotNull(updateServiceRegRepeated);\r
- CountDownLatch res = null;\r
- List<Update> ups = null;\r
- Update up = null;\r
- Integer k1 = new Integer(10);\r
- Long k2 = new Long(100L);\r
-\r
- /***********************/\r
- /* CREATE NEW KEY CASE */\r
- /***********************/\r
- // Start monitoring the updates\r
- res = listener.restart(2);\r
- // modify the cache\r
- cm11.put(k1, "foo");\r
- // Wait\r
- res.await(100L, TimeUnit.SECONDS);\r
- // Analyze the updates\r
- ups = listener.getUpdates();\r
- assertTrue(ups.size() == 2);\r
- // Validate that first we get an update (yes even in case of a\r
- // new value added)\r
- up = ups.get(0);\r
- assertTrue(up.t.equals(UpdateType.CHANGED));\r
- assertTrue(up.key.equals(k1));\r
- assertTrue(up.value.equals("foo"));\r
- assertTrue(up.cacheName.equals(cache1));\r
- // Validate that we then get a create\r
- up = ups.get(1);\r
- assertTrue(up.t.equals(UpdateType.ADDED));\r
- assertTrue(up.key.equals(k1));\r
- assertNull(up.value);\r
- assertTrue(up.cacheName.equals(cache1));\r
-\r
- /*******************************/\r
- /* UPDATE AN EXISTING KEY CASE */\r
- /*******************************/\r
- // Start monitoring the updates\r
- res = listener.restart(1);\r
- // modify the cache\r
- cm11.put(k1, "baz");\r
- // Wait\r
- res.await(100L, TimeUnit.SECONDS);\r
- // Analyze the updates\r
- ups = listener.getUpdates();\r
- assertTrue(ups.size() == 1);\r
- // Validate we get an update with expect fields\r
- up = ups.get(0);\r
- assertTrue(up.t.equals(UpdateType.CHANGED));\r
- assertTrue(up.key.equals(k1));\r
- assertTrue(up.value.equals("baz"));\r
- assertTrue(up.cacheName.equals(cache1));\r
-\r
- /**********************************/\r
- /* RE-UPDATE AN EXISTING KEY CASE */\r
- /**********************************/\r
- // Start monitoring the updates\r
- res = listener.restart(1);\r
- // modify the cache\r
- cm11.put(k1, "baz");\r
- // Wait\r
- res.await(100L, TimeUnit.SECONDS);\r
- // Analyze the updates\r
- ups = listener.getUpdates();\r
- assertTrue(ups.size() == 1);\r
- // Validate we get an update with expect fields\r
- up = ups.get(0);\r
- assertTrue(up.t.equals(UpdateType.CHANGED));\r
- assertTrue(up.key.equals(k1));\r
- assertTrue(up.value.equals("baz"));\r
- assertTrue(up.cacheName.equals(cache1));\r
-\r
- /********************************/\r
- /* REMOVAL OF EXISTING KEY CASE */\r
- /********************************/\r
- // Start monitoring the updates\r
- res = listener.restart(1);\r
- // modify the cache\r
- cm11.remove(k1);\r
- // Wait\r
- res.await(100L, TimeUnit.SECONDS);\r
- // Analyze the updates\r
- ups = listener.getUpdates();\r
- assertTrue(ups.size() == 1);\r
- // Validate we get a delete with expected fields\r
- up = ups.get(0);\r
- assertTrue(up.t.equals(UpdateType.REMOVED));\r
- assertTrue(up.key.equals(k1));\r
- assertNull(up.value);\r
- assertTrue(up.cacheName.equals(cache1));\r
-\r
- /***********************/\r
- /* CREATE NEW KEY CASE */\r
- /***********************/\r
- // Start monitoring the updates\r
- res = listener.restart(2);\r
- // modify the cache\r
- cm12.put(k2, new Short((short)15));\r
- // Wait\r
- res.await(100L, TimeUnit.SECONDS);\r
- // Analyze the updates\r
- ups = listener.getUpdates();\r
- assertTrue(ups.size() == 2);\r
- // Validate that first we get an update (yes even in case of a\r
- // new value added)\r
- up = ups.get(0);\r
- assertTrue(up.t.equals(UpdateType.CHANGED));\r
- assertTrue(up.key.equals(k2));\r
- assertTrue(up.value.equals(new Short((short)15)));\r
- assertTrue(up.cacheName.equals(cache2));\r
- // Validate that we then get a create\r
- up = ups.get(1);\r
- assertTrue(up.t.equals(UpdateType.ADDED));\r
- assertTrue(up.key.equals(k2));\r
- assertNull(up.value);\r
- assertTrue(up.cacheName.equals(cache2));\r
-\r
- /*******************************/\r
- /* UPDATE AN EXISTING KEY CASE */\r
- /*******************************/\r
- // Start monitoring the updates\r
- res = listener.restart(1);\r
- // modify the cache\r
- cm12.put(k2, "BAZ");\r
- // Wait\r
- res.await(100L, TimeUnit.SECONDS);\r
- // Analyze the updates\r
- ups = listener.getUpdates();\r
- assertTrue(ups.size() == 1);\r
- // Validate we get an update with expect fields\r
- up = ups.get(0);\r
- assertTrue(up.t.equals(UpdateType.CHANGED));\r
- assertTrue(up.key.equals(k2));\r
- assertTrue(up.value.equals("BAZ"));\r
- assertTrue(up.cacheName.equals(cache2));\r
-\r
- /********************************/\r
- /* REMOVAL OF EXISTING KEY CASE */\r
- /********************************/\r
- // Start monitoring the updates\r
- res = listener.restart(1);\r
- // modify the cache\r
- cm12.remove(k2);\r
- // Wait\r
- res.await(100L, TimeUnit.SECONDS);\r
- // Analyze the updates\r
- ups = listener.getUpdates();\r
- assertTrue(ups.size() == 1);\r
- // Validate we get a delete with expected fields\r
- up = ups.get(0);\r
- assertTrue(up.t.equals(UpdateType.REMOVED));\r
- assertTrue(up.key.equals(k2));\r
- assertNull(up.value);\r
- assertTrue(up.cacheName.equals(cache2));\r
-\r
- /******************************************************************/\r
- /* NOW LETS REMOVE THE REGISTRATION AND MAKE SURE NO UPDATS COMES */\r
- /******************************************************************/\r
- updateServiceReg.unregister();\r
- // Start monitoring the updates, noone should come in\r
- res = listener.restart(1);\r
-\r
- /***********************/\r
- /* CREATE NEW KEY CASE */\r
- /***********************/\r
- // modify the cache\r
- cm11.put(k1, "foo");\r
-\r
- /*******************************/\r
- /* UPDATE AN EXISTING KEY CASE */\r
- /*******************************/\r
- // modify the cache\r
- cm11.put(k1, "baz");\r
-\r
- /********************************/\r
- /* REMOVAL OF EXISTING KEY CASE */\r
- /********************************/\r
- // modify the cache\r
- cm11.remove(k1);\r
-\r
- /***********************/\r
- /* CREATE NEW KEY CASE */\r
- /***********************/\r
- // modify the cache\r
- cm12.put(k2, new Short((short)15));\r
-\r
- /*******************************/\r
- /* UPDATE AN EXISTING KEY CASE */\r
- /*******************************/\r
- // modify the cache\r
- cm12.put(k2, "BAZ");\r
-\r
- /********************************/\r
- /* REMOVAL OF EXISTING KEY CASE */\r
- /********************************/\r
- // modify the cache\r
- cm12.remove(k2);\r
-\r
-\r
- // Wait to make sure no updates came in, clearly this is\r
- // error prone as logic, but cannot find a better way than\r
- // this to make sure updates didn't get in\r
- res.await(1L, TimeUnit.SECONDS);\r
- // Analyze the updates\r
- ups = listener.getUpdates();\r
- assertTrue(ups.size() == 0);\r
- }\r
-\r
- {\r
- /***********************************/\r
- /* Testing cacheAware in Global */\r
- /***********************************/\r
- Dictionary<String, Object> props = new Hashtable<String, Object>();\r
- Set<String> propSet = new HashSet<String>();\r
- propSet.add(cache3);\r
- props.put("cachenames", propSet);\r
- CacheAware listener = new CacheAware();\r
- ServiceRegistration updateServiceReg = ServiceHelper.registerGlobalServiceWReg(ICacheUpdateAware.class,\r
- listener, props);\r
- assertNotNull(updateServiceReg);\r
-\r
- CountDownLatch res = null;\r
- List<Update> ups = null;\r
- Update up = null;\r
- Integer k1 = new Integer(10);\r
-\r
- /***********************/\r
- /* CREATE NEW KEY CASE */\r
- /***********************/\r
- // Start monitoring the updates\r
- res = listener.restart(2);\r
- // modify the cache\r
- cm23.put(k1, "foo");\r
- // Wait\r
- res.await(100L, TimeUnit.SECONDS);\r
- // Analyze the updates\r
- ups = listener.getUpdates();\r
- assertTrue(ups.size() == 2);\r
- // Validate that first we get an update (yes even in case of a\r
- // new value added)\r
- up = ups.get(0);\r
- assertTrue(up.t.equals(UpdateType.CHANGED));\r
- assertTrue(up.key.equals(k1));\r
- assertTrue(up.value.equals("foo"));\r
- assertTrue(up.cacheName.equals(cache3));\r
- // Validate that we then get a create\r
- up = ups.get(1);\r
- assertTrue(up.t.equals(UpdateType.ADDED));\r
- assertTrue(up.key.equals(k1));\r
- assertNull(up.value);\r
- assertTrue(up.cacheName.equals(cache3));\r
-\r
- /*******************************/\r
- /* UPDATE AN EXISTING KEY CASE */\r
- /*******************************/\r
- // Start monitoring the updates\r
- res = listener.restart(1);\r
- // modify the cache\r
- cm23.put(k1, "baz");\r
- // Wait\r
- res.await(100L, TimeUnit.SECONDS);\r
- // Analyze the updates\r
- ups = listener.getUpdates();\r
- assertTrue(ups.size() == 1);\r
- // Validate we get an update with expect fields\r
- up = ups.get(0);\r
- assertTrue(up.t.equals(UpdateType.CHANGED));\r
- assertTrue(up.key.equals(k1));\r
- assertTrue(up.value.equals("baz"));\r
- assertTrue(up.cacheName.equals(cache3));\r
-\r
- /********************************/\r
- /* REMOVAL OF EXISTING KEY CASE */\r
- /********************************/\r
- // Start monitoring the updates\r
- res = listener.restart(1);\r
- // modify the cache\r
- cm23.remove(k1);\r
- // Wait\r
- res.await(100L, TimeUnit.SECONDS);\r
- // Analyze the updates\r
- ups = listener.getUpdates();\r
- assertTrue(ups.size() == 1);\r
- // Validate we get a delete with expected fields\r
- up = ups.get(0);\r
- assertTrue(up.t.equals(UpdateType.REMOVED));\r
- assertTrue(up.key.equals(k1));\r
- assertNull(up.value);\r
- assertTrue(up.cacheName.equals(cache3));\r
-\r
- /******************************************************************/\r
- /* NOW LETS REMOVE THE REGISTRATION AND MAKE SURE NO UPDATS COMES */\r
- /******************************************************************/\r
- updateServiceReg.unregister();\r
- // Start monitoring the updates, noone should come in\r
- res = listener.restart(1);\r
-\r
- /***********************/\r
- /* CREATE NEW KEY CASE */\r
- /***********************/\r
- // modify the cache\r
- cm23.put(k1, "foo");\r
-\r
- /*******************************/\r
- /* UPDATE AN EXISTING KEY CASE */\r
- /*******************************/\r
- // modify the cache\r
- cm23.put(k1, "baz");\r
-\r
- /********************************/\r
- /* REMOVAL OF EXISTING KEY CASE */\r
- /********************************/\r
- // modify the cache\r
- cm23.remove(k1);\r
-\r
- // Wait to make sure no updates came in, clearly this is\r
- // error prone as logic, but cannot find a better way than\r
- // this to make sure updates didn't get in\r
- res.await(1L, TimeUnit.SECONDS);\r
- // Analyze the updates\r
- ups = listener.getUpdates();\r
- assertTrue(ups.size() == 0);\r
- }\r
-\r
- InetAddress addr = this.clusterDefaultServices.getMyAddress();\r
- assertNotNull(addr);\r
-\r
- List<InetAddress> addrList = this.clusterDefaultServices\r
- .getClusteredControllers();\r
-\r
- this.clusterDefaultServices.destroyCache(cache1);\r
- assertFalse(this.clusterDefaultServices.existCache(cache1));\r
- }\r
-\r
- private class Update {\r
- Object key;\r
- Object value;\r
- String cacheName;\r
- UpdateType t;\r
-\r
- Update (UpdateType t, Object key, Object value, String cacheName) {\r
- this.t = t;\r
- this.key = key;\r
- this.value = value;\r
- this.cacheName = cacheName;\r
- }\r
- }\r
-\r
- private class CacheAware implements ICacheUpdateAware {\r
- private CopyOnWriteArrayList<Update> gotUpdates;\r
- private CountDownLatch latch = null;\r
-\r
- CacheAware() {\r
- this.gotUpdates = new CopyOnWriteArrayList<Update>();\r
- }\r
-\r
-\r
- /**\r
- * Restart the monitor of the updates on the CacheAware object\r
- *\r
- * @param expectedOperations Number of expected updates\r
- *\r
- * @return a countdown latch which will be used to wait till the updates are done\r
- */\r
- CountDownLatch restart(int expectedOperations) {\r
- this.gotUpdates.clear();\r
- this.latch = new CountDownLatch(expectedOperations);\r
- return this.latch;\r
- }\r
-\r
- List<Update> getUpdates() {\r
- return this.gotUpdates;\r
- }\r
-\r
- @Override\r
- public void entryCreated(Object key, String cacheName, boolean originLocal) {\r
- log.debug("CACHE[{}] Got an entry created for key:{}", cacheName, key);\r
- Update u = new Update(UpdateType.ADDED, key, null, cacheName);\r
- this.gotUpdates.add(u);\r
- this.latch.countDown();\r
- }\r
-\r
- @Override\r
- public void entryUpdated(Object key, Object newValue, String cacheName, boolean originLocal) {\r
- log.debug("CACHE[{}] Got an entry updated for key:{} newValue:{}", cacheName, key, newValue);\r
- Update u = new Update(UpdateType.CHANGED, key, newValue, cacheName);\r
- this.gotUpdates.add(u);\r
- this.latch.countDown();\r
- }\r
-\r
- @Override\r
- public void entryDeleted(Object key, String cacheName, boolean originLocal) {\r
- log.debug("CACHE[{}] Got an entry delete for key:{}", cacheName, key);\r
- Update u = new Update(UpdateType.REMOVED, key, null, cacheName);\r
- this.gotUpdates.add(u);\r
- this.latch.countDown();\r
- }\r
- }\r
-}\r