Connection Manager infrastructure supporting the Clustering Active-Active requirement.
[controller.git] / opendaylight / connectionmanager / implementation / src / main / java / org / opendaylight / controller / connectionmanager / scheme / AbstractScheme.java
diff --git a/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/scheme/AbstractScheme.java b/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/scheme/AbstractScheme.java
new file mode 100644 (file)
index 0000000..a490916
--- /dev/null
@@ -0,0 +1,289 @@
+package org.opendaylight.controller.connectionmanager.scheme;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.opendaylight.controller.clustering.services.CacheConfigException;
+import org.opendaylight.controller.clustering.services.CacheExistException;
+import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
+import org.opendaylight.controller.clustering.services.IClusterServices;
+import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme;
+import org.opendaylight.controller.sal.core.Node;
+import org.opendaylight.controller.sal.utils.Status;
+import org.opendaylight.controller.sal.utils.StatusCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractScheme {
+    private static final Logger log = LoggerFactory.getLogger(AbstractScheme.class);
+    protected IClusterGlobalServices clusterServices = null;
+    /*
+     * A more natural Map data-structure is to have a Key=Controller IP-address with value = a set of Nodes.
+     * But, such a data-structure results in some complex event processing during the Cluster operations
+     * to sync up the Connection states.
+     *
+     * A data-structure with Node as the key and set of controllers provides a good balance
+     * between the needed functionality and simpler clustering implementation for Connection Manager.
+     */
+    protected ConcurrentMap <Node, Set<InetAddress>> nodeConnections;
+    protected abstract boolean isConnectionAllowedInternal(Node node);
+    private String name;
+
+    protected AbstractScheme(IClusterGlobalServices clusterServices, ConnectionMgmtScheme type) {
+        this.clusterServices = clusterServices;
+        if (type != null) name = type.name();
+        else name = "UNKNOWN";
+        if (clusterServices != null) {
+            allocateCaches();
+            retrieveCaches();
+        }
+    }
+
+    protected ConcurrentMap <InetAddress, Set<Node>> getControllerToNodesMap() {
+        ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = new ConcurrentHashMap <InetAddress, Set<Node>>();
+        for (Node node : nodeConnections.keySet()) {
+            Set<InetAddress> controllers = nodeConnections.get(node);
+            if (controllers == null) continue;
+            for (InetAddress controller : controllers) {
+                Set<Node> nodes = controllerNodesMap.get(controller);
+                if (nodes == null) {
+                    nodes = new HashSet<Node>();
+                }
+                nodes.add(node);
+                controllerNodesMap.put(controller, nodes);
+            }
+        }
+        return controllerNodesMap;
+    }
+
+    public boolean isConnectionAllowed (Node node) {
+        if (clusterServices == null || nodeConnections == null) {
+            return false;
+        }
+
+        return isConnectionAllowedInternal(node);
+    }
+
+    @SuppressWarnings("deprecation")
+    public void handleClusterViewChanged() {
+        List<InetAddress> controllers = clusterServices.getClusteredControllers();
+        ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
+        List<InetAddress> toRemove = new ArrayList<InetAddress>();
+        for (InetAddress c : controllerNodesMap.keySet()) {
+            if (!controllers.contains(c)) {
+                toRemove.add(c);
+            }
+        }
+
+        for (InetAddress c : toRemove) {
+            log.debug("Removing Controller : {} from the Connections table", c);
+            for (Iterator<Node> nodeIterator = nodeConnections.keySet().iterator();nodeIterator.hasNext();) {
+                Node node = nodeIterator.next();
+                Set <InetAddress> oldControllers = nodeConnections.get(node);
+                Set <InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
+                if (newControllers.remove(c)) {
+                    boolean replaced = false;
+                    try {
+                        replaced = nodeConnections.replace(node, oldControllers, newControllers);
+                    } catch (Exception e) {
+                        log.debug("Replace exception : ", e);
+                        replaced = false;
+                    }
+                    if (!replaced) {
+                        try {
+                            Thread.sleep(10);
+                        } catch (InterruptedException e) {
+                        }
+                        handleClusterViewChanged();
+                    }
+                }
+            }
+        }
+    }
+
+    public Set<Node> getNodes(InetAddress controller) {
+        if (nodeConnections == null) return null;
+        ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
+        return controllerNodesMap.get(controller);
+    }
+
+    @SuppressWarnings("deprecation")
+    public Set<Node> getNodes() {
+        return getNodes(clusterServices.getMyAddress());
+    }
+
+    public Set<InetAddress> getControllers(Node node) {
+        if (nodeConnections != null) return nodeConnections.get(node);
+        return null;
+    }
+
+    public ConcurrentMap<Node, Set<InetAddress>> getNodeConnections() {
+        return nodeConnections;
+    }
+
+    @SuppressWarnings("deprecation")
+    public boolean isLocal(Node node) {
+        if (nodeConnections == null) return false;
+        InetAddress myController = clusterServices.getMyAddress();
+        Set<InetAddress> controllers = nodeConnections.get(node);
+        return (controllers != null && controllers.contains(myController));
+    }
+
+    @SuppressWarnings("deprecation")
+    public Status removeNode (Node node) {
+        return removeNodeFromController(node, clusterServices.getMyAddress());
+    }
+
+    protected Status removeNodeFromController (Node node, InetAddress controller) {
+        if (node == null || controller == null) {
+            return new Status(StatusCode.BADREQUEST);
+        }
+
+        if (clusterServices == null || nodeConnections == null) {
+            return new Status(StatusCode.SUCCESS);
+        }
+
+        Set<InetAddress> oldControllers = nodeConnections.get(node);
+
+        if (oldControllers != null && oldControllers.contains(controller)) {
+            Set<InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
+            if (newControllers.remove(controller)) {
+                if (newControllers.size() > 0) {
+                    boolean replaced = false;
+                    try {
+                    replaced = nodeConnections.replace(node, oldControllers, newControllers);
+                    } catch (Exception e) {
+                        log.debug("Replace exception : ", e);
+                        replaced = false;
+                    }
+                    if (!replaced) {
+                        try {
+                            Thread.sleep(10);
+                        } catch (InterruptedException e) {
+                        }
+                        return removeNodeFromController(node, controller);
+                    }
+                } else {
+                    nodeConnections.remove(node);
+                }
+            }
+        }
+        return new Status(StatusCode.SUCCESS);
+
+    }
+
+    /*
+     * A few race-conditions were seen with the Clustered caches in putIfAbsent and replace
+     * functions. Leaving a few debug logs behind to assist in debugging if strange things happen.
+     */
+    private Status putNodeToController (Node node, InetAddress controller) {
+        if (clusterServices == null || nodeConnections == null) {
+            return new Status(StatusCode.SUCCESS);
+        }
+        log.debug("Trying to Put {} to {}", controller.getHostAddress(), node.toString());
+
+        Set <InetAddress> oldControllers = nodeConnections.get(node);
+        Set <InetAddress> newControllers = null;
+        if (oldControllers == null) {
+            newControllers = new HashSet<InetAddress>();
+        } else {
+            if (oldControllers.size() > 0 && !isConnectionAllowed(node)) {
+                /*
+                 * In certain race conditions, the putIfAbsent fails to be atomic.
+                 * This check is added to identify such cases and report an warning
+                 * for debugging.
+                 */
+                log.warn("States Exists for {} : {}", node, oldControllers.toString());
+            }
+            newControllers = new HashSet<InetAddress>(oldControllers);
+        }
+        newControllers.add(controller);
+
+        if (nodeConnections.putIfAbsent(node, newControllers) != null) {
+            log.debug("PutIfAbsent failed {} to {}", controller.getHostAddress(), node.toString());
+            /*
+             * This check is needed again to take care of the case where some schemes
+             * would not allow nodes to be connected to multiple controllers.
+             * Hence, if putIfAbsent fails, that means, some other controller is competing
+             * with this controller to take hold of a Node.
+             */
+            if (isConnectionAllowed(node)) {
+                log.debug("Trying to replace old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
+                        controller.getHostAddress(), node.toString());
+                if (!nodeConnections.replace(node, oldControllers, newControllers)) {
+                    try {
+                        Thread.sleep(10);
+                    } catch (InterruptedException e) {
+                    }
+                    log.debug("Replace failed... old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
+                            controller.getHostAddress(), node.toString());
+                    return putNodeToController(node, controller);
+                } else {
+                    log.debug("Replace successful old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
+                            controller.getHostAddress(), node.toString());
+                }
+            } else {
+                return new Status(StatusCode.CONFLICT);
+            }
+        } else {
+            log.debug("Added {} to {}", controller.getHostAddress(), node.toString());
+        }
+        return new Status(StatusCode.SUCCESS);
+    }
+
+    public Status addNode (Node node, InetAddress controller) {
+        if (node == null || controller == null) {
+            return new Status(StatusCode.BADREQUEST);
+        }
+        if (isLocal(node)) return new Status(StatusCode.SUCCESS);
+        if (isConnectionAllowed(node)) {
+            return putNodeToController(node, controller);
+        } else {
+            return new Status(StatusCode.NOTALLOWED);
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    public Status addNode (Node node) {
+        return addNode(node, clusterServices.getMyAddress());
+    }
+
+    @SuppressWarnings({ "unchecked", "deprecation" })
+    private void retrieveCaches() {
+        if (this.clusterServices == null) {
+            log.error("un-initialized clusterServices, can't retrieve cache");
+            return;
+        }
+
+        nodeConnections = (ConcurrentMap<Node, Set<InetAddress>>) clusterServices.getCache("connectionmanager."+name+".nodeconnections");
+
+        if (nodeConnections == null) {
+            log.error("\nFailed to get caches");
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    private void allocateCaches() {
+        if (this.clusterServices == null) {
+            log.error("un-initialized clusterServices, can't create cache");
+            return;
+        }
+
+        try {
+            clusterServices.createCache("connectionmanager."+name+".nodeconnections", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+        } catch (CacheExistException cee) {
+            log.error("\nCache already exists - destroy and recreate if needed");
+        } catch (CacheConfigException cce) {
+            log.error("\nCache configuration invalid - check cache mode");
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}