Merge "Conversion class ToSalConversionsUtils."
[controller.git] / opendaylight / connectionmanager / implementation / src / main / java / org / opendaylight / controller / connectionmanager / scheme / AbstractScheme.java
1 package org.opendaylight.controller.connectionmanager.scheme;
2
3 import java.net.InetAddress;
4 import java.util.ArrayList;
5 import java.util.EnumSet;
6 import java.util.HashSet;
7 import java.util.Iterator;
8 import java.util.List;
9 import java.util.Set;
10 import java.util.concurrent.ConcurrentHashMap;
11 import java.util.concurrent.ConcurrentMap;
12
13 import org.opendaylight.controller.clustering.services.CacheConfigException;
14 import org.opendaylight.controller.clustering.services.CacheExistException;
15 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
16 import org.opendaylight.controller.clustering.services.IClusterServices;
17 import org.opendaylight.controller.connectionmanager.ConnectionLocality;
18 import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme;
19 import org.opendaylight.controller.sal.core.Node;
20 import org.opendaylight.controller.sal.utils.Status;
21 import org.opendaylight.controller.sal.utils.StatusCode;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 public abstract class AbstractScheme {
26     private static final Logger log = LoggerFactory.getLogger(AbstractScheme.class);
27     protected IClusterGlobalServices clusterServices = null;
28     /*
29      * A more natural Map data-structure is to have a Key=Controller IP-address with value = a set of Nodes.
30      * But, such a data-structure results in some complex event processing during the Cluster operations
31      * to sync up the Connection states.
32      *
33      * A data-structure with Node as the key and set of controllers provides a good balance
34      * between the needed functionality and simpler clustering implementation for Connection Manager.
35      */
36     protected ConcurrentMap <Node, Set<InetAddress>> nodeConnections;
37     protected abstract boolean isConnectionAllowedInternal(Node node);
38     private String name;
39
40     protected AbstractScheme(IClusterGlobalServices clusterServices, ConnectionMgmtScheme type) {
41         this.clusterServices = clusterServices;
42         if (type != null) name = type.name();
43         else name = "UNKNOWN";
44         if (clusterServices != null) {
45             allocateCaches();
46             retrieveCaches();
47         }
48     }
49
50     protected ConcurrentMap <InetAddress, Set<Node>> getControllerToNodesMap() {
51         ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = new ConcurrentHashMap <InetAddress, Set<Node>>();
52         for (Node node : nodeConnections.keySet()) {
53             Set<InetAddress> controllers = nodeConnections.get(node);
54             if (controllers == null) continue;
55             for (InetAddress controller : controllers) {
56                 Set<Node> nodes = controllerNodesMap.get(controller);
57                 if (nodes == null) {
58                     nodes = new HashSet<Node>();
59                 }
60                 nodes.add(node);
61                 controllerNodesMap.put(controller, nodes);
62             }
63         }
64         return controllerNodesMap;
65     }
66
67     public boolean isConnectionAllowed (Node node) {
68         if (clusterServices == null || nodeConnections == null) {
69             return false;
70         }
71
72         return isConnectionAllowedInternal(node);
73     }
74
75     @SuppressWarnings("deprecation")
76     public void handleClusterViewChanged() {
77         log.debug("Handling Cluster View changed notification");
78         List<InetAddress> controllers = clusterServices.getClusteredControllers();
79         ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
80         List<InetAddress> toRemove = new ArrayList<InetAddress>();
81         for (InetAddress c : controllerNodesMap.keySet()) {
82             if (!controllers.contains(c)) {
83                 toRemove.add(c);
84             }
85         }
86
87         boolean retry = false;
88         for (InetAddress c : toRemove) {
89             log.debug("Removing Controller : {} from the Connections table", c);
90             for (Iterator<Node> nodeIterator = nodeConnections.keySet().iterator();nodeIterator.hasNext();) {
91                 Node node = nodeIterator.next();
92                 Set <InetAddress> oldControllers = nodeConnections.get(node);
93                 Set <InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
94                 if (newControllers.remove(c)) {
95                     try {
96                         clusterServices.tbegin();
97                         if (!nodeConnections.replace(node, oldControllers, newControllers)) {
98                             log.debug("Replace Failed for {} ", node.toString());
99                             retry = true;
100                             clusterServices.trollback();
101                             break;
102                         } else {
103                             clusterServices.tcommit();
104                         }
105                     } catch (Exception e) {
106                         log.debug("Exception in replacing nodeConnections ", e);
107                         retry = true;
108                         try {
109                             clusterServices.trollback();
110                         } catch (Exception e1) {}
111                         break;
112                     }
113                 }
114             }
115         }
116         if (retry) {
117             try {
118                 Thread.sleep(1000);
119             } catch (InterruptedException e) {}
120             handleClusterViewChanged();
121         }
122     }
123
124     public Set<Node> getNodes(InetAddress controller) {
125         if (nodeConnections == null) return null;
126         ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
127         return controllerNodesMap.get(controller);
128     }
129
130     public Set<Node> getNodes() {
131         return getNodes(clusterServices.getMyAddress());
132     }
133
134     public Set<InetAddress> getControllers(Node node) {
135         if (nodeConnections != null) return nodeConnections.get(node);
136         return null;
137     }
138
139     public ConcurrentMap<Node, Set<InetAddress>> getNodeConnections() {
140         return nodeConnections;
141     }
142
143     public boolean isLocal(Node node) {
144         if (nodeConnections == null) return false;
145         InetAddress myController = clusterServices.getMyAddress();
146         Set<InetAddress> controllers = nodeConnections.get(node);
147         return (controllers != null && controllers.contains(myController));
148     }
149
150     public ConnectionLocality getLocalityStatus(Node node) {
151         if (nodeConnections == null) return ConnectionLocality.NOT_CONNECTED;
152         Set<InetAddress> controllers = nodeConnections.get(node);
153         if (controllers == null || controllers.size() == 0) return ConnectionLocality.NOT_CONNECTED;
154         InetAddress myController = clusterServices.getMyAddress();
155         return controllers.contains(myController) ? ConnectionLocality.LOCAL:
156                                                     ConnectionLocality.NOT_LOCAL;
157     }
158
159     public Status removeNode (Node node) {
160         return removeNodeFromController(node, clusterServices.getMyAddress());
161     }
162
163     protected Status removeNodeFromController (Node node, InetAddress controller) {
164         if (node == null || controller == null) {
165             return new Status(StatusCode.BADREQUEST);
166         }
167
168         if (clusterServices == null || nodeConnections == null) {
169             return new Status(StatusCode.SUCCESS);
170         }
171
172         Set<InetAddress> oldControllers = nodeConnections.get(node);
173
174         if (oldControllers != null && oldControllers.contains(controller)) {
175             Set<InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
176             if (newControllers.remove(controller)) {
177                 try {
178                     clusterServices.tbegin();
179                     if (newControllers.size() > 0) {
180                         if (!nodeConnections.replace(node, oldControllers, newControllers)) {
181                             clusterServices.trollback();
182                             try {
183                                 Thread.sleep(100);
184                             } catch ( InterruptedException e) {}
185                             return removeNodeFromController(node, controller);
186                         }
187                     } else {
188                         nodeConnections.remove(node);
189                     }
190                     clusterServices.tcommit();
191                 } catch (Exception e) {
192                     log.error("Excepion in removing Controller from a Node", e);
193                     try {
194                         clusterServices.trollback();
195                     } catch (Exception e1) {
196                         log.error("Error Rolling back the node Connections Changes ", e);
197                     }
198                     return new Status(StatusCode.INTERNALERROR);
199                 }
200
201             }
202         }
203         return new Status(StatusCode.SUCCESS);
204
205     }
206
207     /*
208      * A few race-conditions were seen with the Clustered caches in putIfAbsent and replace
209      * functions. Leaving a few debug logs behind to assist in debugging if strange things happen.
210      */
211     private Status putNodeToController (Node node, InetAddress controller) {
212         if (clusterServices == null || nodeConnections == null) {
213             return new Status(StatusCode.SUCCESS);
214         }
215         log.debug("Trying to Put {} to {}", controller.getHostAddress(), node.toString());
216
217         Set <InetAddress> oldControllers = nodeConnections.get(node);
218         Set <InetAddress> newControllers = null;
219         if (oldControllers == null) {
220             newControllers = new HashSet<InetAddress>();
221         } else {
222             if (oldControllers.size() > 0 && !isConnectionAllowed(node)) {
223                 /*
224                  * In certain race conditions, the putIfAbsent fails to be atomic.
225                  * This check is added to identify such cases and report an warning
226                  * for debugging.
227                  */
228                 log.warn("States Exists for {} : {}", node, oldControllers.toString());
229             }
230             newControllers = new HashSet<InetAddress>(oldControllers);
231         }
232         newControllers.add(controller);
233
234         try {
235             clusterServices.tbegin();
236             if (nodeConnections.putIfAbsent(node, newControllers) != null) {
237                 log.debug("PutIfAbsent failed {} to {}", controller.getHostAddress(), node.toString());
238                 /*
239                  * This check is needed again to take care of the case where some schemes
240                  * would not allow nodes to be connected to multiple controllers.
241                  * Hence, if putIfAbsent fails, that means, some other controller is competing
242                  * with this controller to take hold of a Node.
243                  */
244                 if (isConnectionAllowed(node)) {
245                     if (oldControllers == null || !nodeConnections.replace(node, oldControllers, newControllers)) {
246                         clusterServices.trollback();
247                         try {
248                             Thread.sleep(100);
249                         } catch ( InterruptedException e) {}
250                         log.debug("Retrying ... {} with {}", controller.getHostAddress(), node.toString());
251                         return putNodeToController(node, controller);
252                     } else {
253                         log.debug("Replace successful old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
254                                 controller.getHostAddress(), node.toString());
255                     }
256                 } else {
257                     clusterServices.trollback();
258                     return new Status(StatusCode.CONFLICT);
259                 }
260             } else {
261                 log.debug("Added {} to {}", controller.getHostAddress(), node.toString());
262             }
263             clusterServices.tcommit();
264         } catch (Exception e) {
265             log.error("Excepion in adding Controller to a Node", e);
266             try {
267                 clusterServices.trollback();
268             } catch (Exception e1) {
269                 log.error("Error Rolling back the node Connections Changes ", e);
270             }
271             return new Status(StatusCode.INTERNALERROR);
272         }
273         return new Status(StatusCode.SUCCESS);
274     }
275
276     public Status addNode (Node node, InetAddress controller) {
277         if (node == null || controller == null) {
278             return new Status(StatusCode.BADREQUEST);
279         }
280         if (isLocal(node)) return new Status(StatusCode.SUCCESS);
281         if (isConnectionAllowed(node)) {
282             return putNodeToController(node, controller);
283         } else {
284             return new Status(StatusCode.NOTALLOWED);
285         }
286     }
287
288     @SuppressWarnings("deprecation")
289     public Status addNode (Node node) {
290         return addNode(node, clusterServices.getMyAddress());
291     }
292
293     @SuppressWarnings({ "unchecked", "deprecation" })
294     private void retrieveCaches() {
295         if (this.clusterServices == null) {
296             log.error("un-initialized clusterServices, can't retrieve cache");
297             return;
298         }
299
300         nodeConnections = (ConcurrentMap<Node, Set<InetAddress>>) clusterServices.getCache("connectionmanager."+name+".nodeconnections");
301
302         if (nodeConnections == null) {
303             log.error("\nFailed to get caches");
304         }
305     }
306
307     @SuppressWarnings("deprecation")
308     private void allocateCaches() {
309         if (this.clusterServices == null) {
310             log.error("un-initialized clusterServices, can't create cache");
311             return;
312         }
313
314         try {
315             clusterServices.createCache("connectionmanager."+name+".nodeconnections", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
316         } catch (CacheExistException cee) {
317             log.error("\nCache already exists - destroy and recreate if needed");
318         } catch (CacheConfigException cce) {
319             log.error("\nCache configuration invalid - check cache mode");
320         } catch (Exception e) {
321             log.error("An error occured",e);
322         }
323     }
324 }