Northbound cleanup for Switch Manager
[controller.git] / opendaylight / connectionmanager / implementation / src / main / java / org / opendaylight / controller / connectionmanager / scheme / AbstractScheme.java
1 package org.opendaylight.controller.connectionmanager.scheme;
2
3 import java.net.InetAddress;
4 import java.util.ArrayList;
5 import java.util.EnumSet;
6 import java.util.HashSet;
7 import java.util.Iterator;
8 import java.util.List;
9 import java.util.Set;
10 import java.util.concurrent.ConcurrentHashMap;
11 import java.util.concurrent.ConcurrentMap;
12
13 import javax.transaction.SystemException;
14
15 import org.opendaylight.controller.clustering.services.CacheConfigException;
16 import org.opendaylight.controller.clustering.services.CacheExistException;
17 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
18 import org.opendaylight.controller.clustering.services.IClusterServices;
19 import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme;
20 import org.opendaylight.controller.sal.core.Node;
21 import org.opendaylight.controller.sal.utils.Status;
22 import org.opendaylight.controller.sal.utils.StatusCode;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 public abstract class AbstractScheme {
27     private static final Logger log = LoggerFactory.getLogger(AbstractScheme.class);
28     protected IClusterGlobalServices clusterServices = null;
29     /*
30      * A more natural Map data-structure is to have a Key=Controller IP-address with value = a set of Nodes.
31      * But, such a data-structure results in some complex event processing during the Cluster operations
32      * to sync up the Connection states.
33      *
34      * A data-structure with Node as the key and set of controllers provides a good balance
35      * between the needed functionality and simpler clustering implementation for Connection Manager.
36      */
37     protected ConcurrentMap <Node, Set<InetAddress>> nodeConnections;
38     protected abstract boolean isConnectionAllowedInternal(Node node);
39     private String name;
40
41     protected AbstractScheme(IClusterGlobalServices clusterServices, ConnectionMgmtScheme type) {
42         this.clusterServices = clusterServices;
43         if (type != null) name = type.name();
44         else name = "UNKNOWN";
45         if (clusterServices != null) {
46             allocateCaches();
47             retrieveCaches();
48         }
49     }
50
51     protected ConcurrentMap <InetAddress, Set<Node>> getControllerToNodesMap() {
52         ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = new ConcurrentHashMap <InetAddress, Set<Node>>();
53         for (Node node : nodeConnections.keySet()) {
54             Set<InetAddress> controllers = nodeConnections.get(node);
55             if (controllers == null) continue;
56             for (InetAddress controller : controllers) {
57                 Set<Node> nodes = controllerNodesMap.get(controller);
58                 if (nodes == null) {
59                     nodes = new HashSet<Node>();
60                 }
61                 nodes.add(node);
62                 controllerNodesMap.put(controller, nodes);
63             }
64         }
65         return controllerNodesMap;
66     }
67
68     public boolean isConnectionAllowed (Node node) {
69         if (clusterServices == null || nodeConnections == null) {
70             return false;
71         }
72
73         return isConnectionAllowedInternal(node);
74     }
75
76     @SuppressWarnings("deprecation")
77     public void handleClusterViewChanged() {
78         log.debug("Handling Cluster View changed notification");
79         List<InetAddress> controllers = clusterServices.getClusteredControllers();
80         ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
81         List<InetAddress> toRemove = new ArrayList<InetAddress>();
82         for (InetAddress c : controllerNodesMap.keySet()) {
83             if (!controllers.contains(c)) {
84                 toRemove.add(c);
85             }
86         }
87
88         boolean retry = false;
89         for (InetAddress c : toRemove) {
90             log.debug("Removing Controller : {} from the Connections table", c);
91             for (Iterator<Node> nodeIterator = nodeConnections.keySet().iterator();nodeIterator.hasNext();) {
92                 Node node = nodeIterator.next();
93                 Set <InetAddress> oldControllers = nodeConnections.get(node);
94                 Set <InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
95                 if (newControllers.remove(c)) {
96                     try {
97                         clusterServices.tbegin();
98                         if (!nodeConnections.replace(node, oldControllers, newControllers)) {
99                             log.debug("Replace Failed for {} ", node.toString());
100                             retry = true;
101                             clusterServices.trollback();
102                             break;
103                         } else {
104                             clusterServices.tcommit();
105                         }
106                     } catch (Exception e) {
107                         log.debug("Exception in replacing nodeConnections ", e);
108                         retry = true;
109                         try {
110                             clusterServices.trollback();
111                         } catch (Exception e1) {}
112                         break;
113                     }
114                 }
115             }
116         }
117         if (retry) {
118             try {
119                 Thread.sleep(1000);
120             } catch (InterruptedException e) {}
121             handleClusterViewChanged();
122         }
123     }
124
125     public Set<Node> getNodes(InetAddress controller) {
126         if (nodeConnections == null) return null;
127         ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
128         return controllerNodesMap.get(controller);
129     }
130
131     @SuppressWarnings("deprecation")
132     public Set<Node> getNodes() {
133         return getNodes(clusterServices.getMyAddress());
134     }
135
136     public Set<InetAddress> getControllers(Node node) {
137         if (nodeConnections != null) return nodeConnections.get(node);
138         return null;
139     }
140
141     public ConcurrentMap<Node, Set<InetAddress>> getNodeConnections() {
142         return nodeConnections;
143     }
144
145     @SuppressWarnings("deprecation")
146     public boolean isLocal(Node node) {
147         if (nodeConnections == null) return false;
148         InetAddress myController = clusterServices.getMyAddress();
149         Set<InetAddress> controllers = nodeConnections.get(node);
150         return (controllers != null && controllers.contains(myController));
151     }
152
153     @SuppressWarnings("deprecation")
154     public Status removeNode (Node node) {
155         return removeNodeFromController(node, clusterServices.getMyAddress());
156     }
157
158     protected Status removeNodeFromController (Node node, InetAddress controller) {
159         if (node == null || controller == null) {
160             return new Status(StatusCode.BADREQUEST);
161         }
162
163         if (clusterServices == null || nodeConnections == null) {
164             return new Status(StatusCode.SUCCESS);
165         }
166
167         Set<InetAddress> oldControllers = nodeConnections.get(node);
168
169         if (oldControllers != null && oldControllers.contains(controller)) {
170             Set<InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
171             if (newControllers.remove(controller)) {
172                 try {
173                     clusterServices.tbegin();
174                     if (newControllers.size() > 0) {
175                         if (!nodeConnections.replace(node, oldControllers, newControllers)) {
176                             clusterServices.trollback();
177                             try {
178                                 Thread.sleep(100);
179                             } catch ( InterruptedException e) {}
180                             return removeNodeFromController(node, controller);
181                         }
182                     } else {
183                         nodeConnections.remove(node);
184                     }
185                     clusterServices.tcommit();
186                 } catch (Exception e) {
187                     log.error("Excepion in removing Controller from a Node", e);
188                     try {
189                         clusterServices.trollback();
190                     } catch (Exception e1) {
191                         log.error("Error Rolling back the node Connections Changes ", e);
192                     }
193                     return new Status(StatusCode.INTERNALERROR);
194                 }
195
196             }
197         }
198         return new Status(StatusCode.SUCCESS);
199
200     }
201
202     /*
203      * A few race-conditions were seen with the Clustered caches in putIfAbsent and replace
204      * functions. Leaving a few debug logs behind to assist in debugging if strange things happen.
205      */
206     private Status putNodeToController (Node node, InetAddress controller) {
207         if (clusterServices == null || nodeConnections == null) {
208             return new Status(StatusCode.SUCCESS);
209         }
210         log.debug("Trying to Put {} to {}", controller.getHostAddress(), node.toString());
211
212         Set <InetAddress> oldControllers = nodeConnections.get(node);
213         Set <InetAddress> newControllers = null;
214         if (oldControllers == null) {
215             newControllers = new HashSet<InetAddress>();
216         } else {
217             if (oldControllers.size() > 0 && !isConnectionAllowed(node)) {
218                 /*
219                  * In certain race conditions, the putIfAbsent fails to be atomic.
220                  * This check is added to identify such cases and report an warning
221                  * for debugging.
222                  */
223                 log.warn("States Exists for {} : {}", node, oldControllers.toString());
224             }
225             newControllers = new HashSet<InetAddress>(oldControllers);
226         }
227         newControllers.add(controller);
228
229         try {
230             clusterServices.tbegin();
231             if (nodeConnections.putIfAbsent(node, newControllers) != null) {
232                 log.debug("PutIfAbsent failed {} to {}", controller.getHostAddress(), node.toString());
233                 /*
234                  * This check is needed again to take care of the case where some schemes
235                  * would not allow nodes to be connected to multiple controllers.
236                  * Hence, if putIfAbsent fails, that means, some other controller is competing
237                  * with this controller to take hold of a Node.
238                  */
239                 if (isConnectionAllowed(node)) {
240                     if (!nodeConnections.replace(node, oldControllers, newControllers)) {
241                         clusterServices.trollback();
242                         try {
243                             Thread.sleep(100);
244                         } catch ( InterruptedException e) {}
245                         log.debug("Replace failed... old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
246                                 controller.getHostAddress(), node.toString());
247                         return putNodeToController(node, controller);
248                     } else {
249                         log.debug("Replace successful old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
250                                 controller.getHostAddress(), node.toString());
251                     }
252                 } else {
253                     clusterServices.trollback();
254                     return new Status(StatusCode.CONFLICT);
255                 }
256             } else {
257                 log.debug("Added {} to {}", controller.getHostAddress(), node.toString());
258             }
259             clusterServices.tcommit();
260         } catch (Exception e) {
261             log.error("Excepion in adding Controller to a Node", e);
262             try {
263                 clusterServices.trollback();
264             } catch (Exception e1) {
265                 log.error("Error Rolling back the node Connections Changes ", e);
266             }
267             return new Status(StatusCode.INTERNALERROR);
268         }
269         return new Status(StatusCode.SUCCESS);
270     }
271
272     public Status addNode (Node node, InetAddress controller) {
273         if (node == null || controller == null) {
274             return new Status(StatusCode.BADREQUEST);
275         }
276         if (isLocal(node)) return new Status(StatusCode.SUCCESS);
277         if (isConnectionAllowed(node)) {
278             return putNodeToController(node, controller);
279         } else {
280             return new Status(StatusCode.NOTALLOWED);
281         }
282     }
283
284     @SuppressWarnings("deprecation")
285     public Status addNode (Node node) {
286         return addNode(node, clusterServices.getMyAddress());
287     }
288
289     @SuppressWarnings({ "unchecked", "deprecation" })
290     private void retrieveCaches() {
291         if (this.clusterServices == null) {
292             log.error("un-initialized clusterServices, can't retrieve cache");
293             return;
294         }
295
296         nodeConnections = (ConcurrentMap<Node, Set<InetAddress>>) clusterServices.getCache("connectionmanager."+name+".nodeconnections");
297
298         if (nodeConnections == null) {
299             log.error("\nFailed to get caches");
300         }
301     }
302
303     @SuppressWarnings("deprecation")
304     private void allocateCaches() {
305         if (this.clusterServices == null) {
306             log.error("un-initialized clusterServices, can't create cache");
307             return;
308         }
309
310         try {
311             clusterServices.createCache("connectionmanager."+name+".nodeconnections", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
312         } catch (CacheExistException cee) {
313             log.error("\nCache already exists - destroy and recreate if needed");
314         } catch (CacheConfigException cce) {
315             log.error("\nCache configuration invalid - check cache mode");
316         } catch (Exception e) {
317             e.printStackTrace();
318         }
319     }
320 }