X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fclustering%2Fservices_implementation%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fclustering%2Fservices_implementation%2Finternal%2FClusterManager.java;h=83db4144007963a94e503a33345b1e641eadc2a0;hp=632f550ffe342d585cfaf1e27cba0dac344e7f6b;hb=01c2c9975f3a0c33c1c211783c228c55b4aa3a46;hpb=ed39d9e127dcea25cdd05e4f228a6d7b8b08fad5 diff --git a/opendaylight/clustering/services_implementation/src/main/java/org/opendaylight/controller/clustering/services_implementation/internal/ClusterManager.java b/opendaylight/clustering/services_implementation/src/main/java/org/opendaylight/controller/clustering/services_implementation/internal/ClusterManager.java index 632f550ffe..83db414400 100644 --- a/opendaylight/clustering/services_implementation/src/main/java/org/opendaylight/controller/clustering/services_implementation/internal/ClusterManager.java +++ b/opendaylight/clustering/services_implementation/src/main/java/org/opendaylight/controller/clustering/services_implementation/internal/ClusterManager.java @@ -9,8 +9,6 @@ package org.opendaylight.controller.clustering.services_implementation.internal; -import java.io.PrintWriter; -import java.io.StringWriter; import java.net.InetAddress; import java.net.NetworkInterface; import java.net.SocketException; @@ -24,6 +22,7 @@ import java.util.Properties; import java.util.Set; import java.util.StringTokenizer; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import javax.transaction.HeuristicMixedException; import javax.transaction.HeuristicRollbackException; @@ -35,6 +34,10 @@ import javax.transaction.TransactionManager; import org.infinispan.Cache; import org.infinispan.configuration.cache.Configuration; +import org.infinispan.configuration.cache.ConfigurationBuilder; +import org.infinispan.configuration.global.GlobalConfigurationBuilder; +import org.infinispan.configuration.parsing.ConfigurationBuilderHolder; +import org.infinispan.configuration.parsing.ParserRegistry; import org.infinispan.manager.DefaultCacheManager; import org.infinispan.manager.EmbeddedCacheManager; import org.infinispan.notifications.Listener; @@ -62,10 +65,13 @@ public class ClusterManager implements IClusterServices { .getLogger(ClusterManager.class); private DefaultCacheManager cm; GossipRouter gossiper; - private HashSet roleChangeListeners; + private HashSet roleChangeListeners; private ViewChangedListener cacheManagerListener; - private static String loopbackAddress = "127.0.0.1"; + private static String loopbackAddress = InetAddress.getLoopbackAddress().getHostAddress(); + private static final int gossipRouterPortDefault = 12001; + // defaultTransactionTimeout is 60 seconds + private static int DEFAULT_TRANSACTION_TIMEOUT = 60; /** * Start a JGroups GossipRouter if we are a supernode. The @@ -85,24 +91,23 @@ public class ClusterManager implements IClusterServices { */ private GossipRouter startGossiper() { boolean amIGossipRouter = false; - Integer gossipRouterPortDefault = 12001; Integer gossipRouterPort = gossipRouterPortDefault; InetAddress gossipRouterAddress = null; String supernodes_list = System.getProperty("supernodes", loopbackAddress); - StringBuffer sanitized_supernodes_list = new StringBuffer(); + StringBuilder sanitized_supernodes_list = new StringBuilder(); List myAddresses = new ArrayList(); StringTokenizer supernodes = new StringTokenizer(supernodes_list, ":"); if (supernodes.hasMoreTokens()) { // Populate the list of my addresses try { - Enumeration e = NetworkInterface.getNetworkInterfaces(); + Enumeration e = NetworkInterface.getNetworkInterfaces(); while (e.hasMoreElements()) { - NetworkInterface n = (NetworkInterface) e.nextElement(); - Enumeration ee = n.getInetAddresses(); + NetworkInterface n = e.nextElement(); + Enumeration ee = n.getInetAddresses(); while (ee.hasMoreElements()) { - InetAddress i = (InetAddress) ee.nextElement(); + InetAddress i = ee.nextElement(); myAddresses.add(i); } } @@ -113,15 +118,15 @@ public class ClusterManager implements IClusterServices { } while (supernodes.hasMoreTokens()) { String curr_supernode = supernodes.nextToken(); - logger.debug("Examining supernode " + curr_supernode); + logger.debug("Examining supernode {}", curr_supernode); StringTokenizer host_port = new StringTokenizer(curr_supernode, "[]"); String host; String port; Integer port_num = gossipRouterPortDefault; if (host_port.countTokens() > 2) { - logger.error("Error parsing supernode " + curr_supernode - + " proceed to the next one"); + logger.error("Error parsing supernode {} proceed to the next one", + curr_supernode); continue; } host = host_port.nextToken(); @@ -129,7 +134,7 @@ public class ClusterManager implements IClusterServices { try { hostAddr = InetAddress.getByName(host); } catch (UnknownHostException ue) { - logger.error("Host not known"); + logger.error("Host {} is not known", host); continue; } if (host_port.hasMoreTokens()) { @@ -137,13 +142,12 @@ public class ClusterManager implements IClusterServices { try { port_num = Integer.valueOf(port); } catch (NumberFormatException ne) { - logger - .error("Supplied supernode gossiepr port is not recognized, using standard gossipport"); + logger.error("Supplied supernode gossip port is not recognized, using default gossip port {}", + gossipRouterPortDefault); port_num = gossipRouterPortDefault; } if ((port_num > 65535) || (port_num < 0)) { - logger - .error("Supplied supernode gossip port is outside a valid TCP port range"); + logger.error("Supplied supernode gossip port is outside a valid TCP port range"); port_num = gossipRouterPortDefault; } } @@ -162,8 +166,7 @@ public class ClusterManager implements IClusterServices { if (!sanitized_supernodes_list.toString().equals("")) { sanitized_supernodes_list.append(","); } - sanitized_supernodes_list.append(hostAddr.getHostAddress() + "[" - + port_num + "]"); + sanitized_supernodes_list.append(hostAddr.getHostAddress()).append("[").append(port_num).append("]"); } if (amIGossipRouter) { @@ -184,13 +187,13 @@ public class ClusterManager implements IClusterServices { for (InetAddress myAddr : myAddresses) { if (myAddr.isLoopbackAddress() || myAddr.isLinkLocalAddress()) { - logger.debug("Skipping local address " - + myAddr.getHostAddress()); + logger.debug("Skipping local address {}", + myAddr.getHostAddress()); continue; } else { // First non-local address myBind = myAddr.getHostAddress(); - logger.debug("First non-local address " + myBind); + logger.debug("First non-local address {}", myBind); break; } } @@ -199,7 +202,7 @@ public class ClusterManager implements IClusterServices { .getProperty("jgroups.tcp.address"); if (jgroupAddress == null) { if (myBind != null) { - logger.debug("Set bind address to be " + myBind); + logger.debug("Set bind address to be {}", myBind); System.setProperty("jgroups.tcp.address", myBind); } else { logger @@ -207,8 +210,8 @@ public class ClusterManager implements IClusterServices { System.setProperty("jgroups.tcp.address", "127.0.0.1"); } } else { - logger.debug("jgroup.tcp.address already set to be " - + jgroupAddress); + logger.debug("jgroup.tcp.address already set to be {}", + jgroupAddress); } } catch (UnknownHostException uhe) { logger @@ -220,19 +223,31 @@ public class ClusterManager implements IClusterServices { // host list System.setProperty("jgroups.tcpgossip.initial_hosts", sanitized_supernodes_list.toString()); - logger.debug("jgroups.tcp.address set to " - + System.getProperty("jgroups.tcp.address")); - logger.debug("jgroups.tcpgossip.initial_hosts set to " - + System.getProperty("jgroups.tcpgossip.initial_hosts")); + logger.debug("jgroups.tcp.address set to {}", + System.getProperty("jgroups.tcp.address")); + logger.debug("jgroups.tcpgossip.initial_hosts set to {}", + System.getProperty("jgroups.tcpgossip.initial_hosts")); GossipRouter res = null; if (amIGossipRouter) { - logger.info("I'm a GossipRouter will listen on port " - + gossipRouterPort); - res = new GossipRouter(gossipRouterPort); + logger.info("I'm a GossipRouter will listen on port {}", + gossipRouterPort); + // Start a GossipRouter with JMX support + res = new GossipRouter(gossipRouterPort, null, true); } return res; } + private void exitOnSecurityException(Exception ioe) { + Throwable cause = ioe.getCause(); + while (cause != null) { + if (cause instanceof java.lang.SecurityException) { + logger.error("Failed Cluster authentication. Stopping Controller..."); + System.exit(0); + } + cause = cause.getCause(); + } + } + public void start() { this.gossiper = startGossiper(); if (this.gossiper != null) { @@ -241,18 +256,23 @@ public class ClusterManager implements IClusterServices { this.gossiper.start(); logger.info("Started GossipRouter"); } catch (Exception e) { - logger.error("GossipRouter didn't start exception " + e - + " met"); - StringWriter sw = new StringWriter(); - logger.error("Stack Trace that raised the exception"); - e.printStackTrace(new PrintWriter(sw)); - logger.error(sw.toString()); + logger.error("GossipRouter didn't start. Exception Stack Trace", + e); } } logger.info("Starting the ClusterManager"); try { - //FIXME keeps throwing FileNotFoundException - this.cm = new DefaultCacheManager("/config/infinispan-config.xml"); + ParserRegistry parser = new ParserRegistry(this.getClass() + .getClassLoader()); + String infinispanConfigFile = + System.getProperty("org.infinispan.config.file", "config/infinispan-config.xml"); + logger.debug("Using configuration file:{}", infinispanConfigFile); + ConfigurationBuilderHolder holder = parser.parseFile(infinispanConfigFile); + GlobalConfigurationBuilder globalBuilder = holder.getGlobalConfigurationBuilder(); + globalBuilder.serialization() + .classResolver(new ClassResolver()) + .build(); + this.cm = new DefaultCacheManager(holder, false); logger.debug("Allocated ClusterManager"); if (this.cm != null) { this.cm.start(); @@ -260,15 +280,14 @@ public class ClusterManager implements IClusterServices { logger.debug("Started the ClusterManager"); } } catch (Exception ioe) { - StringWriter sw = new StringWriter(); logger.error("Cannot configure infinispan .. bailing out "); logger.error("Stack Trace that raised th exception"); - ioe.printStackTrace(new PrintWriter(sw)); - logger.error(sw.toString()); + logger.error("",ioe); this.cm = null; + exitOnSecurityException(ioe); this.stop(); } - logger.debug("Cache Manager has value " + this.cm); + logger.debug("Cache Manager has value {}", this.cm); } public void stop() { @@ -289,7 +308,7 @@ public class ClusterManager implements IClusterServices { String cacheName, Set cMode) throws CacheExistException, CacheConfigException { EmbeddedCacheManager manager = this.cm; - Cache c; + Cache c; String realCacheName = "{" + containerName + "}_{" + cacheName + "}"; if (manager == null) { return null; @@ -299,30 +318,60 @@ public class ClusterManager implements IClusterServices { throw new CacheExistException(); } - // Sanity check to avoid contrasting parameters - if (cMode.containsAll(EnumSet.of( - IClusterServices.cacheMode.NON_TRANSACTIONAL, + // Sanity check to avoid contrasting parameters between transactional + // and not + if (cMode.containsAll(EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.TRANSACTIONAL))) { throw new CacheConfigException(); } - if (cMode.contains(IClusterServices.cacheMode.NON_TRANSACTIONAL)) { - c = manager.getCache(realCacheName); - return c; - } else if (cMode.contains(IClusterServices.cacheMode.TRANSACTIONAL)) { - Configuration rc = manager - .getCacheConfiguration("transactional-type"); - manager.defineConfiguration(realCacheName, rc); - c = manager.getCache(realCacheName); - return c; + // Sanity check to avoid contrasting parameters between sync and async + if (cMode.containsAll(EnumSet.of(IClusterServices.cacheMode.SYNC, IClusterServices.cacheMode.ASYNC))) { + throw new CacheConfigException(); } - return null; + + Configuration fromTemplateConfig = null; + /* + * Fetch transactional/non-transactional templates + */ + // Check if transactional + if (cMode.contains(IClusterServices.cacheMode.TRANSACTIONAL)) { + fromTemplateConfig = manager.getCacheConfiguration("transactional-type"); + } else if (cMode.contains(IClusterServices.cacheMode.NON_TRANSACTIONAL)) { + fromTemplateConfig = manager.getDefaultCacheConfiguration(); + } + + // If none set the transactional property then just return null + if (fromTemplateConfig == null) { + return null; + } + + ConfigurationBuilder builder = new ConfigurationBuilder(); + builder.read(fromTemplateConfig); + /* + * Now evaluate async/sync + */ + if (cMode.contains(IClusterServices.cacheMode.ASYNC)) { + builder.clustering() + .cacheMode(fromTemplateConfig.clustering() + .cacheMode() + .toAsync()); + } else if (cMode.contains(IClusterServices.cacheMode.SYNC)) { + builder.clustering() + .cacheMode(fromTemplateConfig.clustering() + .cacheMode() + .toSync()); + } + + manager.defineConfiguration(realCacheName, builder.build()); + c = manager.getCache(realCacheName); + return c; } @Override public ConcurrentMap getCache(String containerName, String cacheName) { EmbeddedCacheManager manager = this.cm; - Cache c; + Cache c; String realCacheName = "{" + containerName + "}_{" + cacheName + "}"; if (manager == null) { return null; @@ -350,21 +399,24 @@ public class ClusterManager implements IClusterServices { @Override public boolean existCache(String containerName, String cacheName) { EmbeddedCacheManager manager = this.cm; - String realCacheName = "{" + containerName + "}_{" + cacheName + "}"; + if (manager == null) { return false; } + + String realCacheName = "{" + containerName + "}_{" + cacheName + "}"; return manager.cacheExists(realCacheName); } @Override public Set getCacheList(String containerName) { - Set perContainerCaches = new HashSet(); + Set perContainerCaches = new HashSet(); EmbeddedCacheManager manager = this.cm; if (manager == null) { return null; } for (String cacheName : manager.getCacheNames()) { + if (!manager.isRunning(cacheName)) continue; if (cacheName.startsWith("{" + containerName + "}_")) { String[] res = cacheName.split("[{}]"); if (res.length >= 4 && res[1].equals(containerName) @@ -403,7 +455,7 @@ public class ClusterManager implements IClusterServices { public void addListener(String containerName, String cacheName, IGetUpdates u) throws CacheListenerAddException { EmbeddedCacheManager manager = this.cm; - Cache c; + Cache c; String realCacheName = "{" + containerName + "}_{" + cacheName + "}"; if (manager == null) { return; @@ -422,7 +474,7 @@ public class ClusterManager implements IClusterServices { public Set> getListeners(String containerName, String cacheName) { EmbeddedCacheManager manager = this.cm; - Cache c; + Cache c; String realCacheName = "{" + containerName + "}_{" + cacheName + "}"; if (manager == null) { return null; @@ -433,7 +485,7 @@ public class ClusterManager implements IClusterServices { } c = manager.getCache(realCacheName); - Set> res = new HashSet(); + Set> res = new HashSet>(); Set listeners = c.getListeners(); for (Object listener : listeners) { if (listener instanceof CacheListenerContainer) { @@ -449,7 +501,7 @@ public class ClusterManager implements IClusterServices { public void removeListener(String containerName, String cacheName, IGetUpdates u) { EmbeddedCacheManager manager = this.cm; - Cache c; + Cache c; String realCacheName = "{" + containerName + "}_{" + cacheName + "}"; if (manager == null) { return; @@ -474,6 +526,12 @@ public class ClusterManager implements IClusterServices { @Override public void tbegin() throws NotSupportedException, SystemException { + // call tbegin with the default timeout + tbegin(DEFAULT_TRANSACTION_TIMEOUT, TimeUnit.SECONDS); + } + + @Override + public void tbegin(long timeout, TimeUnit unit) throws NotSupportedException, SystemException { EmbeddedCacheManager manager = this.cm; if (manager == null) { throw new IllegalStateException(); @@ -483,6 +541,15 @@ public class ClusterManager implements IClusterServices { if (tm == null) { throw new IllegalStateException(); } + long timeoutSec = unit.toSeconds(timeout); + if((timeoutSec > Integer.MAX_VALUE) || (timeoutSec <= 0)) { + // fall back to the default timeout + tm.setTransactionTimeout(DEFAULT_TRANSACTION_TIMEOUT); + } else { + // cast is ok here + // as here we are sure that timeoutSec < = Integer.MAX_VALUE. + tm.setTransactionTimeout((int) timeoutSec); + } tm.begin(); } @@ -536,7 +603,7 @@ public class ClusterManager implements IClusterServices { EmbeddedCacheManager manager = this.cm; if (manager == null) { // In case we cannot fetch the information, lets assume we - // are standby, so to have less responsability. + // are standby, so to have less responsibility. return true; } return (!manager.isCoordinator()); @@ -546,7 +613,7 @@ public class ClusterManager implements IClusterServices { EmbeddedCacheManager manager = this.cm; if ((manager == null) || (a == null)) { // In case we cannot fetch the information, lets assume we - // are standby, so to have less responsability. + // are standby, so to have less responsibility. return null; } Transport t = manager.getTransport(); @@ -568,25 +635,29 @@ public class ClusterManager implements IClusterServices { return null; } + @Override public List getClusteredControllers() { EmbeddedCacheManager manager = this.cm; if (manager == null) { return null; } List
controllers = manager.getMembers(); - if ((controllers == null) || controllers.size() == 0) + if ((controllers == null) || controllers.size() == 0) { return null; + } List clusteredControllers = new ArrayList(); for (Address a : controllers) { InetAddress inetAddress = addressToInetAddress(a); if (inetAddress != null - && !inetAddress.getHostAddress().equals(loopbackAddress)) + && !inetAddress.getHostAddress().equals(loopbackAddress)) { clusteredControllers.add(inetAddress); + } } return clusteredControllers; } + @Override public InetAddress getMyAddress() { EmbeddedCacheManager manager = this.cm; if (manager == null) { @@ -600,7 +671,7 @@ public class ClusterManager implements IClusterServices { EmbeddedCacheManager manager = this.cm; if (manager == null) { // In case we cannot fetch the information, lets assume we - // are standby, so to have less responsability. + // are standby, so to have less responsibility. return null; } @@ -613,12 +684,12 @@ public class ClusterManager implements IClusterServices { EmbeddedCacheManager manager = this.cm; if (manager == null) { // In case we cannot fetch the information, lets assume we - // are standby, so to have less responsability. + // are standby, so to have less responsibility. throw new ListenRoleChangeAddException(); } if (this.roleChangeListeners == null) { - this.roleChangeListeners = new HashSet(); + this.roleChangeListeners = new HashSet(); this.cacheManagerListener = new ViewChangedListener( this.roleChangeListeners); manager.addListener(this.cacheManagerListener); @@ -634,7 +705,7 @@ public class ClusterManager implements IClusterServices { EmbeddedCacheManager manager = this.cm; if (manager == null) { // In case we cannot fetch the information, lets assume we - // are standby, so to have less responsability. + // are standby, so to have less responsibility. return; }