Merge "Added YANG models for base concepts in the controller"
[controller.git] / opendaylight / clustering / integrationtest / src / test / java / org / opendaylight / controller / clustering / services_implementation / internal / ClusteringServicesIT.java
1 package org.opendaylight.controller.clustering.services_implementation.internal;\r
2 \r
3 import static org.junit.Assert.assertEquals;\r
4 import static org.junit.Assert.assertFalse;\r
5 import static org.junit.Assert.assertNotNull;\r
6 import static org.junit.Assert.assertNull;\r
7 import static org.junit.Assert.assertTrue;\r
8 import static org.ops4j.pax.exam.CoreOptions.junitBundles;\r
9 import static org.ops4j.pax.exam.CoreOptions.mavenBundle;\r
10 import static org.ops4j.pax.exam.CoreOptions.options;\r
11 import static org.ops4j.pax.exam.CoreOptions.systemPackages;\r
12 import static org.ops4j.pax.exam.CoreOptions.systemProperty;\r
13 \r
14 import java.util.List;\r
15 import java.util.concurrent.CopyOnWriteArrayList;\r
16 import java.util.concurrent.TimeUnit;\r
17 import java.net.InetAddress;\r
18 import java.util.Dictionary;\r
19 import java.util.HashSet;\r
20 import java.util.Hashtable;\r
21 import java.util.Set;\r
22 import java.util.List;\r
23 import java.util.concurrent.ConcurrentMap;\r
24 \r
25 import javax.inject.Inject;\r
26 \r
27 import org.junit.Before;\r
28 import org.junit.Test;\r
29 import org.junit.runner.RunWith;\r
30 import org.opendaylight.controller.clustering.services.CacheConfigException;\r
31 import org.opendaylight.controller.clustering.services.CacheExistException;\r
32 import org.opendaylight.controller.clustering.services.CacheListenerAddException;\r
33 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;\r
34 import org.opendaylight.controller.clustering.services.IClusterServices;\r
35 import org.opendaylight.controller.clustering.services.IClusterContainerServices;\r
36 import org.opendaylight.controller.clustering.services.IClusterServices.cacheMode;\r
37 import org.opendaylight.controller.clustering.services.IGetUpdates;\r
38 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;\r
39 import org.opendaylight.controller.sal.utils.ServiceHelper;\r
40 import org.opendaylight.controller.sal.core.UpdateType;\r
41 import org.ops4j.pax.exam.Option;\r
42 import org.ops4j.pax.exam.junit.Configuration;\r
43 import org.ops4j.pax.exam.junit.PaxExam;\r
44 import org.ops4j.pax.exam.util.PathUtils;\r
45 import org.osgi.framework.Bundle;\r
46 import org.osgi.framework.BundleContext;\r
47 import org.osgi.framework.ServiceReference;\r
48 import org.osgi.framework.ServiceRegistration;\r
49 import org.slf4j.Logger;\r
50 import org.slf4j.LoggerFactory;\r
51 import java.util.concurrent.CountDownLatch;\r
52 \r
53 @RunWith(PaxExam.class)\r
54 public class ClusteringServicesIT {\r
55     private Logger log = LoggerFactory\r
56         .getLogger(ClusteringServicesIT.class);\r
57     // get the OSGI bundle context\r
58     @Inject\r
59     private BundleContext bc;\r
60     private IClusterServices clusterServices = null;\r
61     private IClusterContainerServices clusterDefaultServices = null;\r
62     private IClusterGlobalServices clusterGlobalServices = null;\r
63 \r
64     // Configure the OSGi container\r
65     @Configuration\r
66     public Option[] config() {\r
67         return options(\r
68             //\r
69             systemProperty("logback.configurationFile").value(\r
70                 "file:" + PathUtils.getBaseDir()\r
71                 + "/src/test/resources/logback.xml"),\r
72             // To start OSGi console for inspection remotely\r
73             systemProperty("osgi.console").value("2401"),\r
74             // Set the systemPackages (used by clustering)\r
75             systemPackages("sun.reflect", "sun.reflect.misc", "sun.misc"),\r
76             // List framework bundles\r
77             mavenBundle("equinoxSDK381",\r
78                         "org.eclipse.equinox.console").versionAsInProject(),\r
79             mavenBundle("equinoxSDK381",\r
80                         "org.eclipse.equinox.util").versionAsInProject(),\r
81             mavenBundle("equinoxSDK381",\r
82                         "org.eclipse.osgi.services").versionAsInProject(),\r
83             mavenBundle("equinoxSDK381",\r
84                         "org.eclipse.equinox.ds").versionAsInProject(),\r
85             mavenBundle("equinoxSDK381",\r
86                         "org.apache.felix.gogo.command").versionAsInProject(),\r
87             mavenBundle("equinoxSDK381",\r
88                         "org.apache.felix.gogo.runtime").versionAsInProject(),\r
89             mavenBundle("equinoxSDK381",\r
90                         "org.apache.felix.gogo.shell").versionAsInProject(),\r
91             // List logger bundles\r
92             mavenBundle("org.slf4j", "slf4j-api").versionAsInProject(),\r
93             mavenBundle("org.slf4j", "log4j-over-slf4j").versionAsInProject(),\r
94             mavenBundle("ch.qos.logback", "logback-core").versionAsInProject(),\r
95             mavenBundle("ch.qos.logback", "logback-classic").versionAsInProject(),\r
96             // List all the bundles on which the test case depends\r
97             mavenBundle("org.opendaylight.controller",\r
98                         "clustering.services").versionAsInProject(),\r
99             mavenBundle("org.opendaylight.controller",\r
100                         "clustering.services-implementation").versionAsInProject(),\r
101             mavenBundle("org.opendaylight.controller", "sal").versionAsInProject(),\r
102             mavenBundle("org.opendaylight.controller",\r
103                         "sal.implementation").versionAsInProject(),\r
104             mavenBundle("org.opendaylight.controller", "containermanager").versionAsInProject(),\r
105             mavenBundle("org.opendaylight.controller",\r
106                         "containermanager.implementation").versionAsInProject(),\r
107             mavenBundle("org.jboss.spec.javax.transaction",\r
108                         "jboss-transaction-api_1.1_spec").versionAsInProject(),\r
109             mavenBundle("org.apache.commons", "commons-lang3").versionAsInProject(),\r
110             mavenBundle("org.apache.felix",\r
111                         "org.apache.felix.dependencymanager").versionAsInProject(),\r
112             mavenBundle("org.apache.felix",\r
113                         "org.apache.felix.dependencymanager.shell").versionAsInProject(),\r
114             junitBundles());\r
115     }\r
116 \r
117     private String stateToString(int state) {\r
118         switch (state) {\r
119         case Bundle.ACTIVE:\r
120             return "ACTIVE";\r
121         case Bundle.INSTALLED:\r
122             return "INSTALLED";\r
123         case Bundle.RESOLVED:\r
124             return "RESOLVED";\r
125         case Bundle.UNINSTALLED:\r
126             return "UNINSTALLED";\r
127         default:\r
128             return "Not CONVERTED";\r
129         }\r
130     }\r
131 \r
132     @Before\r
133     public void areWeReady() {\r
134         assertNotNull(bc);\r
135         boolean debugit = false;\r
136         Bundle b[] = bc.getBundles();\r
137         for (int i = 0; i < b.length; i++) {\r
138             int state = b[i].getState();\r
139             if (state != Bundle.ACTIVE && state != Bundle.RESOLVED) {\r
140                 log.debug("Bundle:" + b[i].getSymbolicName() + " state:"\r
141                           + stateToString(state));\r
142                 debugit = true;\r
143             }\r
144         }\r
145         if (debugit) {\r
146             log.debug("Do some debugging because some bundle is "\r
147                       + "unresolved");\r
148         }\r
149 \r
150         // Assert if true, if false we are good to go!\r
151         assertFalse(debugit);\r
152 \r
153         this.clusterServices = (IClusterServices)ServiceHelper\r
154             .getGlobalInstance(IClusterServices.class, this);\r
155         assertNotNull(this.clusterServices);\r
156 \r
157         this.clusterDefaultServices = (IClusterContainerServices)ServiceHelper\r
158             .getInstance(IClusterContainerServices.class, "default", this);\r
159         assertNotNull(this.clusterDefaultServices);\r
160 \r
161         this.clusterGlobalServices = (IClusterGlobalServices)ServiceHelper\r
162             .getGlobalInstance(IClusterGlobalServices.class, this);\r
163         assertNotNull(this.clusterGlobalServices);\r
164     }\r
165 \r
166     @Test\r
167     public void clusterTest() throws CacheExistException, CacheConfigException,\r
168         CacheListenerAddException {\r
169 \r
170         String container1 = "Container1";\r
171         String container2 = "Container2";\r
172         String cache1 = "Cache1";\r
173         String cache2 = "Cache2";\r
174         String cache3 = "Cache3";\r
175 \r
176         HashSet<cacheMode> cacheModeSet = new HashSet<cacheMode>();\r
177         cacheModeSet.add(cacheMode.NON_TRANSACTIONAL);\r
178         ConcurrentMap cm11 = this.clusterServices.createCache(container1,\r
179                 cache1, cacheModeSet);\r
180         assertNotNull(cm11);\r
181 \r
182         assertNull(this.clusterServices.getCache(container2, cache2));\r
183         assertEquals(cm11, this.clusterServices.getCache(container1, cache1));\r
184 \r
185         assertFalse(this.clusterServices.existCache(container2, cache2));\r
186         assertTrue(this.clusterServices.existCache(container1, cache1));\r
187 \r
188         ConcurrentMap cm12 = this.clusterServices.createCache(container1,\r
189                 cache2, cacheModeSet);\r
190         ConcurrentMap cm23 = this.clusterServices.createCache(container2,\r
191                 cache3, cacheModeSet);\r
192 \r
193         HashSet<String> cacheList = (HashSet<String>) this.clusterServices\r
194                 .getCacheList(container1);\r
195         assertEquals(2, cacheList.size());\r
196         assertTrue(cacheList.contains(cache1));\r
197         assertTrue(cacheList.contains(cache2));\r
198         assertFalse(cacheList.contains(cache3));\r
199 \r
200         assertNotNull(this.clusterServices.getCacheProperties(container1,\r
201                 cache1));\r
202 \r
203         HashSet<IGetUpdates<?, ?>> listeners = (HashSet<IGetUpdates<?, ?>>) this.clusterServices\r
204                 .getListeners(container1, cache1);\r
205         assertEquals(0, listeners.size());\r
206 \r
207         IGetUpdates<?, ?> getUpdate1 = new GetUpdates();\r
208         this.clusterServices.addListener(container1, cache1, getUpdate1);\r
209         listeners = (HashSet<IGetUpdates<?, ?>>) this.clusterServices\r
210                 .getListeners(container1, cache1);\r
211         assertEquals(1, listeners.size());\r
212         this.clusterServices.addListener(container1, cache1, new GetUpdates());\r
213         listeners = (HashSet<IGetUpdates<?, ?>>) this.clusterServices\r
214                 .getListeners(container1, cache1);\r
215         assertEquals(2, listeners.size());\r
216 \r
217         listeners = (HashSet<IGetUpdates<?, ?>>) this.clusterServices\r
218                 .getListeners(container2, cache3);\r
219         assertEquals(0, listeners.size());\r
220 \r
221         this.clusterServices.removeListener(container1, cache1, getUpdate1);\r
222         listeners = (HashSet<IGetUpdates<?, ?>>) this.clusterServices\r
223                 .getListeners(container1, cache1);\r
224         assertEquals(1, listeners.size());\r
225 \r
226         InetAddress addr = this.clusterServices.getMyAddress();\r
227         assertNotNull(addr);\r
228 \r
229         List<InetAddress> addrList = this.clusterServices\r
230                 .getClusteredControllers();\r
231 \r
232         this.clusterServices.destroyCache(container1, cache1);\r
233         assertFalse(this.clusterServices.existCache(container1, cache1));\r
234 \r
235     }\r
236 \r
237     private class GetUpdates implements IGetUpdates<Integer, String> {\r
238 \r
239         @Override\r
240         public void entryCreated(Integer key, String containerName,\r
241                 String cacheName, boolean originLocal) {\r
242             return;\r
243         }\r
244 \r
245         @Override\r
246         public void entryUpdated(Integer key, String newValue,\r
247                 String containerName, String cacheName, boolean originLocal) {\r
248             return;\r
249         }\r
250 \r
251         @Override\r
252         public void entryDeleted(Integer key, String containerName,\r
253                 String cacheName, boolean originLocal) {\r
254             return;\r
255         }\r
256     }\r
257 \r
258     @Test\r
259     public void clusterContainerAndGlobalTest() throws CacheExistException, CacheConfigException,\r
260         CacheListenerAddException, InterruptedException {\r
261         String cache1 = "Cache1";\r
262         String cache2 = "Cache2";\r
263         // Lets test the case of caches with same name in different\r
264         // containers (actually global an container case)\r
265         String cache3 = "Cache2";\r
266 \r
267         HashSet<cacheMode> cacheModeSet = new HashSet<cacheMode>();\r
268         cacheModeSet.add(cacheMode.NON_TRANSACTIONAL);\r
269         ConcurrentMap cm11 = this.clusterDefaultServices.createCache(cache1, cacheModeSet);\r
270         assertNotNull(cm11);\r
271 \r
272         assertTrue(this.clusterDefaultServices.existCache(cache1));\r
273         assertEquals(cm11, this.clusterDefaultServices.getCache(cache1));\r
274 \r
275         ConcurrentMap cm12 = this.clusterDefaultServices.createCache(cache2, cacheModeSet);\r
276         ConcurrentMap cm23 = this.clusterGlobalServices.createCache(cache3, cacheModeSet);\r
277 \r
278         // Now given cahe2 and cache3 have same name lets make sure\r
279         // they don't return the same reference\r
280         assertNotNull(this.clusterGlobalServices.getCache(cache2));\r
281         // cm12 reference must be different than cm23\r
282         assertTrue(cm12 != cm23);\r
283 \r
284         HashSet<String> cacheList = (HashSet<String>) this.clusterDefaultServices\r
285             .getCacheList();\r
286         assertEquals(2, cacheList.size());\r
287         assertTrue(cacheList.contains(cache1));\r
288         assertTrue(cacheList.contains(cache2));\r
289 \r
290         assertNotNull(this.clusterDefaultServices.getCacheProperties(cache1));\r
291 \r
292         {\r
293             /***********************************/\r
294             /* Testing cacheAware in Container */\r
295             /***********************************/\r
296             Dictionary<String, Object> props = new Hashtable<String, Object>();\r
297             Set<String> propSet = new HashSet<String>();\r
298             propSet.add(cache1);\r
299             propSet.add(cache2);\r
300             props.put("cachenames", propSet);\r
301             CacheAware listener = new CacheAware();\r
302             CacheAware listenerRepeated = new CacheAware();\r
303             ServiceRegistration updateServiceReg = ServiceHelper.registerServiceWReg(ICacheUpdateAware.class, "default",\r
304                                                                                      listener, props);\r
305             assertNotNull(updateServiceReg);\r
306 \r
307             // Register another service for the same caches, this\r
308             // should not get any update because we don't allow to\r
309             // override the existing unless before unregistered\r
310             ServiceRegistration updateServiceRegRepeated = ServiceHelper.registerServiceWReg(ICacheUpdateAware.class,\r
311                                                                                              "default",\r
312                                                                                              listenerRepeated, props);\r
313             assertNotNull(updateServiceRegRepeated);\r
314             CountDownLatch res = null;\r
315             List<Update> ups = null;\r
316             Update up = null;\r
317             Integer k1 = new Integer(10);\r
318             Long k2 = new Long(100L);\r
319 \r
320             /***********************/\r
321             /* CREATE NEW KEY CASE */\r
322             /***********************/\r
323             // Start monitoring the updates\r
324             res = listener.restart(2);\r
325             // modify the cache\r
326             cm11.put(k1, "foo");\r
327             // Wait\r
328             res.await(100L, TimeUnit.SECONDS);\r
329             // Analyze the updates\r
330             ups = listener.getUpdates();\r
331             assertTrue(ups.size() == 2);\r
332             // Validate that first we get an update (yes even in case of a\r
333             // new value added)\r
334             up = ups.get(0);\r
335             assertTrue(up.t.equals(UpdateType.CHANGED));\r
336             assertTrue(up.key.equals(k1));\r
337             assertTrue(up.value.equals("foo"));\r
338             assertTrue(up.cacheName.equals(cache1));\r
339             // Validate that we then get a create\r
340             up = ups.get(1);\r
341             assertTrue(up.t.equals(UpdateType.ADDED));\r
342             assertTrue(up.key.equals(k1));\r
343             assertNull(up.value);\r
344             assertTrue(up.cacheName.equals(cache1));\r
345 \r
346             /*******************************/\r
347             /* UPDATE AN EXISTING KEY CASE */\r
348             /*******************************/\r
349             // Start monitoring the updates\r
350             res = listener.restart(1);\r
351             // modify the cache\r
352             cm11.put(k1, "baz");\r
353             // Wait\r
354             res.await(100L, TimeUnit.SECONDS);\r
355             // Analyze the updates\r
356             ups = listener.getUpdates();\r
357             assertTrue(ups.size() == 1);\r
358             // Validate we get an update with expect fields\r
359             up = ups.get(0);\r
360             assertTrue(up.t.equals(UpdateType.CHANGED));\r
361             assertTrue(up.key.equals(k1));\r
362             assertTrue(up.value.equals("baz"));\r
363             assertTrue(up.cacheName.equals(cache1));\r
364 \r
365             /********************************/\r
366             /* REMOVAL OF EXISTING KEY CASE */\r
367             /********************************/\r
368             // Start monitoring the updates\r
369             res = listener.restart(1);\r
370             // modify the cache\r
371             cm11.remove(k1);\r
372             // Wait\r
373             res.await(100L, TimeUnit.SECONDS);\r
374             // Analyze the updates\r
375             ups = listener.getUpdates();\r
376             assertTrue(ups.size() == 1);\r
377             // Validate we get a delete with expected fields\r
378             up = ups.get(0);\r
379             assertTrue(up.t.equals(UpdateType.REMOVED));\r
380             assertTrue(up.key.equals(k1));\r
381             assertNull(up.value);\r
382             assertTrue(up.cacheName.equals(cache1));\r
383 \r
384             /***********************/\r
385             /* CREATE NEW KEY CASE */\r
386             /***********************/\r
387             // Start monitoring the updates\r
388             res = listener.restart(2);\r
389             // modify the cache\r
390             cm12.put(k2, new Short((short)15));\r
391             // Wait\r
392             res.await(100L, TimeUnit.SECONDS);\r
393             // Analyze the updates\r
394             ups = listener.getUpdates();\r
395             assertTrue(ups.size() == 2);\r
396             // Validate that first we get an update (yes even in case of a\r
397             // new value added)\r
398             up = ups.get(0);\r
399             assertTrue(up.t.equals(UpdateType.CHANGED));\r
400             assertTrue(up.key.equals(k2));\r
401             assertTrue(up.value.equals(new Short((short)15)));\r
402             assertTrue(up.cacheName.equals(cache2));\r
403             // Validate that we then get a create\r
404             up = ups.get(1);\r
405             assertTrue(up.t.equals(UpdateType.ADDED));\r
406             assertTrue(up.key.equals(k2));\r
407             assertNull(up.value);\r
408             assertTrue(up.cacheName.equals(cache2));\r
409 \r
410             /*******************************/\r
411             /* UPDATE AN EXISTING KEY CASE */\r
412             /*******************************/\r
413             // Start monitoring the updates\r
414             res = listener.restart(1);\r
415             // modify the cache\r
416             cm12.put(k2, "BAZ");\r
417             // Wait\r
418             res.await(100L, TimeUnit.SECONDS);\r
419             // Analyze the updates\r
420             ups = listener.getUpdates();\r
421             assertTrue(ups.size() == 1);\r
422             // Validate we get an update with expect fields\r
423             up = ups.get(0);\r
424             assertTrue(up.t.equals(UpdateType.CHANGED));\r
425             assertTrue(up.key.equals(k2));\r
426             assertTrue(up.value.equals("BAZ"));\r
427             assertTrue(up.cacheName.equals(cache2));\r
428 \r
429             /********************************/\r
430             /* REMOVAL OF EXISTING KEY CASE */\r
431             /********************************/\r
432             // Start monitoring the updates\r
433             res = listener.restart(1);\r
434             // modify the cache\r
435             cm12.remove(k2);\r
436             // Wait\r
437             res.await(100L, TimeUnit.SECONDS);\r
438             // Analyze the updates\r
439             ups = listener.getUpdates();\r
440             assertTrue(ups.size() == 1);\r
441             // Validate we get a delete with expected fields\r
442             up = ups.get(0);\r
443             assertTrue(up.t.equals(UpdateType.REMOVED));\r
444             assertTrue(up.key.equals(k2));\r
445             assertNull(up.value);\r
446             assertTrue(up.cacheName.equals(cache2));\r
447 \r
448             /******************************************************************/\r
449             /* NOW LETS REMOVE THE REGISTRATION AND MAKE SURE NO UPDATS COMES */\r
450             /******************************************************************/\r
451             updateServiceReg.unregister();\r
452             // Start monitoring the updates, noone should come in\r
453             res = listener.restart(1);\r
454 \r
455             /***********************/\r
456             /* CREATE NEW KEY CASE */\r
457             /***********************/\r
458             // modify the cache\r
459             cm11.put(k1, "foo");\r
460 \r
461             /*******************************/\r
462             /* UPDATE AN EXISTING KEY CASE */\r
463             /*******************************/\r
464             // modify the cache\r
465             cm11.put(k1, "baz");\r
466 \r
467             /********************************/\r
468             /* REMOVAL OF EXISTING KEY CASE */\r
469             /********************************/\r
470             // modify the cache\r
471             cm11.remove(k1);\r
472 \r
473             /***********************/\r
474             /* CREATE NEW KEY CASE */\r
475             /***********************/\r
476             // modify the cache\r
477             cm12.put(k2, new Short((short)15));\r
478 \r
479             /*******************************/\r
480             /* UPDATE AN EXISTING KEY CASE */\r
481             /*******************************/\r
482             // modify the cache\r
483             cm12.put(k2, "BAZ");\r
484 \r
485             /********************************/\r
486             /* REMOVAL OF EXISTING KEY CASE */\r
487             /********************************/\r
488             // modify the cache\r
489             cm12.remove(k2);\r
490 \r
491 \r
492             // Wait to make sure no updates came in, clearly this is\r
493             // error prone as logic, but cannot find a better way than\r
494             // this to make sure updates didn't get in\r
495             res.await(1L, TimeUnit.SECONDS);\r
496             // Analyze the updates\r
497             ups = listener.getUpdates();\r
498             assertTrue(ups.size() == 0);\r
499         }\r
500 \r
501         {\r
502             /***********************************/\r
503             /* Testing cacheAware in Global */\r
504             /***********************************/\r
505             Dictionary<String, Object> props = new Hashtable<String, Object>();\r
506             Set<String> propSet = new HashSet<String>();\r
507             propSet.add(cache3);\r
508             props.put("cachenames", propSet);\r
509             CacheAware listener = new CacheAware();\r
510             ServiceRegistration updateServiceReg = ServiceHelper.registerGlobalServiceWReg(ICacheUpdateAware.class,\r
511                                                                                            listener, props);\r
512             assertNotNull(updateServiceReg);\r
513 \r
514             CountDownLatch res = null;\r
515             List<Update> ups = null;\r
516             Update up = null;\r
517             Integer k1 = new Integer(10);\r
518 \r
519             /***********************/\r
520             /* CREATE NEW KEY CASE */\r
521             /***********************/\r
522             // Start monitoring the updates\r
523             res = listener.restart(2);\r
524             // modify the cache\r
525             cm23.put(k1, "foo");\r
526             // Wait\r
527             res.await(100L, TimeUnit.SECONDS);\r
528             // Analyze the updates\r
529             ups = listener.getUpdates();\r
530             assertTrue(ups.size() == 2);\r
531             // Validate that first we get an update (yes even in case of a\r
532             // new value added)\r
533             up = ups.get(0);\r
534             assertTrue(up.t.equals(UpdateType.CHANGED));\r
535             assertTrue(up.key.equals(k1));\r
536             assertTrue(up.value.equals("foo"));\r
537             assertTrue(up.cacheName.equals(cache3));\r
538             // Validate that we then get a create\r
539             up = ups.get(1);\r
540             assertTrue(up.t.equals(UpdateType.ADDED));\r
541             assertTrue(up.key.equals(k1));\r
542             assertNull(up.value);\r
543             assertTrue(up.cacheName.equals(cache3));\r
544 \r
545             /*******************************/\r
546             /* UPDATE AN EXISTING KEY CASE */\r
547             /*******************************/\r
548             // Start monitoring the updates\r
549             res = listener.restart(1);\r
550             // modify the cache\r
551             cm23.put(k1, "baz");\r
552             // Wait\r
553             res.await(100L, TimeUnit.SECONDS);\r
554             // Analyze the updates\r
555             ups = listener.getUpdates();\r
556             assertTrue(ups.size() == 1);\r
557             // Validate we get an update with expect fields\r
558             up = ups.get(0);\r
559             assertTrue(up.t.equals(UpdateType.CHANGED));\r
560             assertTrue(up.key.equals(k1));\r
561             assertTrue(up.value.equals("baz"));\r
562             assertTrue(up.cacheName.equals(cache3));\r
563 \r
564             /********************************/\r
565             /* REMOVAL OF EXISTING KEY CASE */\r
566             /********************************/\r
567             // Start monitoring the updates\r
568             res = listener.restart(1);\r
569             // modify the cache\r
570             cm23.remove(k1);\r
571             // Wait\r
572             res.await(100L, TimeUnit.SECONDS);\r
573             // Analyze the updates\r
574             ups = listener.getUpdates();\r
575             assertTrue(ups.size() == 1);\r
576             // Validate we get a delete with expected fields\r
577             up = ups.get(0);\r
578             assertTrue(up.t.equals(UpdateType.REMOVED));\r
579             assertTrue(up.key.equals(k1));\r
580             assertNull(up.value);\r
581             assertTrue(up.cacheName.equals(cache3));\r
582 \r
583             /******************************************************************/\r
584             /* NOW LETS REMOVE THE REGISTRATION AND MAKE SURE NO UPDATS COMES */\r
585             /******************************************************************/\r
586             updateServiceReg.unregister();\r
587             // Start monitoring the updates, noone should come in\r
588             res = listener.restart(1);\r
589 \r
590             /***********************/\r
591             /* CREATE NEW KEY CASE */\r
592             /***********************/\r
593             // modify the cache\r
594             cm23.put(k1, "foo");\r
595 \r
596             /*******************************/\r
597             /* UPDATE AN EXISTING KEY CASE */\r
598             /*******************************/\r
599             // modify the cache\r
600             cm23.put(k1, "baz");\r
601 \r
602             /********************************/\r
603             /* REMOVAL OF EXISTING KEY CASE */\r
604             /********************************/\r
605             // modify the cache\r
606             cm23.remove(k1);\r
607 \r
608             // Wait to make sure no updates came in, clearly this is\r
609             // error prone as logic, but cannot find a better way than\r
610             // this to make sure updates didn't get in\r
611             res.await(1L, TimeUnit.SECONDS);\r
612             // Analyze the updates\r
613             ups = listener.getUpdates();\r
614             assertTrue(ups.size() == 0);\r
615         }\r
616 \r
617         InetAddress addr = this.clusterDefaultServices.getMyAddress();\r
618         assertNotNull(addr);\r
619 \r
620         List<InetAddress> addrList = this.clusterDefaultServices\r
621             .getClusteredControllers();\r
622 \r
623         this.clusterDefaultServices.destroyCache(cache1);\r
624         assertFalse(this.clusterDefaultServices.existCache(cache1));\r
625     }\r
626 \r
627     private class Update {\r
628         Object key;\r
629         Object value;\r
630         String cacheName;\r
631         UpdateType t;\r
632 \r
633         Update (UpdateType t, Object key, Object value, String cacheName) {\r
634             this.t = t;\r
635             this.key = key;\r
636             this.value = value;\r
637             this.cacheName = cacheName;\r
638         }\r
639     }\r
640 \r
641     private class CacheAware implements ICacheUpdateAware {\r
642         private CopyOnWriteArrayList<Update> gotUpdates;\r
643         private CountDownLatch latch = null;\r
644 \r
645         CacheAware() {\r
646             this.gotUpdates = new CopyOnWriteArrayList<Update>();\r
647         }\r
648 \r
649 \r
650         /**\r
651          * Restart the monitor of the updates on the CacheAware object\r
652          *\r
653          * @param expectedOperations Number of expected updates\r
654          *\r
655          * @return a countdown latch which will be used to wait till the updates are done\r
656          */\r
657         CountDownLatch restart(int expectedOperations) {\r
658             this.gotUpdates.clear();\r
659             this.latch = new CountDownLatch(expectedOperations);\r
660             return this.latch;\r
661         }\r
662 \r
663         List<Update> getUpdates() {\r
664             return this.gotUpdates;\r
665         }\r
666 \r
667         @Override\r
668         public void entryCreated(Object key, String cacheName, boolean originLocal) {\r
669             log.debug("CACHE[{}] Got an entry created for key:{}", cacheName, key);\r
670             Update u = new Update(UpdateType.ADDED, key, null, cacheName);\r
671             this.gotUpdates.add(u);\r
672             this.latch.countDown();\r
673         }\r
674 \r
675         @Override\r
676         public void entryUpdated(Object key, Object newValue, String cacheName, boolean originLocal) {\r
677             log.debug("CACHE[{}] Got an entry updated for key:{} newValue:{}", cacheName, key, newValue);\r
678             Update u = new Update(UpdateType.CHANGED, key, newValue, cacheName);\r
679             this.gotUpdates.add(u);\r
680             this.latch.countDown();\r
681         }\r
682 \r
683         @Override\r
684         public void entryDeleted(Object key, String cacheName, boolean originLocal) {\r
685             log.debug("CACHE[{}] Got an entry delete for key:{}", cacheName, key);\r
686             Update u = new Update(UpdateType.REMOVED, key, null, cacheName);\r
687             this.gotUpdates.add(u);\r
688             this.latch.countDown();\r
689         }\r
690     }\r
691 }\r