2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.clustering.services_implementation.internal;
\r
10 import static org.junit.Assert.assertEquals;
\r
11 import static org.junit.Assert.assertFalse;
\r
12 import static org.junit.Assert.assertNotNull;
\r
13 import static org.junit.Assert.assertNull;
\r
14 import static org.junit.Assert.assertTrue;
\r
15 import static org.ops4j.pax.exam.CoreOptions.junitBundles;
\r
16 import static org.ops4j.pax.exam.CoreOptions.mavenBundle;
\r
17 import static org.ops4j.pax.exam.CoreOptions.options;
\r
18 import static org.ops4j.pax.exam.CoreOptions.systemPackages;
\r
19 import static org.ops4j.pax.exam.CoreOptions.systemProperty;
\r
21 import java.util.List;
\r
22 import java.util.concurrent.CopyOnWriteArrayList;
\r
23 import java.util.concurrent.TimeUnit;
\r
24 import java.net.InetAddress;
\r
25 import java.util.Dictionary;
\r
26 import java.util.HashSet;
\r
27 import java.util.Hashtable;
\r
28 import java.util.Set;
\r
29 import java.util.concurrent.ConcurrentMap;
\r
31 import javax.inject.Inject;
\r
33 import org.junit.Before;
\r
34 import org.junit.Test;
\r
35 import org.junit.runner.RunWith;
\r
36 import org.opendaylight.controller.clustering.services.CacheConfigException;
\r
37 import org.opendaylight.controller.clustering.services.CacheExistException;
\r
38 import org.opendaylight.controller.clustering.services.CacheListenerAddException;
\r
39 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
\r
40 import org.opendaylight.controller.clustering.services.IClusterServices;
\r
41 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
\r
42 import org.opendaylight.controller.clustering.services.IClusterServices.cacheMode;
\r
43 import org.opendaylight.controller.clustering.services.IGetUpdates;
\r
44 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
\r
45 import org.opendaylight.controller.sal.utils.ServiceHelper;
\r
46 import org.opendaylight.controller.sal.core.UpdateType;
\r
47 import org.ops4j.pax.exam.Option;
\r
48 import org.ops4j.pax.exam.Configuration;
\r
49 import org.ops4j.pax.exam.junit.PaxExam;
\r
50 import org.ops4j.pax.exam.util.PathUtils;
\r
51 import org.osgi.framework.Bundle;
\r
52 import org.osgi.framework.BundleContext;
\r
53 import org.osgi.framework.ServiceRegistration;
\r
54 import org.slf4j.Logger;
\r
55 import org.slf4j.LoggerFactory;
\r
56 import java.util.concurrent.CountDownLatch;
\r
58 @RunWith(PaxExam.class)
\r
59 public class ClusteringServicesIT {
\r
60 private Logger log = LoggerFactory
\r
61 .getLogger(ClusteringServicesIT.class);
\r
62 // get the OSGI bundle context
\r
64 private BundleContext bc;
\r
65 private IClusterServices clusterServices = null;
\r
66 private IClusterContainerServices clusterDefaultServices = null;
\r
67 private IClusterGlobalServices clusterGlobalServices = null;
\r
69 // Configure the OSGi container
\r
71 public Option[] config() {
\r
74 systemProperty("logback.configurationFile").value(
\r
75 "file:" + PathUtils.getBaseDir()
\r
76 + "/src/test/resources/logback.xml"),
\r
77 // To start OSGi console for inspection remotely
\r
78 systemProperty("osgi.console").value("2401"),
\r
79 // Set the systemPackages (used by clustering)
\r
80 systemPackages("sun.reflect", "sun.reflect.misc", "sun.misc"),
\r
81 // List framework bundles
\r
82 mavenBundle("equinoxSDK381",
\r
83 "org.eclipse.equinox.console").versionAsInProject(),
\r
84 mavenBundle("equinoxSDK381",
\r
85 "org.eclipse.equinox.util").versionAsInProject(),
\r
86 mavenBundle("equinoxSDK381",
\r
87 "org.eclipse.osgi.services").versionAsInProject(),
\r
88 mavenBundle("equinoxSDK381",
\r
89 "org.eclipse.equinox.ds").versionAsInProject(),
\r
90 mavenBundle("equinoxSDK381",
\r
91 "org.apache.felix.gogo.command").versionAsInProject(),
\r
92 mavenBundle("equinoxSDK381",
\r
93 "org.apache.felix.gogo.runtime").versionAsInProject(),
\r
94 mavenBundle("equinoxSDK381",
\r
95 "org.apache.felix.gogo.shell").versionAsInProject(),
\r
96 // List logger bundles
\r
97 mavenBundle("org.slf4j", "slf4j-api").versionAsInProject(),
\r
98 mavenBundle("org.slf4j", "log4j-over-slf4j").versionAsInProject(),
\r
99 mavenBundle("ch.qos.logback", "logback-core").versionAsInProject(),
\r
100 mavenBundle("ch.qos.logback", "logback-classic").versionAsInProject(),
\r
101 // List all the bundles on which the test case depends
\r
102 mavenBundle("org.opendaylight.controller",
\r
103 "clustering.services").versionAsInProject(),
\r
104 mavenBundle("org.opendaylight.controller",
\r
105 "clustering.services-implementation").versionAsInProject(),
\r
106 mavenBundle("org.opendaylight.controller", "sal").versionAsInProject(),
\r
107 mavenBundle("org.opendaylight.controller",
\r
108 "sal.implementation").versionAsInProject(),
\r
109 mavenBundle("org.opendaylight.controller", "configuration").versionAsInProject(),
\r
110 mavenBundle("org.opendaylight.controller", "containermanager").versionAsInProject(),
\r
111 mavenBundle("org.opendaylight.controller",
\r
112 "containermanager.it.implementation").versionAsInProject(),
\r
113 mavenBundle("org.jboss.spec.javax.transaction",
\r
114 "jboss-transaction-api_1.1_spec").versionAsInProject(),
\r
115 mavenBundle("org.apache.commons", "commons-lang3").versionAsInProject(),
\r
116 mavenBundle("org.apache.felix",
\r
117 "org.apache.felix.dependencymanager").versionAsInProject(),
\r
118 mavenBundle("org.apache.felix",
\r
119 "org.apache.felix.dependencymanager.shell").versionAsInProject(),
\r
120 mavenBundle("eclipselink", "javax.resource").versionAsInProject(),
\r
124 private String stateToString(int state) {
\r
126 case Bundle.ACTIVE:
\r
128 case Bundle.INSTALLED:
\r
129 return "INSTALLED";
\r
130 case Bundle.RESOLVED:
\r
132 case Bundle.UNINSTALLED:
\r
133 return "UNINSTALLED";
\r
135 return "Not CONVERTED";
\r
140 public void areWeReady() {
\r
142 boolean debugit = false;
\r
143 Bundle b[] = bc.getBundles();
\r
144 for (Bundle element : b) {
\r
145 int state = element.getState();
\r
146 if (state != Bundle.ACTIVE && state != Bundle.RESOLVED) {
\r
147 log.debug("Bundle:" + element.getSymbolicName() + " state:"
\r
148 + stateToString(state));
\r
153 log.debug("Do some debugging because some bundle is "
\r
157 // Assert if true, if false we are good to go!
\r
158 assertFalse(debugit);
\r
160 this.clusterServices = (IClusterServices)ServiceHelper
\r
161 .getGlobalInstance(IClusterServices.class, this);
\r
162 assertNotNull(this.clusterServices);
\r
164 this.clusterDefaultServices = (IClusterContainerServices)ServiceHelper
\r
165 .getInstance(IClusterContainerServices.class, "default", this);
\r
166 assertNotNull(this.clusterDefaultServices);
\r
168 this.clusterGlobalServices = (IClusterGlobalServices)ServiceHelper
\r
169 .getGlobalInstance(IClusterGlobalServices.class, this);
\r
170 assertNotNull(this.clusterGlobalServices);
\r
174 public void clusterTest() throws CacheExistException, CacheConfigException,
\r
175 CacheListenerAddException {
\r
177 String container1 = "Container1";
\r
178 String container2 = "Container2";
\r
179 String cache1 = "Cache1";
\r
180 String cache2 = "Cache2";
\r
181 String cache3 = "Cache3";
\r
183 HashSet<cacheMode> cacheModeSet = new HashSet<cacheMode>();
\r
184 cacheModeSet.add(cacheMode.NON_TRANSACTIONAL);
\r
185 ConcurrentMap cm11 = this.clusterServices.createCache(container1,
\r
186 cache1, cacheModeSet);
\r
187 assertNotNull(cm11);
\r
189 assertNull(this.clusterServices.getCache(container2, cache2));
\r
190 assertEquals(cm11, this.clusterServices.getCache(container1, cache1));
\r
192 assertFalse(this.clusterServices.existCache(container2, cache2));
\r
193 assertTrue(this.clusterServices.existCache(container1, cache1));
\r
195 ConcurrentMap cm12 = this.clusterServices.createCache(container1,
\r
196 cache2, cacheModeSet);
\r
197 ConcurrentMap cm23 = this.clusterServices.createCache(container2,
\r
198 cache3, cacheModeSet);
\r
200 HashSet<String> cacheList = (HashSet<String>) this.clusterServices
\r
201 .getCacheList(container1);
\r
202 assertEquals(2, cacheList.size());
\r
203 assertTrue(cacheList.contains(cache1));
\r
204 assertTrue(cacheList.contains(cache2));
\r
205 assertFalse(cacheList.contains(cache3));
\r
207 assertNotNull(this.clusterServices.getCacheProperties(container1,
\r
210 HashSet<IGetUpdates<?, ?>> listeners = (HashSet<IGetUpdates<?, ?>>) this.clusterServices
\r
211 .getListeners(container1, cache1);
\r
212 assertEquals(0, listeners.size());
\r
214 IGetUpdates<?, ?> getUpdate1 = new GetUpdates();
\r
215 this.clusterServices.addListener(container1, cache1, getUpdate1);
\r
216 listeners = (HashSet<IGetUpdates<?, ?>>) this.clusterServices
\r
217 .getListeners(container1, cache1);
\r
218 assertEquals(1, listeners.size());
\r
219 this.clusterServices.addListener(container1, cache1, new GetUpdates());
\r
220 listeners = (HashSet<IGetUpdates<?, ?>>) this.clusterServices
\r
221 .getListeners(container1, cache1);
\r
222 assertEquals(2, listeners.size());
\r
224 listeners = (HashSet<IGetUpdates<?, ?>>) this.clusterServices
\r
225 .getListeners(container2, cache3);
\r
226 assertEquals(0, listeners.size());
\r
228 this.clusterServices.removeListener(container1, cache1, getUpdate1);
\r
229 listeners = (HashSet<IGetUpdates<?, ?>>) this.clusterServices
\r
230 .getListeners(container1, cache1);
\r
231 assertEquals(1, listeners.size());
\r
233 InetAddress addr = this.clusterServices.getMyAddress();
\r
234 assertNotNull(addr);
\r
236 List<InetAddress> addrList = this.clusterServices
\r
237 .getClusteredControllers();
\r
239 this.clusterServices.destroyCache(container1, cache1);
\r
240 assertFalse(this.clusterServices.existCache(container1, cache1));
\r
244 private class GetUpdates implements IGetUpdates<Integer, String> {
\r
247 public void entryCreated(Integer key, String containerName,
\r
248 String cacheName, boolean originLocal) {
\r
253 public void entryUpdated(Integer key, String newValue,
\r
254 String containerName, String cacheName, boolean originLocal) {
\r
259 public void entryDeleted(Integer key, String containerName,
\r
260 String cacheName, boolean originLocal) {
\r
266 public void clusterContainerAndGlobalTest() throws CacheExistException, CacheConfigException,
\r
267 CacheListenerAddException, InterruptedException {
\r
268 String cache1 = "Cache1";
\r
269 String cache2 = "Cache2";
\r
270 // Lets test the case of caches with same name in different
\r
271 // containers (actually global an container case)
\r
272 String cache3 = "Cache2";
\r
274 HashSet<cacheMode> cacheModeSet = new HashSet<cacheMode>();
\r
275 cacheModeSet.add(cacheMode.NON_TRANSACTIONAL);
\r
276 ConcurrentMap cm11 = this.clusterDefaultServices.createCache(cache1, cacheModeSet);
\r
277 assertNotNull(cm11);
\r
279 assertTrue(this.clusterDefaultServices.existCache(cache1));
\r
280 assertEquals(cm11, this.clusterDefaultServices.getCache(cache1));
\r
282 ConcurrentMap cm12 = this.clusterDefaultServices.createCache(cache2, cacheModeSet);
\r
283 ConcurrentMap cm23 = this.clusterGlobalServices.createCache(cache3, cacheModeSet);
\r
285 // Now given cahe2 and cache3 have same name lets make sure
\r
286 // they don't return the same reference
\r
287 assertNotNull(this.clusterGlobalServices.getCache(cache2));
\r
288 // cm12 reference must be different than cm23
\r
289 assertTrue(cm12 != cm23);
\r
291 HashSet<String> cacheList = (HashSet<String>) this.clusterDefaultServices
\r
293 assertEquals(2, cacheList.size());
\r
294 assertTrue(cacheList.contains(cache1));
\r
295 assertTrue(cacheList.contains(cache2));
\r
297 assertNotNull(this.clusterDefaultServices.getCacheProperties(cache1));
\r
300 /***********************************/
\r
301 /* Testing cacheAware in Container */
\r
302 /***********************************/
\r
303 Dictionary<String, Object> props = new Hashtable<String, Object>();
\r
304 Set<String> propSet = new HashSet<String>();
\r
305 propSet.add(cache1);
\r
306 propSet.add(cache2);
\r
307 props.put("cachenames", propSet);
\r
308 CacheAware listener = new CacheAware();
\r
309 CacheAware listenerRepeated = new CacheAware();
\r
310 ServiceRegistration updateServiceReg = ServiceHelper.registerServiceWReg(ICacheUpdateAware.class, "default",
\r
312 assertNotNull(updateServiceReg);
\r
314 // Register another service for the same caches, this
\r
315 // should not get any update because we don't allow to
\r
316 // override the existing unless before unregistered
\r
317 ServiceRegistration updateServiceRegRepeated = ServiceHelper.registerServiceWReg(ICacheUpdateAware.class,
\r
319 listenerRepeated, props);
\r
320 assertNotNull(updateServiceRegRepeated);
\r
321 CountDownLatch res = null;
\r
322 List<Update> ups = null;
\r
324 Integer k1 = new Integer(10);
\r
325 Long k2 = new Long(100L);
\r
327 /***********************/
\r
328 /* CREATE NEW KEY CASE */
\r
329 /***********************/
\r
330 // Start monitoring the updates
\r
331 res = listener.restart(2);
\r
332 // modify the cache
\r
333 cm11.put(k1, "foo");
\r
335 res.await(100L, TimeUnit.SECONDS);
\r
336 // Analyze the updates
\r
337 ups = listener.getUpdates();
\r
338 assertTrue(ups.size() == 2);
\r
339 // Validate that first we get an update (yes even in case of a
\r
340 // new value added)
\r
342 assertTrue(up.t.equals(UpdateType.CHANGED));
\r
343 assertTrue(up.key.equals(k1));
\r
344 assertTrue(up.value.equals("foo"));
\r
345 assertTrue(up.cacheName.equals(cache1));
\r
346 // Validate that we then get a create
\r
348 assertTrue(up.t.equals(UpdateType.ADDED));
\r
349 assertTrue(up.key.equals(k1));
\r
350 assertNull(up.value);
\r
351 assertTrue(up.cacheName.equals(cache1));
\r
353 /*******************************/
\r
354 /* UPDATE AN EXISTING KEY CASE */
\r
355 /*******************************/
\r
356 // Start monitoring the updates
\r
357 res = listener.restart(1);
\r
358 // modify the cache
\r
359 cm11.put(k1, "baz");
\r
361 res.await(100L, TimeUnit.SECONDS);
\r
362 // Analyze the updates
\r
363 ups = listener.getUpdates();
\r
364 assertTrue(ups.size() == 1);
\r
365 // Validate we get an update with expect fields
\r
367 assertTrue(up.t.equals(UpdateType.CHANGED));
\r
368 assertTrue(up.key.equals(k1));
\r
369 assertTrue(up.value.equals("baz"));
\r
370 assertTrue(up.cacheName.equals(cache1));
\r
372 /**********************************/
\r
373 /* RE-UPDATE AN EXISTING KEY CASE */
\r
374 /**********************************/
\r
375 // Start monitoring the updates
\r
376 res = listener.restart(1);
\r
377 // modify the cache
\r
378 cm11.put(k1, "baz");
\r
380 res.await(100L, TimeUnit.SECONDS);
\r
381 // Analyze the updates
\r
382 ups = listener.getUpdates();
\r
383 assertTrue(ups.size() == 1);
\r
384 // Validate we get an update with expect fields
\r
386 assertTrue(up.t.equals(UpdateType.CHANGED));
\r
387 assertTrue(up.key.equals(k1));
\r
388 assertTrue(up.value.equals("baz"));
\r
389 assertTrue(up.cacheName.equals(cache1));
\r
391 /********************************/
\r
392 /* REMOVAL OF EXISTING KEY CASE */
\r
393 /********************************/
\r
394 // Start monitoring the updates
\r
395 res = listener.restart(1);
\r
396 // modify the cache
\r
399 res.await(100L, TimeUnit.SECONDS);
\r
400 // Analyze the updates
\r
401 ups = listener.getUpdates();
\r
402 assertTrue(ups.size() == 1);
\r
403 // Validate we get a delete with expected fields
\r
405 assertTrue(up.t.equals(UpdateType.REMOVED));
\r
406 assertTrue(up.key.equals(k1));
\r
407 assertNull(up.value);
\r
408 assertTrue(up.cacheName.equals(cache1));
\r
410 /***********************/
\r
411 /* CREATE NEW KEY CASE */
\r
412 /***********************/
\r
413 // Start monitoring the updates
\r
414 res = listener.restart(2);
\r
415 // modify the cache
\r
416 cm12.put(k2, new Short((short)15));
\r
418 res.await(100L, TimeUnit.SECONDS);
\r
419 // Analyze the updates
\r
420 ups = listener.getUpdates();
\r
421 assertTrue(ups.size() == 2);
\r
422 // Validate that first we get an update (yes even in case of a
\r
423 // new value added)
\r
425 assertTrue(up.t.equals(UpdateType.CHANGED));
\r
426 assertTrue(up.key.equals(k2));
\r
427 assertTrue(up.value.equals(new Short((short)15)));
\r
428 assertTrue(up.cacheName.equals(cache2));
\r
429 // Validate that we then get a create
\r
431 assertTrue(up.t.equals(UpdateType.ADDED));
\r
432 assertTrue(up.key.equals(k2));
\r
433 assertNull(up.value);
\r
434 assertTrue(up.cacheName.equals(cache2));
\r
436 /*******************************/
\r
437 /* UPDATE AN EXISTING KEY CASE */
\r
438 /*******************************/
\r
439 // Start monitoring the updates
\r
440 res = listener.restart(1);
\r
441 // modify the cache
\r
442 cm12.put(k2, "BAZ");
\r
444 res.await(100L, TimeUnit.SECONDS);
\r
445 // Analyze the updates
\r
446 ups = listener.getUpdates();
\r
447 assertTrue(ups.size() == 1);
\r
448 // Validate we get an update with expect fields
\r
450 assertTrue(up.t.equals(UpdateType.CHANGED));
\r
451 assertTrue(up.key.equals(k2));
\r
452 assertTrue(up.value.equals("BAZ"));
\r
453 assertTrue(up.cacheName.equals(cache2));
\r
455 /********************************/
\r
456 /* REMOVAL OF EXISTING KEY CASE */
\r
457 /********************************/
\r
458 // Start monitoring the updates
\r
459 res = listener.restart(1);
\r
460 // modify the cache
\r
463 res.await(100L, TimeUnit.SECONDS);
\r
464 // Analyze the updates
\r
465 ups = listener.getUpdates();
\r
466 assertTrue(ups.size() == 1);
\r
467 // Validate we get a delete with expected fields
\r
469 assertTrue(up.t.equals(UpdateType.REMOVED));
\r
470 assertTrue(up.key.equals(k2));
\r
471 assertNull(up.value);
\r
472 assertTrue(up.cacheName.equals(cache2));
\r
474 /******************************************************************/
\r
475 /* NOW LETS REMOVE THE REGISTRATION AND MAKE SURE NO UPDATS COMES */
\r
476 /******************************************************************/
\r
477 updateServiceReg.unregister();
\r
478 // Start monitoring the updates, noone should come in
\r
479 res = listener.restart(1);
\r
481 /***********************/
\r
482 /* CREATE NEW KEY CASE */
\r
483 /***********************/
\r
484 // modify the cache
\r
485 cm11.put(k1, "foo");
\r
487 /*******************************/
\r
488 /* UPDATE AN EXISTING KEY CASE */
\r
489 /*******************************/
\r
490 // modify the cache
\r
491 cm11.put(k1, "baz");
\r
493 /********************************/
\r
494 /* REMOVAL OF EXISTING KEY CASE */
\r
495 /********************************/
\r
496 // modify the cache
\r
499 /***********************/
\r
500 /* CREATE NEW KEY CASE */
\r
501 /***********************/
\r
502 // modify the cache
\r
503 cm12.put(k2, new Short((short)15));
\r
505 /*******************************/
\r
506 /* UPDATE AN EXISTING KEY CASE */
\r
507 /*******************************/
\r
508 // modify the cache
\r
509 cm12.put(k2, "BAZ");
\r
511 /********************************/
\r
512 /* REMOVAL OF EXISTING KEY CASE */
\r
513 /********************************/
\r
514 // modify the cache
\r
518 // Wait to make sure no updates came in, clearly this is
\r
519 // error prone as logic, but cannot find a better way than
\r
520 // this to make sure updates didn't get in
\r
521 res.await(1L, TimeUnit.SECONDS);
\r
522 // Analyze the updates
\r
523 ups = listener.getUpdates();
\r
524 assertTrue(ups.size() == 0);
\r
528 /***********************************/
\r
529 /* Testing cacheAware in Global */
\r
530 /***********************************/
\r
531 Dictionary<String, Object> props = new Hashtable<String, Object>();
\r
532 Set<String> propSet = new HashSet<String>();
\r
533 propSet.add(cache3);
\r
534 props.put("cachenames", propSet);
\r
535 CacheAware listener = new CacheAware();
\r
536 ServiceRegistration updateServiceReg = ServiceHelper.registerGlobalServiceWReg(ICacheUpdateAware.class,
\r
538 assertNotNull(updateServiceReg);
\r
540 CountDownLatch res = null;
\r
541 List<Update> ups = null;
\r
543 Integer k1 = new Integer(10);
\r
545 /***********************/
\r
546 /* CREATE NEW KEY CASE */
\r
547 /***********************/
\r
548 // Start monitoring the updates
\r
549 res = listener.restart(2);
\r
550 // modify the cache
\r
551 cm23.put(k1, "foo");
\r
553 res.await(100L, TimeUnit.SECONDS);
\r
554 // Analyze the updates
\r
555 ups = listener.getUpdates();
\r
556 assertTrue(ups.size() == 2);
\r
557 // Validate that first we get an update (yes even in case of a
\r
558 // new value added)
\r
560 assertTrue(up.t.equals(UpdateType.CHANGED));
\r
561 assertTrue(up.key.equals(k1));
\r
562 assertTrue(up.value.equals("foo"));
\r
563 assertTrue(up.cacheName.equals(cache3));
\r
564 // Validate that we then get a create
\r
566 assertTrue(up.t.equals(UpdateType.ADDED));
\r
567 assertTrue(up.key.equals(k1));
\r
568 assertNull(up.value);
\r
569 assertTrue(up.cacheName.equals(cache3));
\r
571 /*******************************/
\r
572 /* UPDATE AN EXISTING KEY CASE */
\r
573 /*******************************/
\r
574 // Start monitoring the updates
\r
575 res = listener.restart(1);
\r
576 // modify the cache
\r
577 cm23.put(k1, "baz");
\r
579 res.await(100L, TimeUnit.SECONDS);
\r
580 // Analyze the updates
\r
581 ups = listener.getUpdates();
\r
582 assertTrue(ups.size() == 1);
\r
583 // Validate we get an update with expect fields
\r
585 assertTrue(up.t.equals(UpdateType.CHANGED));
\r
586 assertTrue(up.key.equals(k1));
\r
587 assertTrue(up.value.equals("baz"));
\r
588 assertTrue(up.cacheName.equals(cache3));
\r
590 /********************************/
\r
591 /* REMOVAL OF EXISTING KEY CASE */
\r
592 /********************************/
\r
593 // Start monitoring the updates
\r
594 res = listener.restart(1);
\r
595 // modify the cache
\r
598 res.await(100L, TimeUnit.SECONDS);
\r
599 // Analyze the updates
\r
600 ups = listener.getUpdates();
\r
601 assertTrue(ups.size() == 1);
\r
602 // Validate we get a delete with expected fields
\r
604 assertTrue(up.t.equals(UpdateType.REMOVED));
\r
605 assertTrue(up.key.equals(k1));
\r
606 assertNull(up.value);
\r
607 assertTrue(up.cacheName.equals(cache3));
\r
609 /******************************************************************/
\r
610 /* NOW LETS REMOVE THE REGISTRATION AND MAKE SURE NO UPDATS COMES */
\r
611 /******************************************************************/
\r
612 updateServiceReg.unregister();
\r
613 // Start monitoring the updates, noone should come in
\r
614 res = listener.restart(1);
\r
616 /***********************/
\r
617 /* CREATE NEW KEY CASE */
\r
618 /***********************/
\r
619 // modify the cache
\r
620 cm23.put(k1, "foo");
\r
622 /*******************************/
\r
623 /* UPDATE AN EXISTING KEY CASE */
\r
624 /*******************************/
\r
625 // modify the cache
\r
626 cm23.put(k1, "baz");
\r
628 /********************************/
\r
629 /* REMOVAL OF EXISTING KEY CASE */
\r
630 /********************************/
\r
631 // modify the cache
\r
634 // Wait to make sure no updates came in, clearly this is
\r
635 // error prone as logic, but cannot find a better way than
\r
636 // this to make sure updates didn't get in
\r
637 res.await(1L, TimeUnit.SECONDS);
\r
638 // Analyze the updates
\r
639 ups = listener.getUpdates();
\r
640 assertTrue(ups.size() == 0);
\r
643 InetAddress addr = this.clusterDefaultServices.getMyAddress();
\r
644 assertNotNull(addr);
\r
646 List<InetAddress> addrList = this.clusterDefaultServices
\r
647 .getClusteredControllers();
\r
649 this.clusterDefaultServices.destroyCache(cache1);
\r
650 assertFalse(this.clusterDefaultServices.existCache(cache1));
\r
653 private class Update {
\r
659 Update (UpdateType t, Object key, Object value, String cacheName) {
\r
662 this.value = value;
\r
663 this.cacheName = cacheName;
\r
667 private class CacheAware implements ICacheUpdateAware {
\r
668 private CopyOnWriteArrayList<Update> gotUpdates;
\r
669 private CountDownLatch latch = null;
\r
672 this.gotUpdates = new CopyOnWriteArrayList<Update>();
\r
677 * Restart the monitor of the updates on the CacheAware object
\r
679 * @param expectedOperations Number of expected updates
\r
681 * @return a countdown latch which will be used to wait till the updates are done
\r
683 CountDownLatch restart(int expectedOperations) {
\r
684 this.gotUpdates.clear();
\r
685 this.latch = new CountDownLatch(expectedOperations);
\r
689 List<Update> getUpdates() {
\r
690 return this.gotUpdates;
\r
694 public void entryCreated(Object key, String cacheName, boolean originLocal) {
\r
695 log.debug("CACHE[{}] Got an entry created for key:{}", cacheName, key);
\r
696 Update u = new Update(UpdateType.ADDED, key, null, cacheName);
\r
697 this.gotUpdates.add(u);
\r
698 this.latch.countDown();
\r
702 public void entryUpdated(Object key, Object newValue, String cacheName, boolean originLocal) {
\r
703 log.debug("CACHE[{}] Got an entry updated for key:{} newValue:{}", cacheName, key, newValue);
\r
704 Update u = new Update(UpdateType.CHANGED, key, newValue, cacheName);
\r
705 this.gotUpdates.add(u);
\r
706 this.latch.countDown();
\r
710 public void entryDeleted(Object key, String cacheName, boolean originLocal) {
\r
711 log.debug("CACHE[{}] Got an entry delete for key:{}", cacheName, key);
\r
712 Update u = new Update(UpdateType.REMOVED, key, null, cacheName);
\r
713 this.gotUpdates.add(u);
\r
714 this.latch.countDown();
\r