Container Management and associated Northbound APIs.
[controller.git] / opendaylight / clustering / integrationtest / src / test / java / org / opendaylight / controller / clustering / services_implementation / internal / ClusteringServicesIT.java
1 package org.opendaylight.controller.clustering.services_implementation.internal;\r
2 \r
3 import static org.junit.Assert.assertEquals;\r
4 import static org.junit.Assert.assertFalse;\r
5 import static org.junit.Assert.assertNotNull;\r
6 import static org.junit.Assert.assertNull;\r
7 import static org.junit.Assert.assertTrue;\r
8 import static org.ops4j.pax.exam.CoreOptions.junitBundles;\r
9 import static org.ops4j.pax.exam.CoreOptions.mavenBundle;\r
10 import static org.ops4j.pax.exam.CoreOptions.options;\r
11 import static org.ops4j.pax.exam.CoreOptions.systemPackages;\r
12 import static org.ops4j.pax.exam.CoreOptions.systemProperty;\r
13 \r
14 import java.util.List;\r
15 import java.util.concurrent.CopyOnWriteArrayList;\r
16 import java.util.concurrent.TimeUnit;\r
17 import java.net.InetAddress;\r
18 import java.util.Dictionary;\r
19 import java.util.HashSet;\r
20 import java.util.Hashtable;\r
21 import java.util.Set;\r
22 import java.util.List;\r
23 import java.util.concurrent.ConcurrentMap;\r
24 \r
25 import javax.inject.Inject;\r
26 \r
27 import org.junit.Before;\r
28 import org.junit.Test;\r
29 import org.junit.runner.RunWith;\r
30 import org.opendaylight.controller.clustering.services.CacheConfigException;\r
31 import org.opendaylight.controller.clustering.services.CacheExistException;\r
32 import org.opendaylight.controller.clustering.services.CacheListenerAddException;\r
33 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;\r
34 import org.opendaylight.controller.clustering.services.IClusterServices;\r
35 import org.opendaylight.controller.clustering.services.IClusterContainerServices;\r
36 import org.opendaylight.controller.clustering.services.IClusterServices.cacheMode;\r
37 import org.opendaylight.controller.clustering.services.IGetUpdates;\r
38 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;\r
39 import org.opendaylight.controller.sal.utils.ServiceHelper;\r
40 import org.opendaylight.controller.sal.core.UpdateType;\r
41 import org.ops4j.pax.exam.Option;\r
42 import org.ops4j.pax.exam.junit.Configuration;\r
43 import org.ops4j.pax.exam.junit.PaxExam;\r
44 import org.ops4j.pax.exam.util.PathUtils;\r
45 import org.osgi.framework.Bundle;\r
46 import org.osgi.framework.BundleContext;\r
47 import org.osgi.framework.ServiceReference;\r
48 import org.osgi.framework.ServiceRegistration;\r
49 import org.slf4j.Logger;\r
50 import org.slf4j.LoggerFactory;\r
51 import java.util.concurrent.CountDownLatch;\r
52 \r
53 @RunWith(PaxExam.class)\r
54 public class ClusteringServicesIT {\r
55     private Logger log = LoggerFactory\r
56         .getLogger(ClusteringServicesIT.class);\r
57     // get the OSGI bundle context\r
58     @Inject\r
59     private BundleContext bc;\r
60     private IClusterServices clusterServices = null;\r
61     private IClusterContainerServices clusterDefaultServices = null;\r
62     private IClusterGlobalServices clusterGlobalServices = null;\r
63 \r
64     // Configure the OSGi container\r
65     @Configuration\r
66     public Option[] config() {\r
67         return options(\r
68             //\r
69             systemProperty("logback.configurationFile").value(\r
70                 "file:" + PathUtils.getBaseDir()\r
71                 + "/src/test/resources/logback.xml"),\r
72             // To start OSGi console for inspection remotely\r
73             systemProperty("osgi.console").value("2401"),\r
74             // Set the systemPackages (used by clustering)\r
75             systemPackages("sun.reflect", "sun.reflect.misc", "sun.misc"),\r
76             // List framework bundles\r
77             mavenBundle("equinoxSDK381",\r
78                         "org.eclipse.equinox.console").versionAsInProject(),\r
79             mavenBundle("equinoxSDK381",\r
80                         "org.eclipse.equinox.util").versionAsInProject(),\r
81             mavenBundle("equinoxSDK381",\r
82                         "org.eclipse.osgi.services").versionAsInProject(),\r
83             mavenBundle("equinoxSDK381",\r
84                         "org.eclipse.equinox.ds").versionAsInProject(),\r
85             mavenBundle("equinoxSDK381",\r
86                         "org.apache.felix.gogo.command").versionAsInProject(),\r
87             mavenBundle("equinoxSDK381",\r
88                         "org.apache.felix.gogo.runtime").versionAsInProject(),\r
89             mavenBundle("equinoxSDK381",\r
90                         "org.apache.felix.gogo.shell").versionAsInProject(),\r
91             // List logger bundles\r
92             mavenBundle("org.slf4j", "slf4j-api").versionAsInProject(),\r
93             mavenBundle("org.slf4j", "log4j-over-slf4j").versionAsInProject(),\r
94             mavenBundle("ch.qos.logback", "logback-core").versionAsInProject(),\r
95             mavenBundle("ch.qos.logback", "logback-classic").versionAsInProject(),\r
96             // List all the bundles on which the test case depends\r
97             mavenBundle("org.opendaylight.controller",\r
98                         "clustering.services").versionAsInProject(),\r
99             mavenBundle("org.opendaylight.controller",\r
100                         "clustering.services-implementation").versionAsInProject(),\r
101             mavenBundle("org.opendaylight.controller", "sal").versionAsInProject(),\r
102             mavenBundle("org.opendaylight.controller",\r
103                         "sal.implementation").versionAsInProject(),\r
104             mavenBundle("org.opendaylight.controller", "containermanager").versionAsInProject(),\r
105             mavenBundle("org.opendaylight.controller",\r
106                         "containermanager.it.implementation").versionAsInProject(),\r
107             mavenBundle("org.jboss.spec.javax.transaction",\r
108                         "jboss-transaction-api_1.1_spec").versionAsInProject(),\r
109             mavenBundle("org.apache.commons", "commons-lang3").versionAsInProject(),\r
110             mavenBundle("org.apache.felix",\r
111                         "org.apache.felix.dependencymanager").versionAsInProject(),\r
112             mavenBundle("org.apache.felix",\r
113                         "org.apache.felix.dependencymanager.shell").versionAsInProject(),\r
114             mavenBundle("eclipselink", "javax.resource").versionAsInProject(),\r
115             junitBundles());\r
116     }\r
117 \r
118     private String stateToString(int state) {\r
119         switch (state) {\r
120         case Bundle.ACTIVE:\r
121             return "ACTIVE";\r
122         case Bundle.INSTALLED:\r
123             return "INSTALLED";\r
124         case Bundle.RESOLVED:\r
125             return "RESOLVED";\r
126         case Bundle.UNINSTALLED:\r
127             return "UNINSTALLED";\r
128         default:\r
129             return "Not CONVERTED";\r
130         }\r
131     }\r
132 \r
133     @Before\r
134     public void areWeReady() {\r
135         assertNotNull(bc);\r
136         boolean debugit = false;\r
137         Bundle b[] = bc.getBundles();\r
138         for (int i = 0; i < b.length; i++) {\r
139             int state = b[i].getState();\r
140             if (state != Bundle.ACTIVE && state != Bundle.RESOLVED) {\r
141                 log.debug("Bundle:" + b[i].getSymbolicName() + " state:"\r
142                           + stateToString(state));\r
143                 debugit = true;\r
144             }\r
145         }\r
146         if (debugit) {\r
147             log.debug("Do some debugging because some bundle is "\r
148                       + "unresolved");\r
149         }\r
150 \r
151         // Assert if true, if false we are good to go!\r
152         assertFalse(debugit);\r
153 \r
154         this.clusterServices = (IClusterServices)ServiceHelper\r
155             .getGlobalInstance(IClusterServices.class, this);\r
156         assertNotNull(this.clusterServices);\r
157 \r
158         this.clusterDefaultServices = (IClusterContainerServices)ServiceHelper\r
159             .getInstance(IClusterContainerServices.class, "default", this);\r
160         assertNotNull(this.clusterDefaultServices);\r
161 \r
162         this.clusterGlobalServices = (IClusterGlobalServices)ServiceHelper\r
163             .getGlobalInstance(IClusterGlobalServices.class, this);\r
164         assertNotNull(this.clusterGlobalServices);\r
165     }\r
166 \r
167     @Test\r
168     public void clusterTest() throws CacheExistException, CacheConfigException,\r
169         CacheListenerAddException {\r
170 \r
171         String container1 = "Container1";\r
172         String container2 = "Container2";\r
173         String cache1 = "Cache1";\r
174         String cache2 = "Cache2";\r
175         String cache3 = "Cache3";\r
176 \r
177         HashSet<cacheMode> cacheModeSet = new HashSet<cacheMode>();\r
178         cacheModeSet.add(cacheMode.NON_TRANSACTIONAL);\r
179         ConcurrentMap cm11 = this.clusterServices.createCache(container1,\r
180                 cache1, cacheModeSet);\r
181         assertNotNull(cm11);\r
182 \r
183         assertNull(this.clusterServices.getCache(container2, cache2));\r
184         assertEquals(cm11, this.clusterServices.getCache(container1, cache1));\r
185 \r
186         assertFalse(this.clusterServices.existCache(container2, cache2));\r
187         assertTrue(this.clusterServices.existCache(container1, cache1));\r
188 \r
189         ConcurrentMap cm12 = this.clusterServices.createCache(container1,\r
190                 cache2, cacheModeSet);\r
191         ConcurrentMap cm23 = this.clusterServices.createCache(container2,\r
192                 cache3, cacheModeSet);\r
193 \r
194         HashSet<String> cacheList = (HashSet<String>) this.clusterServices\r
195                 .getCacheList(container1);\r
196         assertEquals(2, cacheList.size());\r
197         assertTrue(cacheList.contains(cache1));\r
198         assertTrue(cacheList.contains(cache2));\r
199         assertFalse(cacheList.contains(cache3));\r
200 \r
201         assertNotNull(this.clusterServices.getCacheProperties(container1,\r
202                 cache1));\r
203 \r
204         HashSet<IGetUpdates<?, ?>> listeners = (HashSet<IGetUpdates<?, ?>>) this.clusterServices\r
205                 .getListeners(container1, cache1);\r
206         assertEquals(0, listeners.size());\r
207 \r
208         IGetUpdates<?, ?> getUpdate1 = new GetUpdates();\r
209         this.clusterServices.addListener(container1, cache1, getUpdate1);\r
210         listeners = (HashSet<IGetUpdates<?, ?>>) this.clusterServices\r
211                 .getListeners(container1, cache1);\r
212         assertEquals(1, listeners.size());\r
213         this.clusterServices.addListener(container1, cache1, new GetUpdates());\r
214         listeners = (HashSet<IGetUpdates<?, ?>>) this.clusterServices\r
215                 .getListeners(container1, cache1);\r
216         assertEquals(2, listeners.size());\r
217 \r
218         listeners = (HashSet<IGetUpdates<?, ?>>) this.clusterServices\r
219                 .getListeners(container2, cache3);\r
220         assertEquals(0, listeners.size());\r
221 \r
222         this.clusterServices.removeListener(container1, cache1, getUpdate1);\r
223         listeners = (HashSet<IGetUpdates<?, ?>>) this.clusterServices\r
224                 .getListeners(container1, cache1);\r
225         assertEquals(1, listeners.size());\r
226 \r
227         InetAddress addr = this.clusterServices.getMyAddress();\r
228         assertNotNull(addr);\r
229 \r
230         List<InetAddress> addrList = this.clusterServices\r
231                 .getClusteredControllers();\r
232 \r
233         this.clusterServices.destroyCache(container1, cache1);\r
234         assertFalse(this.clusterServices.existCache(container1, cache1));\r
235 \r
236     }\r
237 \r
238     private class GetUpdates implements IGetUpdates<Integer, String> {\r
239 \r
240         @Override\r
241         public void entryCreated(Integer key, String containerName,\r
242                 String cacheName, boolean originLocal) {\r
243             return;\r
244         }\r
245 \r
246         @Override\r
247         public void entryUpdated(Integer key, String newValue,\r
248                 String containerName, String cacheName, boolean originLocal) {\r
249             return;\r
250         }\r
251 \r
252         @Override\r
253         public void entryDeleted(Integer key, String containerName,\r
254                 String cacheName, boolean originLocal) {\r
255             return;\r
256         }\r
257     }\r
258 \r
259     @Test\r
260     public void clusterContainerAndGlobalTest() throws CacheExistException, CacheConfigException,\r
261         CacheListenerAddException, InterruptedException {\r
262         String cache1 = "Cache1";\r
263         String cache2 = "Cache2";\r
264         // Lets test the case of caches with same name in different\r
265         // containers (actually global an container case)\r
266         String cache3 = "Cache2";\r
267 \r
268         HashSet<cacheMode> cacheModeSet = new HashSet<cacheMode>();\r
269         cacheModeSet.add(cacheMode.NON_TRANSACTIONAL);\r
270         ConcurrentMap cm11 = this.clusterDefaultServices.createCache(cache1, cacheModeSet);\r
271         assertNotNull(cm11);\r
272 \r
273         assertTrue(this.clusterDefaultServices.existCache(cache1));\r
274         assertEquals(cm11, this.clusterDefaultServices.getCache(cache1));\r
275 \r
276         ConcurrentMap cm12 = this.clusterDefaultServices.createCache(cache2, cacheModeSet);\r
277         ConcurrentMap cm23 = this.clusterGlobalServices.createCache(cache3, cacheModeSet);\r
278 \r
279         // Now given cahe2 and cache3 have same name lets make sure\r
280         // they don't return the same reference\r
281         assertNotNull(this.clusterGlobalServices.getCache(cache2));\r
282         // cm12 reference must be different than cm23\r
283         assertTrue(cm12 != cm23);\r
284 \r
285         HashSet<String> cacheList = (HashSet<String>) this.clusterDefaultServices\r
286             .getCacheList();\r
287         assertEquals(2, cacheList.size());\r
288         assertTrue(cacheList.contains(cache1));\r
289         assertTrue(cacheList.contains(cache2));\r
290 \r
291         assertNotNull(this.clusterDefaultServices.getCacheProperties(cache1));\r
292 \r
293         {\r
294             /***********************************/\r
295             /* Testing cacheAware in Container */\r
296             /***********************************/\r
297             Dictionary<String, Object> props = new Hashtable<String, Object>();\r
298             Set<String> propSet = new HashSet<String>();\r
299             propSet.add(cache1);\r
300             propSet.add(cache2);\r
301             props.put("cachenames", propSet);\r
302             CacheAware listener = new CacheAware();\r
303             CacheAware listenerRepeated = new CacheAware();\r
304             ServiceRegistration updateServiceReg = ServiceHelper.registerServiceWReg(ICacheUpdateAware.class, "default",\r
305                                                                                      listener, props);\r
306             assertNotNull(updateServiceReg);\r
307 \r
308             // Register another service for the same caches, this\r
309             // should not get any update because we don't allow to\r
310             // override the existing unless before unregistered\r
311             ServiceRegistration updateServiceRegRepeated = ServiceHelper.registerServiceWReg(ICacheUpdateAware.class,\r
312                                                                                              "default",\r
313                                                                                              listenerRepeated, props);\r
314             assertNotNull(updateServiceRegRepeated);\r
315             CountDownLatch res = null;\r
316             List<Update> ups = null;\r
317             Update up = null;\r
318             Integer k1 = new Integer(10);\r
319             Long k2 = new Long(100L);\r
320 \r
321             /***********************/\r
322             /* CREATE NEW KEY CASE */\r
323             /***********************/\r
324             // Start monitoring the updates\r
325             res = listener.restart(2);\r
326             // modify the cache\r
327             cm11.put(k1, "foo");\r
328             // Wait\r
329             res.await(100L, TimeUnit.SECONDS);\r
330             // Analyze the updates\r
331             ups = listener.getUpdates();\r
332             assertTrue(ups.size() == 2);\r
333             // Validate that first we get an update (yes even in case of a\r
334             // new value added)\r
335             up = ups.get(0);\r
336             assertTrue(up.t.equals(UpdateType.CHANGED));\r
337             assertTrue(up.key.equals(k1));\r
338             assertTrue(up.value.equals("foo"));\r
339             assertTrue(up.cacheName.equals(cache1));\r
340             // Validate that we then get a create\r
341             up = ups.get(1);\r
342             assertTrue(up.t.equals(UpdateType.ADDED));\r
343             assertTrue(up.key.equals(k1));\r
344             assertNull(up.value);\r
345             assertTrue(up.cacheName.equals(cache1));\r
346 \r
347             /*******************************/\r
348             /* UPDATE AN EXISTING KEY CASE */\r
349             /*******************************/\r
350             // Start monitoring the updates\r
351             res = listener.restart(1);\r
352             // modify the cache\r
353             cm11.put(k1, "baz");\r
354             // Wait\r
355             res.await(100L, TimeUnit.SECONDS);\r
356             // Analyze the updates\r
357             ups = listener.getUpdates();\r
358             assertTrue(ups.size() == 1);\r
359             // Validate we get an update with expect fields\r
360             up = ups.get(0);\r
361             assertTrue(up.t.equals(UpdateType.CHANGED));\r
362             assertTrue(up.key.equals(k1));\r
363             assertTrue(up.value.equals("baz"));\r
364             assertTrue(up.cacheName.equals(cache1));\r
365 \r
366             /**********************************/\r
367             /* RE-UPDATE AN EXISTING KEY CASE */\r
368             /**********************************/\r
369             // Start monitoring the updates\r
370             res = listener.restart(1);\r
371             // modify the cache\r
372             cm11.put(k1, "baz");\r
373             // Wait\r
374             res.await(100L, TimeUnit.SECONDS);\r
375             // Analyze the updates\r
376             ups = listener.getUpdates();\r
377             assertTrue(ups.size() == 1);\r
378             // Validate we get an update with expect fields\r
379             up = ups.get(0);\r
380             assertTrue(up.t.equals(UpdateType.CHANGED));\r
381             assertTrue(up.key.equals(k1));\r
382             assertTrue(up.value.equals("baz"));\r
383             assertTrue(up.cacheName.equals(cache1));\r
384 \r
385             /********************************/\r
386             /* REMOVAL OF EXISTING KEY CASE */\r
387             /********************************/\r
388             // Start monitoring the updates\r
389             res = listener.restart(1);\r
390             // modify the cache\r
391             cm11.remove(k1);\r
392             // Wait\r
393             res.await(100L, TimeUnit.SECONDS);\r
394             // Analyze the updates\r
395             ups = listener.getUpdates();\r
396             assertTrue(ups.size() == 1);\r
397             // Validate we get a delete with expected fields\r
398             up = ups.get(0);\r
399             assertTrue(up.t.equals(UpdateType.REMOVED));\r
400             assertTrue(up.key.equals(k1));\r
401             assertNull(up.value);\r
402             assertTrue(up.cacheName.equals(cache1));\r
403 \r
404             /***********************/\r
405             /* CREATE NEW KEY CASE */\r
406             /***********************/\r
407             // Start monitoring the updates\r
408             res = listener.restart(2);\r
409             // modify the cache\r
410             cm12.put(k2, new Short((short)15));\r
411             // Wait\r
412             res.await(100L, TimeUnit.SECONDS);\r
413             // Analyze the updates\r
414             ups = listener.getUpdates();\r
415             assertTrue(ups.size() == 2);\r
416             // Validate that first we get an update (yes even in case of a\r
417             // new value added)\r
418             up = ups.get(0);\r
419             assertTrue(up.t.equals(UpdateType.CHANGED));\r
420             assertTrue(up.key.equals(k2));\r
421             assertTrue(up.value.equals(new Short((short)15)));\r
422             assertTrue(up.cacheName.equals(cache2));\r
423             // Validate that we then get a create\r
424             up = ups.get(1);\r
425             assertTrue(up.t.equals(UpdateType.ADDED));\r
426             assertTrue(up.key.equals(k2));\r
427             assertNull(up.value);\r
428             assertTrue(up.cacheName.equals(cache2));\r
429 \r
430             /*******************************/\r
431             /* UPDATE AN EXISTING KEY CASE */\r
432             /*******************************/\r
433             // Start monitoring the updates\r
434             res = listener.restart(1);\r
435             // modify the cache\r
436             cm12.put(k2, "BAZ");\r
437             // Wait\r
438             res.await(100L, TimeUnit.SECONDS);\r
439             // Analyze the updates\r
440             ups = listener.getUpdates();\r
441             assertTrue(ups.size() == 1);\r
442             // Validate we get an update with expect fields\r
443             up = ups.get(0);\r
444             assertTrue(up.t.equals(UpdateType.CHANGED));\r
445             assertTrue(up.key.equals(k2));\r
446             assertTrue(up.value.equals("BAZ"));\r
447             assertTrue(up.cacheName.equals(cache2));\r
448 \r
449             /********************************/\r
450             /* REMOVAL OF EXISTING KEY CASE */\r
451             /********************************/\r
452             // Start monitoring the updates\r
453             res = listener.restart(1);\r
454             // modify the cache\r
455             cm12.remove(k2);\r
456             // Wait\r
457             res.await(100L, TimeUnit.SECONDS);\r
458             // Analyze the updates\r
459             ups = listener.getUpdates();\r
460             assertTrue(ups.size() == 1);\r
461             // Validate we get a delete with expected fields\r
462             up = ups.get(0);\r
463             assertTrue(up.t.equals(UpdateType.REMOVED));\r
464             assertTrue(up.key.equals(k2));\r
465             assertNull(up.value);\r
466             assertTrue(up.cacheName.equals(cache2));\r
467 \r
468             /******************************************************************/\r
469             /* NOW LETS REMOVE THE REGISTRATION AND MAKE SURE NO UPDATS COMES */\r
470             /******************************************************************/\r
471             updateServiceReg.unregister();\r
472             // Start monitoring the updates, noone should come in\r
473             res = listener.restart(1);\r
474 \r
475             /***********************/\r
476             /* CREATE NEW KEY CASE */\r
477             /***********************/\r
478             // modify the cache\r
479             cm11.put(k1, "foo");\r
480 \r
481             /*******************************/\r
482             /* UPDATE AN EXISTING KEY CASE */\r
483             /*******************************/\r
484             // modify the cache\r
485             cm11.put(k1, "baz");\r
486 \r
487             /********************************/\r
488             /* REMOVAL OF EXISTING KEY CASE */\r
489             /********************************/\r
490             // modify the cache\r
491             cm11.remove(k1);\r
492 \r
493             /***********************/\r
494             /* CREATE NEW KEY CASE */\r
495             /***********************/\r
496             // modify the cache\r
497             cm12.put(k2, new Short((short)15));\r
498 \r
499             /*******************************/\r
500             /* UPDATE AN EXISTING KEY CASE */\r
501             /*******************************/\r
502             // modify the cache\r
503             cm12.put(k2, "BAZ");\r
504 \r
505             /********************************/\r
506             /* REMOVAL OF EXISTING KEY CASE */\r
507             /********************************/\r
508             // modify the cache\r
509             cm12.remove(k2);\r
510 \r
511 \r
512             // Wait to make sure no updates came in, clearly this is\r
513             // error prone as logic, but cannot find a better way than\r
514             // this to make sure updates didn't get in\r
515             res.await(1L, TimeUnit.SECONDS);\r
516             // Analyze the updates\r
517             ups = listener.getUpdates();\r
518             assertTrue(ups.size() == 0);\r
519         }\r
520 \r
521         {\r
522             /***********************************/\r
523             /* Testing cacheAware in Global */\r
524             /***********************************/\r
525             Dictionary<String, Object> props = new Hashtable<String, Object>();\r
526             Set<String> propSet = new HashSet<String>();\r
527             propSet.add(cache3);\r
528             props.put("cachenames", propSet);\r
529             CacheAware listener = new CacheAware();\r
530             ServiceRegistration updateServiceReg = ServiceHelper.registerGlobalServiceWReg(ICacheUpdateAware.class,\r
531                                                                                            listener, props);\r
532             assertNotNull(updateServiceReg);\r
533 \r
534             CountDownLatch res = null;\r
535             List<Update> ups = null;\r
536             Update up = null;\r
537             Integer k1 = new Integer(10);\r
538 \r
539             /***********************/\r
540             /* CREATE NEW KEY CASE */\r
541             /***********************/\r
542             // Start monitoring the updates\r
543             res = listener.restart(2);\r
544             // modify the cache\r
545             cm23.put(k1, "foo");\r
546             // Wait\r
547             res.await(100L, TimeUnit.SECONDS);\r
548             // Analyze the updates\r
549             ups = listener.getUpdates();\r
550             assertTrue(ups.size() == 2);\r
551             // Validate that first we get an update (yes even in case of a\r
552             // new value added)\r
553             up = ups.get(0);\r
554             assertTrue(up.t.equals(UpdateType.CHANGED));\r
555             assertTrue(up.key.equals(k1));\r
556             assertTrue(up.value.equals("foo"));\r
557             assertTrue(up.cacheName.equals(cache3));\r
558             // Validate that we then get a create\r
559             up = ups.get(1);\r
560             assertTrue(up.t.equals(UpdateType.ADDED));\r
561             assertTrue(up.key.equals(k1));\r
562             assertNull(up.value);\r
563             assertTrue(up.cacheName.equals(cache3));\r
564 \r
565             /*******************************/\r
566             /* UPDATE AN EXISTING KEY CASE */\r
567             /*******************************/\r
568             // Start monitoring the updates\r
569             res = listener.restart(1);\r
570             // modify the cache\r
571             cm23.put(k1, "baz");\r
572             // Wait\r
573             res.await(100L, TimeUnit.SECONDS);\r
574             // Analyze the updates\r
575             ups = listener.getUpdates();\r
576             assertTrue(ups.size() == 1);\r
577             // Validate we get an update with expect fields\r
578             up = ups.get(0);\r
579             assertTrue(up.t.equals(UpdateType.CHANGED));\r
580             assertTrue(up.key.equals(k1));\r
581             assertTrue(up.value.equals("baz"));\r
582             assertTrue(up.cacheName.equals(cache3));\r
583 \r
584             /********************************/\r
585             /* REMOVAL OF EXISTING KEY CASE */\r
586             /********************************/\r
587             // Start monitoring the updates\r
588             res = listener.restart(1);\r
589             // modify the cache\r
590             cm23.remove(k1);\r
591             // Wait\r
592             res.await(100L, TimeUnit.SECONDS);\r
593             // Analyze the updates\r
594             ups = listener.getUpdates();\r
595             assertTrue(ups.size() == 1);\r
596             // Validate we get a delete with expected fields\r
597             up = ups.get(0);\r
598             assertTrue(up.t.equals(UpdateType.REMOVED));\r
599             assertTrue(up.key.equals(k1));\r
600             assertNull(up.value);\r
601             assertTrue(up.cacheName.equals(cache3));\r
602 \r
603             /******************************************************************/\r
604             /* NOW LETS REMOVE THE REGISTRATION AND MAKE SURE NO UPDATS COMES */\r
605             /******************************************************************/\r
606             updateServiceReg.unregister();\r
607             // Start monitoring the updates, noone should come in\r
608             res = listener.restart(1);\r
609 \r
610             /***********************/\r
611             /* CREATE NEW KEY CASE */\r
612             /***********************/\r
613             // modify the cache\r
614             cm23.put(k1, "foo");\r
615 \r
616             /*******************************/\r
617             /* UPDATE AN EXISTING KEY CASE */\r
618             /*******************************/\r
619             // modify the cache\r
620             cm23.put(k1, "baz");\r
621 \r
622             /********************************/\r
623             /* REMOVAL OF EXISTING KEY CASE */\r
624             /********************************/\r
625             // modify the cache\r
626             cm23.remove(k1);\r
627 \r
628             // Wait to make sure no updates came in, clearly this is\r
629             // error prone as logic, but cannot find a better way than\r
630             // this to make sure updates didn't get in\r
631             res.await(1L, TimeUnit.SECONDS);\r
632             // Analyze the updates\r
633             ups = listener.getUpdates();\r
634             assertTrue(ups.size() == 0);\r
635         }\r
636 \r
637         InetAddress addr = this.clusterDefaultServices.getMyAddress();\r
638         assertNotNull(addr);\r
639 \r
640         List<InetAddress> addrList = this.clusterDefaultServices\r
641             .getClusteredControllers();\r
642 \r
643         this.clusterDefaultServices.destroyCache(cache1);\r
644         assertFalse(this.clusterDefaultServices.existCache(cache1));\r
645     }\r
646 \r
647     private class Update {\r
648         Object key;\r
649         Object value;\r
650         String cacheName;\r
651         UpdateType t;\r
652 \r
653         Update (UpdateType t, Object key, Object value, String cacheName) {\r
654             this.t = t;\r
655             this.key = key;\r
656             this.value = value;\r
657             this.cacheName = cacheName;\r
658         }\r
659     }\r
660 \r
661     private class CacheAware implements ICacheUpdateAware {\r
662         private CopyOnWriteArrayList<Update> gotUpdates;\r
663         private CountDownLatch latch = null;\r
664 \r
665         CacheAware() {\r
666             this.gotUpdates = new CopyOnWriteArrayList<Update>();\r
667         }\r
668 \r
669 \r
670         /**\r
671          * Restart the monitor of the updates on the CacheAware object\r
672          *\r
673          * @param expectedOperations Number of expected updates\r
674          *\r
675          * @return a countdown latch which will be used to wait till the updates are done\r
676          */\r
677         CountDownLatch restart(int expectedOperations) {\r
678             this.gotUpdates.clear();\r
679             this.latch = new CountDownLatch(expectedOperations);\r
680             return this.latch;\r
681         }\r
682 \r
683         List<Update> getUpdates() {\r
684             return this.gotUpdates;\r
685         }\r
686 \r
687         @Override\r
688         public void entryCreated(Object key, String cacheName, boolean originLocal) {\r
689             log.debug("CACHE[{}] Got an entry created for key:{}", cacheName, key);\r
690             Update u = new Update(UpdateType.ADDED, key, null, cacheName);\r
691             this.gotUpdates.add(u);\r
692             this.latch.countDown();\r
693         }\r
694 \r
695         @Override\r
696         public void entryUpdated(Object key, Object newValue, String cacheName, boolean originLocal) {\r
697             log.debug("CACHE[{}] Got an entry updated for key:{} newValue:{}", cacheName, key, newValue);\r
698             Update u = new Update(UpdateType.CHANGED, key, newValue, cacheName);\r
699             this.gotUpdates.add(u);\r
700             this.latch.countDown();\r
701         }\r
702 \r
703         @Override\r
704         public void entryDeleted(Object key, String cacheName, boolean originLocal) {\r
705             log.debug("CACHE[{}] Got an entry delete for key:{}", cacheName, key);\r
706             Update u = new Update(UpdateType.REMOVED, key, null, cacheName);\r
707             this.gotUpdates.add(u);\r
708             this.latch.countDown();\r
709         }\r
710     }\r
711 }\r