e34eb329330d2e1b30ee72aac6950f2084894777
[controller.git] / opendaylight / adsal / clustering / services_implementation / src / main / java / org / opendaylight / controller / clustering / services_implementation / internal / ClusterManager.java
1
2 /*
3  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 package org.opendaylight.controller.clustering.services_implementation.internal;
11
12 import java.net.InetAddress;
13 import java.net.NetworkInterface;
14 import java.net.SocketException;
15 import java.net.UnknownHostException;
16 import java.net.URI;
17 import java.net.URISyntaxException;
18 import java.util.ArrayList;
19 import java.util.EnumSet;
20 import java.util.Enumeration;
21 import java.util.HashSet;
22 import java.util.List;
23 import java.util.Properties;
24 import java.util.Set;
25 import java.util.StringTokenizer;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.TimeUnit;
28
29 import javax.transaction.HeuristicMixedException;
30 import javax.transaction.HeuristicRollbackException;
31 import javax.transaction.NotSupportedException;
32 import javax.transaction.RollbackException;
33 import javax.transaction.SystemException;
34 import javax.transaction.Transaction;
35 import javax.transaction.TransactionManager;
36
37 import org.infinispan.Cache;
38 import org.infinispan.configuration.cache.Configuration;
39 import org.infinispan.configuration.cache.ConfigurationBuilder;
40 import org.infinispan.configuration.global.GlobalConfigurationBuilder;
41 import org.infinispan.configuration.parsing.ConfigurationBuilderHolder;
42 import org.infinispan.configuration.parsing.ParserRegistry;
43 import org.infinispan.manager.DefaultCacheManager;
44 import org.infinispan.manager.EmbeddedCacheManager;
45 import org.infinispan.notifications.Listener;
46 import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
47 import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
48 import org.infinispan.remoting.transport.Address;
49 import org.infinispan.remoting.transport.Transport;
50 import org.infinispan.remoting.transport.jgroups.JGroupsAddress;
51 import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
52 import org.jgroups.Channel;
53 import org.jgroups.Event;
54 import org.jgroups.stack.GossipRouter;
55 import org.opendaylight.controller.clustering.services.CacheConfigException;
56 import org.opendaylight.controller.clustering.services.CacheExistException;
57 import org.opendaylight.controller.clustering.services.CacheListenerAddException;
58 import org.opendaylight.controller.clustering.services.IClusterServices;
59 import org.opendaylight.controller.clustering.services.IGetUpdates;
60 import org.opendaylight.controller.clustering.services.IListenRoleChange;
61 import org.opendaylight.controller.clustering.services.ListenRoleChangeAddException;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64
65 public class ClusterManager implements IClusterServices {
66     protected static final Logger logger = LoggerFactory
67             .getLogger(ClusterManager.class);
68     private DefaultCacheManager cm;
69     GossipRouter gossiper;
70     private HashSet<IListenRoleChange> roleChangeListeners;
71     private ViewChangedListener cacheManagerListener;
72
73     private static String loopbackAddress = InetAddress.getLoopbackAddress().getHostAddress();
74     private static final int gossipRouterPortDefault = 12001;
75     // defaultTransactionTimeout is 60 seconds
76     private static int DEFAULT_TRANSACTION_TIMEOUT = 60;
77
78     /**
79      * Start a JGroups GossipRouter if we are a supernode. The
80      * GosispRouter is nothing more than a simple
81      * rendevouz-pointer. All the nodes that wants to join the cluster
82      * will come to any of the rendevouz point and they introduce the
83      * nodes to all the others. Once the meet and greet phase if over,
84      * the nodes will open a full-mesh with the remaining n-1 nodes,
85      * so even if the GossipRouter goes down nothing is lost.
86      * NOTE: This function has the side effect to set some of the
87      * JGROUPS configurations, this because in this function already
88      * we try to retrieve some of the network capabilities of the
89      * device and so it's better not to do that again
90      *
91      *
92      * @return GossipRouter
93      */
94     private GossipRouter startGossiper() {
95         boolean amIGossipRouter = false;
96         Integer gossipRouterPort = gossipRouterPortDefault;
97         InetAddress gossipRouterAddress = null;
98         String supernodes_list = System.getProperty("supernodes",
99                 loopbackAddress);
100         /*
101          * Check the environment for the "container" variable, if this is set
102          * and is equal to "lxc", then ODL is running inside an lxc
103          * container, and address resolution of supernodes will be modified
104          * accordingly.
105          */
106         boolean inContainer = "lxc".equals(System.getenv("container"));
107         StringBuffer sanitized_supernodes_list = new StringBuffer();
108         List<InetAddress> myAddresses = new ArrayList<InetAddress>();
109
110         if (inContainer) {
111             logger.trace("DOCKER: Resolving supernode host names using docker container semantics");
112         }
113
114         StringTokenizer supernodes = new StringTokenizer(supernodes_list, ":");
115         if (supernodes.hasMoreTokens()) {
116             // Populate the list of my addresses
117             try {
118                 Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
119                 while (e.hasMoreElements()) {
120                     NetworkInterface n = e.nextElement();
121                     Enumeration<InetAddress> ee = n.getInetAddresses();
122                     while (ee.hasMoreElements()) {
123                         InetAddress i = ee.nextElement();
124                         myAddresses.add(i);
125                     }
126                 }
127             } catch (SocketException se) {
128                 logger.error("Cannot get the list of network interfaces");
129                 return null;
130             }
131         }
132         while (supernodes.hasMoreTokens()) {
133             String curr_supernode = supernodes.nextToken();
134             logger.debug("Examining supernode {}", curr_supernode);
135             StringTokenizer host_port = new StringTokenizer(curr_supernode,
136                     "[]");
137             String host;
138             String port;
139             Integer port_num = gossipRouterPortDefault;
140             if (host_port.countTokens() > 2) {
141                 logger.error("Error parsing supernode {} proceed to the next one",
142                         curr_supernode);
143                 continue;
144             }
145             host = host_port.nextToken();
146             InetAddress hostAddr;
147             /*
148              * If we are in a container and the hostname begins with a '+', this is
149              * an indication that we should resolve this host name in the context
150              * of a docker container.
151              *
152              * Specifically this means:
153              * '+self'   : self reference and the host will be mapped to the value of
154              *             HOSTNAME in the environment
155              * '+<name>' : references another container by its name. The docker established
156              *             environment variables will be used to resolve the host to an
157              *             IP address.
158              */
159             if (inContainer && host != null && host.charAt(0) == '+') {
160                 if ("+self".equals(host)) {
161                     host = System.getenv("HOSTNAME");
162                 } else {
163                     String link = System.getenv(host.substring(1).toUpperCase() + "_PORT");
164                     if (link != null) {
165                         try {
166                             host = new URI(link).getHost();
167                         } catch (URISyntaxException e) {
168                             logger.error("DOCKER: Unable to translate container reference ({}) to host IP Address, will attempt using normal host name",
169                                 host.substring(1));
170                         }
171                     }
172                 }
173             }
174
175             try {
176                 hostAddr = InetAddress.getByName(host);
177             } catch (UnknownHostException ue) {
178                 logger.error("Host {} is not known", host);
179                 continue;
180             }
181             if (host_port.hasMoreTokens()) {
182                 port = host_port.nextToken();
183                 try {
184                     port_num = Integer.valueOf(port);
185                 } catch (NumberFormatException ne) {
186                     logger.error("Supplied supernode gossip port is not recognized, using default gossip port {}",
187                                  gossipRouterPortDefault);
188                     port_num = gossipRouterPortDefault;
189                 }
190                 if ((port_num > 65535) || (port_num < 0)) {
191                     logger.error("Supplied supernode gossip port is outside a valid TCP port range");
192                     port_num = gossipRouterPortDefault;
193                 }
194             }
195             if (!amIGossipRouter) {
196                 if (host != null) {
197                     for (InetAddress myAddr : myAddresses) {
198                         if (myAddr.equals(hostAddr)) {
199                             amIGossipRouter = true;
200                             gossipRouterAddress = hostAddr;
201                             gossipRouterPort = port_num;
202                             break;
203                         }
204                     }
205                 }
206             }
207             if (!sanitized_supernodes_list.toString().equals("")) {
208                 sanitized_supernodes_list.append(",");
209             }
210             sanitized_supernodes_list.append(hostAddr.getHostAddress()).append("[").append(port_num).append("]");
211         }
212
213         if (amIGossipRouter) {
214             // Set the Jgroups binding interface to the one we got
215             // from the supernodes attribute
216             if (gossipRouterAddress != null) {
217                 System.setProperty("jgroups.tcp.address", gossipRouterAddress
218                         .getHostAddress());
219             }
220         } else {
221             // Set the Jgroup binding interface to the one we are well
222             // known outside or else to the first with non-local
223             // scope.
224             try {
225                 String myBind = InetAddress.getLocalHost().getHostAddress();
226                 if (myBind == null
227                         || InetAddress.getLocalHost().isLoopbackAddress()) {
228                     for (InetAddress myAddr : myAddresses) {
229                         if (myAddr.isLoopbackAddress()
230                                 || myAddr.isLinkLocalAddress()) {
231                             logger.debug("Skipping local address {}",
232                                          myAddr.getHostAddress());
233                             continue;
234                         } else {
235                             // First non-local address
236                             myBind = myAddr.getHostAddress();
237                             logger.debug("First non-local address {}", myBind);
238                             break;
239                         }
240                     }
241                 }
242                 String jgroupAddress = System
243                         .getProperty("jgroups.tcp.address");
244                 if (jgroupAddress == null) {
245                     if (myBind != null) {
246                         logger.debug("Set bind address to be {}", myBind);
247                         System.setProperty("jgroups.tcp.address", myBind);
248                     } else {
249                         logger
250                                 .debug("Set bind address to be LOCALHOST=127.0.0.1");
251                         System.setProperty("jgroups.tcp.address", "127.0.0.1");
252                     }
253                 } else {
254                     logger.debug("jgroup.tcp.address already set to be {}",
255                             jgroupAddress);
256                 }
257             } catch (UnknownHostException uhe) {
258                 logger
259                         .error("Met UnknownHostException while trying to get binding address for jgroups");
260             }
261         }
262
263         // The supernodes list constitute also the tcpgossip initial
264         // host list
265         System.setProperty("jgroups.tcpgossip.initial_hosts",
266                 sanitized_supernodes_list.toString());
267         logger.debug("jgroups.tcp.address set to {}",
268                 System.getProperty("jgroups.tcp.address"));
269         logger.debug("jgroups.tcpgossip.initial_hosts set to {}",
270                 System.getProperty("jgroups.tcpgossip.initial_hosts"));
271         GossipRouter res = null;
272         if (amIGossipRouter) {
273             logger.info("I'm a GossipRouter will listen on port {}",
274                     gossipRouterPort);
275             // Start a GossipRouter with JMX support
276             res = new GossipRouter(gossipRouterPort, null, true);
277         }
278         return res;
279     }
280
281     private void exitOnSecurityException(Exception ioe) {
282         Throwable cause = ioe.getCause();
283         while (cause != null) {
284             if (cause instanceof java.lang.SecurityException) {
285                 logger.error("Failed Cluster authentication. Stopping Controller...");
286                 System.exit(0);
287             }
288             cause = cause.getCause();
289         }
290     }
291
292     public void start() {
293         this.gossiper = startGossiper();
294         if (this.gossiper != null) {
295             logger.debug("Trying to start Gossiper");
296             try {
297                 this.gossiper.start();
298                 logger.info("Started GossipRouter");
299             } catch (Exception e) {
300                 logger.error("GossipRouter didn't start. Exception Stack Trace",
301                              e);
302             }
303         }
304         logger.info("Starting the ClusterManager");
305         try {
306             ParserRegistry parser = new ParserRegistry(this.getClass()
307                     .getClassLoader());
308             String infinispanConfigFile =
309                     System.getProperty("org.infinispan.config.file", "config/infinispan-config.xml");
310             logger.debug("Using configuration file:{}", infinispanConfigFile);
311             ConfigurationBuilderHolder holder = parser.parseFile(infinispanConfigFile);
312             GlobalConfigurationBuilder globalBuilder = holder.getGlobalConfigurationBuilder();
313             globalBuilder.serialization()
314                     .classResolver(new ClassResolver())
315                     .build();
316             this.cm = new DefaultCacheManager(holder, false);
317             logger.debug("Allocated ClusterManager");
318             if (this.cm != null) {
319                 this.cm.start();
320                 this.cm.startCache();
321                 logger.debug("Started the ClusterManager");
322             }
323         } catch (Exception ioe) {
324             logger.error("Cannot configure infinispan .. bailing out ");
325             logger.error("Stack Trace that raised th exception");
326             logger.error("",ioe);
327             this.cm = null;
328             exitOnSecurityException(ioe);
329             this.stop();
330         }
331         logger.debug("Cache Manager has value {}", this.cm);
332     }
333
334     public void stop() {
335         logger.info("Stopping the ClusterManager");
336         if (this.cm != null) {
337             logger.info("Found a valid ClusterManager, now let it be stopped");
338             this.cm.stop();
339             this.cm = null;
340         }
341         if (this.gossiper != null) {
342             this.gossiper.stop();
343             this.gossiper = null;
344         }
345     }
346
347     @Override
348     public ConcurrentMap<?, ?> createCache(String containerName,
349             String cacheName, Set<cacheMode> cMode) throws CacheExistException,
350             CacheConfigException {
351         EmbeddedCacheManager manager = this.cm;
352         Cache<Object,Object> c;
353         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
354         if (manager == null) {
355             return null;
356         }
357
358         if (manager.cacheExists(realCacheName)) {
359             throw new CacheExistException();
360         }
361
362         // Sanity check to avoid contrasting parameters between transactional
363         // and not
364         if (cMode.containsAll(EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL,
365                 IClusterServices.cacheMode.TRANSACTIONAL))) {
366             throw new CacheConfigException();
367         }
368
369         // Sanity check to avoid contrasting parameters between sync and async
370         if (cMode.containsAll(EnumSet.of(IClusterServices.cacheMode.SYNC, IClusterServices.cacheMode.ASYNC))) {
371             throw new CacheConfigException();
372         }
373
374         Configuration fromTemplateConfig = null;
375         /*
376          * Fetch transactional/non-transactional templates
377          */
378         // Check if transactional
379         if (cMode.contains(IClusterServices.cacheMode.TRANSACTIONAL)) {
380             fromTemplateConfig = manager.getCacheConfiguration("transactional-type");
381         } else if (cMode.contains(IClusterServices.cacheMode.NON_TRANSACTIONAL)) {
382             fromTemplateConfig = manager.getDefaultCacheConfiguration();
383         }
384
385         // If none set the transactional property then just return null
386         if (fromTemplateConfig == null) {
387             return null;
388         }
389
390         ConfigurationBuilder builder = new ConfigurationBuilder();
391         builder.read(fromTemplateConfig);
392         /*
393          * Now evaluate async/sync
394          */
395         if (cMode.contains(IClusterServices.cacheMode.ASYNC)) {
396             builder.clustering()
397                     .cacheMode(fromTemplateConfig.clustering()
398                             .cacheMode()
399                             .toAsync());
400         } else if (cMode.contains(IClusterServices.cacheMode.SYNC)) {
401             builder.clustering()
402                     .cacheMode(fromTemplateConfig.clustering()
403                             .cacheMode()
404                             .toSync());
405         }
406
407         manager.defineConfiguration(realCacheName, builder.build());
408         c = manager.getCache(realCacheName);
409         return c;
410     }
411
412     @Override
413     public ConcurrentMap<?, ?> getCache(String containerName, String cacheName) {
414         EmbeddedCacheManager manager = this.cm;
415         Cache<Object,Object> c;
416         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
417         if (manager == null) {
418             return null;
419         }
420
421         if (manager.cacheExists(realCacheName)) {
422             c = manager.getCache(realCacheName);
423             return c;
424         }
425         return null;
426     }
427
428     @Override
429     public void destroyCache(String containerName, String cacheName) {
430         EmbeddedCacheManager manager = this.cm;
431         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
432         if (manager == null) {
433             return;
434         }
435         if (manager.cacheExists(realCacheName)) {
436             manager.removeCache(realCacheName);
437         }
438     }
439
440     @Override
441     public boolean existCache(String containerName, String cacheName) {
442         EmbeddedCacheManager manager = this.cm;
443
444         if (manager == null) {
445             return false;
446         }
447
448         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
449         return manager.cacheExists(realCacheName);
450     }
451
452     @Override
453     public Set<String> getCacheList(String containerName) {
454         Set<String> perContainerCaches = new HashSet<String>();
455         EmbeddedCacheManager manager = this.cm;
456         if (manager == null) {
457             return null;
458         }
459         for (String cacheName : manager.getCacheNames()) {
460             if (!manager.isRunning(cacheName)) continue;
461             if (cacheName.startsWith("{" + containerName + "}_")) {
462                 String[] res = cacheName.split("[{}]");
463                 if (res.length >= 4 && res[1].equals(containerName)
464                         && res[2].equals("_")) {
465                     perContainerCaches.add(res[3]);
466                 }
467             }
468         }
469
470         return (perContainerCaches);
471     }
472
473     @Override
474     public Properties getCacheProperties(String containerName, String cacheName) {
475         EmbeddedCacheManager manager = this.cm;
476         if (manager == null) {
477             return null;
478         }
479         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
480         if (!manager.cacheExists(realCacheName)) {
481             return null;
482         }
483         Configuration conf = manager.getCache(realCacheName).getAdvancedCache()
484                 .getCacheConfiguration();
485         Properties p = new Properties();
486         p.setProperty(IClusterServices.cacheProps.TRANSACTION_PROP.toString(),
487                 conf.transaction().toString());
488         p.setProperty(IClusterServices.cacheProps.CLUSTERING_PROP.toString(),
489                 conf.clustering().toString());
490         p.setProperty(IClusterServices.cacheProps.LOCKING_PROP.toString(), conf
491                 .locking().toString());
492         return p;
493     }
494
495     @Override
496     public void addListener(String containerName, String cacheName,
497             IGetUpdates<?, ?> u) throws CacheListenerAddException {
498         EmbeddedCacheManager manager = this.cm;
499         Cache<Object,Object> c;
500         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
501         if (manager == null) {
502             return;
503         }
504
505         if (!manager.cacheExists(realCacheName)) {
506             throw new CacheListenerAddException();
507         }
508         c = manager.getCache(realCacheName);
509         CacheListenerContainer cl = new CacheListenerContainer(u,
510                 containerName, cacheName);
511         c.addListener(cl);
512     }
513
514     @Override
515     public Set<IGetUpdates<?, ?>> getListeners(String containerName,
516             String cacheName) {
517         EmbeddedCacheManager manager = this.cm;
518         Cache<Object,Object> c;
519         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
520         if (manager == null) {
521             return null;
522         }
523
524         if (!manager.cacheExists(realCacheName)) {
525             return null;
526         }
527         c = manager.getCache(realCacheName);
528
529         Set<IGetUpdates<?, ?>> res = new HashSet<IGetUpdates<?, ?>>();
530         Set<Object> listeners = c.getListeners();
531         for (Object listener : listeners) {
532             if (listener instanceof CacheListenerContainer) {
533                 CacheListenerContainer cl = (CacheListenerContainer) listener;
534                 res.add(cl.whichListener());
535             }
536         }
537
538         return res;
539     }
540
541     @Override
542     public void removeListener(String containerName, String cacheName,
543             IGetUpdates<?, ?> u) {
544         EmbeddedCacheManager manager = this.cm;
545         Cache<Object,Object> c;
546         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
547         if (manager == null) {
548             return;
549         }
550
551         if (!manager.cacheExists(realCacheName)) {
552             return;
553         }
554         c = manager.getCache(realCacheName);
555
556         Set<Object> listeners = c.getListeners();
557         for (Object listener : listeners) {
558             if (listener instanceof CacheListenerContainer) {
559                 CacheListenerContainer cl = (CacheListenerContainer) listener;
560                 if (cl.whichListener() == u) {
561                     c.removeListener(listener);
562                     return;
563                 }
564             }
565         }
566     }
567
568     @Override
569     public void tbegin() throws NotSupportedException, SystemException {
570         // call tbegin with the default timeout
571         tbegin(DEFAULT_TRANSACTION_TIMEOUT, TimeUnit.SECONDS);
572     }
573
574     @Override
575     public void tbegin(long timeout, TimeUnit unit) throws NotSupportedException, SystemException {
576         EmbeddedCacheManager manager = this.cm;
577         if (manager == null) {
578             throw new IllegalStateException();
579         }
580         TransactionManager tm = manager.getCache("transactional-type")
581                 .getAdvancedCache().getTransactionManager();
582         if (tm == null) {
583             throw new IllegalStateException();
584         }
585         long timeoutSec = unit.toSeconds(timeout);
586         if((timeoutSec > Integer.MAX_VALUE) || (timeoutSec <= 0)) {
587             // fall back to the default timeout
588             tm.setTransactionTimeout(DEFAULT_TRANSACTION_TIMEOUT);
589         } else {
590             // cast is ok here
591             // as here we are sure that timeoutSec < = Integer.MAX_VALUE.
592             tm.setTransactionTimeout((int) timeoutSec);
593         }
594         tm.begin();
595     }
596
597     @Override
598     public void tcommit() throws RollbackException, HeuristicMixedException,
599             HeuristicRollbackException, java.lang.SecurityException,
600             java.lang.IllegalStateException, SystemException {
601         EmbeddedCacheManager manager = this.cm;
602         if (manager == null) {
603             throw new IllegalStateException();
604         }
605         TransactionManager tm = manager.getCache("transactional-type")
606                 .getAdvancedCache().getTransactionManager();
607         if (tm == null) {
608             throw new IllegalStateException();
609         }
610         tm.commit();
611     }
612
613     @Override
614     public void trollback() throws java.lang.IllegalStateException,
615             java.lang.SecurityException, SystemException {
616         EmbeddedCacheManager manager = this.cm;
617         if (manager == null) {
618             throw new IllegalStateException();
619         }
620         TransactionManager tm = manager.getCache("transactional-type")
621                 .getAdvancedCache().getTransactionManager();
622         if (tm == null) {
623             throw new IllegalStateException();
624         }
625         tm.rollback();
626     }
627
628     @Override
629     public Transaction tgetTransaction() throws SystemException {
630         EmbeddedCacheManager manager = this.cm;
631         if (manager == null) {
632             throw new IllegalStateException();
633         }
634         TransactionManager tm = manager.getCache("transactional-type")
635                 .getAdvancedCache().getTransactionManager();
636         if (tm == null) {
637             return null;
638         }
639         return tm.getTransaction();
640     }
641
642     @Override
643     public boolean amIStandby() {
644         EmbeddedCacheManager manager = this.cm;
645         if (manager == null) {
646             // In case we cannot fetch the information, lets assume we
647             // are standby, so to have less responsibility.
648             return true;
649         }
650         return (!manager.isCoordinator());
651     }
652
653     private InetAddress addressToInetAddress(Address a) {
654         EmbeddedCacheManager manager = this.cm;
655         if ((manager == null) || (a == null)) {
656             // In case we cannot fetch the information, lets assume we
657             // are standby, so to have less responsibility.
658             return null;
659         }
660         Transport t = manager.getTransport();
661         if (t instanceof JGroupsTransport) {
662             JGroupsTransport jt = (JGroupsTransport) t;
663             Channel c = jt.getChannel();
664             if (a instanceof JGroupsAddress) {
665                 JGroupsAddress ja = (JGroupsAddress) a;
666                 org.jgroups.Address phys = (org.jgroups.Address) c
667                         .down(new Event(Event.GET_PHYSICAL_ADDRESS, ja
668                                 .getJGroupsAddress()));
669                 if (phys instanceof org.jgroups.stack.IpAddress) {
670                     InetAddress bindAddress = ((org.jgroups.stack.IpAddress) phys)
671                             .getIpAddress();
672                     return bindAddress;
673                 }
674             }
675         }
676         return null;
677     }
678
679     @Override
680     public List<InetAddress> getClusteredControllers() {
681         EmbeddedCacheManager manager = this.cm;
682         if (manager == null) {
683             return null;
684         }
685         List<Address> controllers = manager.getMembers();
686         if ((controllers == null) || controllers.size() == 0) {
687             return null;
688         }
689
690         List<InetAddress> clusteredControllers = new ArrayList<InetAddress>();
691         for (Address a : controllers) {
692             InetAddress inetAddress = addressToInetAddress(a);
693             if (inetAddress != null
694                     && !inetAddress.getHostAddress().equals(loopbackAddress)) {
695                 clusteredControllers.add(inetAddress);
696             }
697         }
698         return clusteredControllers;
699     }
700
701     @Override
702     public InetAddress getMyAddress() {
703         EmbeddedCacheManager manager = this.cm;
704         if (manager == null) {
705             return null;
706         }
707         return addressToInetAddress(manager.getAddress());
708     }
709
710     @Override
711     public InetAddress getActiveAddress() {
712         EmbeddedCacheManager manager = this.cm;
713         if (manager == null) {
714             // In case we cannot fetch the information, lets assume we
715             // are standby, so to have less responsibility.
716             return null;
717         }
718
719         return addressToInetAddress(manager.getCoordinator());
720     }
721
722     @Override
723     public void listenRoleChange(IListenRoleChange i)
724             throws ListenRoleChangeAddException {
725         EmbeddedCacheManager manager = this.cm;
726         if (manager == null) {
727             // In case we cannot fetch the information, lets assume we
728             // are standby, so to have less responsibility.
729             throw new ListenRoleChangeAddException();
730         }
731
732         if (this.roleChangeListeners == null) {
733             this.roleChangeListeners = new HashSet<IListenRoleChange>();
734             this.cacheManagerListener = new ViewChangedListener(
735                     this.roleChangeListeners);
736             manager.addListener(this.cacheManagerListener);
737         }
738
739         if (this.roleChangeListeners != null) {
740             this.roleChangeListeners.add(i);
741         }
742     }
743
744     @Override
745     public void unlistenRoleChange(IListenRoleChange i) {
746         EmbeddedCacheManager manager = this.cm;
747         if (manager == null) {
748             // In case we cannot fetch the information, lets assume we
749             // are standby, so to have less responsibility.
750             return;
751         }
752
753         if (this.roleChangeListeners != null) {
754             this.roleChangeListeners.remove(i);
755         }
756
757         if ((this.roleChangeListeners != null && this.roleChangeListeners
758                 .isEmpty())
759                 && (this.cacheManagerListener != null)) {
760             manager.removeListener(this.cacheManagerListener);
761             this.cacheManagerListener = null;
762             this.roleChangeListeners = null;
763         }
764     }
765
766     @Listener
767     public class ViewChangedListener {
768         Set<IListenRoleChange> roleListeners;
769
770         public ViewChangedListener(Set<IListenRoleChange> s) {
771             this.roleListeners = s;
772         }
773
774         @ViewChanged
775         public void viewChanged(ViewChangedEvent e) {
776             for (IListenRoleChange i : this.roleListeners) {
777                 i.newActiveAvailable();
778             }
779         }
780     }
781 }