3 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
10 package org.opendaylight.controller.clustering.services_implementation.internal;
12 import java.net.InetAddress;
13 import java.net.NetworkInterface;
14 import java.net.SocketException;
15 import java.net.UnknownHostException;
17 import java.net.URISyntaxException;
18 import java.util.ArrayList;
19 import java.util.EnumSet;
20 import java.util.Enumeration;
21 import java.util.HashSet;
22 import java.util.List;
23 import java.util.Properties;
25 import java.util.StringTokenizer;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.TimeUnit;
29 import javax.transaction.HeuristicMixedException;
30 import javax.transaction.HeuristicRollbackException;
31 import javax.transaction.NotSupportedException;
32 import javax.transaction.RollbackException;
33 import javax.transaction.SystemException;
34 import javax.transaction.Transaction;
35 import javax.transaction.TransactionManager;
37 import org.infinispan.Cache;
38 import org.infinispan.configuration.cache.Configuration;
39 import org.infinispan.configuration.cache.ConfigurationBuilder;
40 import org.infinispan.configuration.global.GlobalConfigurationBuilder;
41 import org.infinispan.configuration.parsing.ConfigurationBuilderHolder;
42 import org.infinispan.configuration.parsing.ParserRegistry;
43 import org.infinispan.manager.DefaultCacheManager;
44 import org.infinispan.manager.EmbeddedCacheManager;
45 import org.infinispan.notifications.Listener;
46 import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
47 import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
48 import org.infinispan.remoting.transport.Address;
49 import org.infinispan.remoting.transport.Transport;
50 import org.infinispan.remoting.transport.jgroups.JGroupsAddress;
51 import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
52 import org.jgroups.Channel;
53 import org.jgroups.Event;
54 import org.jgroups.stack.GossipRouter;
55 import org.opendaylight.controller.clustering.services.CacheConfigException;
56 import org.opendaylight.controller.clustering.services.CacheExistException;
57 import org.opendaylight.controller.clustering.services.CacheListenerAddException;
58 import org.opendaylight.controller.clustering.services.IClusterServices;
59 import org.opendaylight.controller.clustering.services.IGetUpdates;
60 import org.opendaylight.controller.clustering.services.IListenRoleChange;
61 import org.opendaylight.controller.clustering.services.ListenRoleChangeAddException;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
65 public class ClusterManager implements IClusterServices {
66 protected static final Logger logger = LoggerFactory
67 .getLogger(ClusterManager.class);
68 private DefaultCacheManager cm;
69 GossipRouter gossiper;
70 private HashSet<IListenRoleChange> roleChangeListeners;
71 private ViewChangedListener cacheManagerListener;
73 private static String loopbackAddress = InetAddress.getLoopbackAddress().getHostAddress();
74 private static final int gossipRouterPortDefault = 12001;
75 // defaultTransactionTimeout is 60 seconds
76 private static int DEFAULT_TRANSACTION_TIMEOUT = 60;
79 * Start a JGroups GossipRouter if we are a supernode. The
80 * GosispRouter is nothing more than a simple
81 * rendevouz-pointer. All the nodes that wants to join the cluster
82 * will come to any of the rendevouz point and they introduce the
83 * nodes to all the others. Once the meet and greet phase if over,
84 * the nodes will open a full-mesh with the remaining n-1 nodes,
85 * so even if the GossipRouter goes down nothing is lost.
86 * NOTE: This function has the side effect to set some of the
87 * JGROUPS configurations, this because in this function already
88 * we try to retrieve some of the network capabilities of the
89 * device and so it's better not to do that again
92 * @return GossipRouter
94 private GossipRouter startGossiper() {
95 boolean amIGossipRouter = false;
96 Integer gossipRouterPort = gossipRouterPortDefault;
97 InetAddress gossipRouterAddress = null;
98 String supernodes_list = System.getProperty("supernodes",
101 * Check the environment for the "container" variable, if this is set
102 * and is equal to "lxc", then ODL is running inside an lxc
103 * container, and address resolution of supernodes will be modified
106 boolean inContainer = "lxc".equals(System.getenv("container"));
107 StringBuffer sanitized_supernodes_list = new StringBuffer();
108 List<InetAddress> myAddresses = new ArrayList<InetAddress>();
111 logger.trace("DOCKER: Resolving supernode host names using docker container semantics");
114 StringTokenizer supernodes = new StringTokenizer(supernodes_list, ":");
115 if (supernodes.hasMoreTokens()) {
116 // Populate the list of my addresses
118 Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
119 while (e.hasMoreElements()) {
120 NetworkInterface n = e.nextElement();
121 Enumeration<InetAddress> ee = n.getInetAddresses();
122 while (ee.hasMoreElements()) {
123 InetAddress i = ee.nextElement();
127 } catch (SocketException se) {
128 logger.error("Cannot get the list of network interfaces");
132 while (supernodes.hasMoreTokens()) {
133 String curr_supernode = supernodes.nextToken();
134 logger.debug("Examining supernode {}", curr_supernode);
135 StringTokenizer host_port = new StringTokenizer(curr_supernode,
139 Integer port_num = gossipRouterPortDefault;
140 if (host_port.countTokens() > 2) {
141 logger.error("Error parsing supernode {} proceed to the next one",
145 host = host_port.nextToken();
146 InetAddress hostAddr;
148 * If we are in a container and the hostname begins with a '+', this is
149 * an indication that we should resolve this host name in the context
150 * of a docker container.
152 * Specifically this means:
153 * '+self' : self reference and the host will be mapped to the value of
154 * HOSTNAME in the environment
155 * '+<name>' : references another container by its name. The docker established
156 * environment variables will be used to resolve the host to an
159 if (inContainer && host != null && host.charAt(0) == '+') {
160 if ("+self".equals(host)) {
161 host = System.getenv("HOSTNAME");
163 String link = System.getenv(host.substring(1).toUpperCase() + "_PORT");
166 host = new URI(link).getHost();
167 } catch (URISyntaxException e) {
168 logger.error("DOCKER: Unable to translate container reference ({}) to host IP Address, will attempt using normal host name",
176 hostAddr = InetAddress.getByName(host);
177 } catch (UnknownHostException ue) {
178 logger.error("Host {} is not known", host);
181 if (host_port.hasMoreTokens()) {
182 port = host_port.nextToken();
184 port_num = Integer.valueOf(port);
185 } catch (NumberFormatException ne) {
186 logger.error("Supplied supernode gossip port is not recognized, using default gossip port {}",
187 gossipRouterPortDefault);
188 port_num = gossipRouterPortDefault;
190 if ((port_num > 65535) || (port_num < 0)) {
191 logger.error("Supplied supernode gossip port is outside a valid TCP port range");
192 port_num = gossipRouterPortDefault;
195 if (!amIGossipRouter) {
197 for (InetAddress myAddr : myAddresses) {
198 if (myAddr.equals(hostAddr)) {
199 amIGossipRouter = true;
200 gossipRouterAddress = hostAddr;
201 gossipRouterPort = port_num;
207 if (!sanitized_supernodes_list.toString().equals("")) {
208 sanitized_supernodes_list.append(",");
210 sanitized_supernodes_list.append(hostAddr.getHostAddress()).append("[").append(port_num).append("]");
213 if (amIGossipRouter) {
214 // Set the Jgroups binding interface to the one we got
215 // from the supernodes attribute
216 if (gossipRouterAddress != null) {
217 System.setProperty("jgroups.tcp.address", gossipRouterAddress
221 // Set the Jgroup binding interface to the one we are well
222 // known outside or else to the first with non-local
225 String myBind = InetAddress.getLocalHost().getHostAddress();
227 || InetAddress.getLocalHost().isLoopbackAddress()) {
228 for (InetAddress myAddr : myAddresses) {
229 if (myAddr.isLoopbackAddress()
230 || myAddr.isLinkLocalAddress()) {
231 logger.debug("Skipping local address {}",
232 myAddr.getHostAddress());
235 // First non-local address
236 myBind = myAddr.getHostAddress();
237 logger.debug("First non-local address {}", myBind);
242 String jgroupAddress = System
243 .getProperty("jgroups.tcp.address");
244 if (jgroupAddress == null) {
245 if (myBind != null) {
246 logger.debug("Set bind address to be {}", myBind);
247 System.setProperty("jgroups.tcp.address", myBind);
250 .debug("Set bind address to be LOCALHOST=127.0.0.1");
251 System.setProperty("jgroups.tcp.address", "127.0.0.1");
254 logger.debug("jgroup.tcp.address already set to be {}",
257 } catch (UnknownHostException uhe) {
259 .error("Met UnknownHostException while trying to get binding address for jgroups");
263 // The supernodes list constitute also the tcpgossip initial
265 System.setProperty("jgroups.tcpgossip.initial_hosts",
266 sanitized_supernodes_list.toString());
267 logger.debug("jgroups.tcp.address set to {}",
268 System.getProperty("jgroups.tcp.address"));
269 logger.debug("jgroups.tcpgossip.initial_hosts set to {}",
270 System.getProperty("jgroups.tcpgossip.initial_hosts"));
271 GossipRouter res = null;
272 if (amIGossipRouter) {
273 logger.info("I'm a GossipRouter will listen on port {}",
275 // Start a GossipRouter with JMX support
276 res = new GossipRouter(gossipRouterPort, null, true);
281 private void exitOnSecurityException(Exception ioe) {
282 Throwable cause = ioe.getCause();
283 while (cause != null) {
284 if (cause instanceof java.lang.SecurityException) {
285 logger.error("Failed Cluster authentication. Stopping Controller...");
288 cause = cause.getCause();
292 public void start() {
293 this.gossiper = startGossiper();
294 if (this.gossiper != null) {
295 logger.debug("Trying to start Gossiper");
297 this.gossiper.start();
298 logger.info("Started GossipRouter");
299 } catch (Exception e) {
300 logger.error("GossipRouter didn't start. Exception Stack Trace",
304 logger.info("Starting the ClusterManager");
306 ParserRegistry parser = new ParserRegistry(this.getClass()
308 String infinispanConfigFile =
309 System.getProperty("org.infinispan.config.file", "config/infinispan-config.xml");
310 logger.debug("Using configuration file:{}", infinispanConfigFile);
311 ConfigurationBuilderHolder holder = parser.parseFile(infinispanConfigFile);
312 GlobalConfigurationBuilder globalBuilder = holder.getGlobalConfigurationBuilder();
313 globalBuilder.serialization()
314 .classResolver(new ClassResolver())
316 this.cm = new DefaultCacheManager(holder, false);
317 logger.debug("Allocated ClusterManager");
318 if (this.cm != null) {
320 this.cm.startCache();
321 logger.debug("Started the ClusterManager");
323 } catch (Exception ioe) {
324 logger.error("Cannot configure infinispan .. bailing out ");
325 logger.error("Stack Trace that raised th exception");
326 logger.error("",ioe);
328 exitOnSecurityException(ioe);
331 logger.debug("Cache Manager has value {}", this.cm);
335 logger.info("Stopping the ClusterManager");
336 if (this.cm != null) {
337 logger.info("Found a valid ClusterManager, now let it be stopped");
341 if (this.gossiper != null) {
342 this.gossiper.stop();
343 this.gossiper = null;
348 public ConcurrentMap<?, ?> createCache(String containerName,
349 String cacheName, Set<cacheMode> cMode) throws CacheExistException,
350 CacheConfigException {
351 EmbeddedCacheManager manager = this.cm;
352 Cache<Object,Object> c;
353 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
354 if (manager == null) {
358 if (manager.cacheExists(realCacheName)) {
359 throw new CacheExistException();
362 // Sanity check to avoid contrasting parameters between transactional
364 if (cMode.containsAll(EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL,
365 IClusterServices.cacheMode.TRANSACTIONAL))) {
366 throw new CacheConfigException();
369 // Sanity check to avoid contrasting parameters between sync and async
370 if (cMode.containsAll(EnumSet.of(IClusterServices.cacheMode.SYNC, IClusterServices.cacheMode.ASYNC))) {
371 throw new CacheConfigException();
374 Configuration fromTemplateConfig = null;
376 * Fetch transactional/non-transactional templates
378 // Check if transactional
379 if (cMode.contains(IClusterServices.cacheMode.TRANSACTIONAL)) {
380 fromTemplateConfig = manager.getCacheConfiguration("transactional-type");
381 } else if (cMode.contains(IClusterServices.cacheMode.NON_TRANSACTIONAL)) {
382 fromTemplateConfig = manager.getDefaultCacheConfiguration();
385 // If none set the transactional property then just return null
386 if (fromTemplateConfig == null) {
390 ConfigurationBuilder builder = new ConfigurationBuilder();
391 builder.read(fromTemplateConfig);
393 * Now evaluate async/sync
395 if (cMode.contains(IClusterServices.cacheMode.ASYNC)) {
397 .cacheMode(fromTemplateConfig.clustering()
400 } else if (cMode.contains(IClusterServices.cacheMode.SYNC)) {
402 .cacheMode(fromTemplateConfig.clustering()
407 manager.defineConfiguration(realCacheName, builder.build());
408 c = manager.getCache(realCacheName);
413 public ConcurrentMap<?, ?> getCache(String containerName, String cacheName) {
414 EmbeddedCacheManager manager = this.cm;
415 Cache<Object,Object> c;
416 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
417 if (manager == null) {
421 if (manager.cacheExists(realCacheName)) {
422 c = manager.getCache(realCacheName);
429 public void destroyCache(String containerName, String cacheName) {
430 EmbeddedCacheManager manager = this.cm;
431 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
432 if (manager == null) {
435 if (manager.cacheExists(realCacheName)) {
436 manager.removeCache(realCacheName);
441 public boolean existCache(String containerName, String cacheName) {
442 EmbeddedCacheManager manager = this.cm;
444 if (manager == null) {
448 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
449 return manager.cacheExists(realCacheName);
453 public Set<String> getCacheList(String containerName) {
454 Set<String> perContainerCaches = new HashSet<String>();
455 EmbeddedCacheManager manager = this.cm;
456 if (manager == null) {
459 for (String cacheName : manager.getCacheNames()) {
460 if (!manager.isRunning(cacheName)) {
463 if (cacheName.startsWith("{" + containerName + "}_")) {
464 String[] res = cacheName.split("[{}]");
465 if (res.length >= 4 && res[1].equals(containerName)
466 && res[2].equals("_")) {
467 perContainerCaches.add(res[3]);
472 return (perContainerCaches);
476 public Properties getCacheProperties(String containerName, String cacheName) {
477 EmbeddedCacheManager manager = this.cm;
478 if (manager == null) {
481 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
482 if (!manager.cacheExists(realCacheName)) {
485 Configuration conf = manager.getCache(realCacheName).getAdvancedCache()
486 .getCacheConfiguration();
487 Properties p = new Properties();
488 p.setProperty(IClusterServices.cacheProps.TRANSACTION_PROP.toString(),
489 conf.transaction().toString());
490 p.setProperty(IClusterServices.cacheProps.CLUSTERING_PROP.toString(),
491 conf.clustering().toString());
492 p.setProperty(IClusterServices.cacheProps.LOCKING_PROP.toString(), conf
493 .locking().toString());
498 public void addListener(String containerName, String cacheName,
499 IGetUpdates<?, ?> u) throws CacheListenerAddException {
500 EmbeddedCacheManager manager = this.cm;
501 Cache<Object,Object> c;
502 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
503 if (manager == null) {
507 if (!manager.cacheExists(realCacheName)) {
508 throw new CacheListenerAddException();
510 c = manager.getCache(realCacheName);
511 CacheListenerContainer cl = new CacheListenerContainer(u,
512 containerName, cacheName);
517 public Set<IGetUpdates<?, ?>> getListeners(String containerName,
519 EmbeddedCacheManager manager = this.cm;
520 Cache<Object,Object> c;
521 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
522 if (manager == null) {
526 if (!manager.cacheExists(realCacheName)) {
529 c = manager.getCache(realCacheName);
531 Set<IGetUpdates<?, ?>> res = new HashSet<IGetUpdates<?, ?>>();
532 Set<Object> listeners = c.getListeners();
533 for (Object listener : listeners) {
534 if (listener instanceof CacheListenerContainer) {
535 CacheListenerContainer cl = (CacheListenerContainer) listener;
536 res.add(cl.whichListener());
544 public void removeListener(String containerName, String cacheName,
545 IGetUpdates<?, ?> u) {
546 EmbeddedCacheManager manager = this.cm;
547 Cache<Object,Object> c;
548 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
549 if (manager == null) {
553 if (!manager.cacheExists(realCacheName)) {
556 c = manager.getCache(realCacheName);
558 Set<Object> listeners = c.getListeners();
559 for (Object listener : listeners) {
560 if (listener instanceof CacheListenerContainer) {
561 CacheListenerContainer cl = (CacheListenerContainer) listener;
562 if (cl.whichListener() == u) {
563 c.removeListener(listener);
571 public void tbegin() throws NotSupportedException, SystemException {
572 // call tbegin with the default timeout
573 tbegin(DEFAULT_TRANSACTION_TIMEOUT, TimeUnit.SECONDS);
577 public void tbegin(long timeout, TimeUnit unit) throws NotSupportedException, SystemException {
578 EmbeddedCacheManager manager = this.cm;
579 if (manager == null) {
580 throw new IllegalStateException();
582 TransactionManager tm = manager.getCache("transactional-type")
583 .getAdvancedCache().getTransactionManager();
585 throw new IllegalStateException();
587 long timeoutSec = unit.toSeconds(timeout);
588 if((timeoutSec > Integer.MAX_VALUE) || (timeoutSec <= 0)) {
589 // fall back to the default timeout
590 tm.setTransactionTimeout(DEFAULT_TRANSACTION_TIMEOUT);
593 // as here we are sure that timeoutSec < = Integer.MAX_VALUE.
594 tm.setTransactionTimeout((int) timeoutSec);
600 public void tcommit() throws RollbackException, HeuristicMixedException,
601 HeuristicRollbackException, java.lang.SecurityException,
602 java.lang.IllegalStateException, SystemException {
603 EmbeddedCacheManager manager = this.cm;
604 if (manager == null) {
605 throw new IllegalStateException();
607 TransactionManager tm = manager.getCache("transactional-type")
608 .getAdvancedCache().getTransactionManager();
610 throw new IllegalStateException();
616 public void trollback() throws java.lang.IllegalStateException,
617 java.lang.SecurityException, SystemException {
618 EmbeddedCacheManager manager = this.cm;
619 if (manager == null) {
620 throw new IllegalStateException();
622 TransactionManager tm = manager.getCache("transactional-type")
623 .getAdvancedCache().getTransactionManager();
625 throw new IllegalStateException();
631 public Transaction tgetTransaction() throws SystemException {
632 EmbeddedCacheManager manager = this.cm;
633 if (manager == null) {
634 throw new IllegalStateException();
636 TransactionManager tm = manager.getCache("transactional-type")
637 .getAdvancedCache().getTransactionManager();
641 return tm.getTransaction();
645 public boolean amIStandby() {
646 EmbeddedCacheManager manager = this.cm;
647 if (manager == null) {
648 // In case we cannot fetch the information, lets assume we
649 // are standby, so to have less responsibility.
652 return (!manager.isCoordinator());
655 private InetAddress addressToInetAddress(Address a) {
656 EmbeddedCacheManager manager = this.cm;
657 if ((manager == null) || (a == null)) {
658 // In case we cannot fetch the information, lets assume we
659 // are standby, so to have less responsibility.
662 Transport t = manager.getTransport();
663 if (t instanceof JGroupsTransport) {
664 JGroupsTransport jt = (JGroupsTransport) t;
665 Channel c = jt.getChannel();
666 if (a instanceof JGroupsAddress) {
667 JGroupsAddress ja = (JGroupsAddress) a;
668 org.jgroups.Address phys = (org.jgroups.Address) c
669 .down(new Event(Event.GET_PHYSICAL_ADDRESS, ja
670 .getJGroupsAddress()));
671 if (phys instanceof org.jgroups.stack.IpAddress) {
672 InetAddress bindAddress = ((org.jgroups.stack.IpAddress) phys)
682 public List<InetAddress> getClusteredControllers() {
683 EmbeddedCacheManager manager = this.cm;
684 if (manager == null) {
687 List<Address> controllers = manager.getMembers();
688 if ((controllers == null) || controllers.size() == 0) {
692 List<InetAddress> clusteredControllers = new ArrayList<InetAddress>();
693 for (Address a : controllers) {
694 InetAddress inetAddress = addressToInetAddress(a);
695 if (inetAddress != null
696 && !inetAddress.getHostAddress().equals(loopbackAddress)) {
697 clusteredControllers.add(inetAddress);
700 return clusteredControllers;
704 public InetAddress getMyAddress() {
705 EmbeddedCacheManager manager = this.cm;
706 if (manager == null) {
709 return addressToInetAddress(manager.getAddress());
713 public InetAddress getActiveAddress() {
714 EmbeddedCacheManager manager = this.cm;
715 if (manager == null) {
716 // In case we cannot fetch the information, lets assume we
717 // are standby, so to have less responsibility.
721 return addressToInetAddress(manager.getCoordinator());
725 public void listenRoleChange(IListenRoleChange i)
726 throws ListenRoleChangeAddException {
727 EmbeddedCacheManager manager = this.cm;
728 if (manager == null) {
729 // In case we cannot fetch the information, lets assume we
730 // are standby, so to have less responsibility.
731 throw new ListenRoleChangeAddException();
734 if (this.roleChangeListeners == null) {
735 this.roleChangeListeners = new HashSet<IListenRoleChange>();
736 this.cacheManagerListener = new ViewChangedListener(
737 this.roleChangeListeners);
738 manager.addListener(this.cacheManagerListener);
741 if (this.roleChangeListeners != null) {
742 this.roleChangeListeners.add(i);
747 public void unlistenRoleChange(IListenRoleChange i) {
748 EmbeddedCacheManager manager = this.cm;
749 if (manager == null) {
750 // In case we cannot fetch the information, lets assume we
751 // are standby, so to have less responsibility.
755 if (this.roleChangeListeners != null) {
756 this.roleChangeListeners.remove(i);
759 if ((this.roleChangeListeners != null && this.roleChangeListeners
761 && (this.cacheManagerListener != null)) {
762 manager.removeListener(this.cacheManagerListener);
763 this.cacheManagerListener = null;
764 this.roleChangeListeners = null;
769 public class ViewChangedListener {
770 Set<IListenRoleChange> roleListeners;
772 public ViewChangedListener(Set<IListenRoleChange> s) {
773 this.roleListeners = s;
777 public void viewChanged(ViewChangedEvent e) {
778 for (IListenRoleChange i : this.roleListeners) {
779 i.newActiveAvailable();