1 package org.opendaylight.controller.connectionmanager.scheme;
3 import java.net.InetAddress;
4 import java.util.ArrayList;
5 import java.util.EnumSet;
6 import java.util.HashSet;
7 import java.util.Iterator;
10 import java.util.concurrent.ConcurrentHashMap;
11 import java.util.concurrent.ConcurrentMap;
13 import javax.transaction.SystemException;
15 import org.opendaylight.controller.clustering.services.CacheConfigException;
16 import org.opendaylight.controller.clustering.services.CacheExistException;
17 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
18 import org.opendaylight.controller.clustering.services.IClusterServices;
19 import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme;
20 import org.opendaylight.controller.sal.core.Node;
21 import org.opendaylight.controller.sal.utils.Status;
22 import org.opendaylight.controller.sal.utils.StatusCode;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
26 public abstract class AbstractScheme {
27 private static final Logger log = LoggerFactory.getLogger(AbstractScheme.class);
28 protected IClusterGlobalServices clusterServices = null;
30 * A more natural Map data-structure is to have a Key=Controller IP-address with value = a set of Nodes.
31 * But, such a data-structure results in some complex event processing during the Cluster operations
32 * to sync up the Connection states.
34 * A data-structure with Node as the key and set of controllers provides a good balance
35 * between the needed functionality and simpler clustering implementation for Connection Manager.
37 protected ConcurrentMap <Node, Set<InetAddress>> nodeConnections;
38 protected abstract boolean isConnectionAllowedInternal(Node node);
41 protected AbstractScheme(IClusterGlobalServices clusterServices, ConnectionMgmtScheme type) {
42 this.clusterServices = clusterServices;
43 if (type != null) name = type.name();
44 else name = "UNKNOWN";
45 if (clusterServices != null) {
51 protected ConcurrentMap <InetAddress, Set<Node>> getControllerToNodesMap() {
52 ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = new ConcurrentHashMap <InetAddress, Set<Node>>();
53 for (Node node : nodeConnections.keySet()) {
54 Set<InetAddress> controllers = nodeConnections.get(node);
55 if (controllers == null) continue;
56 for (InetAddress controller : controllers) {
57 Set<Node> nodes = controllerNodesMap.get(controller);
59 nodes = new HashSet<Node>();
62 controllerNodesMap.put(controller, nodes);
65 return controllerNodesMap;
68 public boolean isConnectionAllowed (Node node) {
69 if (clusterServices == null || nodeConnections == null) {
73 return isConnectionAllowedInternal(node);
76 @SuppressWarnings("deprecation")
77 public void handleClusterViewChanged() {
78 log.debug("Handling Cluster View changed notification");
79 List<InetAddress> controllers = clusterServices.getClusteredControllers();
80 ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
81 List<InetAddress> toRemove = new ArrayList<InetAddress>();
82 for (InetAddress c : controllerNodesMap.keySet()) {
83 if (!controllers.contains(c)) {
88 boolean retry = false;
89 for (InetAddress c : toRemove) {
90 log.debug("Removing Controller : {} from the Connections table", c);
91 for (Iterator<Node> nodeIterator = nodeConnections.keySet().iterator();nodeIterator.hasNext();) {
92 Node node = nodeIterator.next();
93 Set <InetAddress> oldControllers = nodeConnections.get(node);
94 Set <InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
95 if (newControllers.remove(c)) {
97 clusterServices.tbegin();
98 if (!nodeConnections.replace(node, oldControllers, newControllers)) {
99 log.debug("Replace Failed for {} ", node.toString());
101 clusterServices.trollback();
104 clusterServices.tcommit();
106 } catch (Exception e) {
107 log.debug("Exception in replacing nodeConnections ", e);
110 clusterServices.trollback();
111 } catch (Exception e1) {}
120 } catch (InterruptedException e) {}
121 handleClusterViewChanged();
125 public Set<Node> getNodes(InetAddress controller) {
126 if (nodeConnections == null) return null;
127 ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
128 return controllerNodesMap.get(controller);
131 @SuppressWarnings("deprecation")
132 public Set<Node> getNodes() {
133 return getNodes(clusterServices.getMyAddress());
136 public Set<InetAddress> getControllers(Node node) {
137 if (nodeConnections != null) return nodeConnections.get(node);
141 public ConcurrentMap<Node, Set<InetAddress>> getNodeConnections() {
142 return nodeConnections;
145 @SuppressWarnings("deprecation")
146 public boolean isLocal(Node node) {
147 if (nodeConnections == null) return false;
148 InetAddress myController = clusterServices.getMyAddress();
149 Set<InetAddress> controllers = nodeConnections.get(node);
150 return (controllers != null && controllers.contains(myController));
153 @SuppressWarnings("deprecation")
154 public Status removeNode (Node node) {
155 return removeNodeFromController(node, clusterServices.getMyAddress());
158 protected Status removeNodeFromController (Node node, InetAddress controller) {
159 if (node == null || controller == null) {
160 return new Status(StatusCode.BADREQUEST);
163 if (clusterServices == null || nodeConnections == null) {
164 return new Status(StatusCode.SUCCESS);
167 Set<InetAddress> oldControllers = nodeConnections.get(node);
169 if (oldControllers != null && oldControllers.contains(controller)) {
170 Set<InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
171 if (newControllers.remove(controller)) {
173 clusterServices.tbegin();
174 if (newControllers.size() > 0) {
175 if (!nodeConnections.replace(node, oldControllers, newControllers)) {
176 clusterServices.trollback();
179 } catch ( InterruptedException e) {}
180 return removeNodeFromController(node, controller);
183 nodeConnections.remove(node);
185 clusterServices.tcommit();
186 } catch (Exception e) {
187 log.error("Excepion in removing Controller from a Node", e);
189 clusterServices.trollback();
190 } catch (Exception e1) {
191 log.error("Error Rolling back the node Connections Changes ", e);
193 return new Status(StatusCode.INTERNALERROR);
198 return new Status(StatusCode.SUCCESS);
203 * A few race-conditions were seen with the Clustered caches in putIfAbsent and replace
204 * functions. Leaving a few debug logs behind to assist in debugging if strange things happen.
206 private Status putNodeToController (Node node, InetAddress controller) {
207 if (clusterServices == null || nodeConnections == null) {
208 return new Status(StatusCode.SUCCESS);
210 log.debug("Trying to Put {} to {}", controller.getHostAddress(), node.toString());
212 Set <InetAddress> oldControllers = nodeConnections.get(node);
213 Set <InetAddress> newControllers = null;
214 if (oldControllers == null) {
215 newControllers = new HashSet<InetAddress>();
217 if (oldControllers.size() > 0 && !isConnectionAllowed(node)) {
219 * In certain race conditions, the putIfAbsent fails to be atomic.
220 * This check is added to identify such cases and report an warning
223 log.warn("States Exists for {} : {}", node, oldControllers.toString());
225 newControllers = new HashSet<InetAddress>(oldControllers);
227 newControllers.add(controller);
230 clusterServices.tbegin();
231 if (nodeConnections.putIfAbsent(node, newControllers) != null) {
232 log.debug("PutIfAbsent failed {} to {}", controller.getHostAddress(), node.toString());
234 * This check is needed again to take care of the case where some schemes
235 * would not allow nodes to be connected to multiple controllers.
236 * Hence, if putIfAbsent fails, that means, some other controller is competing
237 * with this controller to take hold of a Node.
239 if (isConnectionAllowed(node)) {
240 if (oldControllers == null || !nodeConnections.replace(node, oldControllers, newControllers)) {
241 clusterServices.trollback();
244 } catch ( InterruptedException e) {}
245 log.debug("Retrying ... {} with {}", controller.getHostAddress(), node.toString());
246 return putNodeToController(node, controller);
248 log.debug("Replace successful old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
249 controller.getHostAddress(), node.toString());
252 clusterServices.trollback();
253 return new Status(StatusCode.CONFLICT);
256 log.debug("Added {} to {}", controller.getHostAddress(), node.toString());
258 clusterServices.tcommit();
259 } catch (Exception e) {
260 log.error("Excepion in adding Controller to a Node", e);
262 clusterServices.trollback();
263 } catch (Exception e1) {
264 log.error("Error Rolling back the node Connections Changes ", e);
266 return new Status(StatusCode.INTERNALERROR);
268 return new Status(StatusCode.SUCCESS);
271 public Status addNode (Node node, InetAddress controller) {
272 if (node == null || controller == null) {
273 return new Status(StatusCode.BADREQUEST);
275 if (isLocal(node)) return new Status(StatusCode.SUCCESS);
276 if (isConnectionAllowed(node)) {
277 return putNodeToController(node, controller);
279 return new Status(StatusCode.NOTALLOWED);
283 @SuppressWarnings("deprecation")
284 public Status addNode (Node node) {
285 return addNode(node, clusterServices.getMyAddress());
288 @SuppressWarnings({ "unchecked", "deprecation" })
289 private void retrieveCaches() {
290 if (this.clusterServices == null) {
291 log.error("un-initialized clusterServices, can't retrieve cache");
295 nodeConnections = (ConcurrentMap<Node, Set<InetAddress>>) clusterServices.getCache("connectionmanager."+name+".nodeconnections");
297 if (nodeConnections == null) {
298 log.error("\nFailed to get caches");
302 @SuppressWarnings("deprecation")
303 private void allocateCaches() {
304 if (this.clusterServices == null) {
305 log.error("un-initialized clusterServices, can't create cache");
310 clusterServices.createCache("connectionmanager."+name+".nodeconnections", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
311 } catch (CacheExistException cee) {
312 log.error("\nCache already exists - destroy and recreate if needed");
313 } catch (CacheConfigException cce) {
314 log.error("\nCache configuration invalid - check cache mode");
315 } catch (Exception e) {
316 log.error("An error occured",e);