1 package org.opendaylight.controller.connectionmanager.scheme;
3 import java.net.InetAddress;
4 import java.util.ArrayList;
5 import java.util.EnumSet;
6 import java.util.HashSet;
7 import java.util.Iterator;
10 import java.util.concurrent.ConcurrentHashMap;
11 import java.util.concurrent.ConcurrentMap;
13 import org.opendaylight.controller.clustering.services.CacheConfigException;
14 import org.opendaylight.controller.clustering.services.CacheExistException;
15 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
16 import org.opendaylight.controller.clustering.services.IClusterServices;
17 import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme;
18 import org.opendaylight.controller.sal.core.Node;
19 import org.opendaylight.controller.sal.utils.Status;
20 import org.opendaylight.controller.sal.utils.StatusCode;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
24 public abstract class AbstractScheme {
25 private static final Logger log = LoggerFactory.getLogger(AbstractScheme.class);
26 protected IClusterGlobalServices clusterServices = null;
28 * A more natural Map data-structure is to have a Key=Controller IP-address with value = a set of Nodes.
29 * But, such a data-structure results in some complex event processing during the Cluster operations
30 * to sync up the Connection states.
32 * A data-structure with Node as the key and set of controllers provides a good balance
33 * between the needed functionality and simpler clustering implementation for Connection Manager.
35 protected ConcurrentMap <Node, Set<InetAddress>> nodeConnections;
36 protected abstract boolean isConnectionAllowedInternal(Node node);
39 protected AbstractScheme(IClusterGlobalServices clusterServices, ConnectionMgmtScheme type) {
40 this.clusterServices = clusterServices;
41 if (type != null) name = type.name();
42 else name = "UNKNOWN";
43 if (clusterServices != null) {
49 protected ConcurrentMap <InetAddress, Set<Node>> getControllerToNodesMap() {
50 ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = new ConcurrentHashMap <InetAddress, Set<Node>>();
51 for (Node node : nodeConnections.keySet()) {
52 Set<InetAddress> controllers = nodeConnections.get(node);
53 if (controllers == null) continue;
54 for (InetAddress controller : controllers) {
55 Set<Node> nodes = controllerNodesMap.get(controller);
57 nodes = new HashSet<Node>();
60 controllerNodesMap.put(controller, nodes);
63 return controllerNodesMap;
66 public boolean isConnectionAllowed (Node node) {
67 if (clusterServices == null || nodeConnections == null) {
71 return isConnectionAllowedInternal(node);
74 @SuppressWarnings("deprecation")
75 public void handleClusterViewChanged() {
76 log.debug("Handling Cluster View changed notification");
77 List<InetAddress> controllers = clusterServices.getClusteredControllers();
78 ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
79 List<InetAddress> toRemove = new ArrayList<InetAddress>();
80 for (InetAddress c : controllerNodesMap.keySet()) {
81 if (!controllers.contains(c)) {
86 boolean retry = false;
87 for (InetAddress c : toRemove) {
88 log.debug("Removing Controller : {} from the Connections table", c);
89 for (Iterator<Node> nodeIterator = nodeConnections.keySet().iterator();nodeIterator.hasNext();) {
90 Node node = nodeIterator.next();
91 Set <InetAddress> oldControllers = nodeConnections.get(node);
92 Set <InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
93 if (newControllers.remove(c)) {
95 clusterServices.tbegin();
96 if (!nodeConnections.replace(node, oldControllers, newControllers)) {
97 log.debug("Replace Failed for {} ", node.toString());
99 clusterServices.trollback();
102 clusterServices.tcommit();
104 } catch (Exception e) {
105 log.debug("Exception in replacing nodeConnections ", e);
108 clusterServices.trollback();
109 } catch (Exception e1) {}
118 } catch (InterruptedException e) {}
119 handleClusterViewChanged();
123 public Set<Node> getNodes(InetAddress controller) {
124 if (nodeConnections == null) return null;
125 ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
126 return controllerNodesMap.get(controller);
129 public Set<Node> getNodes() {
130 return getNodes(clusterServices.getMyAddress());
133 public Set<InetAddress> getControllers(Node node) {
134 if (nodeConnections != null) return nodeConnections.get(node);
138 public ConcurrentMap<Node, Set<InetAddress>> getNodeConnections() {
139 return nodeConnections;
142 public boolean isLocal(Node node) {
143 if (nodeConnections == null) return false;
144 InetAddress myController = clusterServices.getMyAddress();
145 Set<InetAddress> controllers = nodeConnections.get(node);
146 return (controllers != null && controllers.contains(myController));
149 public Status removeNode (Node node) {
150 return removeNodeFromController(node, clusterServices.getMyAddress());
153 protected Status removeNodeFromController (Node node, InetAddress controller) {
154 if (node == null || controller == null) {
155 return new Status(StatusCode.BADREQUEST);
158 if (clusterServices == null || nodeConnections == null) {
159 return new Status(StatusCode.SUCCESS);
162 Set<InetAddress> oldControllers = nodeConnections.get(node);
164 if (oldControllers != null && oldControllers.contains(controller)) {
165 Set<InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
166 if (newControllers.remove(controller)) {
168 clusterServices.tbegin();
169 if (newControllers.size() > 0) {
170 if (!nodeConnections.replace(node, oldControllers, newControllers)) {
171 clusterServices.trollback();
174 } catch ( InterruptedException e) {}
175 return removeNodeFromController(node, controller);
178 nodeConnections.remove(node);
180 clusterServices.tcommit();
181 } catch (Exception e) {
182 log.error("Excepion in removing Controller from a Node", e);
184 clusterServices.trollback();
185 } catch (Exception e1) {
186 log.error("Error Rolling back the node Connections Changes ", e);
188 return new Status(StatusCode.INTERNALERROR);
193 return new Status(StatusCode.SUCCESS);
198 * A few race-conditions were seen with the Clustered caches in putIfAbsent and replace
199 * functions. Leaving a few debug logs behind to assist in debugging if strange things happen.
201 private Status putNodeToController (Node node, InetAddress controller) {
202 if (clusterServices == null || nodeConnections == null) {
203 return new Status(StatusCode.SUCCESS);
205 log.debug("Trying to Put {} to {}", controller.getHostAddress(), node.toString());
207 Set <InetAddress> oldControllers = nodeConnections.get(node);
208 Set <InetAddress> newControllers = null;
209 if (oldControllers == null) {
210 newControllers = new HashSet<InetAddress>();
212 if (oldControllers.size() > 0 && !isConnectionAllowed(node)) {
214 * In certain race conditions, the putIfAbsent fails to be atomic.
215 * This check is added to identify such cases and report an warning
218 log.warn("States Exists for {} : {}", node, oldControllers.toString());
220 newControllers = new HashSet<InetAddress>(oldControllers);
222 newControllers.add(controller);
225 clusterServices.tbegin();
226 if (nodeConnections.putIfAbsent(node, newControllers) != null) {
227 log.debug("PutIfAbsent failed {} to {}", controller.getHostAddress(), node.toString());
229 * This check is needed again to take care of the case where some schemes
230 * would not allow nodes to be connected to multiple controllers.
231 * Hence, if putIfAbsent fails, that means, some other controller is competing
232 * with this controller to take hold of a Node.
234 if (isConnectionAllowed(node)) {
235 if (oldControllers == null || !nodeConnections.replace(node, oldControllers, newControllers)) {
236 clusterServices.trollback();
239 } catch ( InterruptedException e) {}
240 log.debug("Retrying ... {} with {}", controller.getHostAddress(), node.toString());
241 return putNodeToController(node, controller);
243 log.debug("Replace successful old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
244 controller.getHostAddress(), node.toString());
247 clusterServices.trollback();
248 return new Status(StatusCode.CONFLICT);
251 log.debug("Added {} to {}", controller.getHostAddress(), node.toString());
253 clusterServices.tcommit();
254 } catch (Exception e) {
255 log.error("Excepion in adding Controller to a Node", e);
257 clusterServices.trollback();
258 } catch (Exception e1) {
259 log.error("Error Rolling back the node Connections Changes ", e);
261 return new Status(StatusCode.INTERNALERROR);
263 return new Status(StatusCode.SUCCESS);
266 public Status addNode (Node node, InetAddress controller) {
267 if (node == null || controller == null) {
268 return new Status(StatusCode.BADREQUEST);
270 if (isLocal(node)) return new Status(StatusCode.SUCCESS);
271 if (isConnectionAllowed(node)) {
272 return putNodeToController(node, controller);
274 return new Status(StatusCode.NOTALLOWED);
278 @SuppressWarnings("deprecation")
279 public Status addNode (Node node) {
280 return addNode(node, clusterServices.getMyAddress());
283 @SuppressWarnings({ "unchecked", "deprecation" })
284 private void retrieveCaches() {
285 if (this.clusterServices == null) {
286 log.error("un-initialized clusterServices, can't retrieve cache");
290 nodeConnections = (ConcurrentMap<Node, Set<InetAddress>>) clusterServices.getCache("connectionmanager."+name+".nodeconnections");
292 if (nodeConnections == null) {
293 log.error("\nFailed to get caches");
297 @SuppressWarnings("deprecation")
298 private void allocateCaches() {
299 if (this.clusterServices == null) {
300 log.error("un-initialized clusterServices, can't create cache");
305 clusterServices.createCache("connectionmanager."+name+".nodeconnections", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
306 } catch (CacheExistException cee) {
307 log.error("\nCache already exists - destroy and recreate if needed");
308 } catch (CacheConfigException cce) {
309 log.error("\nCache configuration invalid - check cache mode");
310 } catch (Exception e) {
311 log.error("An error occured",e);