2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.connectionmanager.scheme;
10 import java.net.InetAddress;
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.EnumSet;
14 import java.util.HashSet;
15 import java.util.Iterator;
16 import java.util.List;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ConcurrentMap;
21 import org.opendaylight.controller.clustering.services.CacheConfigException;
22 import org.opendaylight.controller.clustering.services.CacheExistException;
23 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
24 import org.opendaylight.controller.clustering.services.IClusterServices;
25 import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme;
26 import org.opendaylight.controller.sal.connection.ConnectionLocality;
27 import org.opendaylight.controller.sal.core.Node;
28 import org.opendaylight.controller.sal.utils.Status;
29 import org.opendaylight.controller.sal.utils.StatusCode;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
33 public abstract class AbstractScheme {
34 private static final Logger log = LoggerFactory.getLogger(AbstractScheme.class);
35 protected IClusterGlobalServices clusterServices = null;
37 * A more natural Map data-structure is to have a Key=Controller IP-address with value = a set of Nodes.
38 * But, such a data-structure results in some complex event processing during the Cluster operations
39 * to sync up the Connection states.
41 * A data-structure with Node as the key and set of controllers provides a good balance
42 * between the needed functionality and simpler clustering implementation for Connection Manager.
44 protected ConcurrentMap <Node, Set<InetAddress>> nodeConnections;
45 protected abstract boolean isConnectionAllowedInternal(Node node);
46 private final String name;
47 private final String nodeConnectionsCacheName;
49 protected AbstractScheme(IClusterGlobalServices clusterServices, ConnectionMgmtScheme type) {
50 this.clusterServices = clusterServices;
51 name = (type != null ? type.name() : "UNKNOWN");
52 nodeConnectionsCacheName = "connectionmanager."+name+".nodeconnections";
53 if (clusterServices != null) {
57 log.error("Couldn't retrieve caches for scheme {}. Clustering service unavailable", name);
61 protected ConcurrentMap <InetAddress, Set<Node>> getControllerToNodesMap() {
62 ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = new ConcurrentHashMap <InetAddress, Set<Node>>();
63 for (Node node : nodeConnections.keySet()) {
64 Set<InetAddress> controllers = nodeConnections.get(node);
65 if (controllers == null) continue;
66 for (InetAddress controller : controllers) {
67 Set<Node> nodes = controllerNodesMap.get(controller);
69 nodes = new HashSet<Node>();
72 controllerNodesMap.put(controller, nodes);
75 return controllerNodesMap;
78 public boolean isConnectionAllowed (Node node) {
79 if (clusterServices == null || nodeConnections == null) {
83 return isConnectionAllowedInternal(node);
86 public void handleClusterViewChanged() {
87 log.debug("Handling Cluster View changed notification");
88 List<InetAddress> controllers = clusterServices.getClusteredControllers();
89 ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
90 List<InetAddress> toRemove = new ArrayList<InetAddress>();
91 for (InetAddress c : controllerNodesMap.keySet()) {
92 if (!controllers.contains(c)) {
97 boolean retry = false;
98 for (InetAddress c : toRemove) {
99 log.debug("Removing Controller : {} from the Connections table", c);
100 for (Iterator<Node> nodeIterator = nodeConnections.keySet().iterator();nodeIterator.hasNext();) {
101 Node node = nodeIterator.next();
102 Set <InetAddress> oldControllers = nodeConnections.get(node);
103 Set <InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
104 if (newControllers.remove(c)) {
106 clusterServices.tbegin();
107 if (!nodeConnections.replace(node, oldControllers, newControllers)) {
108 log.debug("Replace Failed for {} ", node.toString());
110 clusterServices.trollback();
113 clusterServices.tcommit();
115 } catch (Exception e) {
116 log.debug("Exception in replacing nodeConnections ", e);
119 clusterServices.trollback();
120 } catch (Exception e1) {}
129 } catch (InterruptedException e) {}
130 handleClusterViewChanged();
134 public Set<Node> getNodes(InetAddress controller) {
135 ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
136 return controllerNodesMap.get(controller);
139 public Set<Node> getNodes() {
140 return getNodes(clusterServices.getMyAddress());
143 public Set<InetAddress> getControllers(Node node) {
144 if (nodeConnections != null) return nodeConnections.get(node);
145 return Collections.emptySet();
148 public ConcurrentMap<Node, Set<InetAddress>> getNodeConnections() {
149 return nodeConnections;
152 public boolean isLocal(Node node) {
153 if (nodeConnections == null) return false;
154 InetAddress myController = clusterServices.getMyAddress();
155 Set<InetAddress> controllers = nodeConnections.get(node);
156 return (controllers != null && controllers.contains(myController));
159 public ConnectionLocality getLocalityStatus(Node node) {
160 if (nodeConnections == null) return ConnectionLocality.NOT_CONNECTED;
161 Set<InetAddress> controllers = nodeConnections.get(node);
162 if (controllers == null || controllers.size() == 0) return ConnectionLocality.NOT_CONNECTED;
163 InetAddress myController = clusterServices.getMyAddress();
164 return controllers.contains(myController) ? ConnectionLocality.LOCAL:
165 ConnectionLocality.NOT_LOCAL;
168 public Status removeNode (Node node) {
169 return removeNodeFromController(node, clusterServices.getMyAddress());
172 protected Status removeNodeFromController (Node node, InetAddress controller) {
173 if (node == null || controller == null) {
174 return new Status(StatusCode.BADREQUEST, "Invalid Node or Controller Address Specified.");
177 if (clusterServices == null || nodeConnections == null) {
178 return new Status(StatusCode.SUCCESS);
181 Set<InetAddress> oldControllers = nodeConnections.get(node);
183 if (oldControllers != null && oldControllers.contains(controller)) {
184 Set<InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
185 if (newControllers.remove(controller)) {
187 clusterServices.tbegin();
188 if (newControllers.size() > 0) {
189 if (!nodeConnections.replace(node, oldControllers, newControllers)) {
190 clusterServices.trollback();
193 } catch ( InterruptedException e) {}
194 return removeNodeFromController(node, controller);
197 nodeConnections.remove(node);
199 clusterServices.tcommit();
200 } catch (Exception e) {
201 log.error("Exception in removing Controller from a Node", e);
203 clusterServices.trollback();
204 } catch (Exception e1) {
205 log.error("Error Rolling back the node Connections Changes ", e);
207 return new Status(StatusCode.INTERNALERROR);
212 return new Status(StatusCode.SUCCESS);
217 * A few race-conditions were seen with the Clustered caches in putIfAbsent and replace
218 * functions. Leaving a few debug logs behind to assist in debugging if strange things happen.
220 private Status putNodeToController (Node node, InetAddress controller) {
221 if (clusterServices == null || nodeConnections == null) {
222 return new Status(StatusCode.INTERNALERROR, "Cluster service unavailable, or node connections info missing.");
224 log.debug("Trying to Put {} to {}", controller.getHostAddress(), node.toString());
226 Set <InetAddress> oldControllers = nodeConnections.get(node);
227 Set <InetAddress> newControllers = null;
228 if (oldControllers == null) {
229 newControllers = new HashSet<InetAddress>();
231 if (oldControllers.size() > 0 && !isConnectionAllowed(node)) {
233 * In certain race conditions, the putIfAbsent fails to be atomic.
234 * This check is added to identify such cases and report an warning
237 log.warn("States Exists for {} : {}", node, oldControllers.toString());
239 newControllers = new HashSet<InetAddress>(oldControllers);
241 newControllers.add(controller);
244 clusterServices.tbegin();
245 if (nodeConnections.putIfAbsent(node, newControllers) != null) {
246 log.debug("PutIfAbsent failed {} to {}", controller.getHostAddress(), node.toString());
248 * This check is needed again to take care of the case where some schemes
249 * would not allow nodes to be connected to multiple controllers.
250 * Hence, if putIfAbsent fails, that means, some other controller is competing
251 * with this controller to take hold of a Node.
253 if (isConnectionAllowed(node)) {
254 if (oldControllers == null || !nodeConnections.replace(node, oldControllers, newControllers)) {
255 clusterServices.trollback();
258 } catch ( InterruptedException e) {}
259 log.debug("Retrying ... {} with {}", controller.getHostAddress(), node.toString());
260 return putNodeToController(node, controller);
262 log.debug("Replace successful old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
263 controller.getHostAddress(), node.toString());
266 clusterServices.trollback();
267 return new Status(StatusCode.CONFLICT);
270 log.debug("Added {} to {}", controller.getHostAddress(), node.toString());
272 clusterServices.tcommit();
273 } catch (Exception e) {
274 log.error("Excepion in adding Controller to a Node", e);
276 clusterServices.trollback();
277 } catch (Exception e1) {
278 log.error("Error Rolling back the node Connections Changes ", e);
280 return new Status(StatusCode.INTERNALERROR);
282 return new Status(StatusCode.SUCCESS);
285 public Status addNode (Node node, InetAddress controller) {
286 if (node == null || controller == null) {
288 log.warn("addNode: node is null");
289 } else if (controller == null) {
290 log.error("Failed to add node {}. The controller address retrieved from clusterServices is null.", node);
292 return new Status(StatusCode.BADREQUEST);
295 return new Status(StatusCode.SUCCESS);
297 if (isConnectionAllowed(node)) {
298 return putNodeToController(node, controller);
300 return new Status(StatusCode.NOTALLOWED);
304 public Status addNode (Node node) {
305 return addNode(node, clusterServices.getMyAddress());
308 @SuppressWarnings({ "unchecked" })
309 private void retrieveCaches() {
310 if (this.clusterServices == null) {
311 log.error("Un-initialized Cluster Services, can't retrieve caches for scheme: {}", name);
315 nodeConnections = (ConcurrentMap<Node, Set<InetAddress>>) clusterServices.getCache(nodeConnectionsCacheName);
317 if (nodeConnections == null) {
318 log.error("\nFailed to get cache: {}", nodeConnectionsCacheName);
322 private void allocateCaches() {
323 if (this.clusterServices == null) {
324 log.error("Un-initialized clusterServices, can't create cache");
329 clusterServices.createCache(nodeConnectionsCacheName, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
330 } catch (CacheExistException cee) {
331 log.debug("\nCache already exists: {}", nodeConnectionsCacheName);
332 } catch (CacheConfigException cce) {
333 log.error("\nCache configuration invalid - check cache mode");
334 } catch (Exception e) {
335 log.error("An error occured",e);
340 * @see java.lang.Object#hashCode()
343 public int hashCode() {
344 final int prime = 31;
346 result = prime * result + ((name == null) ? 0 : name.hashCode());
347 result = prime * result + ((nodeConnections == null) ? 0 : nodeConnections.hashCode());
348 result = prime * result + ((nodeConnectionsCacheName == null) ? 0 : nodeConnectionsCacheName.hashCode());
353 * @see java.lang.Object#equals(java.lang.Object)
356 public boolean equals(Object obj) {
363 if (!(obj instanceof AbstractScheme)) {
366 AbstractScheme other = (AbstractScheme) obj;
368 if (other.name != null) {
371 } else if (!name.equals(other.name)) {
374 if (nodeConnections == null) {
375 if (other.nodeConnections != null) {
378 } else if (!nodeConnections.equals(other.nodeConnections)) {
381 if (nodeConnectionsCacheName == null) {
382 if (other.nodeConnectionsCacheName != null) {
385 } else if (!nodeConnectionsCacheName.equals(other.nodeConnectionsCacheName)) {