Upgrading & configuring Infinispan 5.2.3 -> 5.3.0 for better 2-phase commit support. 69/869/1
authorMadhu Venugopal <vmadhu@cisco.com>
Wed, 14 Aug 2013 04:21:45 +0000 (21:21 -0700)
committerMadhu Venugopal <vmadhu@cisco.com>
Wed, 14 Aug 2013 04:21:45 +0000 (21:21 -0700)
This follows the recent change from DummyTransactionManager to JBossTransactionManager.

ConnectionManager depends on the Consistency guarantee honored by the Clustering Services in order to
provide consistent Connection Management Services. Existing Infinispan 5.2.3 carries a few issues
(Refer : ISPN-3366 and ISPN-3357) which makes putIfAbsent not atomic even for TRANSACTIONAL caches.
Also, Since we are using NON-TRANSACTIONAL Caches, Infinispan doesnt provide any Atomicity guarantee
for the Cache Syncs. This is the first commit in the controller repo to convert an existing cache
from NON-TRASACTIONAL to TRANSACTIONAL. Few more caches that needs Atomicity guarantee will be
converted soon.
In addition, Infinispan configurations must be tweaked to make the transaction commits synchronous
(Currently it is inadvertently configured as Async).
With these changes, the Connection manager NodeConnections cache is made TRANSACTIONAL and the
cache syncs are guaranteed to be atomic.

NOTE: There are still few issues with Infinispan cache sync (ISPN-2719)

Change-Id: I97037ac7d6413f5b988496a03fb05bd88bbbce3f
Signed-off-by: Madhu Venugopal <vmadhu@cisco.com>
opendaylight/clustering/services_implementation/pom.xml
opendaylight/clustering/services_implementation/src/main/resources/config/infinispan-config.xml
opendaylight/clustering/test/src/main/java/org/opendaylight/controller/clustering/test/internal/SimpleClient.java
opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/internal/Activator.java
opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/internal/ConnectionManager.java
opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/scheme/AbstractScheme.java

index 1d52c4b..025fbdf 100644 (file)
@@ -39,6 +39,7 @@
         <instructions>
           <Import-Package>
             org.slf4j,
+            !org.apache.logging.log4j.*,
             !bsh*,
             !net.jcip.*,
             !javax.swing,
     <dependency>
       <groupId>org.infinispan</groupId>
       <artifactId>infinispan-core</artifactId>
-      <version>5.2.3.Final</version>
+      <version>5.3.0.Final</version>
     </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
index 16dd579..e917eea 100644 (file)
   <namedCache name="transactional-type">
     <transaction
         transactionManagerLookupClass="org.infinispan.transaction.lookup.JBossStandaloneJTAManagerLookup"
-        syncRollbackPhase="false"
-        syncCommitPhase="false"
+        syncRollbackPhase="true"
+        syncCommitPhase="true"
         cacheStopTimeout="30000"
         use1PcForAutoCommitTransactions="false"
         autoCommit="true"
         lockingMode="OPTIMISTIC"
-        useSynchronization="false"
+        useSynchronization="true"
         transactionMode="TRANSACTIONAL"
         />
   </namedCache>
index f681c35..60be87b 100644 (file)
@@ -9,6 +9,7 @@
 
 package org.opendaylight.controller.clustering.test.internal;
 
+import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
@@ -268,6 +269,24 @@ public class SimpleClient implements CommandProvider {
         ci.println(cacheName + " is no longer being monitored for updates");
     }
 
+    public void _myController(CommandInterpreter ci) {
+        if (this.icluster == null) {
+            ci.println("\nNo Clustering services available");
+            return;
+        }
+        ci.println("This Controller : " +icluster.getMyAddress().getHostAddress());
+    }
+
+    public void _getClusterNodes(CommandInterpreter ci) {
+        if (this.icluster == null) {
+            ci.println("\nNo Clustering services available");
+            return;
+        }
+        for (InetAddress address : icluster.getClusteredControllers()) {
+            ci.println("\t"+address.getHostAddress());
+        }
+    }
+
     public void _listcaches(CommandInterpreter ci) {
         if (this.icluster == null) {
             ci.println("\nNo Clustering services available");
@@ -441,29 +460,6 @@ public class SimpleClient implements CommandProvider {
         }
     }
 
-    @SuppressWarnings("deprecation") //TODO: remove call to deprecated amIStandby
-    public void _getRole(CommandInterpreter ci) {
-        if (this.icluster == null) {
-            ci.println("\nNo Clustering services available");
-            return;
-        }
-        String role = "Active";
-        if (this.icluster.amIStandby()) {
-            role = "Standby";
-        }
-        ci.println("My role is: " + role);
-    }
-
-    @SuppressWarnings("deprecation") //TODO: remove call to deprecated getActiveAddres
-    public void _getActive(CommandInterpreter ci) {
-        if (this.icluster == null) {
-            ci.println("\nNo Clustering services available");
-            return;
-        }
-        ci.println("Current active address is "
-                + this.icluster.getActiveAddress());
-    }
-
     @SuppressWarnings("deprecation") //TODO: remove use of deprecated listenRoleChange
     public void _listenActive(CommandInterpreter ci) {
         if (this.icluster == null) {
@@ -640,16 +636,12 @@ public class SimpleClient implements CommandProvider {
         help.append("\tunlistenActive   - UNListen to Active updates\n");
         help.append("\tdestroy          - Destroy a cache\n");
         help.append("\tcreate           - Create a cache\n");
-        help.append("\tgetRole          - Tell if active or standby\n");
-        help.append("\tgetActive        - Report the IP address of Active\n");
-        help
-                .append("\tputComplex       - Fill a more complex data structure\n");
-        help
-                .append("\tupdateComplex    - Update the value of a more complex data structure\n");
-        help
-                .append("\tgetLogLevel      - Get the loglevel for the logger specified\n");
-        help
-                .append("\tsetLogLevel      - Set the loglevel for the logger specified\n");
+        help.append("\tmyController     - Print this controller's Cluster identifier\n");
+        help.append("\tgetClusterNodes  - Print all the controllers that make this cluster\n");
+        help.append("\tputComplex       - Fill a more complex data structure\n");
+        help.append("\tupdateComplex    - Update the value of a more complex data structure\n");
+        help.append("\tgetLogLevel      - Get the loglevel for the logger specified\n");
+        help.append("\tsetLogLevel      - Set the loglevel for the logger specified\n");
         return help.toString();
     }
 }
index 5ebbfe2..4bee253 100644 (file)
@@ -17,6 +17,7 @@ import java.util.Set;
 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
 import org.opendaylight.controller.clustering.services.ICoordinatorChangeAware;
+import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme;
 import org.opendaylight.controller.connectionmanager.IConnectionManager;
 import org.opendaylight.controller.sal.connection.IConnectionListener;
 import org.opendaylight.controller.sal.connection.IConnectionService;
@@ -77,7 +78,9 @@ public class Activator extends ComponentActivatorAbstractBase {
         if (imp.equals(ConnectionManager.class)) {
             Dictionary<String, Object> props = new Hashtable<String, Object>();
             Set<String> propSet = new HashSet<String>();
-            propSet.add("connectionmanager.nodeconnections");
+            for (ConnectionMgmtScheme scheme:ConnectionMgmtScheme.values()) {
+                propSet.add("connectionmanager."+scheme.name()+".nodeconnections");
+            }
             props.put("cachenames", propSet);
             props.put("scope", "Global");
 
index c4b7d4f..fdba533 100644 (file)
@@ -176,9 +176,6 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene
 
     @Override
     public void coordinatorChanged() {
-        AbstractScheme scheme = schemes.get(activeScheme);
-        if (scheme == null) return;
-        scheme.handleClusterViewChanged();
         notifyClusterViewChanged();
     }
 
@@ -202,8 +199,7 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene
 
     @Override
     public void entryCreated(Node key, String cacheName, boolean originLocal) {
-        AbstractScheme scheme = schemes.get(activeScheme);
-        logger.debug("Created : {} cache : {} existingValue : {}", key, cacheName, scheme.getNodeConnections().get(key));
+        if (originLocal) return;
     }
 
     /*
@@ -277,6 +273,9 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene
                         connectionService.notifyNodeDisconnectFromMaster(node);
                         break;
                     case CLUSTER_VIEW_CHANGED:
+                        AbstractScheme scheme = schemes.get(activeScheme);
+                        if (scheme == null) return;
+                        scheme.handleClusterViewChanged();
                         connectionService.notifyClusterViewChanged();
                         break;
                     default:
index a490916..6b20909 100644 (file)
@@ -10,6 +10,8 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import javax.transaction.SystemException;
+
 import org.opendaylight.controller.clustering.services.CacheConfigException;
 import org.opendaylight.controller.clustering.services.CacheExistException;
 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
@@ -82,6 +84,7 @@ public abstract class AbstractScheme {
             }
         }
 
+        boolean retry = false;
         for (InetAddress c : toRemove) {
             log.debug("Removing Controller : {} from the Connections table", c);
             for (Iterator<Node> nodeIterator = nodeConnections.keySet().iterator();nodeIterator.hasNext();) {
@@ -89,23 +92,35 @@ public abstract class AbstractScheme {
                 Set <InetAddress> oldControllers = nodeConnections.get(node);
                 Set <InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
                 if (newControllers.remove(c)) {
-                    boolean replaced = false;
                     try {
-                        replaced = nodeConnections.replace(node, oldControllers, newControllers);
+                        clusterServices.tbegin();
+                        if (!nodeConnections.replace(node, oldControllers, newControllers)) {
+                            log.debug("Replace Failed for {} ", node.toString());
+                            retry = true;
+                            clusterServices.trollback();
+                            break;
+                        } else {
+                            clusterServices.tcommit();
+                        }
                     } catch (Exception e) {
-                        log.debug("Replace exception : ", e);
-                        replaced = false;
-                    }
-                    if (!replaced) {
+                        log.error("Exception in replacing nodeConnections ", e);
+                        retry = false;
                         try {
-                            Thread.sleep(10);
-                        } catch (InterruptedException e) {
-                        }
-                        handleClusterViewChanged();
+                            clusterServices.trollback();
+                        } catch (Exception e1) {}
+                        break;
                     }
                 }
             }
         }
+        if (retry) {
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+            handleClusterViewChanged();
+        }
     }
 
     public Set<Node> getNodes(InetAddress controller) {
@@ -155,24 +170,30 @@ public abstract class AbstractScheme {
         if (oldControllers != null && oldControllers.contains(controller)) {
             Set<InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
             if (newControllers.remove(controller)) {
-                if (newControllers.size() > 0) {
-                    boolean replaced = false;
-                    try {
-                    replaced = nodeConnections.replace(node, oldControllers, newControllers);
-                    } catch (Exception e) {
-                        log.debug("Replace exception : ", e);
-                        replaced = false;
-                    }
-                    if (!replaced) {
-                        try {
-                            Thread.sleep(10);
-                        } catch (InterruptedException e) {
+                try {
+                    clusterServices.tbegin();
+                    if (newControllers.size() > 0) {
+                        if (!nodeConnections.replace(node, oldControllers, newControllers)) {
+                            clusterServices.trollback();
+                            try {
+                                Thread.sleep(100);
+                            } catch ( InterruptedException e) {}
+                            return removeNodeFromController(node, controller);
                         }
-                        return removeNodeFromController(node, controller);
+                    } else {
+                        nodeConnections.remove(node);
                     }
-                } else {
-                    nodeConnections.remove(node);
+                    clusterServices.tcommit();
+                } catch (Exception e) {
+                    log.error("Excepion in removing Controller from a Node", e);
+                    try {
+                        clusterServices.trollback();
+                    } catch (Exception e1) {
+                        log.error("Error Rolling back the node Connections Changes ", e);
+                    }
+                    return new Status(StatusCode.INTERNALERROR);
                 }
+
             }
         }
         return new Status(StatusCode.SUCCESS);
@@ -206,34 +227,45 @@ public abstract class AbstractScheme {
         }
         newControllers.add(controller);
 
-        if (nodeConnections.putIfAbsent(node, newControllers) != null) {
-            log.debug("PutIfAbsent failed {} to {}", controller.getHostAddress(), node.toString());
-            /*
-             * This check is needed again to take care of the case where some schemes
-             * would not allow nodes to be connected to multiple controllers.
-             * Hence, if putIfAbsent fails, that means, some other controller is competing
-             * with this controller to take hold of a Node.
-             */
-            if (isConnectionAllowed(node)) {
-                log.debug("Trying to replace old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
-                        controller.getHostAddress(), node.toString());
-                if (!nodeConnections.replace(node, oldControllers, newControllers)) {
-                    try {
-                        Thread.sleep(10);
-                    } catch (InterruptedException e) {
+        try {
+            clusterServices.tbegin();
+            if (nodeConnections.putIfAbsent(node, newControllers) != null) {
+                log.debug("PutIfAbsent failed {} to {}", controller.getHostAddress(), node.toString());
+                /*
+                 * This check is needed again to take care of the case where some schemes
+                 * would not allow nodes to be connected to multiple controllers.
+                 * Hence, if putIfAbsent fails, that means, some other controller is competing
+                 * with this controller to take hold of a Node.
+                 */
+                if (isConnectionAllowed(node)) {
+                    if (!nodeConnections.replace(node, oldControllers, newControllers)) {
+                        clusterServices.trollback();
+                        try {
+                            Thread.sleep(100);
+                        } catch ( InterruptedException e) {}
+                        log.debug("Replace failed... old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
+                                controller.getHostAddress(), node.toString());
+                        return putNodeToController(node, controller);
+                    } else {
+                        log.debug("Replace successful old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
+                                controller.getHostAddress(), node.toString());
                     }
-                    log.debug("Replace failed... old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
-                            controller.getHostAddress(), node.toString());
-                    return putNodeToController(node, controller);
                 } else {
-                    log.debug("Replace successful old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
-                            controller.getHostAddress(), node.toString());
+                    clusterServices.trollback();
+                    return new Status(StatusCode.CONFLICT);
                 }
             } else {
-                return new Status(StatusCode.CONFLICT);
+                log.debug("Added {} to {}", controller.getHostAddress(), node.toString());
             }
-        } else {
-            log.debug("Added {} to {}", controller.getHostAddress(), node.toString());
+            clusterServices.tcommit();
+        } catch (Exception e) {
+            log.error("Excepion in adding Controller to a Node", e);
+            try {
+                clusterServices.trollback();
+            } catch (Exception e1) {
+                log.error("Error Rolling back the node Connections Changes ", e);
+            }
+            return new Status(StatusCode.INTERNALERROR);
         }
         return new Status(StatusCode.SUCCESS);
     }
@@ -277,7 +309,7 @@ public abstract class AbstractScheme {
         }
 
         try {
-            clusterServices.createCache("connectionmanager."+name+".nodeconnections", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+            clusterServices.createCache("connectionmanager."+name+".nodeconnections", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
         } catch (CacheExistException cee) {
             log.error("\nCache already exists - destroy and recreate if needed");
         } catch (CacheConfigException cce) {