import java.net.InetAddress;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
import org.opendaylight.controller.clustering.services.IClusterServices;
import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme;
+import org.opendaylight.controller.sal.connection.ConnectionLocality;
import org.opendaylight.controller.sal.core.Node;
import org.opendaylight.controller.sal.utils.Status;
import org.opendaylight.controller.sal.utils.StatusCode;
*/
protected ConcurrentMap <Node, Set<InetAddress>> nodeConnections;
protected abstract boolean isConnectionAllowedInternal(Node node);
- private String name;
+ private final String name;
+ private final String nodeConnectionsCacheName;
protected AbstractScheme(IClusterGlobalServices clusterServices, ConnectionMgmtScheme type) {
this.clusterServices = clusterServices;
- if (type != null) name = type.name();
- else name = "UNKNOWN";
+ name = (type != null ? type.name() : "UNKNOWN");
+ nodeConnectionsCacheName = "connectionmanager."+name+".nodeconnections";
if (clusterServices != null) {
allocateCaches();
retrieveCaches();
+ } else {
+ log.error("Couldn't retrieve caches for scheme %s. Clustering service unavailable", name);
}
}
return isConnectionAllowedInternal(node);
}
- @SuppressWarnings("deprecation")
public void handleClusterViewChanged() {
+ log.debug("Handling Cluster View changed notification");
List<InetAddress> controllers = clusterServices.getClusteredControllers();
ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
List<InetAddress> toRemove = new ArrayList<InetAddress>();
}
}
+ boolean retry = false;
for (InetAddress c : toRemove) {
log.debug("Removing Controller : {} from the Connections table", c);
for (Iterator<Node> nodeIterator = nodeConnections.keySet().iterator();nodeIterator.hasNext();) {
Set <InetAddress> oldControllers = nodeConnections.get(node);
Set <InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
if (newControllers.remove(c)) {
- boolean replaced = false;
try {
- replaced = nodeConnections.replace(node, oldControllers, newControllers);
+ clusterServices.tbegin();
+ if (!nodeConnections.replace(node, oldControllers, newControllers)) {
+ log.debug("Replace Failed for {} ", node.toString());
+ retry = true;
+ clusterServices.trollback();
+ break;
+ } else {
+ clusterServices.tcommit();
+ }
} catch (Exception e) {
- log.debug("Replace exception : ", e);
- replaced = false;
- }
- if (!replaced) {
+ log.debug("Exception in replacing nodeConnections ", e);
+ retry = true;
try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- }
- handleClusterViewChanged();
+ clusterServices.trollback();
+ } catch (Exception e1) {}
+ break;
}
}
}
}
+ if (retry) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {}
+ handleClusterViewChanged();
+ }
}
public Set<Node> getNodes(InetAddress controller) {
- if (nodeConnections == null) return null;
ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
return controllerNodesMap.get(controller);
}
- @SuppressWarnings("deprecation")
public Set<Node> getNodes() {
return getNodes(clusterServices.getMyAddress());
}
public Set<InetAddress> getControllers(Node node) {
if (nodeConnections != null) return nodeConnections.get(node);
- return null;
+ return Collections.emptySet();
}
public ConcurrentMap<Node, Set<InetAddress>> getNodeConnections() {
return nodeConnections;
}
- @SuppressWarnings("deprecation")
public boolean isLocal(Node node) {
if (nodeConnections == null) return false;
InetAddress myController = clusterServices.getMyAddress();
return (controllers != null && controllers.contains(myController));
}
- @SuppressWarnings("deprecation")
+ public ConnectionLocality getLocalityStatus(Node node) {
+ if (nodeConnections == null) return ConnectionLocality.NOT_CONNECTED;
+ Set<InetAddress> controllers = nodeConnections.get(node);
+ if (controllers == null || controllers.size() == 0) return ConnectionLocality.NOT_CONNECTED;
+ InetAddress myController = clusterServices.getMyAddress();
+ return controllers.contains(myController) ? ConnectionLocality.LOCAL:
+ ConnectionLocality.NOT_LOCAL;
+ }
+
public Status removeNode (Node node) {
return removeNodeFromController(node, clusterServices.getMyAddress());
}
protected Status removeNodeFromController (Node node, InetAddress controller) {
if (node == null || controller == null) {
- return new Status(StatusCode.BADREQUEST);
+ return new Status(StatusCode.BADREQUEST, "Invalid Node or Controller Address Specified.");
}
if (clusterServices == null || nodeConnections == null) {
if (oldControllers != null && oldControllers.contains(controller)) {
Set<InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
if (newControllers.remove(controller)) {
- if (newControllers.size() > 0) {
- boolean replaced = false;
- try {
- replaced = nodeConnections.replace(node, oldControllers, newControllers);
- } catch (Exception e) {
- log.debug("Replace exception : ", e);
- replaced = false;
- }
- if (!replaced) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
+ try {
+ clusterServices.tbegin();
+ if (newControllers.size() > 0) {
+ if (!nodeConnections.replace(node, oldControllers, newControllers)) {
+ clusterServices.trollback();
+ try {
+ Thread.sleep(100);
+ } catch ( InterruptedException e) {}
+ return removeNodeFromController(node, controller);
}
- return removeNodeFromController(node, controller);
+ } else {
+ nodeConnections.remove(node);
}
- } else {
- nodeConnections.remove(node);
+ clusterServices.tcommit();
+ } catch (Exception e) {
+ log.error("Exception in removing Controller from a Node", e);
+ try {
+ clusterServices.trollback();
+ } catch (Exception e1) {
+ log.error("Error Rolling back the node Connections Changes ", e);
+ }
+ return new Status(StatusCode.INTERNALERROR);
}
+
}
}
return new Status(StatusCode.SUCCESS);
*/
private Status putNodeToController (Node node, InetAddress controller) {
if (clusterServices == null || nodeConnections == null) {
- return new Status(StatusCode.SUCCESS);
+ return new Status(StatusCode.INTERNALERROR, "Cluster service unavailable, or node connections info missing.");
}
log.debug("Trying to Put {} to {}", controller.getHostAddress(), node.toString());
}
newControllers.add(controller);
- if (nodeConnections.putIfAbsent(node, newControllers) != null) {
- log.debug("PutIfAbsent failed {} to {}", controller.getHostAddress(), node.toString());
- /*
- * This check is needed again to take care of the case where some schemes
- * would not allow nodes to be connected to multiple controllers.
- * Hence, if putIfAbsent fails, that means, some other controller is competing
- * with this controller to take hold of a Node.
- */
- if (isConnectionAllowed(node)) {
- log.debug("Trying to replace old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
- controller.getHostAddress(), node.toString());
- if (!nodeConnections.replace(node, oldControllers, newControllers)) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
+ try {
+ clusterServices.tbegin();
+ if (nodeConnections.putIfAbsent(node, newControllers) != null) {
+ log.debug("PutIfAbsent failed {} to {}", controller.getHostAddress(), node.toString());
+ /*
+ * This check is needed again to take care of the case where some schemes
+ * would not allow nodes to be connected to multiple controllers.
+ * Hence, if putIfAbsent fails, that means, some other controller is competing
+ * with this controller to take hold of a Node.
+ */
+ if (isConnectionAllowed(node)) {
+ if (oldControllers == null || !nodeConnections.replace(node, oldControllers, newControllers)) {
+ clusterServices.trollback();
+ try {
+ Thread.sleep(100);
+ } catch ( InterruptedException e) {}
+ log.debug("Retrying ... {} with {}", controller.getHostAddress(), node.toString());
+ return putNodeToController(node, controller);
+ } else {
+ log.debug("Replace successful old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
+ controller.getHostAddress(), node.toString());
}
- log.debug("Replace failed... old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
- controller.getHostAddress(), node.toString());
- return putNodeToController(node, controller);
} else {
- log.debug("Replace successful old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
- controller.getHostAddress(), node.toString());
+ clusterServices.trollback();
+ return new Status(StatusCode.CONFLICT);
}
} else {
- return new Status(StatusCode.CONFLICT);
+ log.debug("Added {} to {}", controller.getHostAddress(), node.toString());
}
- } else {
- log.debug("Added {} to {}", controller.getHostAddress(), node.toString());
+ clusterServices.tcommit();
+ } catch (Exception e) {
+ log.error("Excepion in adding Controller to a Node", e);
+ try {
+ clusterServices.trollback();
+ } catch (Exception e1) {
+ log.error("Error Rolling back the node Connections Changes ", e);
+ }
+ return new Status(StatusCode.INTERNALERROR);
}
return new Status(StatusCode.SUCCESS);
}
public Status addNode (Node node, InetAddress controller) {
if (node == null || controller == null) {
+ if (node == null) {
+ log.warn("addNode: node is null");
+ } else if (controller == null) {
+ log.error("Failed to add node {}. The controller address retrieved from clusterServices is null.", node);
+ }
return new Status(StatusCode.BADREQUEST);
}
- if (isLocal(node)) return new Status(StatusCode.SUCCESS);
+ if (isLocal(node)) {
+ return new Status(StatusCode.SUCCESS);
+ }
if (isConnectionAllowed(node)) {
return putNodeToController(node, controller);
} else {
}
}
- @SuppressWarnings("deprecation")
public Status addNode (Node node) {
return addNode(node, clusterServices.getMyAddress());
}
- @SuppressWarnings({ "unchecked", "deprecation" })
+ @SuppressWarnings({ "unchecked" })
private void retrieveCaches() {
if (this.clusterServices == null) {
- log.error("un-initialized clusterServices, can't retrieve cache");
+ log.error("Un-initialized Cluster Services, can't retrieve caches for scheme: {}", name);
return;
}
- nodeConnections = (ConcurrentMap<Node, Set<InetAddress>>) clusterServices.getCache("connectionmanager."+name+".nodeconnections");
+ nodeConnections = (ConcurrentMap<Node, Set<InetAddress>>) clusterServices.getCache(nodeConnectionsCacheName);
if (nodeConnections == null) {
- log.error("\nFailed to get caches");
+ log.error("\nFailed to get cache: {}", nodeConnectionsCacheName);
}
}
- @SuppressWarnings("deprecation")
private void allocateCaches() {
if (this.clusterServices == null) {
- log.error("un-initialized clusterServices, can't create cache");
+ log.error("Un-initialized clusterServices, can't create cache");
return;
}
try {
- clusterServices.createCache("connectionmanager."+name+".nodeconnections", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ clusterServices.createCache(nodeConnectionsCacheName, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
} catch (CacheExistException cee) {
- log.error("\nCache already exists - destroy and recreate if needed");
+ log.debug("\nCache already exists: {}", nodeConnectionsCacheName);
} catch (CacheConfigException cce) {
log.error("\nCache configuration invalid - check cache mode");
} catch (Exception e) {
- e.printStackTrace();
+ log.error("An error occured",e);
}
}
}