1 package org.opendaylight.controller.connectionmanager.scheme;
3 import java.net.InetAddress;
4 import java.util.ArrayList;
5 import java.util.EnumSet;
6 import java.util.HashSet;
7 import java.util.Iterator;
10 import java.util.concurrent.ConcurrentHashMap;
11 import java.util.concurrent.ConcurrentMap;
13 import org.opendaylight.controller.clustering.services.CacheConfigException;
14 import org.opendaylight.controller.clustering.services.CacheExistException;
15 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
16 import org.opendaylight.controller.clustering.services.IClusterServices;
17 import org.opendaylight.controller.connectionmanager.ConnectionLocality;
18 import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme;
19 import org.opendaylight.controller.sal.core.Node;
20 import org.opendaylight.controller.sal.utils.Status;
21 import org.opendaylight.controller.sal.utils.StatusCode;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
25 public abstract class AbstractScheme {
26 private static final Logger log = LoggerFactory.getLogger(AbstractScheme.class);
27 protected IClusterGlobalServices clusterServices = null;
29 * A more natural Map data-structure is to have a Key=Controller IP-address with value = a set of Nodes.
30 * But, such a data-structure results in some complex event processing during the Cluster operations
31 * to sync up the Connection states.
33 * A data-structure with Node as the key and set of controllers provides a good balance
34 * between the needed functionality and simpler clustering implementation for Connection Manager.
36 protected ConcurrentMap <Node, Set<InetAddress>> nodeConnections;
37 protected abstract boolean isConnectionAllowedInternal(Node node);
40 protected AbstractScheme(IClusterGlobalServices clusterServices, ConnectionMgmtScheme type) {
41 this.clusterServices = clusterServices;
42 if (type != null) name = type.name();
43 else name = "UNKNOWN";
44 if (clusterServices != null) {
50 protected ConcurrentMap <InetAddress, Set<Node>> getControllerToNodesMap() {
51 ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = new ConcurrentHashMap <InetAddress, Set<Node>>();
52 for (Node node : nodeConnections.keySet()) {
53 Set<InetAddress> controllers = nodeConnections.get(node);
54 if (controllers == null) continue;
55 for (InetAddress controller : controllers) {
56 Set<Node> nodes = controllerNodesMap.get(controller);
58 nodes = new HashSet<Node>();
61 controllerNodesMap.put(controller, nodes);
64 return controllerNodesMap;
67 public boolean isConnectionAllowed (Node node) {
68 if (clusterServices == null || nodeConnections == null) {
72 return isConnectionAllowedInternal(node);
75 @SuppressWarnings("deprecation")
76 public void handleClusterViewChanged() {
77 log.debug("Handling Cluster View changed notification");
78 List<InetAddress> controllers = clusterServices.getClusteredControllers();
79 ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
80 List<InetAddress> toRemove = new ArrayList<InetAddress>();
81 for (InetAddress c : controllerNodesMap.keySet()) {
82 if (!controllers.contains(c)) {
87 boolean retry = false;
88 for (InetAddress c : toRemove) {
89 log.debug("Removing Controller : {} from the Connections table", c);
90 for (Iterator<Node> nodeIterator = nodeConnections.keySet().iterator();nodeIterator.hasNext();) {
91 Node node = nodeIterator.next();
92 Set <InetAddress> oldControllers = nodeConnections.get(node);
93 Set <InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
94 if (newControllers.remove(c)) {
96 clusterServices.tbegin();
97 if (!nodeConnections.replace(node, oldControllers, newControllers)) {
98 log.debug("Replace Failed for {} ", node.toString());
100 clusterServices.trollback();
103 clusterServices.tcommit();
105 } catch (Exception e) {
106 log.debug("Exception in replacing nodeConnections ", e);
109 clusterServices.trollback();
110 } catch (Exception e1) {}
119 } catch (InterruptedException e) {}
120 handleClusterViewChanged();
124 public Set<Node> getNodes(InetAddress controller) {
125 if (nodeConnections == null) return null;
126 ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
127 return controllerNodesMap.get(controller);
130 public Set<Node> getNodes() {
131 return getNodes(clusterServices.getMyAddress());
134 public Set<InetAddress> getControllers(Node node) {
135 if (nodeConnections != null) return nodeConnections.get(node);
139 public ConcurrentMap<Node, Set<InetAddress>> getNodeConnections() {
140 return nodeConnections;
143 public boolean isLocal(Node node) {
144 if (nodeConnections == null) return false;
145 InetAddress myController = clusterServices.getMyAddress();
146 Set<InetAddress> controllers = nodeConnections.get(node);
147 return (controllers != null && controllers.contains(myController));
150 public ConnectionLocality getLocalityStatus(Node node) {
151 if (nodeConnections == null) return ConnectionLocality.NOT_CONNECTED;
152 Set<InetAddress> controllers = nodeConnections.get(node);
153 if (controllers == null || controllers.size() == 0) return ConnectionLocality.NOT_CONNECTED;
154 InetAddress myController = clusterServices.getMyAddress();
155 return controllers.contains(myController) ? ConnectionLocality.LOCAL:
156 ConnectionLocality.NOT_LOCAL;
159 public Status removeNode (Node node) {
160 return removeNodeFromController(node, clusterServices.getMyAddress());
163 protected Status removeNodeFromController (Node node, InetAddress controller) {
164 if (node == null || controller == null) {
165 return new Status(StatusCode.BADREQUEST);
168 if (clusterServices == null || nodeConnections == null) {
169 return new Status(StatusCode.SUCCESS);
172 Set<InetAddress> oldControllers = nodeConnections.get(node);
174 if (oldControllers != null && oldControllers.contains(controller)) {
175 Set<InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
176 if (newControllers.remove(controller)) {
178 clusterServices.tbegin();
179 if (newControllers.size() > 0) {
180 if (!nodeConnections.replace(node, oldControllers, newControllers)) {
181 clusterServices.trollback();
184 } catch ( InterruptedException e) {}
185 return removeNodeFromController(node, controller);
188 nodeConnections.remove(node);
190 clusterServices.tcommit();
191 } catch (Exception e) {
192 log.error("Excepion in removing Controller from a Node", e);
194 clusterServices.trollback();
195 } catch (Exception e1) {
196 log.error("Error Rolling back the node Connections Changes ", e);
198 return new Status(StatusCode.INTERNALERROR);
203 return new Status(StatusCode.SUCCESS);
208 * A few race-conditions were seen with the Clustered caches in putIfAbsent and replace
209 * functions. Leaving a few debug logs behind to assist in debugging if strange things happen.
211 private Status putNodeToController (Node node, InetAddress controller) {
212 if (clusterServices == null || nodeConnections == null) {
213 return new Status(StatusCode.SUCCESS);
215 log.debug("Trying to Put {} to {}", controller.getHostAddress(), node.toString());
217 Set <InetAddress> oldControllers = nodeConnections.get(node);
218 Set <InetAddress> newControllers = null;
219 if (oldControllers == null) {
220 newControllers = new HashSet<InetAddress>();
222 if (oldControllers.size() > 0 && !isConnectionAllowed(node)) {
224 * In certain race conditions, the putIfAbsent fails to be atomic.
225 * This check is added to identify such cases and report an warning
228 log.warn("States Exists for {} : {}", node, oldControllers.toString());
230 newControllers = new HashSet<InetAddress>(oldControllers);
232 newControllers.add(controller);
235 clusterServices.tbegin();
236 if (nodeConnections.putIfAbsent(node, newControllers) != null) {
237 log.debug("PutIfAbsent failed {} to {}", controller.getHostAddress(), node.toString());
239 * This check is needed again to take care of the case where some schemes
240 * would not allow nodes to be connected to multiple controllers.
241 * Hence, if putIfAbsent fails, that means, some other controller is competing
242 * with this controller to take hold of a Node.
244 if (isConnectionAllowed(node)) {
245 if (oldControllers == null || !nodeConnections.replace(node, oldControllers, newControllers)) {
246 clusterServices.trollback();
249 } catch ( InterruptedException e) {}
250 log.debug("Retrying ... {} with {}", controller.getHostAddress(), node.toString());
251 return putNodeToController(node, controller);
253 log.debug("Replace successful old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
254 controller.getHostAddress(), node.toString());
257 clusterServices.trollback();
258 return new Status(StatusCode.CONFLICT);
261 log.debug("Added {} to {}", controller.getHostAddress(), node.toString());
263 clusterServices.tcommit();
264 } catch (Exception e) {
265 log.error("Excepion in adding Controller to a Node", e);
267 clusterServices.trollback();
268 } catch (Exception e1) {
269 log.error("Error Rolling back the node Connections Changes ", e);
271 return new Status(StatusCode.INTERNALERROR);
273 return new Status(StatusCode.SUCCESS);
276 public Status addNode (Node node, InetAddress controller) {
277 if (node == null || controller == null) {
278 return new Status(StatusCode.BADREQUEST);
280 if (isLocal(node)) return new Status(StatusCode.SUCCESS);
281 if (isConnectionAllowed(node)) {
282 return putNodeToController(node, controller);
284 return new Status(StatusCode.NOTALLOWED);
288 @SuppressWarnings("deprecation")
289 public Status addNode (Node node) {
290 return addNode(node, clusterServices.getMyAddress());
293 @SuppressWarnings({ "unchecked", "deprecation" })
294 private void retrieveCaches() {
295 if (this.clusterServices == null) {
296 log.error("un-initialized clusterServices, can't retrieve cache");
300 nodeConnections = (ConcurrentMap<Node, Set<InetAddress>>) clusterServices.getCache("connectionmanager."+name+".nodeconnections");
302 if (nodeConnections == null) {
303 log.error("\nFailed to get caches");
307 @SuppressWarnings("deprecation")
308 private void allocateCaches() {
309 if (this.clusterServices == null) {
310 log.error("un-initialized clusterServices, can't create cache");
315 clusterServices.createCache("connectionmanager."+name+".nodeconnections", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
316 } catch (CacheExistException cee) {
317 log.error("\nCache already exists - destroy and recreate if needed");
318 } catch (CacheConfigException cce) {
319 log.error("\nCache configuration invalid - check cache mode");
320 } catch (Exception e) {
321 log.error("An error occured",e);