Merge "Unitests: remove conditions that are always true/false."
[controller.git] / opendaylight / clustering / services_implementation / src / main / java / org / opendaylight / controller / clustering / services_implementation / internal / ClusterManager.java
1
2 /*
3  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 package org.opendaylight.controller.clustering.services_implementation.internal;
11
12 import java.net.InetAddress;
13 import java.net.NetworkInterface;
14 import java.net.SocketException;
15 import java.net.UnknownHostException;
16 import java.util.ArrayList;
17 import java.util.EnumSet;
18 import java.util.Enumeration;
19 import java.util.HashSet;
20 import java.util.List;
21 import java.util.Properties;
22 import java.util.Set;
23 import java.util.StringTokenizer;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.TimeUnit;
26
27 import javax.transaction.HeuristicMixedException;
28 import javax.transaction.HeuristicRollbackException;
29 import javax.transaction.NotSupportedException;
30 import javax.transaction.RollbackException;
31 import javax.transaction.SystemException;
32 import javax.transaction.Transaction;
33 import javax.transaction.TransactionManager;
34
35 import org.infinispan.Cache;
36 import org.infinispan.configuration.cache.Configuration;
37 import org.infinispan.configuration.cache.ConfigurationBuilder;
38 import org.infinispan.configuration.global.GlobalConfigurationBuilder;
39 import org.infinispan.configuration.parsing.ConfigurationBuilderHolder;
40 import org.infinispan.configuration.parsing.ParserRegistry;
41 import org.infinispan.manager.DefaultCacheManager;
42 import org.infinispan.manager.EmbeddedCacheManager;
43 import org.infinispan.notifications.Listener;
44 import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
45 import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
46 import org.infinispan.remoting.transport.Address;
47 import org.infinispan.remoting.transport.Transport;
48 import org.infinispan.remoting.transport.jgroups.JGroupsAddress;
49 import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
50 import org.jgroups.Channel;
51 import org.jgroups.Event;
52 import org.jgroups.stack.GossipRouter;
53 import org.opendaylight.controller.clustering.services.CacheConfigException;
54 import org.opendaylight.controller.clustering.services.CacheExistException;
55 import org.opendaylight.controller.clustering.services.CacheListenerAddException;
56 import org.opendaylight.controller.clustering.services.IClusterServices;
57 import org.opendaylight.controller.clustering.services.IGetUpdates;
58 import org.opendaylight.controller.clustering.services.IListenRoleChange;
59 import org.opendaylight.controller.clustering.services.ListenRoleChangeAddException;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63 public class ClusterManager implements IClusterServices {
64     protected static final Logger logger = LoggerFactory
65             .getLogger(ClusterManager.class);
66     private DefaultCacheManager cm;
67     GossipRouter gossiper;
68     private HashSet<IListenRoleChange> roleChangeListeners;
69     private ViewChangedListener cacheManagerListener;
70
71     private static String loopbackAddress = InetAddress.getLoopbackAddress().getHostAddress();
72
73     // defaultTransactionTimeout is 60 seconds
74     private static int DEFAULT_TRANSACTION_TIMEOUT = 60;
75
76     /**
77      * Start a JGroups GossipRouter if we are a supernode. The
78      * GosispRouter is nothing more than a simple
79      * rendevouz-pointer. All the nodes that wants to join the cluster
80      * will come to any of the rendevouz point and they introduce the
81      * nodes to all the others. Once the meet and greet phase if over,
82      * the nodes will open a full-mesh with the remaining n-1 nodes,
83      * so even if the GossipRouter goes down nothing is lost.
84      * NOTE: This function has the side effect to set some of the
85      * JGROUPS configurations, this because in this function already
86      * we try to retrieve some of the network capabilities of the
87      * device and so it's better not to do that again
88      *
89      *
90      * @return GossipRouter
91      */
92     private GossipRouter startGossiper() {
93         boolean amIGossipRouter = false;
94         Integer gossipRouterPortDefault = 12001;
95         Integer gossipRouterPort = gossipRouterPortDefault;
96         InetAddress gossipRouterAddress = null;
97         String supernodes_list = System.getProperty("supernodes",
98                 loopbackAddress);
99         StringBuffer sanitized_supernodes_list = new StringBuffer();
100         List<InetAddress> myAddresses = new ArrayList<InetAddress>();
101
102         StringTokenizer supernodes = new StringTokenizer(supernodes_list, ":");
103         if (supernodes.hasMoreTokens()) {
104             // Populate the list of my addresses
105             try {
106                 Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
107                 while (e.hasMoreElements()) {
108                     NetworkInterface n = e.nextElement();
109                     Enumeration<InetAddress> ee = n.getInetAddresses();
110                     while (ee.hasMoreElements()) {
111                         InetAddress i = ee.nextElement();
112                         myAddresses.add(i);
113                     }
114                 }
115             } catch (SocketException se) {
116                 logger.error("Cannot get the list of network interfaces");
117                 return null;
118             }
119         }
120         while (supernodes.hasMoreTokens()) {
121             String curr_supernode = supernodes.nextToken();
122             logger.debug("Examining supernode {}", curr_supernode);
123             StringTokenizer host_port = new StringTokenizer(curr_supernode,
124                     "[]");
125             String host;
126             String port;
127             Integer port_num = gossipRouterPortDefault;
128             if (host_port.countTokens() > 2) {
129                 logger.error("Error parsing supernode {} proceed to the next one",
130                         curr_supernode);
131                 continue;
132             }
133             host = host_port.nextToken();
134             InetAddress hostAddr;
135             try {
136                 hostAddr = InetAddress.getByName(host);
137             } catch (UnknownHostException ue) {
138                 logger.error("Host not known");
139                 continue;
140             }
141             if (host_port.hasMoreTokens()) {
142                 port = host_port.nextToken();
143                 try {
144                     port_num = Integer.valueOf(port);
145                 } catch (NumberFormatException ne) {
146                     logger
147                             .error("Supplied supernode gossiepr port is not recognized, using standard gossipport");
148                     port_num = gossipRouterPortDefault;
149                 }
150                 if ((port_num > 65535) || (port_num < 0)) {
151                     logger
152                             .error("Supplied supernode gossip port is outside a valid TCP port range");
153                     port_num = gossipRouterPortDefault;
154                 }
155             }
156             if (!amIGossipRouter) {
157                 if (host != null) {
158                     for (InetAddress myAddr : myAddresses) {
159                         if (myAddr.equals(hostAddr)) {
160                             amIGossipRouter = true;
161                             gossipRouterAddress = hostAddr;
162                             gossipRouterPort = port_num;
163                             break;
164                         }
165                     }
166                 }
167             }
168             if (!sanitized_supernodes_list.toString().equals("")) {
169                 sanitized_supernodes_list.append(",");
170             }
171             sanitized_supernodes_list.append(hostAddr.getHostAddress() + "["
172                     + port_num + "]");
173         }
174
175         if (amIGossipRouter) {
176             // Set the Jgroups binding interface to the one we got
177             // from the supernodes attribute
178             if (gossipRouterAddress != null) {
179                 System.setProperty("jgroups.tcp.address", gossipRouterAddress
180                         .getHostAddress());
181             }
182         } else {
183             // Set the Jgroup binding interface to the one we are well
184             // known outside or else to the first with non-local
185             // scope.
186             try {
187                 String myBind = InetAddress.getLocalHost().getHostAddress();
188                 if (myBind == null
189                         || InetAddress.getLocalHost().isLoopbackAddress()) {
190                     for (InetAddress myAddr : myAddresses) {
191                         if (myAddr.isLoopbackAddress()
192                                 || myAddr.isLinkLocalAddress()) {
193                             logger.debug("Skipping local address {}",
194                                          myAddr.getHostAddress());
195                             continue;
196                         } else {
197                             // First non-local address
198                             myBind = myAddr.getHostAddress();
199                             logger.debug("First non-local address {}", myBind);
200                             break;
201                         }
202                     }
203                 }
204                 String jgroupAddress = System
205                         .getProperty("jgroups.tcp.address");
206                 if (jgroupAddress == null) {
207                     if (myBind != null) {
208                         logger.debug("Set bind address to be {}", myBind);
209                         System.setProperty("jgroups.tcp.address", myBind);
210                     } else {
211                         logger
212                                 .debug("Set bind address to be LOCALHOST=127.0.0.1");
213                         System.setProperty("jgroups.tcp.address", "127.0.0.1");
214                     }
215                 } else {
216                     logger.debug("jgroup.tcp.address already set to be {}",
217                             jgroupAddress);
218                 }
219             } catch (UnknownHostException uhe) {
220                 logger
221                         .error("Met UnknownHostException while trying to get binding address for jgroups");
222             }
223         }
224
225         // The supernodes list constitute also the tcpgossip initial
226         // host list
227         System.setProperty("jgroups.tcpgossip.initial_hosts",
228                 sanitized_supernodes_list.toString());
229         logger.debug("jgroups.tcp.address set to {}",
230                 System.getProperty("jgroups.tcp.address"));
231         logger.debug("jgroups.tcpgossip.initial_hosts set to {}",
232                 System.getProperty("jgroups.tcpgossip.initial_hosts"));
233         GossipRouter res = null;
234         if (amIGossipRouter) {
235             logger.info("I'm a GossipRouter will listen on port {}",
236                     gossipRouterPort);
237             // Start a GossipRouter with JMX support
238             res = new GossipRouter(gossipRouterPort, null, true);
239         }
240         return res;
241     }
242
243     private void exitOnSecurityException(Exception ioe) {
244         Throwable cause = ioe.getCause();
245         while (cause != null) {
246             if (cause instanceof java.lang.SecurityException) {
247                 logger.error("Failed Cluster authentication. Stopping Controller...");
248                 System.exit(0);
249             }
250             cause = cause.getCause();
251         }
252     }
253
254     public void start() {
255         this.gossiper = startGossiper();
256         if (this.gossiper != null) {
257             logger.debug("Trying to start Gossiper");
258             try {
259                 this.gossiper.start();
260                 logger.info("Started GossipRouter");
261             } catch (Exception e) {
262                 logger.error("GossipRouter didn't start. Exception Stack Trace",
263                              e);
264             }
265         }
266         logger.info("Starting the ClusterManager");
267         try {
268             ParserRegistry parser = new ParserRegistry(this.getClass()
269                     .getClassLoader());
270             String infinispanConfigFile =
271                     System.getProperty("org.infinispan.config.file", "config/infinispan-config.xml");
272             logger.debug("Using configuration file:{}", infinispanConfigFile);
273             ConfigurationBuilderHolder holder = parser.parseFile(infinispanConfigFile);
274             GlobalConfigurationBuilder globalBuilder = holder.getGlobalConfigurationBuilder();
275             globalBuilder.serialization()
276                     .classResolver(new ClassResolver())
277                     .build();
278             this.cm = new DefaultCacheManager(holder, false);
279             logger.debug("Allocated ClusterManager");
280             if (this.cm != null) {
281                 this.cm.start();
282                 this.cm.startCache();
283                 logger.debug("Started the ClusterManager");
284             }
285         } catch (Exception ioe) {
286             logger.error("Cannot configure infinispan .. bailing out ");
287             logger.error("Stack Trace that raised th exception");
288             logger.error("",ioe);
289             this.cm = null;
290             exitOnSecurityException(ioe);
291             this.stop();
292         }
293         logger.debug("Cache Manager has value {}", this.cm);
294     }
295
296     public void stop() {
297         logger.info("Stopping the ClusterManager");
298         if (this.cm != null) {
299             logger.info("Found a valid ClusterManager, now let it be stopped");
300             this.cm.stop();
301             this.cm = null;
302         }
303         if (this.gossiper != null) {
304             this.gossiper.stop();
305             this.gossiper = null;
306         }
307     }
308
309     @Override
310     public ConcurrentMap<?, ?> createCache(String containerName,
311             String cacheName, Set<cacheMode> cMode) throws CacheExistException,
312             CacheConfigException {
313         EmbeddedCacheManager manager = this.cm;
314         Cache<Object,Object> c;
315         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
316         if (manager == null) {
317             return null;
318         }
319
320         if (manager.cacheExists(realCacheName)) {
321             throw new CacheExistException();
322         }
323
324         // Sanity check to avoid contrasting parameters between transactional
325         // and not
326         if (cMode.containsAll(EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL,
327                 IClusterServices.cacheMode.TRANSACTIONAL))) {
328             throw new CacheConfigException();
329         }
330
331         // Sanity check to avoid contrasting parameters between sync and async
332         if (cMode.containsAll(EnumSet.of(IClusterServices.cacheMode.SYNC, IClusterServices.cacheMode.ASYNC))) {
333             throw new CacheConfigException();
334         }
335
336         Configuration fromTemplateConfig = null;
337         /*
338          * Fetch transactional/non-transactional templates
339          */
340         // Check if transactional
341         if (cMode.contains(IClusterServices.cacheMode.TRANSACTIONAL)) {
342             fromTemplateConfig = manager.getCacheConfiguration("transactional-type");
343         } else if (cMode.contains(IClusterServices.cacheMode.NON_TRANSACTIONAL)) {
344             fromTemplateConfig = manager.getDefaultCacheConfiguration();
345         }
346
347         // If none set the transactional property then just return null
348         if (fromTemplateConfig == null) {
349             return null;
350         }
351
352         ConfigurationBuilder builder = new ConfigurationBuilder();
353         builder.read(fromTemplateConfig);
354         /*
355          * Now evaluate async/sync
356          */
357         if (cMode.contains(IClusterServices.cacheMode.ASYNC)) {
358             builder.clustering()
359                     .cacheMode(fromTemplateConfig.clustering()
360                             .cacheMode()
361                             .toAsync());
362         } else if (cMode.contains(IClusterServices.cacheMode.SYNC)) {
363             builder.clustering()
364                     .cacheMode(fromTemplateConfig.clustering()
365                             .cacheMode()
366                             .toSync());
367         }
368
369         manager.defineConfiguration(realCacheName, builder.build());
370         c = manager.getCache(realCacheName);
371         return c;
372     }
373
374     @Override
375     public ConcurrentMap<?, ?> getCache(String containerName, String cacheName) {
376         EmbeddedCacheManager manager = this.cm;
377         Cache<Object,Object> c;
378         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
379         if (manager == null) {
380             return null;
381         }
382
383         if (manager.cacheExists(realCacheName)) {
384             c = manager.getCache(realCacheName);
385             return c;
386         }
387         return null;
388     }
389
390     @Override
391     public void destroyCache(String containerName, String cacheName) {
392         EmbeddedCacheManager manager = this.cm;
393         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
394         if (manager == null) {
395             return;
396         }
397         if (manager.cacheExists(realCacheName)) {
398             manager.removeCache(realCacheName);
399         }
400     }
401
402     @Override
403     public boolean existCache(String containerName, String cacheName) {
404         EmbeddedCacheManager manager = this.cm;
405
406         if (manager == null) {
407             return false;
408         }
409
410         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
411         return manager.cacheExists(realCacheName);
412     }
413
414     @Override
415     public Set<String> getCacheList(String containerName) {
416         Set<String> perContainerCaches = new HashSet<String>();
417         EmbeddedCacheManager manager = this.cm;
418         if (manager == null) {
419             return null;
420         }
421         for (String cacheName : manager.getCacheNames()) {
422             if (!manager.isRunning(cacheName)) continue;
423             if (cacheName.startsWith("{" + containerName + "}_")) {
424                 String[] res = cacheName.split("[{}]");
425                 if (res.length >= 4 && res[1].equals(containerName)
426                         && res[2].equals("_")) {
427                     perContainerCaches.add(res[3]);
428                 }
429             }
430         }
431
432         return (perContainerCaches);
433     }
434
435     @Override
436     public Properties getCacheProperties(String containerName, String cacheName) {
437         EmbeddedCacheManager manager = this.cm;
438         if (manager == null) {
439             return null;
440         }
441         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
442         if (!manager.cacheExists(realCacheName)) {
443             return null;
444         }
445         Configuration conf = manager.getCache(realCacheName).getAdvancedCache()
446                 .getCacheConfiguration();
447         Properties p = new Properties();
448         p.setProperty(IClusterServices.cacheProps.TRANSACTION_PROP.toString(),
449                 conf.transaction().toString());
450         p.setProperty(IClusterServices.cacheProps.CLUSTERING_PROP.toString(),
451                 conf.clustering().toString());
452         p.setProperty(IClusterServices.cacheProps.LOCKING_PROP.toString(), conf
453                 .locking().toString());
454         return p;
455     }
456
457     @Override
458     public void addListener(String containerName, String cacheName,
459             IGetUpdates<?, ?> u) throws CacheListenerAddException {
460         EmbeddedCacheManager manager = this.cm;
461         Cache<Object,Object> c;
462         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
463         if (manager == null) {
464             return;
465         }
466
467         if (!manager.cacheExists(realCacheName)) {
468             throw new CacheListenerAddException();
469         }
470         c = manager.getCache(realCacheName);
471         CacheListenerContainer cl = new CacheListenerContainer(u,
472                 containerName, cacheName);
473         c.addListener(cl);
474     }
475
476     @Override
477     public Set<IGetUpdates<?, ?>> getListeners(String containerName,
478             String cacheName) {
479         EmbeddedCacheManager manager = this.cm;
480         Cache<Object,Object> c;
481         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
482         if (manager == null) {
483             return null;
484         }
485
486         if (!manager.cacheExists(realCacheName)) {
487             return null;
488         }
489         c = manager.getCache(realCacheName);
490
491         Set<IGetUpdates<?, ?>> res = new HashSet<IGetUpdates<?, ?>>();
492         Set<Object> listeners = c.getListeners();
493         for (Object listener : listeners) {
494             if (listener instanceof CacheListenerContainer) {
495                 CacheListenerContainer cl = (CacheListenerContainer) listener;
496                 res.add(cl.whichListener());
497             }
498         }
499
500         return res;
501     }
502
503     @Override
504     public void removeListener(String containerName, String cacheName,
505             IGetUpdates<?, ?> u) {
506         EmbeddedCacheManager manager = this.cm;
507         Cache<Object,Object> c;
508         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
509         if (manager == null) {
510             return;
511         }
512
513         if (!manager.cacheExists(realCacheName)) {
514             return;
515         }
516         c = manager.getCache(realCacheName);
517
518         Set<Object> listeners = c.getListeners();
519         for (Object listener : listeners) {
520             if (listener instanceof CacheListenerContainer) {
521                 CacheListenerContainer cl = (CacheListenerContainer) listener;
522                 if (cl.whichListener() == u) {
523                     c.removeListener(listener);
524                     return;
525                 }
526             }
527         }
528     }
529
530     @Override
531     public void tbegin() throws NotSupportedException, SystemException {
532         // call tbegin with the default timeout
533         tbegin(DEFAULT_TRANSACTION_TIMEOUT, TimeUnit.SECONDS);
534     }
535
536     @Override
537     public void tbegin(long timeout, TimeUnit unit) throws NotSupportedException, SystemException {
538         EmbeddedCacheManager manager = this.cm;
539         if (manager == null) {
540             throw new IllegalStateException();
541         }
542         TransactionManager tm = manager.getCache("transactional-type")
543                 .getAdvancedCache().getTransactionManager();
544         if (tm == null) {
545             throw new IllegalStateException();
546         }
547         long timeoutSec = unit.toSeconds(timeout);
548         if((timeoutSec > Integer.MAX_VALUE) || (timeoutSec <= 0)) {
549             // fall back to the default timeout
550             tm.setTransactionTimeout(DEFAULT_TRANSACTION_TIMEOUT);
551         } else {
552             // cast is ok here
553             // as here we are sure that timeoutSec < = Integer.MAX_VALUE.
554             tm.setTransactionTimeout((int) timeoutSec);
555         }
556         tm.begin();
557     }
558
559     @Override
560     public void tcommit() throws RollbackException, HeuristicMixedException,
561             HeuristicRollbackException, java.lang.SecurityException,
562             java.lang.IllegalStateException, SystemException {
563         EmbeddedCacheManager manager = this.cm;
564         if (manager == null) {
565             throw new IllegalStateException();
566         }
567         TransactionManager tm = manager.getCache("transactional-type")
568                 .getAdvancedCache().getTransactionManager();
569         if (tm == null) {
570             throw new IllegalStateException();
571         }
572         tm.commit();
573     }
574
575     @Override
576     public void trollback() throws java.lang.IllegalStateException,
577             java.lang.SecurityException, SystemException {
578         EmbeddedCacheManager manager = this.cm;
579         if (manager == null) {
580             throw new IllegalStateException();
581         }
582         TransactionManager tm = manager.getCache("transactional-type")
583                 .getAdvancedCache().getTransactionManager();
584         if (tm == null) {
585             throw new IllegalStateException();
586         }
587         tm.rollback();
588     }
589
590     @Override
591     public Transaction tgetTransaction() throws SystemException {
592         EmbeddedCacheManager manager = this.cm;
593         if (manager == null) {
594             throw new IllegalStateException();
595         }
596         TransactionManager tm = manager.getCache("transactional-type")
597                 .getAdvancedCache().getTransactionManager();
598         if (tm == null) {
599             return null;
600         }
601         return tm.getTransaction();
602     }
603
604     @Override
605     public boolean amIStandby() {
606         EmbeddedCacheManager manager = this.cm;
607         if (manager == null) {
608             // In case we cannot fetch the information, lets assume we
609             // are standby, so to have less responsibility.
610             return true;
611         }
612         return (!manager.isCoordinator());
613     }
614
615     private InetAddress addressToInetAddress(Address a) {
616         EmbeddedCacheManager manager = this.cm;
617         if ((manager == null) || (a == null)) {
618             // In case we cannot fetch the information, lets assume we
619             // are standby, so to have less responsibility.
620             return null;
621         }
622         Transport t = manager.getTransport();
623         if (t instanceof JGroupsTransport) {
624             JGroupsTransport jt = (JGroupsTransport) t;
625             Channel c = jt.getChannel();
626             if (a instanceof JGroupsAddress) {
627                 JGroupsAddress ja = (JGroupsAddress) a;
628                 org.jgroups.Address phys = (org.jgroups.Address) c
629                         .down(new Event(Event.GET_PHYSICAL_ADDRESS, ja
630                                 .getJGroupsAddress()));
631                 if (phys instanceof org.jgroups.stack.IpAddress) {
632                     InetAddress bindAddress = ((org.jgroups.stack.IpAddress) phys)
633                             .getIpAddress();
634                     return bindAddress;
635                 }
636             }
637         }
638         return null;
639     }
640
641     @Override
642     public List<InetAddress> getClusteredControllers() {
643         EmbeddedCacheManager manager = this.cm;
644         if (manager == null) {
645             return null;
646         }
647         List<Address> controllers = manager.getMembers();
648         if ((controllers == null) || controllers.size() == 0) {
649             return null;
650         }
651
652         List<InetAddress> clusteredControllers = new ArrayList<InetAddress>();
653         for (Address a : controllers) {
654             InetAddress inetAddress = addressToInetAddress(a);
655             if (inetAddress != null
656                     && !inetAddress.getHostAddress().equals(loopbackAddress)) {
657                 clusteredControllers.add(inetAddress);
658             }
659         }
660         return clusteredControllers;
661     }
662
663     @Override
664     public InetAddress getMyAddress() {
665         EmbeddedCacheManager manager = this.cm;
666         if (manager == null) {
667             return null;
668         }
669         return addressToInetAddress(manager.getAddress());
670     }
671
672     @Override
673     public InetAddress getActiveAddress() {
674         EmbeddedCacheManager manager = this.cm;
675         if (manager == null) {
676             // In case we cannot fetch the information, lets assume we
677             // are standby, so to have less responsibility.
678             return null;
679         }
680
681         return addressToInetAddress(manager.getCoordinator());
682     }
683
684     @Override
685     public void listenRoleChange(IListenRoleChange i)
686             throws ListenRoleChangeAddException {
687         EmbeddedCacheManager manager = this.cm;
688         if (manager == null) {
689             // In case we cannot fetch the information, lets assume we
690             // are standby, so to have less responsibility.
691             throw new ListenRoleChangeAddException();
692         }
693
694         if (this.roleChangeListeners == null) {
695             this.roleChangeListeners = new HashSet<IListenRoleChange>();
696             this.cacheManagerListener = new ViewChangedListener(
697                     this.roleChangeListeners);
698             manager.addListener(this.cacheManagerListener);
699         }
700
701         if (this.roleChangeListeners != null) {
702             this.roleChangeListeners.add(i);
703         }
704     }
705
706     @Override
707     public void unlistenRoleChange(IListenRoleChange i) {
708         EmbeddedCacheManager manager = this.cm;
709         if (manager == null) {
710             // In case we cannot fetch the information, lets assume we
711             // are standby, so to have less responsibility.
712             return;
713         }
714
715         if (this.roleChangeListeners != null) {
716             this.roleChangeListeners.remove(i);
717         }
718
719         if ((this.roleChangeListeners != null && this.roleChangeListeners
720                 .isEmpty())
721                 && (this.cacheManagerListener != null)) {
722             manager.removeListener(this.cacheManagerListener);
723             this.cacheManagerListener = null;
724             this.roleChangeListeners = null;
725         }
726     }
727
728     @Listener
729     public class ViewChangedListener {
730         Set<IListenRoleChange> roleListeners;
731
732         public ViewChangedListener(Set<IListenRoleChange> s) {
733             this.roleListeners = s;
734         }
735
736         @ViewChanged
737         public void viewChanged(ViewChangedEvent e) {
738             for (IListenRoleChange i : this.roleListeners) {
739                 i.newActiveAvailable();
740             }
741         }
742     }
743 }