X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fclustering%2Fservices_implementation%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fclustering%2Fservices_implementation%2Finternal%2FClusterManager.java;h=e34eb329330d2e1b30ee72aac6950f2084894777;hb=fba140bf09ffbf8694aa41f544caaa331c2ec29a;hp=c3fd30ae9b65629c8e83bb7f789229b0b6bc5870;hpb=336327522d9b1bd81958feeac03bbd8332e938e4;p=controller.git diff --git a/opendaylight/clustering/services_implementation/src/main/java/org/opendaylight/controller/clustering/services_implementation/internal/ClusterManager.java b/opendaylight/clustering/services_implementation/src/main/java/org/opendaylight/controller/clustering/services_implementation/internal/ClusterManager.java index c3fd30ae9b..e34eb32933 100644 --- a/opendaylight/clustering/services_implementation/src/main/java/org/opendaylight/controller/clustering/services_implementation/internal/ClusterManager.java +++ b/opendaylight/clustering/services_implementation/src/main/java/org/opendaylight/controller/clustering/services_implementation/internal/ClusterManager.java @@ -13,6 +13,8 @@ import java.net.InetAddress; import java.net.NetworkInterface; import java.net.SocketException; import java.net.UnknownHostException; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.EnumSet; import java.util.Enumeration; @@ -22,6 +24,7 @@ import java.util.Properties; import java.util.Set; import java.util.StringTokenizer; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import javax.transaction.HeuristicMixedException; import javax.transaction.HeuristicRollbackException; @@ -33,6 +36,10 @@ import javax.transaction.TransactionManager; import org.infinispan.Cache; import org.infinispan.configuration.cache.Configuration; +import org.infinispan.configuration.cache.ConfigurationBuilder; +import org.infinispan.configuration.global.GlobalConfigurationBuilder; +import org.infinispan.configuration.parsing.ConfigurationBuilderHolder; +import org.infinispan.configuration.parsing.ParserRegistry; import org.infinispan.manager.DefaultCacheManager; import org.infinispan.manager.EmbeddedCacheManager; import org.infinispan.notifications.Listener; @@ -52,11 +59,10 @@ import org.opendaylight.controller.clustering.services.IClusterServices; import org.opendaylight.controller.clustering.services.IGetUpdates; import org.opendaylight.controller.clustering.services.IListenRoleChange; import org.opendaylight.controller.clustering.services.ListenRoleChangeAddException; -import org.opendaylight.controller.sal.core.IContainerAware; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ClusterManager implements IClusterServices, IContainerAware { +public class ClusterManager implements IClusterServices { protected static final Logger logger = LoggerFactory .getLogger(ClusterManager.class); private DefaultCacheManager cm; @@ -64,7 +70,10 @@ public class ClusterManager implements IClusterServices, IContainerAware { private HashSet roleChangeListeners; private ViewChangedListener cacheManagerListener; - private static String loopbackAddress = "127.0.0.1"; + private static String loopbackAddress = InetAddress.getLoopbackAddress().getHostAddress(); + private static final int gossipRouterPortDefault = 12001; + // defaultTransactionTimeout is 60 seconds + private static int DEFAULT_TRANSACTION_TIMEOUT = 60; /** * Start a JGroups GossipRouter if we are a supernode. The @@ -84,14 +93,24 @@ public class ClusterManager implements IClusterServices, IContainerAware { */ private GossipRouter startGossiper() { boolean amIGossipRouter = false; - Integer gossipRouterPortDefault = 12001; Integer gossipRouterPort = gossipRouterPortDefault; InetAddress gossipRouterAddress = null; String supernodes_list = System.getProperty("supernodes", loopbackAddress); + /* + * Check the environment for the "container" variable, if this is set + * and is equal to "lxc", then ODL is running inside an lxc + * container, and address resolution of supernodes will be modified + * accordingly. + */ + boolean inContainer = "lxc".equals(System.getenv("container")); StringBuffer sanitized_supernodes_list = new StringBuffer(); List myAddresses = new ArrayList(); + if (inContainer) { + logger.trace("DOCKER: Resolving supernode host names using docker container semantics"); + } + StringTokenizer supernodes = new StringTokenizer(supernodes_list, ":"); if (supernodes.hasMoreTokens()) { // Populate the list of my addresses @@ -125,10 +144,38 @@ public class ClusterManager implements IClusterServices, IContainerAware { } host = host_port.nextToken(); InetAddress hostAddr; + /* + * If we are in a container and the hostname begins with a '+', this is + * an indication that we should resolve this host name in the context + * of a docker container. + * + * Specifically this means: + * '+self' : self reference and the host will be mapped to the value of + * HOSTNAME in the environment + * '+' : references another container by its name. The docker established + * environment variables will be used to resolve the host to an + * IP address. + */ + if (inContainer && host != null && host.charAt(0) == '+') { + if ("+self".equals(host)) { + host = System.getenv("HOSTNAME"); + } else { + String link = System.getenv(host.substring(1).toUpperCase() + "_PORT"); + if (link != null) { + try { + host = new URI(link).getHost(); + } catch (URISyntaxException e) { + logger.error("DOCKER: Unable to translate container reference ({}) to host IP Address, will attempt using normal host name", + host.substring(1)); + } + } + } + } + try { hostAddr = InetAddress.getByName(host); } catch (UnknownHostException ue) { - logger.error("Host not known"); + logger.error("Host {} is not known", host); continue; } if (host_port.hasMoreTokens()) { @@ -136,13 +183,12 @@ public class ClusterManager implements IClusterServices, IContainerAware { try { port_num = Integer.valueOf(port); } catch (NumberFormatException ne) { - logger - .error("Supplied supernode gossiepr port is not recognized, using standard gossipport"); + logger.error("Supplied supernode gossip port is not recognized, using default gossip port {}", + gossipRouterPortDefault); port_num = gossipRouterPortDefault; } if ((port_num > 65535) || (port_num < 0)) { - logger - .error("Supplied supernode gossip port is outside a valid TCP port range"); + logger.error("Supplied supernode gossip port is outside a valid TCP port range"); port_num = gossipRouterPortDefault; } } @@ -161,8 +207,7 @@ public class ClusterManager implements IClusterServices, IContainerAware { if (!sanitized_supernodes_list.toString().equals("")) { sanitized_supernodes_list.append(","); } - sanitized_supernodes_list.append(hostAddr.getHostAddress() + "[" - + port_num + "]"); + sanitized_supernodes_list.append(hostAddr.getHostAddress()).append("[").append(port_num).append("]"); } if (amIGossipRouter) { @@ -227,11 +272,23 @@ public class ClusterManager implements IClusterServices, IContainerAware { if (amIGossipRouter) { logger.info("I'm a GossipRouter will listen on port {}", gossipRouterPort); - res = new GossipRouter(gossipRouterPort); + // Start a GossipRouter with JMX support + res = new GossipRouter(gossipRouterPort, null, true); } return res; } + private void exitOnSecurityException(Exception ioe) { + Throwable cause = ioe.getCause(); + while (cause != null) { + if (cause instanceof java.lang.SecurityException) { + logger.error("Failed Cluster authentication. Stopping Controller..."); + System.exit(0); + } + cause = cause.getCause(); + } + } + public void start() { this.gossiper = startGossiper(); if (this.gossiper != null) { @@ -246,8 +303,17 @@ public class ClusterManager implements IClusterServices, IContainerAware { } logger.info("Starting the ClusterManager"); try { - //FIXME keeps throwing FileNotFoundException - this.cm = new DefaultCacheManager("config/infinispan-config.xml"); + ParserRegistry parser = new ParserRegistry(this.getClass() + .getClassLoader()); + String infinispanConfigFile = + System.getProperty("org.infinispan.config.file", "config/infinispan-config.xml"); + logger.debug("Using configuration file:{}", infinispanConfigFile); + ConfigurationBuilderHolder holder = parser.parseFile(infinispanConfigFile); + GlobalConfigurationBuilder globalBuilder = holder.getGlobalConfigurationBuilder(); + globalBuilder.serialization() + .classResolver(new ClassResolver()) + .build(); + this.cm = new DefaultCacheManager(holder, false); logger.debug("Allocated ClusterManager"); if (this.cm != null) { this.cm.start(); @@ -259,6 +325,7 @@ public class ClusterManager implements IClusterServices, IContainerAware { logger.error("Stack Trace that raised th exception"); logger.error("",ioe); this.cm = null; + exitOnSecurityException(ioe); this.stop(); } logger.debug("Cache Manager has value {}", this.cm); @@ -292,24 +359,54 @@ public class ClusterManager implements IClusterServices, IContainerAware { throw new CacheExistException(); } - // Sanity check to avoid contrasting parameters - if (cMode.containsAll(EnumSet.of( - IClusterServices.cacheMode.NON_TRANSACTIONAL, + // Sanity check to avoid contrasting parameters between transactional + // and not + if (cMode.containsAll(EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.TRANSACTIONAL))) { throw new CacheConfigException(); } - if (cMode.contains(IClusterServices.cacheMode.NON_TRANSACTIONAL)) { - c = manager.getCache(realCacheName); - return c; - } else if (cMode.contains(IClusterServices.cacheMode.TRANSACTIONAL)) { - Configuration rc = manager - .getCacheConfiguration("transactional-type"); - manager.defineConfiguration(realCacheName, rc); - c = manager.getCache(realCacheName); - return c; + // Sanity check to avoid contrasting parameters between sync and async + if (cMode.containsAll(EnumSet.of(IClusterServices.cacheMode.SYNC, IClusterServices.cacheMode.ASYNC))) { + throw new CacheConfigException(); } - return null; + + Configuration fromTemplateConfig = null; + /* + * Fetch transactional/non-transactional templates + */ + // Check if transactional + if (cMode.contains(IClusterServices.cacheMode.TRANSACTIONAL)) { + fromTemplateConfig = manager.getCacheConfiguration("transactional-type"); + } else if (cMode.contains(IClusterServices.cacheMode.NON_TRANSACTIONAL)) { + fromTemplateConfig = manager.getDefaultCacheConfiguration(); + } + + // If none set the transactional property then just return null + if (fromTemplateConfig == null) { + return null; + } + + ConfigurationBuilder builder = new ConfigurationBuilder(); + builder.read(fromTemplateConfig); + /* + * Now evaluate async/sync + */ + if (cMode.contains(IClusterServices.cacheMode.ASYNC)) { + builder.clustering() + .cacheMode(fromTemplateConfig.clustering() + .cacheMode() + .toAsync()); + } else if (cMode.contains(IClusterServices.cacheMode.SYNC)) { + builder.clustering() + .cacheMode(fromTemplateConfig.clustering() + .cacheMode() + .toSync()); + } + + manager.defineConfiguration(realCacheName, builder.build()); + c = manager.getCache(realCacheName); + return c; } @Override @@ -343,10 +440,12 @@ public class ClusterManager implements IClusterServices, IContainerAware { @Override public boolean existCache(String containerName, String cacheName) { EmbeddedCacheManager manager = this.cm; - String realCacheName = "{" + containerName + "}_{" + cacheName + "}"; + if (manager == null) { return false; } + + String realCacheName = "{" + containerName + "}_{" + cacheName + "}"; return manager.cacheExists(realCacheName); } @@ -358,6 +457,7 @@ public class ClusterManager implements IClusterServices, IContainerAware { return null; } for (String cacheName : manager.getCacheNames()) { + if (!manager.isRunning(cacheName)) continue; if (cacheName.startsWith("{" + containerName + "}_")) { String[] res = cacheName.split("[{}]"); if (res.length >= 4 && res[1].equals(containerName) @@ -467,6 +567,12 @@ public class ClusterManager implements IClusterServices, IContainerAware { @Override public void tbegin() throws NotSupportedException, SystemException { + // call tbegin with the default timeout + tbegin(DEFAULT_TRANSACTION_TIMEOUT, TimeUnit.SECONDS); + } + + @Override + public void tbegin(long timeout, TimeUnit unit) throws NotSupportedException, SystemException { EmbeddedCacheManager manager = this.cm; if (manager == null) { throw new IllegalStateException(); @@ -476,6 +582,15 @@ public class ClusterManager implements IClusterServices, IContainerAware { if (tm == null) { throw new IllegalStateException(); } + long timeoutSec = unit.toSeconds(timeout); + if((timeoutSec > Integer.MAX_VALUE) || (timeoutSec <= 0)) { + // fall back to the default timeout + tm.setTransactionTimeout(DEFAULT_TRANSACTION_TIMEOUT); + } else { + // cast is ok here + // as here we are sure that timeoutSec < = Integer.MAX_VALUE. + tm.setTransactionTimeout((int) timeoutSec); + } tm.begin(); } @@ -663,21 +778,4 @@ public class ClusterManager implements IClusterServices, IContainerAware { } } } - - private void removeContainerCaches(String containerName) { - logger.info("Destroying caches for container {}", containerName); - for (String cacheName : this.getCacheList(containerName)) { - this.destroyCache(containerName, cacheName); - } - } - - @Override - public void containerCreate(String arg0) { - // no op - } - - @Override - public void containerDestroy(String container) { - removeContainerCaches(container); - } }