X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fconnectionmanager%2Fimplementation%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fconnectionmanager%2Fscheme%2FAbstractScheme.java;h=78f274c717585993d13a3deeb86812a32ad4ad5a;hb=b3be9c84aabb7df22d633e9fe6c36d3a92a15722;hp=a490916da2f6bb65a8e3c2ba288b71f356320386;hpb=59fdd1aa84189a4672326f225235f747f6eb9a66;p=controller.git diff --git a/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/scheme/AbstractScheme.java b/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/scheme/AbstractScheme.java index a490916da2..78f274c717 100644 --- a/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/scheme/AbstractScheme.java +++ b/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/scheme/AbstractScheme.java @@ -14,6 +14,7 @@ import org.opendaylight.controller.clustering.services.CacheConfigException; import org.opendaylight.controller.clustering.services.CacheExistException; import org.opendaylight.controller.clustering.services.IClusterGlobalServices; import org.opendaylight.controller.clustering.services.IClusterServices; +import org.opendaylight.controller.connectionmanager.ConnectionLocality; import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme; import org.opendaylight.controller.sal.core.Node; import org.opendaylight.controller.sal.utils.Status; @@ -73,6 +74,7 @@ public abstract class AbstractScheme { @SuppressWarnings("deprecation") public void handleClusterViewChanged() { + log.debug("Handling Cluster View changed notification"); List controllers = clusterServices.getClusteredControllers(); ConcurrentMap > controllerNodesMap = getControllerToNodesMap(); List toRemove = new ArrayList(); @@ -82,6 +84,7 @@ public abstract class AbstractScheme { } } + boolean retry = false; for (InetAddress c : toRemove) { log.debug("Removing Controller : {} from the Connections table", c); for (Iterator nodeIterator = nodeConnections.keySet().iterator();nodeIterator.hasNext();) { @@ -89,23 +92,33 @@ public abstract class AbstractScheme { Set oldControllers = nodeConnections.get(node); Set newControllers = new HashSet(oldControllers); if (newControllers.remove(c)) { - boolean replaced = false; try { - replaced = nodeConnections.replace(node, oldControllers, newControllers); + clusterServices.tbegin(); + if (!nodeConnections.replace(node, oldControllers, newControllers)) { + log.debug("Replace Failed for {} ", node.toString()); + retry = true; + clusterServices.trollback(); + break; + } else { + clusterServices.tcommit(); + } } catch (Exception e) { - log.debug("Replace exception : ", e); - replaced = false; - } - if (!replaced) { + log.debug("Exception in replacing nodeConnections ", e); + retry = true; try { - Thread.sleep(10); - } catch (InterruptedException e) { - } - handleClusterViewChanged(); + clusterServices.trollback(); + } catch (Exception e1) {} + break; } } } } + if (retry) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) {} + handleClusterViewChanged(); + } } public Set getNodes(InetAddress controller) { @@ -114,7 +127,6 @@ public abstract class AbstractScheme { return controllerNodesMap.get(controller); } - @SuppressWarnings("deprecation") public Set getNodes() { return getNodes(clusterServices.getMyAddress()); } @@ -128,7 +140,6 @@ public abstract class AbstractScheme { return nodeConnections; } - @SuppressWarnings("deprecation") public boolean isLocal(Node node) { if (nodeConnections == null) return false; InetAddress myController = clusterServices.getMyAddress(); @@ -136,7 +147,15 @@ public abstract class AbstractScheme { return (controllers != null && controllers.contains(myController)); } - @SuppressWarnings("deprecation") + public ConnectionLocality getLocalityStatus(Node node) { + if (nodeConnections == null) return ConnectionLocality.NOT_CONNECTED; + Set controllers = nodeConnections.get(node); + if (controllers == null || controllers.size() == 0) return ConnectionLocality.NOT_CONNECTED; + InetAddress myController = clusterServices.getMyAddress(); + return controllers.contains(myController) ? ConnectionLocality.LOCAL: + ConnectionLocality.NOT_LOCAL; + } + public Status removeNode (Node node) { return removeNodeFromController(node, clusterServices.getMyAddress()); } @@ -155,24 +174,30 @@ public abstract class AbstractScheme { if (oldControllers != null && oldControllers.contains(controller)) { Set newControllers = new HashSet(oldControllers); if (newControllers.remove(controller)) { - if (newControllers.size() > 0) { - boolean replaced = false; - try { - replaced = nodeConnections.replace(node, oldControllers, newControllers); - } catch (Exception e) { - log.debug("Replace exception : ", e); - replaced = false; - } - if (!replaced) { - try { - Thread.sleep(10); - } catch (InterruptedException e) { + try { + clusterServices.tbegin(); + if (newControllers.size() > 0) { + if (!nodeConnections.replace(node, oldControllers, newControllers)) { + clusterServices.trollback(); + try { + Thread.sleep(100); + } catch ( InterruptedException e) {} + return removeNodeFromController(node, controller); } - return removeNodeFromController(node, controller); + } else { + nodeConnections.remove(node); } - } else { - nodeConnections.remove(node); + clusterServices.tcommit(); + } catch (Exception e) { + log.error("Excepion in removing Controller from a Node", e); + try { + clusterServices.trollback(); + } catch (Exception e1) { + log.error("Error Rolling back the node Connections Changes ", e); + } + return new Status(StatusCode.INTERNALERROR); } + } } return new Status(StatusCode.SUCCESS); @@ -206,34 +231,44 @@ public abstract class AbstractScheme { } newControllers.add(controller); - if (nodeConnections.putIfAbsent(node, newControllers) != null) { - log.debug("PutIfAbsent failed {} to {}", controller.getHostAddress(), node.toString()); - /* - * This check is needed again to take care of the case where some schemes - * would not allow nodes to be connected to multiple controllers. - * Hence, if putIfAbsent fails, that means, some other controller is competing - * with this controller to take hold of a Node. - */ - if (isConnectionAllowed(node)) { - log.debug("Trying to replace old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(), - controller.getHostAddress(), node.toString()); - if (!nodeConnections.replace(node, oldControllers, newControllers)) { - try { - Thread.sleep(10); - } catch (InterruptedException e) { + try { + clusterServices.tbegin(); + if (nodeConnections.putIfAbsent(node, newControllers) != null) { + log.debug("PutIfAbsent failed {} to {}", controller.getHostAddress(), node.toString()); + /* + * This check is needed again to take care of the case where some schemes + * would not allow nodes to be connected to multiple controllers. + * Hence, if putIfAbsent fails, that means, some other controller is competing + * with this controller to take hold of a Node. + */ + if (isConnectionAllowed(node)) { + if (oldControllers == null || !nodeConnections.replace(node, oldControllers, newControllers)) { + clusterServices.trollback(); + try { + Thread.sleep(100); + } catch ( InterruptedException e) {} + log.debug("Retrying ... {} with {}", controller.getHostAddress(), node.toString()); + return putNodeToController(node, controller); + } else { + log.debug("Replace successful old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(), + controller.getHostAddress(), node.toString()); } - log.debug("Replace failed... old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(), - controller.getHostAddress(), node.toString()); - return putNodeToController(node, controller); } else { - log.debug("Replace successful old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(), - controller.getHostAddress(), node.toString()); + clusterServices.trollback(); + return new Status(StatusCode.CONFLICT); } } else { - return new Status(StatusCode.CONFLICT); + log.debug("Added {} to {}", controller.getHostAddress(), node.toString()); } - } else { - log.debug("Added {} to {}", controller.getHostAddress(), node.toString()); + clusterServices.tcommit(); + } catch (Exception e) { + log.error("Excepion in adding Controller to a Node", e); + try { + clusterServices.trollback(); + } catch (Exception e1) { + log.error("Error Rolling back the node Connections Changes ", e); + } + return new Status(StatusCode.INTERNALERROR); } return new Status(StatusCode.SUCCESS); } @@ -277,13 +312,13 @@ public abstract class AbstractScheme { } try { - clusterServices.createCache("connectionmanager."+name+".nodeconnections", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + clusterServices.createCache("connectionmanager."+name+".nodeconnections", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); } catch (CacheExistException cee) { log.error("\nCache already exists - destroy and recreate if needed"); } catch (CacheConfigException cce) { log.error("\nCache configuration invalid - check cache mode"); } catch (Exception e) { - e.printStackTrace(); + log.error("An error occured",e); } } }