1 package org.opendaylight.controller.connectionmanager.scheme;
3 import java.net.InetAddress;
4 import java.util.ArrayList;
5 import java.util.Collections;
6 import java.util.EnumSet;
7 import java.util.HashSet;
8 import java.util.Iterator;
11 import java.util.concurrent.ConcurrentHashMap;
12 import java.util.concurrent.ConcurrentMap;
14 import org.opendaylight.controller.clustering.services.CacheConfigException;
15 import org.opendaylight.controller.clustering.services.CacheExistException;
16 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
17 import org.opendaylight.controller.clustering.services.IClusterServices;
18 import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme;
19 import org.opendaylight.controller.sal.connection.ConnectionLocality;
20 import org.opendaylight.controller.sal.core.Node;
21 import org.opendaylight.controller.sal.utils.Status;
22 import org.opendaylight.controller.sal.utils.StatusCode;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
26 public abstract class AbstractScheme {
27 private static final Logger log = LoggerFactory.getLogger(AbstractScheme.class);
28 protected IClusterGlobalServices clusterServices = null;
30 * A more natural Map data-structure is to have a Key=Controller IP-address with value = a set of Nodes.
31 * But, such a data-structure results in some complex event processing during the Cluster operations
32 * to sync up the Connection states.
34 * A data-structure with Node as the key and set of controllers provides a good balance
35 * between the needed functionality and simpler clustering implementation for Connection Manager.
37 protected ConcurrentMap <Node, Set<InetAddress>> nodeConnections;
38 protected abstract boolean isConnectionAllowedInternal(Node node);
39 private final String name;
40 private final String nodeConnectionsCacheName;
42 protected AbstractScheme(IClusterGlobalServices clusterServices, ConnectionMgmtScheme type) {
43 this.clusterServices = clusterServices;
44 name = (type != null ? type.name() : "UNKNOWN");
45 nodeConnectionsCacheName = "connectionmanager."+name+".nodeconnections";
46 if (clusterServices != null) {
50 log.error("Couldn't retrieve caches for scheme %s. Clustering service unavailable", name);
54 protected ConcurrentMap <InetAddress, Set<Node>> getControllerToNodesMap() {
55 ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = new ConcurrentHashMap <InetAddress, Set<Node>>();
56 for (Node node : nodeConnections.keySet()) {
57 Set<InetAddress> controllers = nodeConnections.get(node);
58 if (controllers == null) continue;
59 for (InetAddress controller : controllers) {
60 Set<Node> nodes = controllerNodesMap.get(controller);
62 nodes = new HashSet<Node>();
65 controllerNodesMap.put(controller, nodes);
68 return controllerNodesMap;
71 public boolean isConnectionAllowed (Node node) {
72 if (clusterServices == null || nodeConnections == null) {
76 return isConnectionAllowedInternal(node);
79 public void handleClusterViewChanged() {
80 log.debug("Handling Cluster View changed notification");
81 List<InetAddress> controllers = clusterServices.getClusteredControllers();
82 ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
83 List<InetAddress> toRemove = new ArrayList<InetAddress>();
84 for (InetAddress c : controllerNodesMap.keySet()) {
85 if (!controllers.contains(c)) {
90 boolean retry = false;
91 for (InetAddress c : toRemove) {
92 log.debug("Removing Controller : {} from the Connections table", c);
93 for (Iterator<Node> nodeIterator = nodeConnections.keySet().iterator();nodeIterator.hasNext();) {
94 Node node = nodeIterator.next();
95 Set <InetAddress> oldControllers = nodeConnections.get(node);
96 Set <InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
97 if (newControllers.remove(c)) {
99 clusterServices.tbegin();
100 if (!nodeConnections.replace(node, oldControllers, newControllers)) {
101 log.debug("Replace Failed for {} ", node.toString());
103 clusterServices.trollback();
106 clusterServices.tcommit();
108 } catch (Exception e) {
109 log.debug("Exception in replacing nodeConnections ", e);
112 clusterServices.trollback();
113 } catch (Exception e1) {}
122 } catch (InterruptedException e) {}
123 handleClusterViewChanged();
127 public Set<Node> getNodes(InetAddress controller) {
128 ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
129 return controllerNodesMap.get(controller);
132 public Set<Node> getNodes() {
133 return getNodes(clusterServices.getMyAddress());
136 public Set<InetAddress> getControllers(Node node) {
137 if (nodeConnections != null) return nodeConnections.get(node);
138 return Collections.emptySet();
141 public ConcurrentMap<Node, Set<InetAddress>> getNodeConnections() {
142 return nodeConnections;
145 public boolean isLocal(Node node) {
146 if (nodeConnections == null) return false;
147 InetAddress myController = clusterServices.getMyAddress();
148 Set<InetAddress> controllers = nodeConnections.get(node);
149 return (controllers != null && controllers.contains(myController));
152 public ConnectionLocality getLocalityStatus(Node node) {
153 if (nodeConnections == null) return ConnectionLocality.NOT_CONNECTED;
154 Set<InetAddress> controllers = nodeConnections.get(node);
155 if (controllers == null || controllers.size() == 0) return ConnectionLocality.NOT_CONNECTED;
156 InetAddress myController = clusterServices.getMyAddress();
157 return controllers.contains(myController) ? ConnectionLocality.LOCAL:
158 ConnectionLocality.NOT_LOCAL;
161 public Status removeNode (Node node) {
162 return removeNodeFromController(node, clusterServices.getMyAddress());
165 protected Status removeNodeFromController (Node node, InetAddress controller) {
166 if (node == null || controller == null) {
167 return new Status(StatusCode.BADREQUEST, "Invalid Node or Controller Address Specified.");
170 if (clusterServices == null || nodeConnections == null) {
171 return new Status(StatusCode.SUCCESS);
174 Set<InetAddress> oldControllers = nodeConnections.get(node);
176 if (oldControllers != null && oldControllers.contains(controller)) {
177 Set<InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
178 if (newControllers.remove(controller)) {
180 clusterServices.tbegin();
181 if (newControllers.size() > 0) {
182 if (!nodeConnections.replace(node, oldControllers, newControllers)) {
183 clusterServices.trollback();
186 } catch ( InterruptedException e) {}
187 return removeNodeFromController(node, controller);
190 nodeConnections.remove(node);
192 clusterServices.tcommit();
193 } catch (Exception e) {
194 log.error("Exception in removing Controller from a Node", e);
196 clusterServices.trollback();
197 } catch (Exception e1) {
198 log.error("Error Rolling back the node Connections Changes ", e);
200 return new Status(StatusCode.INTERNALERROR);
205 return new Status(StatusCode.SUCCESS);
210 * A few race-conditions were seen with the Clustered caches in putIfAbsent and replace
211 * functions. Leaving a few debug logs behind to assist in debugging if strange things happen.
213 private Status putNodeToController (Node node, InetAddress controller) {
214 if (clusterServices == null || nodeConnections == null) {
215 return new Status(StatusCode.INTERNALERROR, "Cluster service unavailable, or node connections info missing.");
217 log.debug("Trying to Put {} to {}", controller.getHostAddress(), node.toString());
219 Set <InetAddress> oldControllers = nodeConnections.get(node);
220 Set <InetAddress> newControllers = null;
221 if (oldControllers == null) {
222 newControllers = new HashSet<InetAddress>();
224 if (oldControllers.size() > 0 && !isConnectionAllowed(node)) {
226 * In certain race conditions, the putIfAbsent fails to be atomic.
227 * This check is added to identify such cases and report an warning
230 log.warn("States Exists for {} : {}", node, oldControllers.toString());
232 newControllers = new HashSet<InetAddress>(oldControllers);
234 newControllers.add(controller);
237 clusterServices.tbegin();
238 if (nodeConnections.putIfAbsent(node, newControllers) != null) {
239 log.debug("PutIfAbsent failed {} to {}", controller.getHostAddress(), node.toString());
241 * This check is needed again to take care of the case where some schemes
242 * would not allow nodes to be connected to multiple controllers.
243 * Hence, if putIfAbsent fails, that means, some other controller is competing
244 * with this controller to take hold of a Node.
246 if (isConnectionAllowed(node)) {
247 if (oldControllers == null || !nodeConnections.replace(node, oldControllers, newControllers)) {
248 clusterServices.trollback();
251 } catch ( InterruptedException e) {}
252 log.debug("Retrying ... {} with {}", controller.getHostAddress(), node.toString());
253 return putNodeToController(node, controller);
255 log.debug("Replace successful old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
256 controller.getHostAddress(), node.toString());
259 clusterServices.trollback();
260 return new Status(StatusCode.CONFLICT);
263 log.debug("Added {} to {}", controller.getHostAddress(), node.toString());
265 clusterServices.tcommit();
266 } catch (Exception e) {
267 log.error("Excepion in adding Controller to a Node", e);
269 clusterServices.trollback();
270 } catch (Exception e1) {
271 log.error("Error Rolling back the node Connections Changes ", e);
273 return new Status(StatusCode.INTERNALERROR);
275 return new Status(StatusCode.SUCCESS);
278 public Status addNode (Node node, InetAddress controller) {
279 if (node == null || controller == null) {
281 log.warn("addNode: node is null");
282 } else if (controller == null) {
283 log.error("Failed to add node {}. The controller address retrieved from clusterServices is null.", node);
285 return new Status(StatusCode.BADREQUEST);
288 return new Status(StatusCode.SUCCESS);
290 if (isConnectionAllowed(node)) {
291 return putNodeToController(node, controller);
293 return new Status(StatusCode.NOTALLOWED);
297 public Status addNode (Node node) {
298 return addNode(node, clusterServices.getMyAddress());
301 @SuppressWarnings({ "unchecked" })
302 private void retrieveCaches() {
303 if (this.clusterServices == null) {
304 log.error("Un-initialized Cluster Services, can't retrieve caches for scheme: {}", name);
308 nodeConnections = (ConcurrentMap<Node, Set<InetAddress>>) clusterServices.getCache(nodeConnectionsCacheName);
310 if (nodeConnections == null) {
311 log.error("\nFailed to get cache: {}", nodeConnectionsCacheName);
315 private void allocateCaches() {
316 if (this.clusterServices == null) {
317 log.error("Un-initialized clusterServices, can't create cache");
322 clusterServices.createCache(nodeConnectionsCacheName, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
323 } catch (CacheExistException cee) {
324 log.debug("\nCache already exists: {}", nodeConnectionsCacheName);
325 } catch (CacheConfigException cce) {
326 log.error("\nCache configuration invalid - check cache mode");
327 } catch (Exception e) {
328 log.error("An error occured",e);