83db4144007963a94e503a33345b1e641eadc2a0
[controller.git] / opendaylight / clustering / services_implementation / src / main / java / org / opendaylight / controller / clustering / services_implementation / internal / ClusterManager.java
1
2 /*
3  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 package org.opendaylight.controller.clustering.services_implementation.internal;
11
12 import java.net.InetAddress;
13 import java.net.NetworkInterface;
14 import java.net.SocketException;
15 import java.net.UnknownHostException;
16 import java.util.ArrayList;
17 import java.util.EnumSet;
18 import java.util.Enumeration;
19 import java.util.HashSet;
20 import java.util.List;
21 import java.util.Properties;
22 import java.util.Set;
23 import java.util.StringTokenizer;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.TimeUnit;
26
27 import javax.transaction.HeuristicMixedException;
28 import javax.transaction.HeuristicRollbackException;
29 import javax.transaction.NotSupportedException;
30 import javax.transaction.RollbackException;
31 import javax.transaction.SystemException;
32 import javax.transaction.Transaction;
33 import javax.transaction.TransactionManager;
34
35 import org.infinispan.Cache;
36 import org.infinispan.configuration.cache.Configuration;
37 import org.infinispan.configuration.cache.ConfigurationBuilder;
38 import org.infinispan.configuration.global.GlobalConfigurationBuilder;
39 import org.infinispan.configuration.parsing.ConfigurationBuilderHolder;
40 import org.infinispan.configuration.parsing.ParserRegistry;
41 import org.infinispan.manager.DefaultCacheManager;
42 import org.infinispan.manager.EmbeddedCacheManager;
43 import org.infinispan.notifications.Listener;
44 import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
45 import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
46 import org.infinispan.remoting.transport.Address;
47 import org.infinispan.remoting.transport.Transport;
48 import org.infinispan.remoting.transport.jgroups.JGroupsAddress;
49 import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
50 import org.jgroups.Channel;
51 import org.jgroups.Event;
52 import org.jgroups.stack.GossipRouter;
53 import org.opendaylight.controller.clustering.services.CacheConfigException;
54 import org.opendaylight.controller.clustering.services.CacheExistException;
55 import org.opendaylight.controller.clustering.services.CacheListenerAddException;
56 import org.opendaylight.controller.clustering.services.IClusterServices;
57 import org.opendaylight.controller.clustering.services.IGetUpdates;
58 import org.opendaylight.controller.clustering.services.IListenRoleChange;
59 import org.opendaylight.controller.clustering.services.ListenRoleChangeAddException;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63 public class ClusterManager implements IClusterServices {
64     protected static final Logger logger = LoggerFactory
65             .getLogger(ClusterManager.class);
66     private DefaultCacheManager cm;
67     GossipRouter gossiper;
68     private HashSet<IListenRoleChange> roleChangeListeners;
69     private ViewChangedListener cacheManagerListener;
70
71     private static String loopbackAddress = InetAddress.getLoopbackAddress().getHostAddress();
72     private static final int gossipRouterPortDefault = 12001;
73     // defaultTransactionTimeout is 60 seconds
74     private static int DEFAULT_TRANSACTION_TIMEOUT = 60;
75
76     /**
77      * Start a JGroups GossipRouter if we are a supernode. The
78      * GosispRouter is nothing more than a simple
79      * rendevouz-pointer. All the nodes that wants to join the cluster
80      * will come to any of the rendevouz point and they introduce the
81      * nodes to all the others. Once the meet and greet phase if over,
82      * the nodes will open a full-mesh with the remaining n-1 nodes,
83      * so even if the GossipRouter goes down nothing is lost.
84      * NOTE: This function has the side effect to set some of the
85      * JGROUPS configurations, this because in this function already
86      * we try to retrieve some of the network capabilities of the
87      * device and so it's better not to do that again
88      *
89      *
90      * @return GossipRouter
91      */
92     private GossipRouter startGossiper() {
93         boolean amIGossipRouter = false;
94         Integer gossipRouterPort = gossipRouterPortDefault;
95         InetAddress gossipRouterAddress = null;
96         String supernodes_list = System.getProperty("supernodes",
97                 loopbackAddress);
98         StringBuilder sanitized_supernodes_list = new StringBuilder();
99         List<InetAddress> myAddresses = new ArrayList<InetAddress>();
100
101         StringTokenizer supernodes = new StringTokenizer(supernodes_list, ":");
102         if (supernodes.hasMoreTokens()) {
103             // Populate the list of my addresses
104             try {
105                 Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
106                 while (e.hasMoreElements()) {
107                     NetworkInterface n = e.nextElement();
108                     Enumeration<InetAddress> ee = n.getInetAddresses();
109                     while (ee.hasMoreElements()) {
110                         InetAddress i = ee.nextElement();
111                         myAddresses.add(i);
112                     }
113                 }
114             } catch (SocketException se) {
115                 logger.error("Cannot get the list of network interfaces");
116                 return null;
117             }
118         }
119         while (supernodes.hasMoreTokens()) {
120             String curr_supernode = supernodes.nextToken();
121             logger.debug("Examining supernode {}", curr_supernode);
122             StringTokenizer host_port = new StringTokenizer(curr_supernode,
123                     "[]");
124             String host;
125             String port;
126             Integer port_num = gossipRouterPortDefault;
127             if (host_port.countTokens() > 2) {
128                 logger.error("Error parsing supernode {} proceed to the next one",
129                         curr_supernode);
130                 continue;
131             }
132             host = host_port.nextToken();
133             InetAddress hostAddr;
134             try {
135                 hostAddr = InetAddress.getByName(host);
136             } catch (UnknownHostException ue) {
137                 logger.error("Host {} is not known", host);
138                 continue;
139             }
140             if (host_port.hasMoreTokens()) {
141                 port = host_port.nextToken();
142                 try {
143                     port_num = Integer.valueOf(port);
144                 } catch (NumberFormatException ne) {
145                     logger.error("Supplied supernode gossip port is not recognized, using default gossip port {}",
146                                  gossipRouterPortDefault);
147                     port_num = gossipRouterPortDefault;
148                 }
149                 if ((port_num > 65535) || (port_num < 0)) {
150                     logger.error("Supplied supernode gossip port is outside a valid TCP port range");
151                     port_num = gossipRouterPortDefault;
152                 }
153             }
154             if (!amIGossipRouter) {
155                 if (host != null) {
156                     for (InetAddress myAddr : myAddresses) {
157                         if (myAddr.equals(hostAddr)) {
158                             amIGossipRouter = true;
159                             gossipRouterAddress = hostAddr;
160                             gossipRouterPort = port_num;
161                             break;
162                         }
163                     }
164                 }
165             }
166             if (!sanitized_supernodes_list.toString().equals("")) {
167                 sanitized_supernodes_list.append(",");
168             }
169             sanitized_supernodes_list.append(hostAddr.getHostAddress()).append("[").append(port_num).append("]");
170         }
171
172         if (amIGossipRouter) {
173             // Set the Jgroups binding interface to the one we got
174             // from the supernodes attribute
175             if (gossipRouterAddress != null) {
176                 System.setProperty("jgroups.tcp.address", gossipRouterAddress
177                         .getHostAddress());
178             }
179         } else {
180             // Set the Jgroup binding interface to the one we are well
181             // known outside or else to the first with non-local
182             // scope.
183             try {
184                 String myBind = InetAddress.getLocalHost().getHostAddress();
185                 if (myBind == null
186                         || InetAddress.getLocalHost().isLoopbackAddress()) {
187                     for (InetAddress myAddr : myAddresses) {
188                         if (myAddr.isLoopbackAddress()
189                                 || myAddr.isLinkLocalAddress()) {
190                             logger.debug("Skipping local address {}",
191                                          myAddr.getHostAddress());
192                             continue;
193                         } else {
194                             // First non-local address
195                             myBind = myAddr.getHostAddress();
196                             logger.debug("First non-local address {}", myBind);
197                             break;
198                         }
199                     }
200                 }
201                 String jgroupAddress = System
202                         .getProperty("jgroups.tcp.address");
203                 if (jgroupAddress == null) {
204                     if (myBind != null) {
205                         logger.debug("Set bind address to be {}", myBind);
206                         System.setProperty("jgroups.tcp.address", myBind);
207                     } else {
208                         logger
209                                 .debug("Set bind address to be LOCALHOST=127.0.0.1");
210                         System.setProperty("jgroups.tcp.address", "127.0.0.1");
211                     }
212                 } else {
213                     logger.debug("jgroup.tcp.address already set to be {}",
214                             jgroupAddress);
215                 }
216             } catch (UnknownHostException uhe) {
217                 logger
218                         .error("Met UnknownHostException while trying to get binding address for jgroups");
219             }
220         }
221
222         // The supernodes list constitute also the tcpgossip initial
223         // host list
224         System.setProperty("jgroups.tcpgossip.initial_hosts",
225                 sanitized_supernodes_list.toString());
226         logger.debug("jgroups.tcp.address set to {}",
227                 System.getProperty("jgroups.tcp.address"));
228         logger.debug("jgroups.tcpgossip.initial_hosts set to {}",
229                 System.getProperty("jgroups.tcpgossip.initial_hosts"));
230         GossipRouter res = null;
231         if (amIGossipRouter) {
232             logger.info("I'm a GossipRouter will listen on port {}",
233                     gossipRouterPort);
234             // Start a GossipRouter with JMX support
235             res = new GossipRouter(gossipRouterPort, null, true);
236         }
237         return res;
238     }
239
240     private void exitOnSecurityException(Exception ioe) {
241         Throwable cause = ioe.getCause();
242         while (cause != null) {
243             if (cause instanceof java.lang.SecurityException) {
244                 logger.error("Failed Cluster authentication. Stopping Controller...");
245                 System.exit(0);
246             }
247             cause = cause.getCause();
248         }
249     }
250
251     public void start() {
252         this.gossiper = startGossiper();
253         if (this.gossiper != null) {
254             logger.debug("Trying to start Gossiper");
255             try {
256                 this.gossiper.start();
257                 logger.info("Started GossipRouter");
258             } catch (Exception e) {
259                 logger.error("GossipRouter didn't start. Exception Stack Trace",
260                              e);
261             }
262         }
263         logger.info("Starting the ClusterManager");
264         try {
265             ParserRegistry parser = new ParserRegistry(this.getClass()
266                     .getClassLoader());
267             String infinispanConfigFile =
268                     System.getProperty("org.infinispan.config.file", "config/infinispan-config.xml");
269             logger.debug("Using configuration file:{}", infinispanConfigFile);
270             ConfigurationBuilderHolder holder = parser.parseFile(infinispanConfigFile);
271             GlobalConfigurationBuilder globalBuilder = holder.getGlobalConfigurationBuilder();
272             globalBuilder.serialization()
273                     .classResolver(new ClassResolver())
274                     .build();
275             this.cm = new DefaultCacheManager(holder, false);
276             logger.debug("Allocated ClusterManager");
277             if (this.cm != null) {
278                 this.cm.start();
279                 this.cm.startCache();
280                 logger.debug("Started the ClusterManager");
281             }
282         } catch (Exception ioe) {
283             logger.error("Cannot configure infinispan .. bailing out ");
284             logger.error("Stack Trace that raised th exception");
285             logger.error("",ioe);
286             this.cm = null;
287             exitOnSecurityException(ioe);
288             this.stop();
289         }
290         logger.debug("Cache Manager has value {}", this.cm);
291     }
292
293     public void stop() {
294         logger.info("Stopping the ClusterManager");
295         if (this.cm != null) {
296             logger.info("Found a valid ClusterManager, now let it be stopped");
297             this.cm.stop();
298             this.cm = null;
299         }
300         if (this.gossiper != null) {
301             this.gossiper.stop();
302             this.gossiper = null;
303         }
304     }
305
306     @Override
307     public ConcurrentMap<?, ?> createCache(String containerName,
308             String cacheName, Set<cacheMode> cMode) throws CacheExistException,
309             CacheConfigException {
310         EmbeddedCacheManager manager = this.cm;
311         Cache<Object,Object> c;
312         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
313         if (manager == null) {
314             return null;
315         }
316
317         if (manager.cacheExists(realCacheName)) {
318             throw new CacheExistException();
319         }
320
321         // Sanity check to avoid contrasting parameters between transactional
322         // and not
323         if (cMode.containsAll(EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL,
324                 IClusterServices.cacheMode.TRANSACTIONAL))) {
325             throw new CacheConfigException();
326         }
327
328         // Sanity check to avoid contrasting parameters between sync and async
329         if (cMode.containsAll(EnumSet.of(IClusterServices.cacheMode.SYNC, IClusterServices.cacheMode.ASYNC))) {
330             throw new CacheConfigException();
331         }
332
333         Configuration fromTemplateConfig = null;
334         /*
335          * Fetch transactional/non-transactional templates
336          */
337         // Check if transactional
338         if (cMode.contains(IClusterServices.cacheMode.TRANSACTIONAL)) {
339             fromTemplateConfig = manager.getCacheConfiguration("transactional-type");
340         } else if (cMode.contains(IClusterServices.cacheMode.NON_TRANSACTIONAL)) {
341             fromTemplateConfig = manager.getDefaultCacheConfiguration();
342         }
343
344         // If none set the transactional property then just return null
345         if (fromTemplateConfig == null) {
346             return null;
347         }
348
349         ConfigurationBuilder builder = new ConfigurationBuilder();
350         builder.read(fromTemplateConfig);
351         /*
352          * Now evaluate async/sync
353          */
354         if (cMode.contains(IClusterServices.cacheMode.ASYNC)) {
355             builder.clustering()
356                     .cacheMode(fromTemplateConfig.clustering()
357                             .cacheMode()
358                             .toAsync());
359         } else if (cMode.contains(IClusterServices.cacheMode.SYNC)) {
360             builder.clustering()
361                     .cacheMode(fromTemplateConfig.clustering()
362                             .cacheMode()
363                             .toSync());
364         }
365
366         manager.defineConfiguration(realCacheName, builder.build());
367         c = manager.getCache(realCacheName);
368         return c;
369     }
370
371     @Override
372     public ConcurrentMap<?, ?> getCache(String containerName, String cacheName) {
373         EmbeddedCacheManager manager = this.cm;
374         Cache<Object,Object> c;
375         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
376         if (manager == null) {
377             return null;
378         }
379
380         if (manager.cacheExists(realCacheName)) {
381             c = manager.getCache(realCacheName);
382             return c;
383         }
384         return null;
385     }
386
387     @Override
388     public void destroyCache(String containerName, String cacheName) {
389         EmbeddedCacheManager manager = this.cm;
390         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
391         if (manager == null) {
392             return;
393         }
394         if (manager.cacheExists(realCacheName)) {
395             manager.removeCache(realCacheName);
396         }
397     }
398
399     @Override
400     public boolean existCache(String containerName, String cacheName) {
401         EmbeddedCacheManager manager = this.cm;
402
403         if (manager == null) {
404             return false;
405         }
406
407         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
408         return manager.cacheExists(realCacheName);
409     }
410
411     @Override
412     public Set<String> getCacheList(String containerName) {
413         Set<String> perContainerCaches = new HashSet<String>();
414         EmbeddedCacheManager manager = this.cm;
415         if (manager == null) {
416             return null;
417         }
418         for (String cacheName : manager.getCacheNames()) {
419             if (!manager.isRunning(cacheName)) continue;
420             if (cacheName.startsWith("{" + containerName + "}_")) {
421                 String[] res = cacheName.split("[{}]");
422                 if (res.length >= 4 && res[1].equals(containerName)
423                         && res[2].equals("_")) {
424                     perContainerCaches.add(res[3]);
425                 }
426             }
427         }
428
429         return (perContainerCaches);
430     }
431
432     @Override
433     public Properties getCacheProperties(String containerName, String cacheName) {
434         EmbeddedCacheManager manager = this.cm;
435         if (manager == null) {
436             return null;
437         }
438         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
439         if (!manager.cacheExists(realCacheName)) {
440             return null;
441         }
442         Configuration conf = manager.getCache(realCacheName).getAdvancedCache()
443                 .getCacheConfiguration();
444         Properties p = new Properties();
445         p.setProperty(IClusterServices.cacheProps.TRANSACTION_PROP.toString(),
446                 conf.transaction().toString());
447         p.setProperty(IClusterServices.cacheProps.CLUSTERING_PROP.toString(),
448                 conf.clustering().toString());
449         p.setProperty(IClusterServices.cacheProps.LOCKING_PROP.toString(), conf
450                 .locking().toString());
451         return p;
452     }
453
454     @Override
455     public void addListener(String containerName, String cacheName,
456             IGetUpdates<?, ?> u) throws CacheListenerAddException {
457         EmbeddedCacheManager manager = this.cm;
458         Cache<Object,Object> c;
459         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
460         if (manager == null) {
461             return;
462         }
463
464         if (!manager.cacheExists(realCacheName)) {
465             throw new CacheListenerAddException();
466         }
467         c = manager.getCache(realCacheName);
468         CacheListenerContainer cl = new CacheListenerContainer(u,
469                 containerName, cacheName);
470         c.addListener(cl);
471     }
472
473     @Override
474     public Set<IGetUpdates<?, ?>> getListeners(String containerName,
475             String cacheName) {
476         EmbeddedCacheManager manager = this.cm;
477         Cache<Object,Object> c;
478         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
479         if (manager == null) {
480             return null;
481         }
482
483         if (!manager.cacheExists(realCacheName)) {
484             return null;
485         }
486         c = manager.getCache(realCacheName);
487
488         Set<IGetUpdates<?, ?>> res = new HashSet<IGetUpdates<?, ?>>();
489         Set<Object> listeners = c.getListeners();
490         for (Object listener : listeners) {
491             if (listener instanceof CacheListenerContainer) {
492                 CacheListenerContainer cl = (CacheListenerContainer) listener;
493                 res.add(cl.whichListener());
494             }
495         }
496
497         return res;
498     }
499
500     @Override
501     public void removeListener(String containerName, String cacheName,
502             IGetUpdates<?, ?> u) {
503         EmbeddedCacheManager manager = this.cm;
504         Cache<Object,Object> c;
505         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
506         if (manager == null) {
507             return;
508         }
509
510         if (!manager.cacheExists(realCacheName)) {
511             return;
512         }
513         c = manager.getCache(realCacheName);
514
515         Set<Object> listeners = c.getListeners();
516         for (Object listener : listeners) {
517             if (listener instanceof CacheListenerContainer) {
518                 CacheListenerContainer cl = (CacheListenerContainer) listener;
519                 if (cl.whichListener() == u) {
520                     c.removeListener(listener);
521                     return;
522                 }
523             }
524         }
525     }
526
527     @Override
528     public void tbegin() throws NotSupportedException, SystemException {
529         // call tbegin with the default timeout
530         tbegin(DEFAULT_TRANSACTION_TIMEOUT, TimeUnit.SECONDS);
531     }
532
533     @Override
534     public void tbegin(long timeout, TimeUnit unit) throws NotSupportedException, SystemException {
535         EmbeddedCacheManager manager = this.cm;
536         if (manager == null) {
537             throw new IllegalStateException();
538         }
539         TransactionManager tm = manager.getCache("transactional-type")
540                 .getAdvancedCache().getTransactionManager();
541         if (tm == null) {
542             throw new IllegalStateException();
543         }
544         long timeoutSec = unit.toSeconds(timeout);
545         if((timeoutSec > Integer.MAX_VALUE) || (timeoutSec <= 0)) {
546             // fall back to the default timeout
547             tm.setTransactionTimeout(DEFAULT_TRANSACTION_TIMEOUT);
548         } else {
549             // cast is ok here
550             // as here we are sure that timeoutSec < = Integer.MAX_VALUE.
551             tm.setTransactionTimeout((int) timeoutSec);
552         }
553         tm.begin();
554     }
555
556     @Override
557     public void tcommit() throws RollbackException, HeuristicMixedException,
558             HeuristicRollbackException, java.lang.SecurityException,
559             java.lang.IllegalStateException, SystemException {
560         EmbeddedCacheManager manager = this.cm;
561         if (manager == null) {
562             throw new IllegalStateException();
563         }
564         TransactionManager tm = manager.getCache("transactional-type")
565                 .getAdvancedCache().getTransactionManager();
566         if (tm == null) {
567             throw new IllegalStateException();
568         }
569         tm.commit();
570     }
571
572     @Override
573     public void trollback() throws java.lang.IllegalStateException,
574             java.lang.SecurityException, SystemException {
575         EmbeddedCacheManager manager = this.cm;
576         if (manager == null) {
577             throw new IllegalStateException();
578         }
579         TransactionManager tm = manager.getCache("transactional-type")
580                 .getAdvancedCache().getTransactionManager();
581         if (tm == null) {
582             throw new IllegalStateException();
583         }
584         tm.rollback();
585     }
586
587     @Override
588     public Transaction tgetTransaction() throws SystemException {
589         EmbeddedCacheManager manager = this.cm;
590         if (manager == null) {
591             throw new IllegalStateException();
592         }
593         TransactionManager tm = manager.getCache("transactional-type")
594                 .getAdvancedCache().getTransactionManager();
595         if (tm == null) {
596             return null;
597         }
598         return tm.getTransaction();
599     }
600
601     @Override
602     public boolean amIStandby() {
603         EmbeddedCacheManager manager = this.cm;
604         if (manager == null) {
605             // In case we cannot fetch the information, lets assume we
606             // are standby, so to have less responsibility.
607             return true;
608         }
609         return (!manager.isCoordinator());
610     }
611
612     private InetAddress addressToInetAddress(Address a) {
613         EmbeddedCacheManager manager = this.cm;
614         if ((manager == null) || (a == null)) {
615             // In case we cannot fetch the information, lets assume we
616             // are standby, so to have less responsibility.
617             return null;
618         }
619         Transport t = manager.getTransport();
620         if (t instanceof JGroupsTransport) {
621             JGroupsTransport jt = (JGroupsTransport) t;
622             Channel c = jt.getChannel();
623             if (a instanceof JGroupsAddress) {
624                 JGroupsAddress ja = (JGroupsAddress) a;
625                 org.jgroups.Address phys = (org.jgroups.Address) c
626                         .down(new Event(Event.GET_PHYSICAL_ADDRESS, ja
627                                 .getJGroupsAddress()));
628                 if (phys instanceof org.jgroups.stack.IpAddress) {
629                     InetAddress bindAddress = ((org.jgroups.stack.IpAddress) phys)
630                             .getIpAddress();
631                     return bindAddress;
632                 }
633             }
634         }
635         return null;
636     }
637
638     @Override
639     public List<InetAddress> getClusteredControllers() {
640         EmbeddedCacheManager manager = this.cm;
641         if (manager == null) {
642             return null;
643         }
644         List<Address> controllers = manager.getMembers();
645         if ((controllers == null) || controllers.size() == 0) {
646             return null;
647         }
648
649         List<InetAddress> clusteredControllers = new ArrayList<InetAddress>();
650         for (Address a : controllers) {
651             InetAddress inetAddress = addressToInetAddress(a);
652             if (inetAddress != null
653                     && !inetAddress.getHostAddress().equals(loopbackAddress)) {
654                 clusteredControllers.add(inetAddress);
655             }
656         }
657         return clusteredControllers;
658     }
659
660     @Override
661     public InetAddress getMyAddress() {
662         EmbeddedCacheManager manager = this.cm;
663         if (manager == null) {
664             return null;
665         }
666         return addressToInetAddress(manager.getAddress());
667     }
668
669     @Override
670     public InetAddress getActiveAddress() {
671         EmbeddedCacheManager manager = this.cm;
672         if (manager == null) {
673             // In case we cannot fetch the information, lets assume we
674             // are standby, so to have less responsibility.
675             return null;
676         }
677
678         return addressToInetAddress(manager.getCoordinator());
679     }
680
681     @Override
682     public void listenRoleChange(IListenRoleChange i)
683             throws ListenRoleChangeAddException {
684         EmbeddedCacheManager manager = this.cm;
685         if (manager == null) {
686             // In case we cannot fetch the information, lets assume we
687             // are standby, so to have less responsibility.
688             throw new ListenRoleChangeAddException();
689         }
690
691         if (this.roleChangeListeners == null) {
692             this.roleChangeListeners = new HashSet<IListenRoleChange>();
693             this.cacheManagerListener = new ViewChangedListener(
694                     this.roleChangeListeners);
695             manager.addListener(this.cacheManagerListener);
696         }
697
698         if (this.roleChangeListeners != null) {
699             this.roleChangeListeners.add(i);
700         }
701     }
702
703     @Override
704     public void unlistenRoleChange(IListenRoleChange i) {
705         EmbeddedCacheManager manager = this.cm;
706         if (manager == null) {
707             // In case we cannot fetch the information, lets assume we
708             // are standby, so to have less responsibility.
709             return;
710         }
711
712         if (this.roleChangeListeners != null) {
713             this.roleChangeListeners.remove(i);
714         }
715
716         if ((this.roleChangeListeners != null && this.roleChangeListeners
717                 .isEmpty())
718                 && (this.cacheManagerListener != null)) {
719             manager.removeListener(this.cacheManagerListener);
720             this.cacheManagerListener = null;
721             this.roleChangeListeners = null;
722         }
723     }
724
725     @Listener
726     public class ViewChangedListener {
727         Set<IListenRoleChange> roleListeners;
728
729         public ViewChangedListener(Set<IListenRoleChange> s) {
730             this.roleListeners = s;
731         }
732
733         @ViewChanged
734         public void viewChanged(ViewChangedEvent e) {
735             for (IListenRoleChange i : this.roleListeners) {
736                 i.newActiveAvailable();
737             }
738         }
739     }
740 }