import static org.ops4j.pax.exam.CoreOptions.systemPackages;\r
import static org.ops4j.pax.exam.CoreOptions.systemProperty;\r
\r
+import java.util.List;\r
+import java.util.concurrent.CopyOnWriteArrayList;\r
+import java.util.concurrent.TimeUnit;\r
import java.net.InetAddress;\r
+import java.util.Dictionary;\r
import java.util.HashSet;\r
+import java.util.Hashtable;\r
+import java.util.Set;\r
import java.util.List;\r
import java.util.concurrent.ConcurrentMap;\r
\r
import org.opendaylight.controller.clustering.services.CacheConfigException;\r
import org.opendaylight.controller.clustering.services.CacheExistException;\r
import org.opendaylight.controller.clustering.services.CacheListenerAddException;\r
+import org.opendaylight.controller.clustering.services.IClusterGlobalServices;\r
import org.opendaylight.controller.clustering.services.IClusterServices;\r
+import org.opendaylight.controller.clustering.services.IClusterContainerServices;\r
import org.opendaylight.controller.clustering.services.IClusterServices.cacheMode;\r
import org.opendaylight.controller.clustering.services.IGetUpdates;\r
+import org.opendaylight.controller.clustering.services.ICacheUpdateAware;\r
+import org.opendaylight.controller.sal.utils.ServiceHelper;\r
+import org.opendaylight.controller.sal.core.UpdateType;\r
import org.ops4j.pax.exam.Option;\r
import org.ops4j.pax.exam.junit.Configuration;\r
import org.ops4j.pax.exam.junit.PaxExam;\r
import org.osgi.framework.Bundle;\r
import org.osgi.framework.BundleContext;\r
import org.osgi.framework.ServiceReference;\r
+import org.osgi.framework.ServiceRegistration;\r
import org.slf4j.Logger;\r
import org.slf4j.LoggerFactory;\r
+import java.util.concurrent.CountDownLatch;\r
\r
@RunWith(PaxExam.class)\r
public class ClusteringServicesIT {\r
private Logger log = LoggerFactory\r
- .getLogger(ClusteringServicesIT.class);\r
+ .getLogger(ClusteringServicesIT.class);\r
// get the OSGI bundle context\r
@Inject\r
private BundleContext bc;\r
-\r
private IClusterServices clusterServices = null;\r
+ private IClusterContainerServices clusterDefaultServices = null;\r
+ private IClusterGlobalServices clusterGlobalServices = null;\r
\r
// Configure the OSGi container\r
@Configuration\r
public Option[] config() {\r
return options(\r
- //\r
- systemProperty("logback.configurationFile").value(\r
- "file:" + PathUtils.getBaseDir()\r
- + "/src/test/resources/logback.xml"),\r
- // To start OSGi console for inspection remotely\r
- systemProperty("osgi.console").value("2401"),\r
- // Set the systemPackages (used by clustering)\r
- systemPackages("sun.reflect", "sun.reflect.misc", "sun.misc"),\r
- // List framework bundles\r
- mavenBundle("equinoxSDK381", "org.eclipse.equinox.console",\r
- "1.0.0.v20120522-1841"),\r
- mavenBundle("equinoxSDK381", "org.eclipse.equinox.util",\r
- "1.0.400.v20120522-2049"),\r
- mavenBundle("equinoxSDK381", "org.eclipse.osgi.services",\r
- "3.3.100.v20120522-1822"),\r
- mavenBundle("equinoxSDK381", "org.eclipse.equinox.ds",\r
- "1.4.0.v20120522-1841"),\r
- mavenBundle("equinoxSDK381", "org.apache.felix.gogo.command",\r
- "0.8.0.v201108120515"),\r
- mavenBundle("equinoxSDK381", "org.apache.felix.gogo.runtime",\r
- "0.8.0.v201108120515"),\r
- mavenBundle("equinoxSDK381", "org.apache.felix.gogo.shell",\r
- "0.8.0.v201110170705"),\r
- // List logger bundles\r
- mavenBundle("org.slf4j", "slf4j-api").versionAsInProject(),\r
- mavenBundle("org.slf4j", "log4j-over-slf4j").versionAsInProject(),\r
- mavenBundle("ch.qos.logback", "logback-core").versionAsInProject(),\r
- mavenBundle("ch.qos.logback", "logback-classic").versionAsInProject(),\r
- // List all the bundles on which the test case depends\r
- mavenBundle("org.opendaylight.controller",\r
+ //\r
+ systemProperty("logback.configurationFile").value(\r
+ "file:" + PathUtils.getBaseDir()\r
+ + "/src/test/resources/logback.xml"),\r
+ // To start OSGi console for inspection remotely\r
+ systemProperty("osgi.console").value("2401"),\r
+ // Set the systemPackages (used by clustering)\r
+ systemPackages("sun.reflect", "sun.reflect.misc", "sun.misc"),\r
+ // List framework bundles\r
+ mavenBundle("equinoxSDK381",\r
+ "org.eclipse.equinox.console").versionAsInProject(),\r
+ mavenBundle("equinoxSDK381",\r
+ "org.eclipse.equinox.util").versionAsInProject(),\r
+ mavenBundle("equinoxSDK381",\r
+ "org.eclipse.osgi.services").versionAsInProject(),\r
+ mavenBundle("equinoxSDK381",\r
+ "org.eclipse.equinox.ds").versionAsInProject(),\r
+ mavenBundle("equinoxSDK381",\r
+ "org.apache.felix.gogo.command").versionAsInProject(),\r
+ mavenBundle("equinoxSDK381",\r
+ "org.apache.felix.gogo.runtime").versionAsInProject(),\r
+ mavenBundle("equinoxSDK381",\r
+ "org.apache.felix.gogo.shell").versionAsInProject(),\r
+ // List logger bundles\r
+ mavenBundle("org.slf4j", "slf4j-api").versionAsInProject(),\r
+ mavenBundle("org.slf4j", "log4j-over-slf4j").versionAsInProject(),\r
+ mavenBundle("ch.qos.logback", "logback-core").versionAsInProject(),\r
+ mavenBundle("ch.qos.logback", "logback-classic").versionAsInProject(),\r
+ // List all the bundles on which the test case depends\r
+ mavenBundle("org.opendaylight.controller",\r
"clustering.services").versionAsInProject(),\r
- mavenBundle("org.opendaylight.controller",\r
+ mavenBundle("org.opendaylight.controller",\r
"clustering.services-implementation").versionAsInProject(),\r
- mavenBundle("org.opendaylight.controller", "sal").versionAsInProject(),\r
- mavenBundle("org.opendaylight.controller",\r
+ mavenBundle("org.opendaylight.controller", "sal").versionAsInProject(),\r
+ mavenBundle("org.opendaylight.controller",\r
"sal.implementation").versionAsInProject(),\r
- mavenBundle("org.jboss.spec.javax.transaction",\r
+ mavenBundle("org.opendaylight.controller", "containermanager").versionAsInProject(),\r
+ mavenBundle("org.opendaylight.controller",\r
+ "containermanager.implementation").versionAsInProject(),\r
+ mavenBundle("org.jboss.spec.javax.transaction",\r
"jboss-transaction-api_1.1_spec").versionAsInProject(),\r
- mavenBundle("org.apache.commons", "commons-lang3").versionAsInProject(),\r
- mavenBundle("org.apache.felix",\r
+ mavenBundle("org.apache.commons", "commons-lang3").versionAsInProject(),\r
+ mavenBundle("org.apache.felix",\r
"org.apache.felix.dependencymanager").versionAsInProject(),\r
- junitBundles());\r
+ mavenBundle("org.apache.felix",\r
+ "org.apache.felix.dependencymanager.shell").versionAsInProject(),\r
+ junitBundles());\r
}\r
\r
private String stateToString(int state) {\r
int state = b[i].getState();\r
if (state != Bundle.ACTIVE && state != Bundle.RESOLVED) {\r
log.debug("Bundle:" + b[i].getSymbolicName() + " state:"\r
- + stateToString(state));\r
+ + stateToString(state));\r
debugit = true;\r
}\r
}\r
if (debugit) {\r
log.debug("Do some debugging because some bundle is "\r
- + "unresolved");\r
+ + "unresolved");\r
}\r
\r
// Assert if true, if false we are good to go!\r
assertFalse(debugit);\r
\r
- ServiceReference r = bc.getServiceReference(IClusterServices.class\r
- .getName());\r
- if (r != null) {\r
- this.clusterServices = (IClusterServices) bc.getService(r);\r
- }\r
+ this.clusterServices = (IClusterServices)ServiceHelper\r
+ .getGlobalInstance(IClusterServices.class, this);\r
assertNotNull(this.clusterServices);\r
\r
+ this.clusterDefaultServices = (IClusterContainerServices)ServiceHelper\r
+ .getInstance(IClusterContainerServices.class, "default", this);\r
+ assertNotNull(this.clusterDefaultServices);\r
+\r
+ this.clusterGlobalServices = (IClusterGlobalServices)ServiceHelper\r
+ .getGlobalInstance(IClusterGlobalServices.class, this);\r
+ assertNotNull(this.clusterGlobalServices);\r
}\r
\r
@Test\r
public void clusterTest() throws CacheExistException, CacheConfigException,\r
- CacheListenerAddException {\r
+ CacheListenerAddException {\r
\r
String container1 = "Container1";\r
String container2 = "Container2";\r
}\r
\r
@Override\r
- public void entryUpdated(Integer key, String new_value,\r
+ public void entryUpdated(Integer key, String newValue,\r
String containerName, String cacheName, boolean originLocal) {\r
return;\r
}\r
return;\r
}\r
}\r
+\r
+ @Test\r
+ public void clusterContainerAndGlobalTest() throws CacheExistException, CacheConfigException,\r
+ CacheListenerAddException, InterruptedException {\r
+ String cache1 = "Cache1";\r
+ String cache2 = "Cache2";\r
+ // Lets test the case of caches with same name in different\r
+ // containers (actually global an container case)\r
+ String cache3 = "Cache2";\r
+\r
+ HashSet<cacheMode> cacheModeSet = new HashSet<cacheMode>();\r
+ cacheModeSet.add(cacheMode.NON_TRANSACTIONAL);\r
+ ConcurrentMap cm11 = this.clusterDefaultServices.createCache(cache1, cacheModeSet);\r
+ assertNotNull(cm11);\r
+\r
+ assertTrue(this.clusterDefaultServices.existCache(cache1));\r
+ assertEquals(cm11, this.clusterDefaultServices.getCache(cache1));\r
+\r
+ ConcurrentMap cm12 = this.clusterDefaultServices.createCache(cache2, cacheModeSet);\r
+ ConcurrentMap cm23 = this.clusterGlobalServices.createCache(cache3, cacheModeSet);\r
+\r
+ // Now given cahe2 and cache3 have same name lets make sure\r
+ // they don't return the same reference\r
+ assertNotNull(this.clusterGlobalServices.getCache(cache2));\r
+ // cm12 reference must be different than cm23\r
+ assertTrue(cm12 != cm23);\r
+\r
+ HashSet<String> cacheList = (HashSet<String>) this.clusterDefaultServices\r
+ .getCacheList();\r
+ assertEquals(2, cacheList.size());\r
+ assertTrue(cacheList.contains(cache1));\r
+ assertTrue(cacheList.contains(cache2));\r
+\r
+ assertNotNull(this.clusterDefaultServices.getCacheProperties(cache1));\r
+\r
+ {\r
+ /***********************************/\r
+ /* Testing cacheAware in Container */\r
+ /***********************************/\r
+ Dictionary<String, Object> props = new Hashtable<String, Object>();\r
+ Set<String> propSet = new HashSet<String>();\r
+ propSet.add(cache1);\r
+ propSet.add(cache2);\r
+ props.put("cachenames", propSet);\r
+ CacheAware listener = new CacheAware();\r
+ CacheAware listenerRepeated = new CacheAware();\r
+ ServiceRegistration updateServiceReg = ServiceHelper.registerServiceWReg(ICacheUpdateAware.class, "default",\r
+ listener, props);\r
+ assertNotNull(updateServiceReg);\r
+\r
+ // Register another service for the same caches, this\r
+ // should not get any update because we don't allow to\r
+ // override the existing unless before unregistered\r
+ ServiceRegistration updateServiceRegRepeated = ServiceHelper.registerServiceWReg(ICacheUpdateAware.class,\r
+ "default",\r
+ listenerRepeated, props);\r
+ assertNotNull(updateServiceRegRepeated);\r
+ CountDownLatch res = null;\r
+ List<Update> ups = null;\r
+ Update up = null;\r
+ Integer k1 = new Integer(10);\r
+ Long k2 = new Long(100L);\r
+\r
+ /***********************/\r
+ /* CREATE NEW KEY CASE */\r
+ /***********************/\r
+ // Start monitoring the updates\r
+ res = listener.restart(2);\r
+ // modify the cache\r
+ cm11.put(k1, "foo");\r
+ // Wait\r
+ res.await(100L, TimeUnit.SECONDS);\r
+ // Analyze the updates\r
+ ups = listener.getUpdates();\r
+ assertTrue(ups.size() == 2);\r
+ // Validate that first we get an update (yes even in case of a\r
+ // new value added)\r
+ up = ups.get(0);\r
+ assertTrue(up.t.equals(UpdateType.CHANGED));\r
+ assertTrue(up.key.equals(k1));\r
+ assertTrue(up.value.equals("foo"));\r
+ assertTrue(up.cacheName.equals(cache1));\r
+ // Validate that we then get a create\r
+ up = ups.get(1);\r
+ assertTrue(up.t.equals(UpdateType.ADDED));\r
+ assertTrue(up.key.equals(k1));\r
+ assertNull(up.value);\r
+ assertTrue(up.cacheName.equals(cache1));\r
+\r
+ /*******************************/\r
+ /* UPDATE AN EXISTING KEY CASE */\r
+ /*******************************/\r
+ // Start monitoring the updates\r
+ res = listener.restart(1);\r
+ // modify the cache\r
+ cm11.put(k1, "baz");\r
+ // Wait\r
+ res.await(100L, TimeUnit.SECONDS);\r
+ // Analyze the updates\r
+ ups = listener.getUpdates();\r
+ assertTrue(ups.size() == 1);\r
+ // Validate we get an update with expect fields\r
+ up = ups.get(0);\r
+ assertTrue(up.t.equals(UpdateType.CHANGED));\r
+ assertTrue(up.key.equals(k1));\r
+ assertTrue(up.value.equals("baz"));\r
+ assertTrue(up.cacheName.equals(cache1));\r
+\r
+ /********************************/\r
+ /* REMOVAL OF EXISTING KEY CASE */\r
+ /********************************/\r
+ // Start monitoring the updates\r
+ res = listener.restart(1);\r
+ // modify the cache\r
+ cm11.remove(k1);\r
+ // Wait\r
+ res.await(100L, TimeUnit.SECONDS);\r
+ // Analyze the updates\r
+ ups = listener.getUpdates();\r
+ assertTrue(ups.size() == 1);\r
+ // Validate we get a delete with expected fields\r
+ up = ups.get(0);\r
+ assertTrue(up.t.equals(UpdateType.REMOVED));\r
+ assertTrue(up.key.equals(k1));\r
+ assertNull(up.value);\r
+ assertTrue(up.cacheName.equals(cache1));\r
+\r
+ /***********************/\r
+ /* CREATE NEW KEY CASE */\r
+ /***********************/\r
+ // Start monitoring the updates\r
+ res = listener.restart(2);\r
+ // modify the cache\r
+ cm12.put(k2, new Short((short)15));\r
+ // Wait\r
+ res.await(100L, TimeUnit.SECONDS);\r
+ // Analyze the updates\r
+ ups = listener.getUpdates();\r
+ assertTrue(ups.size() == 2);\r
+ // Validate that first we get an update (yes even in case of a\r
+ // new value added)\r
+ up = ups.get(0);\r
+ assertTrue(up.t.equals(UpdateType.CHANGED));\r
+ assertTrue(up.key.equals(k2));\r
+ assertTrue(up.value.equals(new Short((short)15)));\r
+ assertTrue(up.cacheName.equals(cache2));\r
+ // Validate that we then get a create\r
+ up = ups.get(1);\r
+ assertTrue(up.t.equals(UpdateType.ADDED));\r
+ assertTrue(up.key.equals(k2));\r
+ assertNull(up.value);\r
+ assertTrue(up.cacheName.equals(cache2));\r
+\r
+ /*******************************/\r
+ /* UPDATE AN EXISTING KEY CASE */\r
+ /*******************************/\r
+ // Start monitoring the updates\r
+ res = listener.restart(1);\r
+ // modify the cache\r
+ cm12.put(k2, "BAZ");\r
+ // Wait\r
+ res.await(100L, TimeUnit.SECONDS);\r
+ // Analyze the updates\r
+ ups = listener.getUpdates();\r
+ assertTrue(ups.size() == 1);\r
+ // Validate we get an update with expect fields\r
+ up = ups.get(0);\r
+ assertTrue(up.t.equals(UpdateType.CHANGED));\r
+ assertTrue(up.key.equals(k2));\r
+ assertTrue(up.value.equals("BAZ"));\r
+ assertTrue(up.cacheName.equals(cache2));\r
+\r
+ /********************************/\r
+ /* REMOVAL OF EXISTING KEY CASE */\r
+ /********************************/\r
+ // Start monitoring the updates\r
+ res = listener.restart(1);\r
+ // modify the cache\r
+ cm12.remove(k2);\r
+ // Wait\r
+ res.await(100L, TimeUnit.SECONDS);\r
+ // Analyze the updates\r
+ ups = listener.getUpdates();\r
+ assertTrue(ups.size() == 1);\r
+ // Validate we get a delete with expected fields\r
+ up = ups.get(0);\r
+ assertTrue(up.t.equals(UpdateType.REMOVED));\r
+ assertTrue(up.key.equals(k2));\r
+ assertNull(up.value);\r
+ assertTrue(up.cacheName.equals(cache2));\r
+\r
+ /******************************************************************/\r
+ /* NOW LETS REMOVE THE REGISTRATION AND MAKE SURE NO UPDATS COMES */\r
+ /******************************************************************/\r
+ updateServiceReg.unregister();\r
+ // Start monitoring the updates, noone should come in\r
+ res = listener.restart(1);\r
+\r
+ /***********************/\r
+ /* CREATE NEW KEY CASE */\r
+ /***********************/\r
+ // modify the cache\r
+ cm11.put(k1, "foo");\r
+\r
+ /*******************************/\r
+ /* UPDATE AN EXISTING KEY CASE */\r
+ /*******************************/\r
+ // modify the cache\r
+ cm11.put(k1, "baz");\r
+\r
+ /********************************/\r
+ /* REMOVAL OF EXISTING KEY CASE */\r
+ /********************************/\r
+ // modify the cache\r
+ cm11.remove(k1);\r
+\r
+ /***********************/\r
+ /* CREATE NEW KEY CASE */\r
+ /***********************/\r
+ // modify the cache\r
+ cm12.put(k2, new Short((short)15));\r
+\r
+ /*******************************/\r
+ /* UPDATE AN EXISTING KEY CASE */\r
+ /*******************************/\r
+ // modify the cache\r
+ cm12.put(k2, "BAZ");\r
+\r
+ /********************************/\r
+ /* REMOVAL OF EXISTING KEY CASE */\r
+ /********************************/\r
+ // modify the cache\r
+ cm12.remove(k2);\r
+\r
+\r
+ // Wait to make sure no updates came in, clearly this is\r
+ // error prone as logic, but cannot find a better way than\r
+ // this to make sure updates didn't get in\r
+ res.await(1L, TimeUnit.SECONDS);\r
+ // Analyze the updates\r
+ ups = listener.getUpdates();\r
+ assertTrue(ups.size() == 0);\r
+ }\r
+\r
+ {\r
+ /***********************************/\r
+ /* Testing cacheAware in Global */\r
+ /***********************************/\r
+ Dictionary<String, Object> props = new Hashtable<String, Object>();\r
+ Set<String> propSet = new HashSet<String>();\r
+ propSet.add(cache3);\r
+ props.put("cachenames", propSet);\r
+ CacheAware listener = new CacheAware();\r
+ ServiceRegistration updateServiceReg = ServiceHelper.registerGlobalServiceWReg(ICacheUpdateAware.class,\r
+ listener, props);\r
+ assertNotNull(updateServiceReg);\r
+\r
+ CountDownLatch res = null;\r
+ List<Update> ups = null;\r
+ Update up = null;\r
+ Integer k1 = new Integer(10);\r
+\r
+ /***********************/\r
+ /* CREATE NEW KEY CASE */\r
+ /***********************/\r
+ // Start monitoring the updates\r
+ res = listener.restart(2);\r
+ // modify the cache\r
+ cm23.put(k1, "foo");\r
+ // Wait\r
+ res.await(100L, TimeUnit.SECONDS);\r
+ // Analyze the updates\r
+ ups = listener.getUpdates();\r
+ assertTrue(ups.size() == 2);\r
+ // Validate that first we get an update (yes even in case of a\r
+ // new value added)\r
+ up = ups.get(0);\r
+ assertTrue(up.t.equals(UpdateType.CHANGED));\r
+ assertTrue(up.key.equals(k1));\r
+ assertTrue(up.value.equals("foo"));\r
+ assertTrue(up.cacheName.equals(cache3));\r
+ // Validate that we then get a create\r
+ up = ups.get(1);\r
+ assertTrue(up.t.equals(UpdateType.ADDED));\r
+ assertTrue(up.key.equals(k1));\r
+ assertNull(up.value);\r
+ assertTrue(up.cacheName.equals(cache3));\r
+\r
+ /*******************************/\r
+ /* UPDATE AN EXISTING KEY CASE */\r
+ /*******************************/\r
+ // Start monitoring the updates\r
+ res = listener.restart(1);\r
+ // modify the cache\r
+ cm23.put(k1, "baz");\r
+ // Wait\r
+ res.await(100L, TimeUnit.SECONDS);\r
+ // Analyze the updates\r
+ ups = listener.getUpdates();\r
+ assertTrue(ups.size() == 1);\r
+ // Validate we get an update with expect fields\r
+ up = ups.get(0);\r
+ assertTrue(up.t.equals(UpdateType.CHANGED));\r
+ assertTrue(up.key.equals(k1));\r
+ assertTrue(up.value.equals("baz"));\r
+ assertTrue(up.cacheName.equals(cache3));\r
+\r
+ /********************************/\r
+ /* REMOVAL OF EXISTING KEY CASE */\r
+ /********************************/\r
+ // Start monitoring the updates\r
+ res = listener.restart(1);\r
+ // modify the cache\r
+ cm23.remove(k1);\r
+ // Wait\r
+ res.await(100L, TimeUnit.SECONDS);\r
+ // Analyze the updates\r
+ ups = listener.getUpdates();\r
+ assertTrue(ups.size() == 1);\r
+ // Validate we get a delete with expected fields\r
+ up = ups.get(0);\r
+ assertTrue(up.t.equals(UpdateType.REMOVED));\r
+ assertTrue(up.key.equals(k1));\r
+ assertNull(up.value);\r
+ assertTrue(up.cacheName.equals(cache3));\r
+\r
+ /******************************************************************/\r
+ /* NOW LETS REMOVE THE REGISTRATION AND MAKE SURE NO UPDATS COMES */\r
+ /******************************************************************/\r
+ updateServiceReg.unregister();\r
+ // Start monitoring the updates, noone should come in\r
+ res = listener.restart(1);\r
+\r
+ /***********************/\r
+ /* CREATE NEW KEY CASE */\r
+ /***********************/\r
+ // modify the cache\r
+ cm23.put(k1, "foo");\r
+\r
+ /*******************************/\r
+ /* UPDATE AN EXISTING KEY CASE */\r
+ /*******************************/\r
+ // modify the cache\r
+ cm23.put(k1, "baz");\r
+\r
+ /********************************/\r
+ /* REMOVAL OF EXISTING KEY CASE */\r
+ /********************************/\r
+ // modify the cache\r
+ cm23.remove(k1);\r
+\r
+ // Wait to make sure no updates came in, clearly this is\r
+ // error prone as logic, but cannot find a better way than\r
+ // this to make sure updates didn't get in\r
+ res.await(1L, TimeUnit.SECONDS);\r
+ // Analyze the updates\r
+ ups = listener.getUpdates();\r
+ assertTrue(ups.size() == 0);\r
+ }\r
+\r
+ InetAddress addr = this.clusterDefaultServices.getMyAddress();\r
+ assertNotNull(addr);\r
+\r
+ List<InetAddress> addrList = this.clusterDefaultServices\r
+ .getClusteredControllers();\r
+\r
+ this.clusterDefaultServices.destroyCache(cache1);\r
+ assertFalse(this.clusterDefaultServices.existCache(cache1));\r
+ }\r
+\r
+ private class Update {\r
+ Object key;\r
+ Object value;\r
+ String cacheName;\r
+ UpdateType t;\r
+\r
+ Update (UpdateType t, Object key, Object value, String cacheName) {\r
+ this.t = t;\r
+ this.key = key;\r
+ this.value = value;\r
+ this.cacheName = cacheName;\r
+ }\r
+ }\r
+\r
+ private class CacheAware implements ICacheUpdateAware {\r
+ private CopyOnWriteArrayList<Update> gotUpdates;\r
+ private CountDownLatch latch = null;\r
+\r
+ CacheAware() {\r
+ this.gotUpdates = new CopyOnWriteArrayList<Update>();\r
+ }\r
+\r
+\r
+ /**\r
+ * Restart the monitor of the updates on the CacheAware object\r
+ *\r
+ * @param expectedOperations Number of expected updates\r
+ *\r
+ * @return a countdown latch which will be used to wait till the updates are done\r
+ */\r
+ CountDownLatch restart(int expectedOperations) {\r
+ this.gotUpdates.clear();\r
+ this.latch = new CountDownLatch(expectedOperations);\r
+ return this.latch;\r
+ }\r
+\r
+ List<Update> getUpdates() {\r
+ return this.gotUpdates;\r
+ }\r
+\r
+ @Override\r
+ public void entryCreated(Object key, String cacheName, boolean originLocal) {\r
+ log.debug("CACHE[{}] Got an entry created for key:{}", cacheName, key);\r
+ Update u = new Update(UpdateType.ADDED, key, null, cacheName);\r
+ this.gotUpdates.add(u);\r
+ this.latch.countDown();\r
+ }\r
+\r
+ @Override\r
+ public void entryUpdated(Object key, Object newValue, String cacheName, boolean originLocal) {\r
+ log.debug("CACHE[{}] Got an entry updated for key:{} newValue:{}", cacheName, key, newValue);\r
+ Update u = new Update(UpdateType.CHANGED, key, newValue, cacheName);\r
+ this.gotUpdates.add(u);\r
+ this.latch.countDown();\r
+ }\r
+\r
+ @Override\r
+ public void entryDeleted(Object key, String cacheName, boolean originLocal) {\r
+ log.debug("CACHE[{}] Got an entry delete for key:{}", cacheName, key);\r
+ Update u = new Update(UpdateType.REMOVED, key, null, cacheName);\r
+ this.gotUpdates.add(u);\r
+ this.latch.countDown();\r
+ }\r
+ }\r
}\r