1 package org.opendaylight.controller.connectionmanager.scheme;
3 import java.net.InetAddress;
4 import java.util.ArrayList;
5 import java.util.EnumSet;
6 import java.util.HashSet;
7 import java.util.Iterator;
10 import java.util.concurrent.ConcurrentHashMap;
11 import java.util.concurrent.ConcurrentMap;
13 import org.opendaylight.controller.clustering.services.CacheConfigException;
14 import org.opendaylight.controller.clustering.services.CacheExistException;
15 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
16 import org.opendaylight.controller.clustering.services.IClusterServices;
17 import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme;
18 import org.opendaylight.controller.sal.connection.ConnectionLocality;
19 import org.opendaylight.controller.sal.core.Node;
20 import org.opendaylight.controller.sal.utils.Status;
21 import org.opendaylight.controller.sal.utils.StatusCode;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
25 public abstract class AbstractScheme {
26 private static final Logger log = LoggerFactory.getLogger(AbstractScheme.class);
27 protected IClusterGlobalServices clusterServices = null;
29 * A more natural Map data-structure is to have a Key=Controller IP-address with value = a set of Nodes.
30 * But, such a data-structure results in some complex event processing during the Cluster operations
31 * to sync up the Connection states.
33 * A data-structure with Node as the key and set of controllers provides a good balance
34 * between the needed functionality and simpler clustering implementation for Connection Manager.
36 protected ConcurrentMap <Node, Set<InetAddress>> nodeConnections;
37 protected abstract boolean isConnectionAllowedInternal(Node node);
38 private final String name;
39 private final String nodeConnectionsCacheName;
41 protected AbstractScheme(IClusterGlobalServices clusterServices, ConnectionMgmtScheme type) {
42 this.clusterServices = clusterServices;
43 name = (type != null ? type.name() : "UNKNOWN");
44 nodeConnectionsCacheName = "connectionmanager."+name+".nodeconnections";
45 if (clusterServices != null) {
49 log.error("Couldn't retrieve caches for scheme %s. Clustering service unavailable", name);
53 protected ConcurrentMap <InetAddress, Set<Node>> getControllerToNodesMap() {
54 ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = new ConcurrentHashMap <InetAddress, Set<Node>>();
55 for (Node node : nodeConnections.keySet()) {
56 Set<InetAddress> controllers = nodeConnections.get(node);
57 if (controllers == null) continue;
58 for (InetAddress controller : controllers) {
59 Set<Node> nodes = controllerNodesMap.get(controller);
61 nodes = new HashSet<Node>();
64 controllerNodesMap.put(controller, nodes);
67 return controllerNodesMap;
70 public boolean isConnectionAllowed (Node node) {
71 if (clusterServices == null || nodeConnections == null) {
75 return isConnectionAllowedInternal(node);
78 public void handleClusterViewChanged() {
79 log.debug("Handling Cluster View changed notification");
80 List<InetAddress> controllers = clusterServices.getClusteredControllers();
81 ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
82 List<InetAddress> toRemove = new ArrayList<InetAddress>();
83 for (InetAddress c : controllerNodesMap.keySet()) {
84 if (!controllers.contains(c)) {
89 boolean retry = false;
90 for (InetAddress c : toRemove) {
91 log.debug("Removing Controller : {} from the Connections table", c);
92 for (Iterator<Node> nodeIterator = nodeConnections.keySet().iterator();nodeIterator.hasNext();) {
93 Node node = nodeIterator.next();
94 Set <InetAddress> oldControllers = nodeConnections.get(node);
95 Set <InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
96 if (newControllers.remove(c)) {
98 clusterServices.tbegin();
99 if (!nodeConnections.replace(node, oldControllers, newControllers)) {
100 log.debug("Replace Failed for {} ", node.toString());
102 clusterServices.trollback();
105 clusterServices.tcommit();
107 } catch (Exception e) {
108 log.debug("Exception in replacing nodeConnections ", e);
111 clusterServices.trollback();
112 } catch (Exception e1) {}
121 } catch (InterruptedException e) {}
122 handleClusterViewChanged();
126 public Set<Node> getNodes(InetAddress controller) {
127 ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
128 return controllerNodesMap.get(controller);
131 public Set<Node> getNodes() {
132 return getNodes(clusterServices.getMyAddress());
135 public Set<InetAddress> getControllers(Node node) {
136 if (nodeConnections != null) return nodeConnections.get(node);
140 public ConcurrentMap<Node, Set<InetAddress>> getNodeConnections() {
141 return nodeConnections;
144 public boolean isLocal(Node node) {
145 if (nodeConnections == null) return false;
146 InetAddress myController = clusterServices.getMyAddress();
147 Set<InetAddress> controllers = nodeConnections.get(node);
148 return (controllers != null && controllers.contains(myController));
151 public ConnectionLocality getLocalityStatus(Node node) {
152 if (nodeConnections == null) return ConnectionLocality.NOT_CONNECTED;
153 Set<InetAddress> controllers = nodeConnections.get(node);
154 if (controllers == null || controllers.size() == 0) return ConnectionLocality.NOT_CONNECTED;
155 InetAddress myController = clusterServices.getMyAddress();
156 return controllers.contains(myController) ? ConnectionLocality.LOCAL:
157 ConnectionLocality.NOT_LOCAL;
160 public Status removeNode (Node node) {
161 return removeNodeFromController(node, clusterServices.getMyAddress());
164 protected Status removeNodeFromController (Node node, InetAddress controller) {
165 if (node == null || controller == null) {
166 return new Status(StatusCode.BADREQUEST, "Invalid Node or Controller Address Specified.");
169 if (clusterServices == null || nodeConnections == null) {
170 return new Status(StatusCode.SUCCESS);
173 Set<InetAddress> oldControllers = nodeConnections.get(node);
175 if (oldControllers != null && oldControllers.contains(controller)) {
176 Set<InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
177 if (newControllers.remove(controller)) {
179 clusterServices.tbegin();
180 if (newControllers.size() > 0) {
181 if (!nodeConnections.replace(node, oldControllers, newControllers)) {
182 clusterServices.trollback();
185 } catch ( InterruptedException e) {}
186 return removeNodeFromController(node, controller);
189 nodeConnections.remove(node);
191 clusterServices.tcommit();
192 } catch (Exception e) {
193 log.error("Exception in removing Controller from a Node", e);
195 clusterServices.trollback();
196 } catch (Exception e1) {
197 log.error("Error Rolling back the node Connections Changes ", e);
199 return new Status(StatusCode.INTERNALERROR);
204 return new Status(StatusCode.SUCCESS);
209 * A few race-conditions were seen with the Clustered caches in putIfAbsent and replace
210 * functions. Leaving a few debug logs behind to assist in debugging if strange things happen.
212 private Status putNodeToController (Node node, InetAddress controller) {
213 if (clusterServices == null || nodeConnections == null) {
214 return new Status(StatusCode.INTERNALERROR, "Cluster service unavailable, or node connections info missing.");
216 log.debug("Trying to Put {} to {}", controller.getHostAddress(), node.toString());
218 Set <InetAddress> oldControllers = nodeConnections.get(node);
219 Set <InetAddress> newControllers = null;
220 if (oldControllers == null) {
221 newControllers = new HashSet<InetAddress>();
223 if (oldControllers.size() > 0 && !isConnectionAllowed(node)) {
225 * In certain race conditions, the putIfAbsent fails to be atomic.
226 * This check is added to identify such cases and report an warning
229 log.warn("States Exists for {} : {}", node, oldControllers.toString());
231 newControllers = new HashSet<InetAddress>(oldControllers);
233 newControllers.add(controller);
236 clusterServices.tbegin();
237 if (nodeConnections.putIfAbsent(node, newControllers) != null) {
238 log.debug("PutIfAbsent failed {} to {}", controller.getHostAddress(), node.toString());
240 * This check is needed again to take care of the case where some schemes
241 * would not allow nodes to be connected to multiple controllers.
242 * Hence, if putIfAbsent fails, that means, some other controller is competing
243 * with this controller to take hold of a Node.
245 if (isConnectionAllowed(node)) {
246 if (oldControllers == null || !nodeConnections.replace(node, oldControllers, newControllers)) {
247 clusterServices.trollback();
250 } catch ( InterruptedException e) {}
251 log.debug("Retrying ... {} with {}", controller.getHostAddress(), node.toString());
252 return putNodeToController(node, controller);
254 log.debug("Replace successful old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
255 controller.getHostAddress(), node.toString());
258 clusterServices.trollback();
259 return new Status(StatusCode.CONFLICT);
262 log.debug("Added {} to {}", controller.getHostAddress(), node.toString());
264 clusterServices.tcommit();
265 } catch (Exception e) {
266 log.error("Excepion in adding Controller to a Node", e);
268 clusterServices.trollback();
269 } catch (Exception e1) {
270 log.error("Error Rolling back the node Connections Changes ", e);
272 return new Status(StatusCode.INTERNALERROR);
274 return new Status(StatusCode.SUCCESS);
277 public Status addNode (Node node, InetAddress controller) {
278 if (node == null || controller == null) {
279 return new Status(StatusCode.BADREQUEST);
282 return new Status(StatusCode.SUCCESS);
284 if (isConnectionAllowed(node)) {
285 return putNodeToController(node, controller);
287 return new Status(StatusCode.NOTALLOWED);
291 public Status addNode (Node node) {
292 return addNode(node, clusterServices.getMyAddress());
295 @SuppressWarnings({ "unchecked" })
296 private void retrieveCaches() {
297 if (this.clusterServices == null) {
298 log.error("Un-initialized Cluster Services, can't retrieve caches for scheme: {}", name);
302 nodeConnections = (ConcurrentMap<Node, Set<InetAddress>>) clusterServices.getCache(nodeConnectionsCacheName);
304 if (nodeConnections == null) {
305 log.error("\nFailed to get cache: {}", nodeConnectionsCacheName);
309 private void allocateCaches() {
310 if (this.clusterServices == null) {
311 log.error("Un-initialized clusterServices, can't create cache");
316 clusterServices.createCache(nodeConnectionsCacheName, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
317 } catch (CacheExistException cee) {
318 log.debug("\nCache already exists: {}", nodeConnectionsCacheName);
319 } catch (CacheConfigException cce) {
320 log.error("\nCache configuration invalid - check cache mode");
321 } catch (Exception e) {
322 log.error("An error occured",e);