Merge "Decouple IContainerListener to avoid parallel computation in cluster"
[controller.git] / opendaylight / clustering / services_implementation / src / main / java / org / opendaylight / controller / clustering / services_implementation / internal / ClusterManager.java
1
2 /*
3  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 package org.opendaylight.controller.clustering.services_implementation.internal;
11
12 import java.net.InetAddress;
13 import java.net.NetworkInterface;
14 import java.net.SocketException;
15 import java.net.UnknownHostException;
16 import java.util.ArrayList;
17 import java.util.EnumSet;
18 import java.util.Enumeration;
19 import java.util.HashSet;
20 import java.util.List;
21 import java.util.Properties;
22 import java.util.Set;
23 import java.util.StringTokenizer;
24 import java.util.concurrent.ConcurrentMap;
25
26 import javax.transaction.HeuristicMixedException;
27 import javax.transaction.HeuristicRollbackException;
28 import javax.transaction.NotSupportedException;
29 import javax.transaction.RollbackException;
30 import javax.transaction.SystemException;
31 import javax.transaction.Transaction;
32 import javax.transaction.TransactionManager;
33
34 import org.infinispan.Cache;
35 import org.infinispan.configuration.cache.Configuration;
36 import org.infinispan.configuration.cache.ConfigurationBuilder;
37 import org.infinispan.configuration.global.GlobalConfigurationBuilder;
38 import org.infinispan.configuration.parsing.ConfigurationBuilderHolder;
39 import org.infinispan.configuration.parsing.ParserRegistry;
40 import org.infinispan.manager.DefaultCacheManager;
41 import org.infinispan.manager.EmbeddedCacheManager;
42 import org.infinispan.notifications.Listener;
43 import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
44 import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
45 import org.infinispan.remoting.transport.Address;
46 import org.infinispan.remoting.transport.Transport;
47 import org.infinispan.remoting.transport.jgroups.JGroupsAddress;
48 import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
49 import org.jgroups.Channel;
50 import org.jgroups.Event;
51 import org.jgroups.stack.GossipRouter;
52 import org.opendaylight.controller.clustering.services.CacheConfigException;
53 import org.opendaylight.controller.clustering.services.CacheExistException;
54 import org.opendaylight.controller.clustering.services.CacheListenerAddException;
55 import org.opendaylight.controller.clustering.services.IClusterServices;
56 import org.opendaylight.controller.clustering.services.IGetUpdates;
57 import org.opendaylight.controller.clustering.services.IListenRoleChange;
58 import org.opendaylight.controller.clustering.services.ListenRoleChangeAddException;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
61
62 public class ClusterManager implements IClusterServices {
63     protected static final Logger logger = LoggerFactory
64             .getLogger(ClusterManager.class);
65     private DefaultCacheManager cm;
66     GossipRouter gossiper;
67     private HashSet<IListenRoleChange> roleChangeListeners;
68     private ViewChangedListener cacheManagerListener;
69
70     private static String loopbackAddress = "127.0.0.1";
71
72     /**
73      * Start a JGroups GossipRouter if we are a supernode. The
74      * GosispRouter is nothing more than a simple
75      * rendevouz-pointer. All the nodes that wants to join the cluster
76      * will come to any of the rendevouz point and they introduce the
77      * nodes to all the others. Once the meet and greet phase if over,
78      * the nodes will open a full-mesh with the remaining n-1 nodes,
79      * so even if the GossipRouter goes down nothing is lost.
80      * NOTE: This function has the side effect to set some of the
81      * JGROUPS configurations, this because in this function already
82      * we try to retrieve some of the network capabilities of the
83      * device and so it's better not to do that again
84      *
85      *
86      * @return GossipRouter
87      */
88     private GossipRouter startGossiper() {
89         boolean amIGossipRouter = false;
90         Integer gossipRouterPortDefault = 12001;
91         Integer gossipRouterPort = gossipRouterPortDefault;
92         InetAddress gossipRouterAddress = null;
93         String supernodes_list = System.getProperty("supernodes",
94                 loopbackAddress);
95         StringBuffer sanitized_supernodes_list = new StringBuffer();
96         List<InetAddress> myAddresses = new ArrayList<InetAddress>();
97
98         StringTokenizer supernodes = new StringTokenizer(supernodes_list, ":");
99         if (supernodes.hasMoreTokens()) {
100             // Populate the list of my addresses
101             try {
102                 Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
103                 while (e.hasMoreElements()) {
104                     NetworkInterface n = e.nextElement();
105                     Enumeration<InetAddress> ee = n.getInetAddresses();
106                     while (ee.hasMoreElements()) {
107                         InetAddress i = ee.nextElement();
108                         myAddresses.add(i);
109                     }
110                 }
111             } catch (SocketException se) {
112                 logger.error("Cannot get the list of network interfaces");
113                 return null;
114             }
115         }
116         while (supernodes.hasMoreTokens()) {
117             String curr_supernode = supernodes.nextToken();
118             logger.debug("Examining supernode {}", curr_supernode);
119             StringTokenizer host_port = new StringTokenizer(curr_supernode,
120                     "[]");
121             String host;
122             String port;
123             Integer port_num = gossipRouterPortDefault;
124             if (host_port.countTokens() > 2) {
125                 logger.error("Error parsing supernode {} proceed to the next one",
126                         curr_supernode);
127                 continue;
128             }
129             host = host_port.nextToken();
130             InetAddress hostAddr;
131             try {
132                 hostAddr = InetAddress.getByName(host);
133             } catch (UnknownHostException ue) {
134                 logger.error("Host not known");
135                 continue;
136             }
137             if (host_port.hasMoreTokens()) {
138                 port = host_port.nextToken();
139                 try {
140                     port_num = Integer.valueOf(port);
141                 } catch (NumberFormatException ne) {
142                     logger
143                             .error("Supplied supernode gossiepr port is not recognized, using standard gossipport");
144                     port_num = gossipRouterPortDefault;
145                 }
146                 if ((port_num > 65535) || (port_num < 0)) {
147                     logger
148                             .error("Supplied supernode gossip port is outside a valid TCP port range");
149                     port_num = gossipRouterPortDefault;
150                 }
151             }
152             if (!amIGossipRouter) {
153                 if (host != null) {
154                     for (InetAddress myAddr : myAddresses) {
155                         if (myAddr.equals(hostAddr)) {
156                             amIGossipRouter = true;
157                             gossipRouterAddress = hostAddr;
158                             gossipRouterPort = port_num;
159                             break;
160                         }
161                     }
162                 }
163             }
164             if (!sanitized_supernodes_list.toString().equals("")) {
165                 sanitized_supernodes_list.append(",");
166             }
167             sanitized_supernodes_list.append(hostAddr.getHostAddress() + "["
168                     + port_num + "]");
169         }
170
171         if (amIGossipRouter) {
172             // Set the Jgroups binding interface to the one we got
173             // from the supernodes attribute
174             if (gossipRouterAddress != null) {
175                 System.setProperty("jgroups.tcp.address", gossipRouterAddress
176                         .getHostAddress());
177             }
178         } else {
179             // Set the Jgroup binding interface to the one we are well
180             // known outside or else to the first with non-local
181             // scope.
182             try {
183                 String myBind = InetAddress.getLocalHost().getHostAddress();
184                 if (myBind == null
185                         || InetAddress.getLocalHost().isLoopbackAddress()) {
186                     for (InetAddress myAddr : myAddresses) {
187                         if (myAddr.isLoopbackAddress()
188                                 || myAddr.isLinkLocalAddress()) {
189                             logger.debug("Skipping local address {}",
190                                          myAddr.getHostAddress());
191                             continue;
192                         } else {
193                             // First non-local address
194                             myBind = myAddr.getHostAddress();
195                             logger.debug("First non-local address {}", myBind);
196                             break;
197                         }
198                     }
199                 }
200                 String jgroupAddress = System
201                         .getProperty("jgroups.tcp.address");
202                 if (jgroupAddress == null) {
203                     if (myBind != null) {
204                         logger.debug("Set bind address to be {}", myBind);
205                         System.setProperty("jgroups.tcp.address", myBind);
206                     } else {
207                         logger
208                                 .debug("Set bind address to be LOCALHOST=127.0.0.1");
209                         System.setProperty("jgroups.tcp.address", "127.0.0.1");
210                     }
211                 } else {
212                     logger.debug("jgroup.tcp.address already set to be {}",
213                             jgroupAddress);
214                 }
215             } catch (UnknownHostException uhe) {
216                 logger
217                         .error("Met UnknownHostException while trying to get binding address for jgroups");
218             }
219         }
220
221         // The supernodes list constitute also the tcpgossip initial
222         // host list
223         System.setProperty("jgroups.tcpgossip.initial_hosts",
224                 sanitized_supernodes_list.toString());
225         logger.debug("jgroups.tcp.address set to {}",
226                 System.getProperty("jgroups.tcp.address"));
227         logger.debug("jgroups.tcpgossip.initial_hosts set to {}",
228                 System.getProperty("jgroups.tcpgossip.initial_hosts"));
229         GossipRouter res = null;
230         if (amIGossipRouter) {
231             logger.info("I'm a GossipRouter will listen on port {}",
232                     gossipRouterPort);
233             // Start a GossipRouter with JMX support
234             res = new GossipRouter(gossipRouterPort, null, true);
235         }
236         return res;
237     }
238
239     private void exitOnSecurityException(Exception ioe) {
240         Throwable cause = ioe.getCause();
241         while (cause != null) {
242             if (cause instanceof java.lang.SecurityException) {
243                 logger.error("Failed Cluster authentication. Stopping Controller...");
244                 System.exit(0);
245             }
246             cause = cause.getCause();
247         }
248     }
249
250     public void start() {
251         this.gossiper = startGossiper();
252         if (this.gossiper != null) {
253             logger.debug("Trying to start Gossiper");
254             try {
255                 this.gossiper.start();
256                 logger.info("Started GossipRouter");
257             } catch (Exception e) {
258                 logger.error("GossipRouter didn't start. Exception Stack Trace",
259                              e);
260             }
261         }
262         logger.info("Starting the ClusterManager");
263         try {
264             ParserRegistry parser = new ParserRegistry(this.getClass()
265                     .getClassLoader());
266             String infinispanConfigFile =
267                     System.getProperty("org.infinispan.config.file", "config/infinispan-config.xml");
268             logger.debug("Using configuration file:{}", infinispanConfigFile);
269             ConfigurationBuilderHolder holder = parser.parseFile(infinispanConfigFile);
270             GlobalConfigurationBuilder globalBuilder = holder.getGlobalConfigurationBuilder();
271             globalBuilder.serialization()
272                     .classResolver(new ClassResolver())
273                     .build();
274             this.cm = new DefaultCacheManager(holder, false);
275             logger.debug("Allocated ClusterManager");
276             if (this.cm != null) {
277                 this.cm.start();
278                 this.cm.startCache();
279                 logger.debug("Started the ClusterManager");
280             }
281         } catch (Exception ioe) {
282             logger.error("Cannot configure infinispan .. bailing out ");
283             logger.error("Stack Trace that raised th exception");
284             logger.error("",ioe);
285             this.cm = null;
286             exitOnSecurityException(ioe);
287             this.stop();
288         }
289         logger.debug("Cache Manager has value {}", this.cm);
290     }
291
292     public void stop() {
293         logger.info("Stopping the ClusterManager");
294         if (this.cm != null) {
295             logger.info("Found a valid ClusterManager, now let it be stopped");
296             this.cm.stop();
297             this.cm = null;
298         }
299         if (this.gossiper != null) {
300             this.gossiper.stop();
301             this.gossiper = null;
302         }
303     }
304
305     @Override
306     public ConcurrentMap<?, ?> createCache(String containerName,
307             String cacheName, Set<cacheMode> cMode) throws CacheExistException,
308             CacheConfigException {
309         EmbeddedCacheManager manager = this.cm;
310         Cache<Object,Object> c;
311         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
312         if (manager == null) {
313             return null;
314         }
315
316         if (manager.cacheExists(realCacheName)) {
317             throw new CacheExistException();
318         }
319
320         // Sanity check to avoid contrasting parameters between transactional
321         // and not
322         if (cMode.containsAll(EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL,
323                 IClusterServices.cacheMode.TRANSACTIONAL))) {
324             throw new CacheConfigException();
325         }
326
327         // Sanity check to avoid contrasting parameters between sync and async
328         if (cMode.containsAll(EnumSet.of(IClusterServices.cacheMode.SYNC, IClusterServices.cacheMode.ASYNC))) {
329             throw new CacheConfigException();
330         }
331
332         Configuration fromTemplateConfig = null;
333         /*
334          * Fetch transactional/non-transactional templates
335          */
336         // Check if transactional
337         if (cMode.contains(IClusterServices.cacheMode.TRANSACTIONAL)) {
338             fromTemplateConfig = manager.getCacheConfiguration("transactional-type");
339         } else if (cMode.contains(IClusterServices.cacheMode.NON_TRANSACTIONAL)) {
340             fromTemplateConfig = manager.getDefaultCacheConfiguration();
341         }
342
343         // If none set the transactional property then just return null
344         if (fromTemplateConfig == null) {
345             return null;
346         }
347
348         ConfigurationBuilder builder = new ConfigurationBuilder();
349         builder.read(fromTemplateConfig);
350         /*
351          * Now evaluate async/sync
352          */
353         if (cMode.contains(IClusterServices.cacheMode.ASYNC)) {
354             builder.clustering()
355                     .cacheMode(fromTemplateConfig.clustering()
356                             .cacheMode()
357                             .toAsync());
358         } else if (cMode.contains(IClusterServices.cacheMode.SYNC)) {
359             builder.clustering()
360                     .cacheMode(fromTemplateConfig.clustering()
361                             .cacheMode()
362                             .toSync());
363         }
364
365         manager.defineConfiguration(realCacheName, builder.build());
366         c = manager.getCache(realCacheName);
367         return c;
368     }
369
370     @Override
371     public ConcurrentMap<?, ?> getCache(String containerName, String cacheName) {
372         EmbeddedCacheManager manager = this.cm;
373         Cache<Object,Object> c;
374         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
375         if (manager == null) {
376             return null;
377         }
378
379         if (manager.cacheExists(realCacheName)) {
380             c = manager.getCache(realCacheName);
381             return c;
382         }
383         return null;
384     }
385
386     @Override
387     public void destroyCache(String containerName, String cacheName) {
388         EmbeddedCacheManager manager = this.cm;
389         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
390         if (manager == null) {
391             return;
392         }
393         if (manager.cacheExists(realCacheName)) {
394             manager.removeCache(realCacheName);
395         }
396     }
397
398     @Override
399     public boolean existCache(String containerName, String cacheName) {
400         EmbeddedCacheManager manager = this.cm;
401         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
402         if (manager == null) {
403             return false;
404         }
405         return manager.cacheExists(realCacheName);
406     }
407
408     @Override
409     public Set<String> getCacheList(String containerName) {
410         Set<String> perContainerCaches = new HashSet<String>();
411         EmbeddedCacheManager manager = this.cm;
412         if (manager == null) {
413             return null;
414         }
415         for (String cacheName : manager.getCacheNames()) {
416             if (!manager.isRunning(cacheName)) continue;
417             if (cacheName.startsWith("{" + containerName + "}_")) {
418                 String[] res = cacheName.split("[{}]");
419                 if (res.length >= 4 && res[1].equals(containerName)
420                         && res[2].equals("_")) {
421                     perContainerCaches.add(res[3]);
422                 }
423             }
424         }
425
426         return (perContainerCaches);
427     }
428
429     @Override
430     public Properties getCacheProperties(String containerName, String cacheName) {
431         EmbeddedCacheManager manager = this.cm;
432         if (manager == null) {
433             return null;
434         }
435         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
436         if (!manager.cacheExists(realCacheName)) {
437             return null;
438         }
439         Configuration conf = manager.getCache(realCacheName).getAdvancedCache()
440                 .getCacheConfiguration();
441         Properties p = new Properties();
442         p.setProperty(IClusterServices.cacheProps.TRANSACTION_PROP.toString(),
443                 conf.transaction().toString());
444         p.setProperty(IClusterServices.cacheProps.CLUSTERING_PROP.toString(),
445                 conf.clustering().toString());
446         p.setProperty(IClusterServices.cacheProps.LOCKING_PROP.toString(), conf
447                 .locking().toString());
448         return p;
449     }
450
451     @Override
452     public void addListener(String containerName, String cacheName,
453             IGetUpdates<?, ?> u) throws CacheListenerAddException {
454         EmbeddedCacheManager manager = this.cm;
455         Cache<Object,Object> c;
456         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
457         if (manager == null) {
458             return;
459         }
460
461         if (!manager.cacheExists(realCacheName)) {
462             throw new CacheListenerAddException();
463         }
464         c = manager.getCache(realCacheName);
465         CacheListenerContainer cl = new CacheListenerContainer(u,
466                 containerName, cacheName);
467         c.addListener(cl);
468     }
469
470     @Override
471     public Set<IGetUpdates<?, ?>> getListeners(String containerName,
472             String cacheName) {
473         EmbeddedCacheManager manager = this.cm;
474         Cache<Object,Object> c;
475         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
476         if (manager == null) {
477             return null;
478         }
479
480         if (!manager.cacheExists(realCacheName)) {
481             return null;
482         }
483         c = manager.getCache(realCacheName);
484
485         Set<IGetUpdates<?, ?>> res = new HashSet<IGetUpdates<?, ?>>();
486         Set<Object> listeners = c.getListeners();
487         for (Object listener : listeners) {
488             if (listener instanceof CacheListenerContainer) {
489                 CacheListenerContainer cl = (CacheListenerContainer) listener;
490                 res.add(cl.whichListener());
491             }
492         }
493
494         return res;
495     }
496
497     @Override
498     public void removeListener(String containerName, String cacheName,
499             IGetUpdates<?, ?> u) {
500         EmbeddedCacheManager manager = this.cm;
501         Cache<Object,Object> c;
502         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
503         if (manager == null) {
504             return;
505         }
506
507         if (!manager.cacheExists(realCacheName)) {
508             return;
509         }
510         c = manager.getCache(realCacheName);
511
512         Set<Object> listeners = c.getListeners();
513         for (Object listener : listeners) {
514             if (listener instanceof CacheListenerContainer) {
515                 CacheListenerContainer cl = (CacheListenerContainer) listener;
516                 if (cl.whichListener() == u) {
517                     c.removeListener(listener);
518                     return;
519                 }
520             }
521         }
522     }
523
524     @Override
525     public void tbegin() throws NotSupportedException, SystemException {
526         EmbeddedCacheManager manager = this.cm;
527         if (manager == null) {
528             throw new IllegalStateException();
529         }
530         TransactionManager tm = manager.getCache("transactional-type")
531                 .getAdvancedCache().getTransactionManager();
532         if (tm == null) {
533             throw new IllegalStateException();
534         }
535         tm.begin();
536     }
537
538     @Override
539     public void tcommit() throws RollbackException, HeuristicMixedException,
540             HeuristicRollbackException, java.lang.SecurityException,
541             java.lang.IllegalStateException, SystemException {
542         EmbeddedCacheManager manager = this.cm;
543         if (manager == null) {
544             throw new IllegalStateException();
545         }
546         TransactionManager tm = manager.getCache("transactional-type")
547                 .getAdvancedCache().getTransactionManager();
548         if (tm == null) {
549             throw new IllegalStateException();
550         }
551         tm.commit();
552     }
553
554     @Override
555     public void trollback() throws java.lang.IllegalStateException,
556             java.lang.SecurityException, SystemException {
557         EmbeddedCacheManager manager = this.cm;
558         if (manager == null) {
559             throw new IllegalStateException();
560         }
561         TransactionManager tm = manager.getCache("transactional-type")
562                 .getAdvancedCache().getTransactionManager();
563         if (tm == null) {
564             throw new IllegalStateException();
565         }
566         tm.rollback();
567     }
568
569     @Override
570     public Transaction tgetTransaction() throws SystemException {
571         EmbeddedCacheManager manager = this.cm;
572         if (manager == null) {
573             throw new IllegalStateException();
574         }
575         TransactionManager tm = manager.getCache("transactional-type")
576                 .getAdvancedCache().getTransactionManager();
577         if (tm == null) {
578             return null;
579         }
580         return tm.getTransaction();
581     }
582
583     @Override
584     public boolean amIStandby() {
585         EmbeddedCacheManager manager = this.cm;
586         if (manager == null) {
587             // In case we cannot fetch the information, lets assume we
588             // are standby, so to have less responsibility.
589             return true;
590         }
591         return (!manager.isCoordinator());
592     }
593
594     private InetAddress addressToInetAddress(Address a) {
595         EmbeddedCacheManager manager = this.cm;
596         if ((manager == null) || (a == null)) {
597             // In case we cannot fetch the information, lets assume we
598             // are standby, so to have less responsibility.
599             return null;
600         }
601         Transport t = manager.getTransport();
602         if (t instanceof JGroupsTransport) {
603             JGroupsTransport jt = (JGroupsTransport) t;
604             Channel c = jt.getChannel();
605             if (a instanceof JGroupsAddress) {
606                 JGroupsAddress ja = (JGroupsAddress) a;
607                 org.jgroups.Address phys = (org.jgroups.Address) c
608                         .down(new Event(Event.GET_PHYSICAL_ADDRESS, ja
609                                 .getJGroupsAddress()));
610                 if (phys instanceof org.jgroups.stack.IpAddress) {
611                     InetAddress bindAddress = ((org.jgroups.stack.IpAddress) phys)
612                             .getIpAddress();
613                     return bindAddress;
614                 }
615             }
616         }
617         return null;
618     }
619
620     @Override
621     public List<InetAddress> getClusteredControllers() {
622         EmbeddedCacheManager manager = this.cm;
623         if (manager == null) {
624             return null;
625         }
626         List<Address> controllers = manager.getMembers();
627         if ((controllers == null) || controllers.size() == 0) {
628             return null;
629         }
630
631         List<InetAddress> clusteredControllers = new ArrayList<InetAddress>();
632         for (Address a : controllers) {
633             InetAddress inetAddress = addressToInetAddress(a);
634             if (inetAddress != null
635                     && !inetAddress.getHostAddress().equals(loopbackAddress)) {
636                 clusteredControllers.add(inetAddress);
637             }
638         }
639         return clusteredControllers;
640     }
641
642     @Override
643     public InetAddress getMyAddress() {
644         EmbeddedCacheManager manager = this.cm;
645         if (manager == null) {
646             return null;
647         }
648         return addressToInetAddress(manager.getAddress());
649     }
650
651     @Override
652     public InetAddress getActiveAddress() {
653         EmbeddedCacheManager manager = this.cm;
654         if (manager == null) {
655             // In case we cannot fetch the information, lets assume we
656             // are standby, so to have less responsibility.
657             return null;
658         }
659
660         return addressToInetAddress(manager.getCoordinator());
661     }
662
663     @Override
664     public void listenRoleChange(IListenRoleChange i)
665             throws ListenRoleChangeAddException {
666         EmbeddedCacheManager manager = this.cm;
667         if (manager == null) {
668             // In case we cannot fetch the information, lets assume we
669             // are standby, so to have less responsibility.
670             throw new ListenRoleChangeAddException();
671         }
672
673         if (this.roleChangeListeners == null) {
674             this.roleChangeListeners = new HashSet<IListenRoleChange>();
675             this.cacheManagerListener = new ViewChangedListener(
676                     this.roleChangeListeners);
677             manager.addListener(this.cacheManagerListener);
678         }
679
680         if (this.roleChangeListeners != null) {
681             this.roleChangeListeners.add(i);
682         }
683     }
684
685     @Override
686     public void unlistenRoleChange(IListenRoleChange i) {
687         EmbeddedCacheManager manager = this.cm;
688         if (manager == null) {
689             // In case we cannot fetch the information, lets assume we
690             // are standby, so to have less responsibility.
691             return;
692         }
693
694         if (this.roleChangeListeners != null) {
695             this.roleChangeListeners.remove(i);
696         }
697
698         if ((this.roleChangeListeners != null && this.roleChangeListeners
699                 .isEmpty())
700                 && (this.cacheManagerListener != null)) {
701             manager.removeListener(this.cacheManagerListener);
702             this.cacheManagerListener = null;
703             this.roleChangeListeners = null;
704         }
705     }
706
707     @Listener
708     public class ViewChangedListener {
709         Set<IListenRoleChange> roleListeners;
710
711         public ViewChangedListener(Set<IListenRoleChange> s) {
712             this.roleListeners = s;
713         }
714
715         @ViewChanged
716         public void viewChanged(ViewChangedEvent e) {
717             for (IListenRoleChange i : this.roleListeners) {
718                 i.newActiveAvailable();
719             }
720         }
721     }
722 }