Upgrading & configuring Infinispan 5.2.3 -> 5.3.0 for better 2-phase commit support. 69/869/1
authorMadhu Venugopal <vmadhu@cisco.com>
Wed, 14 Aug 2013 04:21:45 +0000 (21:21 -0700)
committerMadhu Venugopal <vmadhu@cisco.com>
Wed, 14 Aug 2013 04:21:45 +0000 (21:21 -0700)
This follows the recent change from DummyTransactionManager to JBossTransactionManager.

ConnectionManager depends on the Consistency guarantee honored by the Clustering Services in order to
provide consistent Connection Management Services. Existing Infinispan 5.2.3 carries a few issues
(Refer : ISPN-3366 and ISPN-3357) which makes putIfAbsent not atomic even for TRANSACTIONAL caches.
Also, Since we are using NON-TRANSACTIONAL Caches, Infinispan doesnt provide any Atomicity guarantee
for the Cache Syncs. This is the first commit in the controller repo to convert an existing cache
from NON-TRASACTIONAL to TRANSACTIONAL. Few more caches that needs Atomicity guarantee will be
converted soon.
In addition, Infinispan configurations must be tweaked to make the transaction commits synchronous
(Currently it is inadvertently configured as Async).
With these changes, the Connection manager NodeConnections cache is made TRANSACTIONAL and the
cache syncs are guaranteed to be atomic.

NOTE: There are still few issues with Infinispan cache sync (ISPN-2719)

Change-Id: I97037ac7d6413f5b988496a03fb05bd88bbbce3f
Signed-off-by: Madhu Venugopal <vmadhu@cisco.com>
opendaylight/clustering/services_implementation/pom.xml
opendaylight/clustering/services_implementation/src/main/resources/config/infinispan-config.xml
opendaylight/clustering/test/src/main/java/org/opendaylight/controller/clustering/test/internal/SimpleClient.java
opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/internal/Activator.java
opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/internal/ConnectionManager.java
opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/scheme/AbstractScheme.java

index 1d52c4b0777589bff90468248fefa4e4234ddef7..025fbdfccb503fc5750e8d7d2b2c04364678610d 100644 (file)
@@ -39,6 +39,7 @@
         <instructions>
           <Import-Package>
             org.slf4j,
+            !org.apache.logging.log4j.*,
             !bsh*,
             !net.jcip.*,
             !javax.swing,
     <dependency>
       <groupId>org.infinispan</groupId>
       <artifactId>infinispan-core</artifactId>
-      <version>5.2.3.Final</version>
+      <version>5.3.0.Final</version>
     </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
index 16dd579abb83962c63f7c3efb3c21d856b1f5409..e917eea825bf7049e620ce8013bf0ce497bc095c 100644 (file)
   <namedCache name="transactional-type">
     <transaction
         transactionManagerLookupClass="org.infinispan.transaction.lookup.JBossStandaloneJTAManagerLookup"
-        syncRollbackPhase="false"
-        syncCommitPhase="false"
+        syncRollbackPhase="true"
+        syncCommitPhase="true"
         cacheStopTimeout="30000"
         use1PcForAutoCommitTransactions="false"
         autoCommit="true"
         lockingMode="OPTIMISTIC"
-        useSynchronization="false"
+        useSynchronization="true"
         transactionMode="TRANSACTIONAL"
         />
   </namedCache>
index f681c35c8c9ea376a3d0399269d798479a296689..60be87b86d4add14a4feaabae6a211c2d34f981b 100644 (file)
@@ -9,6 +9,7 @@
 
 package org.opendaylight.controller.clustering.test.internal;
 
+import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
@@ -268,6 +269,24 @@ public class SimpleClient implements CommandProvider {
         ci.println(cacheName + " is no longer being monitored for updates");
     }
 
+    public void _myController(CommandInterpreter ci) {
+        if (this.icluster == null) {
+            ci.println("\nNo Clustering services available");
+            return;
+        }
+        ci.println("This Controller : " +icluster.getMyAddress().getHostAddress());
+    }
+
+    public void _getClusterNodes(CommandInterpreter ci) {
+        if (this.icluster == null) {
+            ci.println("\nNo Clustering services available");
+            return;
+        }
+        for (InetAddress address : icluster.getClusteredControllers()) {
+            ci.println("\t"+address.getHostAddress());
+        }
+    }
+
     public void _listcaches(CommandInterpreter ci) {
         if (this.icluster == null) {
             ci.println("\nNo Clustering services available");
@@ -441,29 +460,6 @@ public class SimpleClient implements CommandProvider {
         }
     }
 
-    @SuppressWarnings("deprecation") //TODO: remove call to deprecated amIStandby
-    public void _getRole(CommandInterpreter ci) {
-        if (this.icluster == null) {
-            ci.println("\nNo Clustering services available");
-            return;
-        }
-        String role = "Active";
-        if (this.icluster.amIStandby()) {
-            role = "Standby";
-        }
-        ci.println("My role is: " + role);
-    }
-
-    @SuppressWarnings("deprecation") //TODO: remove call to deprecated getActiveAddres
-    public void _getActive(CommandInterpreter ci) {
-        if (this.icluster == null) {
-            ci.println("\nNo Clustering services available");
-            return;
-        }
-        ci.println("Current active address is "
-                + this.icluster.getActiveAddress());
-    }
-
     @SuppressWarnings("deprecation") //TODO: remove use of deprecated listenRoleChange
     public void _listenActive(CommandInterpreter ci) {
         if (this.icluster == null) {
@@ -640,16 +636,12 @@ public class SimpleClient implements CommandProvider {
         help.append("\tunlistenActive   - UNListen to Active updates\n");
         help.append("\tdestroy          - Destroy a cache\n");
         help.append("\tcreate           - Create a cache\n");
-        help.append("\tgetRole          - Tell if active or standby\n");
-        help.append("\tgetActive        - Report the IP address of Active\n");
-        help
-                .append("\tputComplex       - Fill a more complex data structure\n");
-        help
-                .append("\tupdateComplex    - Update the value of a more complex data structure\n");
-        help
-                .append("\tgetLogLevel      - Get the loglevel for the logger specified\n");
-        help
-                .append("\tsetLogLevel      - Set the loglevel for the logger specified\n");
+        help.append("\tmyController     - Print this controller's Cluster identifier\n");
+        help.append("\tgetClusterNodes  - Print all the controllers that make this cluster\n");
+        help.append("\tputComplex       - Fill a more complex data structure\n");
+        help.append("\tupdateComplex    - Update the value of a more complex data structure\n");
+        help.append("\tgetLogLevel      - Get the loglevel for the logger specified\n");
+        help.append("\tsetLogLevel      - Set the loglevel for the logger specified\n");
         return help.toString();
     }
 }
index 5ebbfe2ea8cfb26dd6523b54acc147709b56bf2e..4bee2537656e3ac1ddfda162cd20735939a50a58 100644 (file)
@@ -17,6 +17,7 @@ import java.util.Set;
 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
 import org.opendaylight.controller.clustering.services.ICoordinatorChangeAware;
+import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme;
 import org.opendaylight.controller.connectionmanager.IConnectionManager;
 import org.opendaylight.controller.sal.connection.IConnectionListener;
 import org.opendaylight.controller.sal.connection.IConnectionService;
@@ -77,7 +78,9 @@ public class Activator extends ComponentActivatorAbstractBase {
         if (imp.equals(ConnectionManager.class)) {
             Dictionary<String, Object> props = new Hashtable<String, Object>();
             Set<String> propSet = new HashSet<String>();
-            propSet.add("connectionmanager.nodeconnections");
+            for (ConnectionMgmtScheme scheme:ConnectionMgmtScheme.values()) {
+                propSet.add("connectionmanager."+scheme.name()+".nodeconnections");
+            }
             props.put("cachenames", propSet);
             props.put("scope", "Global");
 
index c4b7d4fe6cf52b9b517d75c53d596778700528d8..fdba533b5bfa827bf2ccd79bf2f061785d06d597 100644 (file)
@@ -176,9 +176,6 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene
 
     @Override
     public void coordinatorChanged() {
-        AbstractScheme scheme = schemes.get(activeScheme);
-        if (scheme == null) return;
-        scheme.handleClusterViewChanged();
         notifyClusterViewChanged();
     }
 
@@ -202,8 +199,7 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene
 
     @Override
     public void entryCreated(Node key, String cacheName, boolean originLocal) {
-        AbstractScheme scheme = schemes.get(activeScheme);
-        logger.debug("Created : {} cache : {} existingValue : {}", key, cacheName, scheme.getNodeConnections().get(key));
+        if (originLocal) return;
     }
 
     /*
@@ -277,6 +273,9 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene
                         connectionService.notifyNodeDisconnectFromMaster(node);
                         break;
                     case CLUSTER_VIEW_CHANGED:
+                        AbstractScheme scheme = schemes.get(activeScheme);
+                        if (scheme == null) return;
+                        scheme.handleClusterViewChanged();
                         connectionService.notifyClusterViewChanged();
                         break;
                     default:
index a490916da2f6bb65a8e3c2ba288b71f356320386..6b20909a877ce81de268c585c5e309be9fe7e2cf 100644 (file)
@@ -10,6 +10,8 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import javax.transaction.SystemException;
+
 import org.opendaylight.controller.clustering.services.CacheConfigException;
 import org.opendaylight.controller.clustering.services.CacheExistException;
 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
@@ -82,6 +84,7 @@ public abstract class AbstractScheme {
             }
         }
 
+        boolean retry = false;
         for (InetAddress c : toRemove) {
             log.debug("Removing Controller : {} from the Connections table", c);
             for (Iterator<Node> nodeIterator = nodeConnections.keySet().iterator();nodeIterator.hasNext();) {
@@ -89,23 +92,35 @@ public abstract class AbstractScheme {
                 Set <InetAddress> oldControllers = nodeConnections.get(node);
                 Set <InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
                 if (newControllers.remove(c)) {
-                    boolean replaced = false;
                     try {
-                        replaced = nodeConnections.replace(node, oldControllers, newControllers);
+                        clusterServices.tbegin();
+                        if (!nodeConnections.replace(node, oldControllers, newControllers)) {
+                            log.debug("Replace Failed for {} ", node.toString());
+                            retry = true;
+                            clusterServices.trollback();
+                            break;
+                        } else {
+                            clusterServices.tcommit();
+                        }
                     } catch (Exception e) {
-                        log.debug("Replace exception : ", e);
-                        replaced = false;
-                    }
-                    if (!replaced) {
+                        log.error("Exception in replacing nodeConnections ", e);
+                        retry = false;
                         try {
-                            Thread.sleep(10);
-                        } catch (InterruptedException e) {
-                        }
-                        handleClusterViewChanged();
+                            clusterServices.trollback();
+                        } catch (Exception e1) {}
+                        break;
                     }
                 }
             }
         }
+        if (retry) {
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+            handleClusterViewChanged();
+        }
     }
 
     public Set<Node> getNodes(InetAddress controller) {
@@ -155,24 +170,30 @@ public abstract class AbstractScheme {
         if (oldControllers != null && oldControllers.contains(controller)) {
             Set<InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
             if (newControllers.remove(controller)) {
-                if (newControllers.size() > 0) {
-                    boolean replaced = false;
-                    try {
-                    replaced = nodeConnections.replace(node, oldControllers, newControllers);
-                    } catch (Exception e) {
-                        log.debug("Replace exception : ", e);
-                        replaced = false;
-                    }
-                    if (!replaced) {
-                        try {
-                            Thread.sleep(10);
-                        } catch (InterruptedException e) {
+                try {
+                    clusterServices.tbegin();
+                    if (newControllers.size() > 0) {
+                        if (!nodeConnections.replace(node, oldControllers, newControllers)) {
+                            clusterServices.trollback();
+                            try {
+                                Thread.sleep(100);
+                            } catch ( InterruptedException e) {}
+                            return removeNodeFromController(node, controller);
                         }
-                        return removeNodeFromController(node, controller);
+                    } else {
+                        nodeConnections.remove(node);
                     }
-                } else {
-                    nodeConnections.remove(node);
+                    clusterServices.tcommit();
+                } catch (Exception e) {
+                    log.error("Excepion in removing Controller from a Node", e);
+                    try {
+                        clusterServices.trollback();
+                    } catch (Exception e1) {
+                        log.error("Error Rolling back the node Connections Changes ", e);
+                    }
+                    return new Status(StatusCode.INTERNALERROR);
                 }
+
             }
         }
         return new Status(StatusCode.SUCCESS);
@@ -206,34 +227,45 @@ public abstract class AbstractScheme {
         }
         newControllers.add(controller);
 
-        if (nodeConnections.putIfAbsent(node, newControllers) != null) {
-            log.debug("PutIfAbsent failed {} to {}", controller.getHostAddress(), node.toString());
-            /*
-             * This check is needed again to take care of the case where some schemes
-             * would not allow nodes to be connected to multiple controllers.
-             * Hence, if putIfAbsent fails, that means, some other controller is competing
-             * with this controller to take hold of a Node.
-             */
-            if (isConnectionAllowed(node)) {
-                log.debug("Trying to replace old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
-                        controller.getHostAddress(), node.toString());
-                if (!nodeConnections.replace(node, oldControllers, newControllers)) {
-                    try {
-                        Thread.sleep(10);
-                    } catch (InterruptedException e) {
+        try {
+            clusterServices.tbegin();
+            if (nodeConnections.putIfAbsent(node, newControllers) != null) {
+                log.debug("PutIfAbsent failed {} to {}", controller.getHostAddress(), node.toString());
+                /*
+                 * This check is needed again to take care of the case where some schemes
+                 * would not allow nodes to be connected to multiple controllers.
+                 * Hence, if putIfAbsent fails, that means, some other controller is competing
+                 * with this controller to take hold of a Node.
+                 */
+                if (isConnectionAllowed(node)) {
+                    if (!nodeConnections.replace(node, oldControllers, newControllers)) {
+                        clusterServices.trollback();
+                        try {
+                            Thread.sleep(100);
+                        } catch ( InterruptedException e) {}
+                        log.debug("Replace failed... old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
+                                controller.getHostAddress(), node.toString());
+                        return putNodeToController(node, controller);
+                    } else {
+                        log.debug("Replace successful old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
+                                controller.getHostAddress(), node.toString());
                     }
-                    log.debug("Replace failed... old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
-                            controller.getHostAddress(), node.toString());
-                    return putNodeToController(node, controller);
                 } else {
-                    log.debug("Replace successful old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
-                            controller.getHostAddress(), node.toString());
+                    clusterServices.trollback();
+                    return new Status(StatusCode.CONFLICT);
                 }
             } else {
-                return new Status(StatusCode.CONFLICT);
+                log.debug("Added {} to {}", controller.getHostAddress(), node.toString());
             }
-        } else {
-            log.debug("Added {} to {}", controller.getHostAddress(), node.toString());
+            clusterServices.tcommit();
+        } catch (Exception e) {
+            log.error("Excepion in adding Controller to a Node", e);
+            try {
+                clusterServices.trollback();
+            } catch (Exception e1) {
+                log.error("Error Rolling back the node Connections Changes ", e);
+            }
+            return new Status(StatusCode.INTERNALERROR);
         }
         return new Status(StatusCode.SUCCESS);
     }
@@ -277,7 +309,7 @@ public abstract class AbstractScheme {
         }
 
         try {
-            clusterServices.createCache("connectionmanager."+name+".nodeconnections", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+            clusterServices.createCache("connectionmanager."+name+".nodeconnections", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
         } catch (CacheExistException cee) {
             log.error("\nCache already exists - destroy and recreate if needed");
         } catch (CacheConfigException cce) {