fix typos and some Java improvements in ClusterManager
[controller.git] / opendaylight / clustering / services_implementation / src / main / java / org / opendaylight / controller / clustering / services_implementation / internal / ClusterManager.java
index 632f550ffe342d585cfaf1e27cba0dac344e7f6b..83db4144007963a94e503a33345b1e641eadc2a0 100644 (file)
@@ -9,8 +9,6 @@
 
 package org.opendaylight.controller.clustering.services_implementation.internal;
 
-import java.io.PrintWriter;
-import java.io.StringWriter;
 import java.net.InetAddress;
 import java.net.NetworkInterface;
 import java.net.SocketException;
@@ -24,6 +22,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.StringTokenizer;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 
 import javax.transaction.HeuristicMixedException;
 import javax.transaction.HeuristicRollbackException;
@@ -35,6 +34,10 @@ import javax.transaction.TransactionManager;
 
 import org.infinispan.Cache;
 import org.infinispan.configuration.cache.Configuration;
+import org.infinispan.configuration.cache.ConfigurationBuilder;
+import org.infinispan.configuration.global.GlobalConfigurationBuilder;
+import org.infinispan.configuration.parsing.ConfigurationBuilderHolder;
+import org.infinispan.configuration.parsing.ParserRegistry;
 import org.infinispan.manager.DefaultCacheManager;
 import org.infinispan.manager.EmbeddedCacheManager;
 import org.infinispan.notifications.Listener;
@@ -62,10 +65,13 @@ public class ClusterManager implements IClusterServices {
             .getLogger(ClusterManager.class);
     private DefaultCacheManager cm;
     GossipRouter gossiper;
-    private HashSet roleChangeListeners;
+    private HashSet<IListenRoleChange> roleChangeListeners;
     private ViewChangedListener cacheManagerListener;
 
-    private static String loopbackAddress = "127.0.0.1";
+    private static String loopbackAddress = InetAddress.getLoopbackAddress().getHostAddress();
+    private static final int gossipRouterPortDefault = 12001;
+    // defaultTransactionTimeout is 60 seconds
+    private static int DEFAULT_TRANSACTION_TIMEOUT = 60;
 
     /**
      * Start a JGroups GossipRouter if we are a supernode. The
@@ -85,24 +91,23 @@ public class ClusterManager implements IClusterServices {
      */
     private GossipRouter startGossiper() {
         boolean amIGossipRouter = false;
-        Integer gossipRouterPortDefault = 12001;
         Integer gossipRouterPort = gossipRouterPortDefault;
         InetAddress gossipRouterAddress = null;
         String supernodes_list = System.getProperty("supernodes",
                 loopbackAddress);
-        StringBuffer sanitized_supernodes_list = new StringBuffer();
+        StringBuilder sanitized_supernodes_list = new StringBuilder();
         List<InetAddress> myAddresses = new ArrayList<InetAddress>();
 
         StringTokenizer supernodes = new StringTokenizer(supernodes_list, ":");
         if (supernodes.hasMoreTokens()) {
             // Populate the list of my addresses
             try {
-                Enumeration e = NetworkInterface.getNetworkInterfaces();
+                Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
                 while (e.hasMoreElements()) {
-                    NetworkInterface n = (NetworkInterface) e.nextElement();
-                    Enumeration ee = n.getInetAddresses();
+                    NetworkInterface n = e.nextElement();
+                    Enumeration<InetAddress> ee = n.getInetAddresses();
                     while (ee.hasMoreElements()) {
-                        InetAddress i = (InetAddress) ee.nextElement();
+                        InetAddress i = ee.nextElement();
                         myAddresses.add(i);
                     }
                 }
@@ -113,15 +118,15 @@ public class ClusterManager implements IClusterServices {
         }
         while (supernodes.hasMoreTokens()) {
             String curr_supernode = supernodes.nextToken();
-            logger.debug("Examining supernode " + curr_supernode);
+            logger.debug("Examining supernode {}", curr_supernode);
             StringTokenizer host_port = new StringTokenizer(curr_supernode,
                     "[]");
             String host;
             String port;
             Integer port_num = gossipRouterPortDefault;
             if (host_port.countTokens() > 2) {
-                logger.error("Error parsing supernode " + curr_supernode
-                        + " proceed to the next one");
+                logger.error("Error parsing supernode {} proceed to the next one",
+                        curr_supernode);
                 continue;
             }
             host = host_port.nextToken();
@@ -129,7 +134,7 @@ public class ClusterManager implements IClusterServices {
             try {
                 hostAddr = InetAddress.getByName(host);
             } catch (UnknownHostException ue) {
-                logger.error("Host not known");
+                logger.error("Host {} is not known", host);
                 continue;
             }
             if (host_port.hasMoreTokens()) {
@@ -137,13 +142,12 @@ public class ClusterManager implements IClusterServices {
                 try {
                     port_num = Integer.valueOf(port);
                 } catch (NumberFormatException ne) {
-                    logger
-                            .error("Supplied supernode gossiepr port is not recognized, using standard gossipport");
+                    logger.error("Supplied supernode gossip port is not recognized, using default gossip port {}",
+                                 gossipRouterPortDefault);
                     port_num = gossipRouterPortDefault;
                 }
                 if ((port_num > 65535) || (port_num < 0)) {
-                    logger
-                            .error("Supplied supernode gossip port is outside a valid TCP port range");
+                    logger.error("Supplied supernode gossip port is outside a valid TCP port range");
                     port_num = gossipRouterPortDefault;
                 }
             }
@@ -162,8 +166,7 @@ public class ClusterManager implements IClusterServices {
             if (!sanitized_supernodes_list.toString().equals("")) {
                 sanitized_supernodes_list.append(",");
             }
-            sanitized_supernodes_list.append(hostAddr.getHostAddress() + "["
-                    + port_num + "]");
+            sanitized_supernodes_list.append(hostAddr.getHostAddress()).append("[").append(port_num).append("]");
         }
 
         if (amIGossipRouter) {
@@ -184,13 +187,13 @@ public class ClusterManager implements IClusterServices {
                     for (InetAddress myAddr : myAddresses) {
                         if (myAddr.isLoopbackAddress()
                                 || myAddr.isLinkLocalAddress()) {
-                            logger.debug("Skipping local address "
-                                    + myAddr.getHostAddress());
+                            logger.debug("Skipping local address {}",
+                                         myAddr.getHostAddress());
                             continue;
                         } else {
                             // First non-local address
                             myBind = myAddr.getHostAddress();
-                            logger.debug("First non-local address " + myBind);
+                            logger.debug("First non-local address {}", myBind);
                             break;
                         }
                     }
@@ -199,7 +202,7 @@ public class ClusterManager implements IClusterServices {
                         .getProperty("jgroups.tcp.address");
                 if (jgroupAddress == null) {
                     if (myBind != null) {
-                        logger.debug("Set bind address to be " + myBind);
+                        logger.debug("Set bind address to be {}", myBind);
                         System.setProperty("jgroups.tcp.address", myBind);
                     } else {
                         logger
@@ -207,8 +210,8 @@ public class ClusterManager implements IClusterServices {
                         System.setProperty("jgroups.tcp.address", "127.0.0.1");
                     }
                 } else {
-                    logger.debug("jgroup.tcp.address already set to be "
-                            jgroupAddress);
+                    logger.debug("jgroup.tcp.address already set to be {}",
+                            jgroupAddress);
                 }
             } catch (UnknownHostException uhe) {
                 logger
@@ -220,19 +223,31 @@ public class ClusterManager implements IClusterServices {
         // host list
         System.setProperty("jgroups.tcpgossip.initial_hosts",
                 sanitized_supernodes_list.toString());
-        logger.debug("jgroups.tcp.address set to "
-                System.getProperty("jgroups.tcp.address"));
-        logger.debug("jgroups.tcpgossip.initial_hosts set to "
-                System.getProperty("jgroups.tcpgossip.initial_hosts"));
+        logger.debug("jgroups.tcp.address set to {}",
+                System.getProperty("jgroups.tcp.address"));
+        logger.debug("jgroups.tcpgossip.initial_hosts set to {}",
+                System.getProperty("jgroups.tcpgossip.initial_hosts"));
         GossipRouter res = null;
         if (amIGossipRouter) {
-            logger.info("I'm a GossipRouter will listen on port "
-                    + gossipRouterPort);
-            res = new GossipRouter(gossipRouterPort);
+            logger.info("I'm a GossipRouter will listen on port {}",
+                    gossipRouterPort);
+            // Start a GossipRouter with JMX support
+            res = new GossipRouter(gossipRouterPort, null, true);
         }
         return res;
     }
 
+    private void exitOnSecurityException(Exception ioe) {
+        Throwable cause = ioe.getCause();
+        while (cause != null) {
+            if (cause instanceof java.lang.SecurityException) {
+                logger.error("Failed Cluster authentication. Stopping Controller...");
+                System.exit(0);
+            }
+            cause = cause.getCause();
+        }
+    }
+
     public void start() {
         this.gossiper = startGossiper();
         if (this.gossiper != null) {
@@ -241,18 +256,23 @@ public class ClusterManager implements IClusterServices {
                 this.gossiper.start();
                 logger.info("Started GossipRouter");
             } catch (Exception e) {
-                logger.error("GossipRouter didn't start exception " + e
-                        + " met");
-                StringWriter sw = new StringWriter();
-                logger.error("Stack Trace that raised the exception");
-                e.printStackTrace(new PrintWriter(sw));
-                logger.error(sw.toString());
+                logger.error("GossipRouter didn't start. Exception Stack Trace",
+                             e);
             }
         }
         logger.info("Starting the ClusterManager");
         try {
-            //FIXME keeps throwing FileNotFoundException
-            this.cm = new DefaultCacheManager("/config/infinispan-config.xml");
+            ParserRegistry parser = new ParserRegistry(this.getClass()
+                    .getClassLoader());
+            String infinispanConfigFile =
+                    System.getProperty("org.infinispan.config.file", "config/infinispan-config.xml");
+            logger.debug("Using configuration file:{}", infinispanConfigFile);
+            ConfigurationBuilderHolder holder = parser.parseFile(infinispanConfigFile);
+            GlobalConfigurationBuilder globalBuilder = holder.getGlobalConfigurationBuilder();
+            globalBuilder.serialization()
+                    .classResolver(new ClassResolver())
+                    .build();
+            this.cm = new DefaultCacheManager(holder, false);
             logger.debug("Allocated ClusterManager");
             if (this.cm != null) {
                 this.cm.start();
@@ -260,15 +280,14 @@ public class ClusterManager implements IClusterServices {
                 logger.debug("Started the ClusterManager");
             }
         } catch (Exception ioe) {
-            StringWriter sw = new StringWriter();
             logger.error("Cannot configure infinispan .. bailing out ");
             logger.error("Stack Trace that raised th exception");
-            ioe.printStackTrace(new PrintWriter(sw));
-            logger.error(sw.toString());
+            logger.error("",ioe);
             this.cm = null;
+            exitOnSecurityException(ioe);
             this.stop();
         }
-        logger.debug("Cache Manager has value " + this.cm);
+        logger.debug("Cache Manager has value {}", this.cm);
     }
 
     public void stop() {
@@ -289,7 +308,7 @@ public class ClusterManager implements IClusterServices {
             String cacheName, Set<cacheMode> cMode) throws CacheExistException,
             CacheConfigException {
         EmbeddedCacheManager manager = this.cm;
-        Cache c;
+        Cache<Object,Object> c;
         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
         if (manager == null) {
             return null;
@@ -299,30 +318,60 @@ public class ClusterManager implements IClusterServices {
             throw new CacheExistException();
         }
 
-        // Sanity check to avoid contrasting parameters
-        if (cMode.containsAll(EnumSet.of(
-                IClusterServices.cacheMode.NON_TRANSACTIONAL,
+        // Sanity check to avoid contrasting parameters between transactional
+        // and not
+        if (cMode.containsAll(EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL,
                 IClusterServices.cacheMode.TRANSACTIONAL))) {
             throw new CacheConfigException();
         }
 
-        if (cMode.contains(IClusterServices.cacheMode.NON_TRANSACTIONAL)) {
-            c = manager.getCache(realCacheName);
-            return c;
-        } else if (cMode.contains(IClusterServices.cacheMode.TRANSACTIONAL)) {
-            Configuration rc = manager
-                    .getCacheConfiguration("transactional-type");
-            manager.defineConfiguration(realCacheName, rc);
-            c = manager.getCache(realCacheName);
-            return c;
+        // Sanity check to avoid contrasting parameters between sync and async
+        if (cMode.containsAll(EnumSet.of(IClusterServices.cacheMode.SYNC, IClusterServices.cacheMode.ASYNC))) {
+            throw new CacheConfigException();
         }
-        return null;
+
+        Configuration fromTemplateConfig = null;
+        /*
+         * Fetch transactional/non-transactional templates
+         */
+        // Check if transactional
+        if (cMode.contains(IClusterServices.cacheMode.TRANSACTIONAL)) {
+            fromTemplateConfig = manager.getCacheConfiguration("transactional-type");
+        } else if (cMode.contains(IClusterServices.cacheMode.NON_TRANSACTIONAL)) {
+            fromTemplateConfig = manager.getDefaultCacheConfiguration();
+        }
+
+        // If none set the transactional property then just return null
+        if (fromTemplateConfig == null) {
+            return null;
+        }
+
+        ConfigurationBuilder builder = new ConfigurationBuilder();
+        builder.read(fromTemplateConfig);
+        /*
+         * Now evaluate async/sync
+         */
+        if (cMode.contains(IClusterServices.cacheMode.ASYNC)) {
+            builder.clustering()
+                    .cacheMode(fromTemplateConfig.clustering()
+                            .cacheMode()
+                            .toAsync());
+        } else if (cMode.contains(IClusterServices.cacheMode.SYNC)) {
+            builder.clustering()
+                    .cacheMode(fromTemplateConfig.clustering()
+                            .cacheMode()
+                            .toSync());
+        }
+
+        manager.defineConfiguration(realCacheName, builder.build());
+        c = manager.getCache(realCacheName);
+        return c;
     }
 
     @Override
     public ConcurrentMap<?, ?> getCache(String containerName, String cacheName) {
         EmbeddedCacheManager manager = this.cm;
-        Cache c;
+        Cache<Object,Object> c;
         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
         if (manager == null) {
             return null;
@@ -350,21 +399,24 @@ public class ClusterManager implements IClusterServices {
     @Override
     public boolean existCache(String containerName, String cacheName) {
         EmbeddedCacheManager manager = this.cm;
-        String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
+
         if (manager == null) {
             return false;
         }
+
+        String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
         return manager.cacheExists(realCacheName);
     }
 
     @Override
     public Set<String> getCacheList(String containerName) {
-        Set<String> perContainerCaches = new HashSet();
+        Set<String> perContainerCaches = new HashSet<String>();
         EmbeddedCacheManager manager = this.cm;
         if (manager == null) {
             return null;
         }
         for (String cacheName : manager.getCacheNames()) {
+            if (!manager.isRunning(cacheName)) continue;
             if (cacheName.startsWith("{" + containerName + "}_")) {
                 String[] res = cacheName.split("[{}]");
                 if (res.length >= 4 && res[1].equals(containerName)
@@ -403,7 +455,7 @@ public class ClusterManager implements IClusterServices {
     public void addListener(String containerName, String cacheName,
             IGetUpdates<?, ?> u) throws CacheListenerAddException {
         EmbeddedCacheManager manager = this.cm;
-        Cache c;
+        Cache<Object,Object> c;
         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
         if (manager == null) {
             return;
@@ -422,7 +474,7 @@ public class ClusterManager implements IClusterServices {
     public Set<IGetUpdates<?, ?>> getListeners(String containerName,
             String cacheName) {
         EmbeddedCacheManager manager = this.cm;
-        Cache c;
+        Cache<Object,Object> c;
         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
         if (manager == null) {
             return null;
@@ -433,7 +485,7 @@ public class ClusterManager implements IClusterServices {
         }
         c = manager.getCache(realCacheName);
 
-        Set<IGetUpdates<?, ?>> res = new HashSet();
+        Set<IGetUpdates<?, ?>> res = new HashSet<IGetUpdates<?, ?>>();
         Set<Object> listeners = c.getListeners();
         for (Object listener : listeners) {
             if (listener instanceof CacheListenerContainer) {
@@ -449,7 +501,7 @@ public class ClusterManager implements IClusterServices {
     public void removeListener(String containerName, String cacheName,
             IGetUpdates<?, ?> u) {
         EmbeddedCacheManager manager = this.cm;
-        Cache c;
+        Cache<Object,Object> c;
         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
         if (manager == null) {
             return;
@@ -474,6 +526,12 @@ public class ClusterManager implements IClusterServices {
 
     @Override
     public void tbegin() throws NotSupportedException, SystemException {
+        // call tbegin with the default timeout
+        tbegin(DEFAULT_TRANSACTION_TIMEOUT, TimeUnit.SECONDS);
+    }
+
+    @Override
+    public void tbegin(long timeout, TimeUnit unit) throws NotSupportedException, SystemException {
         EmbeddedCacheManager manager = this.cm;
         if (manager == null) {
             throw new IllegalStateException();
@@ -483,6 +541,15 @@ public class ClusterManager implements IClusterServices {
         if (tm == null) {
             throw new IllegalStateException();
         }
+        long timeoutSec = unit.toSeconds(timeout);
+        if((timeoutSec > Integer.MAX_VALUE) || (timeoutSec <= 0)) {
+            // fall back to the default timeout
+            tm.setTransactionTimeout(DEFAULT_TRANSACTION_TIMEOUT);
+        } else {
+            // cast is ok here
+            // as here we are sure that timeoutSec < = Integer.MAX_VALUE.
+            tm.setTransactionTimeout((int) timeoutSec);
+        }
         tm.begin();
     }
 
@@ -536,7 +603,7 @@ public class ClusterManager implements IClusterServices {
         EmbeddedCacheManager manager = this.cm;
         if (manager == null) {
             // In case we cannot fetch the information, lets assume we
-            // are standby, so to have less responsability.
+            // are standby, so to have less responsibility.
             return true;
         }
         return (!manager.isCoordinator());
@@ -546,7 +613,7 @@ public class ClusterManager implements IClusterServices {
         EmbeddedCacheManager manager = this.cm;
         if ((manager == null) || (a == null)) {
             // In case we cannot fetch the information, lets assume we
-            // are standby, so to have less responsability.
+            // are standby, so to have less responsibility.
             return null;
         }
         Transport t = manager.getTransport();
@@ -568,25 +635,29 @@ public class ClusterManager implements IClusterServices {
         return null;
     }
 
+    @Override
     public List<InetAddress> getClusteredControllers() {
         EmbeddedCacheManager manager = this.cm;
         if (manager == null) {
             return null;
         }
         List<Address> controllers = manager.getMembers();
-        if ((controllers == null) || controllers.size() == 0)
+        if ((controllers == null) || controllers.size() == 0) {
             return null;
+        }
 
         List<InetAddress> clusteredControllers = new ArrayList<InetAddress>();
         for (Address a : controllers) {
             InetAddress inetAddress = addressToInetAddress(a);
             if (inetAddress != null
-                    && !inetAddress.getHostAddress().equals(loopbackAddress))
+                    && !inetAddress.getHostAddress().equals(loopbackAddress)) {
                 clusteredControllers.add(inetAddress);
+            }
         }
         return clusteredControllers;
     }
 
+    @Override
     public InetAddress getMyAddress() {
         EmbeddedCacheManager manager = this.cm;
         if (manager == null) {
@@ -600,7 +671,7 @@ public class ClusterManager implements IClusterServices {
         EmbeddedCacheManager manager = this.cm;
         if (manager == null) {
             // In case we cannot fetch the information, lets assume we
-            // are standby, so to have less responsability.
+            // are standby, so to have less responsibility.
             return null;
         }
 
@@ -613,12 +684,12 @@ public class ClusterManager implements IClusterServices {
         EmbeddedCacheManager manager = this.cm;
         if (manager == null) {
             // In case we cannot fetch the information, lets assume we
-            // are standby, so to have less responsability.
+            // are standby, so to have less responsibility.
             throw new ListenRoleChangeAddException();
         }
 
         if (this.roleChangeListeners == null) {
-            this.roleChangeListeners = new HashSet();
+            this.roleChangeListeners = new HashSet<IListenRoleChange>();
             this.cacheManagerListener = new ViewChangedListener(
                     this.roleChangeListeners);
             manager.addListener(this.cacheManagerListener);
@@ -634,7 +705,7 @@ public class ClusterManager implements IClusterServices {
         EmbeddedCacheManager manager = this.cm;
         if (manager == null) {
             // In case we cannot fetch the information, lets assume we
-            // are standby, so to have less responsability.
+            // are standby, so to have less responsibility.
             return;
         }