1 package org.opendaylight.controller.connectionmanager.scheme;
3 import java.net.InetAddress;
4 import java.util.ArrayList;
5 import java.util.EnumSet;
6 import java.util.HashSet;
7 import java.util.Iterator;
10 import java.util.concurrent.ConcurrentHashMap;
11 import java.util.concurrent.ConcurrentMap;
13 import javax.transaction.SystemException;
15 import org.opendaylight.controller.clustering.services.CacheConfigException;
16 import org.opendaylight.controller.clustering.services.CacheExistException;
17 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
18 import org.opendaylight.controller.clustering.services.IClusterServices;
19 import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme;
20 import org.opendaylight.controller.sal.core.Node;
21 import org.opendaylight.controller.sal.utils.Status;
22 import org.opendaylight.controller.sal.utils.StatusCode;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
26 public abstract class AbstractScheme {
27 private static final Logger log = LoggerFactory.getLogger(AbstractScheme.class);
28 protected IClusterGlobalServices clusterServices = null;
30 * A more natural Map data-structure is to have a Key=Controller IP-address with value = a set of Nodes.
31 * But, such a data-structure results in some complex event processing during the Cluster operations
32 * to sync up the Connection states.
34 * A data-structure with Node as the key and set of controllers provides a good balance
35 * between the needed functionality and simpler clustering implementation for Connection Manager.
37 protected ConcurrentMap <Node, Set<InetAddress>> nodeConnections;
38 protected abstract boolean isConnectionAllowedInternal(Node node);
41 protected AbstractScheme(IClusterGlobalServices clusterServices, ConnectionMgmtScheme type) {
42 this.clusterServices = clusterServices;
43 if (type != null) name = type.name();
44 else name = "UNKNOWN";
45 if (clusterServices != null) {
51 protected ConcurrentMap <InetAddress, Set<Node>> getControllerToNodesMap() {
52 ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = new ConcurrentHashMap <InetAddress, Set<Node>>();
53 for (Node node : nodeConnections.keySet()) {
54 Set<InetAddress> controllers = nodeConnections.get(node);
55 if (controllers == null) continue;
56 for (InetAddress controller : controllers) {
57 Set<Node> nodes = controllerNodesMap.get(controller);
59 nodes = new HashSet<Node>();
62 controllerNodesMap.put(controller, nodes);
65 return controllerNodesMap;
68 public boolean isConnectionAllowed (Node node) {
69 if (clusterServices == null || nodeConnections == null) {
73 return isConnectionAllowedInternal(node);
76 @SuppressWarnings("deprecation")
77 public void handleClusterViewChanged() {
78 List<InetAddress> controllers = clusterServices.getClusteredControllers();
79 ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
80 List<InetAddress> toRemove = new ArrayList<InetAddress>();
81 for (InetAddress c : controllerNodesMap.keySet()) {
82 if (!controllers.contains(c)) {
87 boolean retry = false;
88 for (InetAddress c : toRemove) {
89 log.debug("Removing Controller : {} from the Connections table", c);
90 for (Iterator<Node> nodeIterator = nodeConnections.keySet().iterator();nodeIterator.hasNext();) {
91 Node node = nodeIterator.next();
92 Set <InetAddress> oldControllers = nodeConnections.get(node);
93 Set <InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
94 if (newControllers.remove(c)) {
96 clusterServices.tbegin();
97 if (!nodeConnections.replace(node, oldControllers, newControllers)) {
98 log.debug("Replace Failed for {} ", node.toString());
100 clusterServices.trollback();
103 clusterServices.tcommit();
105 } catch (Exception e) {
106 log.error("Exception in replacing nodeConnections ", e);
109 clusterServices.trollback();
110 } catch (Exception e1) {}
119 } catch (InterruptedException e) {
122 handleClusterViewChanged();
126 public Set<Node> getNodes(InetAddress controller) {
127 if (nodeConnections == null) return null;
128 ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
129 return controllerNodesMap.get(controller);
132 @SuppressWarnings("deprecation")
133 public Set<Node> getNodes() {
134 return getNodes(clusterServices.getMyAddress());
137 public Set<InetAddress> getControllers(Node node) {
138 if (nodeConnections != null) return nodeConnections.get(node);
142 public ConcurrentMap<Node, Set<InetAddress>> getNodeConnections() {
143 return nodeConnections;
146 @SuppressWarnings("deprecation")
147 public boolean isLocal(Node node) {
148 if (nodeConnections == null) return false;
149 InetAddress myController = clusterServices.getMyAddress();
150 Set<InetAddress> controllers = nodeConnections.get(node);
151 return (controllers != null && controllers.contains(myController));
154 @SuppressWarnings("deprecation")
155 public Status removeNode (Node node) {
156 return removeNodeFromController(node, clusterServices.getMyAddress());
159 protected Status removeNodeFromController (Node node, InetAddress controller) {
160 if (node == null || controller == null) {
161 return new Status(StatusCode.BADREQUEST);
164 if (clusterServices == null || nodeConnections == null) {
165 return new Status(StatusCode.SUCCESS);
168 Set<InetAddress> oldControllers = nodeConnections.get(node);
170 if (oldControllers != null && oldControllers.contains(controller)) {
171 Set<InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
172 if (newControllers.remove(controller)) {
174 clusterServices.tbegin();
175 if (newControllers.size() > 0) {
176 if (!nodeConnections.replace(node, oldControllers, newControllers)) {
177 clusterServices.trollback();
180 } catch ( InterruptedException e) {}
181 return removeNodeFromController(node, controller);
184 nodeConnections.remove(node);
186 clusterServices.tcommit();
187 } catch (Exception e) {
188 log.error("Excepion in removing Controller from a Node", e);
190 clusterServices.trollback();
191 } catch (Exception e1) {
192 log.error("Error Rolling back the node Connections Changes ", e);
194 return new Status(StatusCode.INTERNALERROR);
199 return new Status(StatusCode.SUCCESS);
204 * A few race-conditions were seen with the Clustered caches in putIfAbsent and replace
205 * functions. Leaving a few debug logs behind to assist in debugging if strange things happen.
207 private Status putNodeToController (Node node, InetAddress controller) {
208 if (clusterServices == null || nodeConnections == null) {
209 return new Status(StatusCode.SUCCESS);
211 log.debug("Trying to Put {} to {}", controller.getHostAddress(), node.toString());
213 Set <InetAddress> oldControllers = nodeConnections.get(node);
214 Set <InetAddress> newControllers = null;
215 if (oldControllers == null) {
216 newControllers = new HashSet<InetAddress>();
218 if (oldControllers.size() > 0 && !isConnectionAllowed(node)) {
220 * In certain race conditions, the putIfAbsent fails to be atomic.
221 * This check is added to identify such cases and report an warning
224 log.warn("States Exists for {} : {}", node, oldControllers.toString());
226 newControllers = new HashSet<InetAddress>(oldControllers);
228 newControllers.add(controller);
231 clusterServices.tbegin();
232 if (nodeConnections.putIfAbsent(node, newControllers) != null) {
233 log.debug("PutIfAbsent failed {} to {}", controller.getHostAddress(), node.toString());
235 * This check is needed again to take care of the case where some schemes
236 * would not allow nodes to be connected to multiple controllers.
237 * Hence, if putIfAbsent fails, that means, some other controller is competing
238 * with this controller to take hold of a Node.
240 if (isConnectionAllowed(node)) {
241 if (!nodeConnections.replace(node, oldControllers, newControllers)) {
242 clusterServices.trollback();
245 } catch ( InterruptedException e) {}
246 log.debug("Replace failed... old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
247 controller.getHostAddress(), node.toString());
248 return putNodeToController(node, controller);
250 log.debug("Replace successful old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
251 controller.getHostAddress(), node.toString());
254 clusterServices.trollback();
255 return new Status(StatusCode.CONFLICT);
258 log.debug("Added {} to {}", controller.getHostAddress(), node.toString());
260 clusterServices.tcommit();
261 } catch (Exception e) {
262 log.error("Excepion in adding Controller to a Node", e);
264 clusterServices.trollback();
265 } catch (Exception e1) {
266 log.error("Error Rolling back the node Connections Changes ", e);
268 return new Status(StatusCode.INTERNALERROR);
270 return new Status(StatusCode.SUCCESS);
273 public Status addNode (Node node, InetAddress controller) {
274 if (node == null || controller == null) {
275 return new Status(StatusCode.BADREQUEST);
277 if (isLocal(node)) return new Status(StatusCode.SUCCESS);
278 if (isConnectionAllowed(node)) {
279 return putNodeToController(node, controller);
281 return new Status(StatusCode.NOTALLOWED);
285 @SuppressWarnings("deprecation")
286 public Status addNode (Node node) {
287 return addNode(node, clusterServices.getMyAddress());
290 @SuppressWarnings({ "unchecked", "deprecation" })
291 private void retrieveCaches() {
292 if (this.clusterServices == null) {
293 log.error("un-initialized clusterServices, can't retrieve cache");
297 nodeConnections = (ConcurrentMap<Node, Set<InetAddress>>) clusterServices.getCache("connectionmanager."+name+".nodeconnections");
299 if (nodeConnections == null) {
300 log.error("\nFailed to get caches");
304 @SuppressWarnings("deprecation")
305 private void allocateCaches() {
306 if (this.clusterServices == null) {
307 log.error("un-initialized clusterServices, can't create cache");
312 clusterServices.createCache("connectionmanager."+name+".nodeconnections", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
313 } catch (CacheExistException cee) {
314 log.error("\nCache already exists - destroy and recreate if needed");
315 } catch (CacheConfigException cce) {
316 log.error("\nCache configuration invalid - check cache mode");
317 } catch (Exception e) {