ce0325d0727114de303dacd3e5f610dce848d8b1
[affinity.git] / affinity / implementation / src / main / java / org / opendaylight / affinity / affinity / internal / AffinityManagerImpl.java
1 /*
2  * Copyright (c) 2013 Plexxi, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.affinity.affinity.internal;
10
11 import java.io.FileNotFoundException;
12 import java.io.IOException;
13 import java.io.ObjectInputStream;
14 import java.net.UnknownHostException;
15 import java.net.InetAddress;
16 import java.net.NetworkInterface;
17 import java.net.SocketException;
18 import java.util.ArrayList;
19 import java.util.Collections;
20 import java.util.Date;
21 import java.util.Dictionary;
22 import java.util.EnumSet;
23 import java.util.Enumeration;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.AbstractMap;
29 import java.util.Set;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.ConcurrentMap;
32 import java.util.concurrent.CopyOnWriteArrayList;
33 import java.util.ArrayList;
34 import java.util.EnumSet;
35 import java.util.HashMap;
36 import java.util.HashSet;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.Map.Entry;
40 import java.util.Set;
41 import java.util.concurrent.ConcurrentHashMap;
42 import java.util.concurrent.ConcurrentMap;
43
44 import org.apache.felix.dm.Component;
45 import org.eclipse.osgi.framework.console.CommandInterpreter;
46 import org.eclipse.osgi.framework.console.CommandProvider;
47 import org.opendaylight.controller.clustering.services.CacheConfigException;
48 import org.opendaylight.controller.clustering.services.CacheExistException;
49 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
50 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
51 import org.opendaylight.controller.clustering.services.IClusterServices;
52 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
53 import org.opendaylight.controller.forwardingrulesmanager.FlowEntry;
54 import org.opendaylight.controller.sal.core.IContainer;
55 import org.opendaylight.controller.sal.core.Node;
56 import org.opendaylight.controller.sal.core.Host;
57 import org.opendaylight.controller.sal.core.NodeConnector;
58 import org.opendaylight.controller.sal.core.NodeTable;
59 import org.opendaylight.controller.sal.core.Property;
60 import org.opendaylight.controller.sal.core.UpdateType;
61 import org.opendaylight.controller.sal.flowprogrammer.IFlowProgrammerService;
62 import org.opendaylight.controller.sal.flowprogrammer.Flow;
63 import org.opendaylight.controller.sal.match.Match;
64 import org.opendaylight.controller.sal.match.MatchType;
65 import org.opendaylight.controller.sal.match.MatchField;
66 import org.opendaylight.controller.sal.action.Action;
67 import org.opendaylight.controller.sal.action.Output;
68
69 import org.opendaylight.controller.sal.reader.FlowOnNode;
70 import org.opendaylight.controller.sal.reader.IReadService;
71 import org.opendaylight.controller.sal.reader.IReadServiceListener;
72 import org.opendaylight.controller.sal.utils.GlobalConstants;
73 import org.opendaylight.controller.sal.utils.IObjectReader;
74 import org.opendaylight.controller.sal.utils.ObjectReader;
75 import org.opendaylight.controller.sal.utils.ObjectWriter;
76 import org.opendaylight.controller.sal.utils.NetUtils;
77
78 import org.opendaylight.controller.hosttracker.IfIptoHost;
79 import org.opendaylight.controller.sal.utils.Status;
80 import org.opendaylight.controller.sal.utils.StatusCode;
81
82 import org.opendaylight.controller.sal.utils.ServiceHelper;
83 import org.opendaylight.affinity.affinity.AffinityGroup;
84 import org.opendaylight.affinity.affinity.AffinityLink;
85 import org.opendaylight.affinity.affinity.AffinityIdentifier;
86 import org.opendaylight.affinity.affinity.IAffinityManager;
87 import org.opendaylight.affinity.affinity.IAffinityManagerAware;
88
89 import org.opendaylight.controller.hosttracker.IfIptoHost;
90 import org.opendaylight.controller.hosttracker.hostAware.HostNodeConnector;
91 import org.opendaylight.controller.switchmanager.ISwitchManager;
92 import org.opendaylight.affinity.l2agent.L2Agent;
93
94 import org.slf4j.Logger;
95 import org.slf4j.LoggerFactory;
96
97 /**
98  * The class caches latest network nodes statistics as notified by reader
99  * services and provides API to retrieve them.
100  */
101 public class AffinityManagerImpl implements IAffinityManager, IConfigurationContainerAware, IObjectReader, ICacheUpdateAware<Long, String> {
102     private static final Logger log = LoggerFactory.getLogger(AffinityManagerImpl.class);
103
104     private static String ROOT = GlobalConstants.STARTUPHOME.toString();
105     private static final String SAVE = "Save";
106     private String affinityLinkFileName = null;
107     private String affinityGroupFileName = null;
108     private IFlowProgrammerService fps = null;
109     private ISwitchManager switchManager = null;
110     private L2Agent l2agent = null;
111
112     private ConcurrentMap<String, AffinityGroup> affinityGroupList;
113     private ConcurrentMap<String, AffinityLink> affinityLinkList;
114     private ConcurrentMap<Long, String> configSaveEvent;
115
116     private IfIptoHost hostTracker;
117
118     private final Set<IAffinityManagerAware> affinityManagerAware = Collections
119             .synchronizedSet(new HashSet<IAffinityManagerAware>());
120
121     private byte[] MAC;
122     private static boolean hostRefresh = true;
123     private int hostRetryCount = 5;
124     private IClusterContainerServices clusterContainerService = null;
125     private String containerName = GlobalConstants.DEFAULT.toString();
126     private boolean isDefaultContainer = true;
127     private static final int REPLACE_RETRY = 1;
128
129     public enum ReasonCode {
130         SUCCESS("Success"), FAILURE("Failure"), INVALID_CONF(
131                 "Invalid Configuration"), EXIST("Entry Already Exist"), CONFLICT(
132                         "Configuration Conflict with Existing Entry");
133
134         private final String name;
135
136         private ReasonCode(String name) {
137             this.name = name;
138         }
139
140         @Override
141         public String toString() {
142             return name;
143         }
144     }
145
146     /* Only default container. */
147     public String getContainerName() {
148         return containerName;
149     }
150
151     public void startUp() {
152         // Initialize configuration file names
153         affinityLinkFileName = ROOT + "affinityConfig_link" + this.getContainerName()
154             + ".conf";
155         affinityGroupFileName = ROOT + "affinityConfig_group" + this.getContainerName()
156             + ".conf";
157         log.debug("configuration file names " + affinityLinkFileName + "and " + affinityGroupFileName);
158         // Instantiate cluster synced variables
159         allocateCaches();
160         retrieveCaches();
161
162         /*
163          * Read startup and build database if we have not already gotten the
164          * configurations synced from another node
165          */
166         if (affinityGroupList.isEmpty() || affinityLinkList.isEmpty()) {
167             loadAffinityConfiguration();
168         }
169     }
170
171     public void shutDown() {
172     }
173
174     @SuppressWarnings("deprecation")
175     private void allocateCaches() {
176         if (this.clusterContainerService == null) {
177             this.nonClusterObjectCreate();
178             log.warn("un-initialized clusterContainerService, can't create cache");
179             return;
180         }
181         try {
182             clusterContainerService.createCache(
183                     "affinity.affinityGroupList",
184                     EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
185             clusterContainerService.createCache(
186                     "affinity.affinityLinkList",
187                     EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
188             clusterContainerService.createCache(
189                     "affinity.configSaveEvent",
190                     EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
191         } catch (CacheConfigException cce) {
192             log.error("\nCache configuration invalid - check cache mode");
193         } catch (CacheExistException ce) {
194             log.error("\nCache already exits - destroy and recreate if needed");
195         }
196     }
197
198     @SuppressWarnings({ "unchecked", "deprecation" })
199     private void retrieveCaches() {
200         if (this.clusterContainerService == null) {
201             log.info("un-initialized clusterContainerService, can't retrieve cache");
202             return;
203         }
204         affinityGroupList = (ConcurrentMap<String, AffinityGroup>) clusterContainerService
205             .getCache("affinity.affinityGroupList");
206         if (affinityGroupList == null) {
207             log.error("\nFailed to get cache for affinityGroupList");
208         }
209         affinityLinkList = (ConcurrentMap<String, AffinityLink>) clusterContainerService
210             .getCache("affinity.affinityLinkList");
211         if (affinityLinkList == null) {
212             log.error("\nFailed to get cache for affinityLinkList");
213         }
214
215         configSaveEvent = (ConcurrentMap<Long, String>) clusterContainerService
216             .getCache("affinity.configSaveEvent");
217         if (configSaveEvent == null) {
218             log.error("\nFailed to get cache for configSaveEvent");
219         }
220     }
221
222     private void nonClusterObjectCreate() {
223         affinityLinkList = new ConcurrentHashMap<String, AffinityLink>();
224         affinityGroupList = new ConcurrentHashMap<String, AffinityGroup>();
225         configSaveEvent = new ConcurrentHashMap<Long, String>();
226     }
227
228
229     void setHostTracker(IfIptoHost h) {
230         this.hostTracker = h;
231     }
232
233     void unsetHostTracker(IfIptoHost h) {
234         if (this.hostTracker.equals(h)) {
235             this.hostTracker = null;
236         }
237     }
238     public void setFlowProgrammerService(IFlowProgrammerService s)
239     {
240         this.fps = s;
241     }
242
243     public void unsetFlowProgrammerService(IFlowProgrammerService s) {
244         if (this.fps == s) {
245             this.fps = null;
246         }
247     }
248     public void setL2Agent(L2Agent s)
249     {
250         this.l2agent = s;
251     }
252
253     public void unsetL2Agent(L2Agent s) {
254         if (this.l2agent == s) {
255             this.l2agent = null;
256         }
257     }
258
259     /*
260     public void setForwardingRulesManager(
261             IForwardingRulesManager forwardingRulesManager) {
262         this.ruleManager = forwardingRulesManager;
263     }
264
265     public void unsetForwardingRulesManager(
266             IForwardingRulesManager forwardingRulesManager) {
267         if (this.ruleManager == forwardingRulesManager) {
268             this.ruleManager = null;
269         }
270     }
271     */
272     
273     public Status addAffinityLink(AffinityLink al) {
274         boolean putNewLink = false;
275
276         if (affinityLinkList.containsKey(al.getName())) {
277             return new Status(StatusCode.CONFLICT,
278                               "AffinityLink with the specified name already configured.");
279         }
280
281         
282         AffinityLink alCurr = affinityLinkList.get(al.getName());
283         if (alCurr == null) {
284             if (affinityLinkList.putIfAbsent(al.getName(), al) == null) {
285                 putNewLink = true;
286             } 
287         } else {
288             putNewLink = affinityLinkList.replace(al.getName(), alCurr, al);
289         }
290
291         if (!putNewLink) {
292             String msg = "Cluster conflict: Conflict while adding the subnet " + al.getName();
293             return new Status(StatusCode.CONFLICT, msg);
294         }
295         
296         return new Status(StatusCode.SUCCESS);
297     }
298
299
300     /** 
301      * Fetch all node connectors. Each switch port will receive a flow
302      * rule. Do not stop on error. Pass in the waypointMAC address so
303      * that the correct output port can be determined.
304      */
305     public Status pushFlowRule(Flow flow, byte [] waypointMAC) {
306         /* Get all node connectors. */
307         Set<Node> nodes = switchManager.getNodes();
308         Status success = new Status(StatusCode.SUCCESS);
309         Status notfound = new Status(StatusCode.NOTFOUND);
310
311         if (nodes == null) {
312             log.debug("No nodes in network.");
313             return success;
314         } 
315         for (Node node: nodes) {
316             /* Look up the output port leading to the waypoint. */
317             NodeConnector dst_connector = l2agent.lookup(node, waypointMAC);
318             Action action = new Output(dst_connector);
319             flow.addAction(action);
320
321             Status status = fps.addFlow(node, flow);
322             if (!status.isSuccess()) {
323                 log.debug("Error during addFlow: {} on {}. The failure is: {}",
324                           flow, node, status.getDescription());
325             }
326         }
327         return success;
328     }
329
330     /** 
331      * add flow rules for each node connector.
332      */
333     public Status addFlowRulesForRedirect(AffinityLink al) throws Exception {
334         Match match = new Match();
335         List<Action> actions = new ArrayList<Action>();
336
337         InetAddress address1, address2;
338         InetAddress mask;
339         mask = InetAddress.getByName("255.255.255.255");
340
341         Flow f = new Flow(match, actions);
342         String waypoint = al.getWaypoint();
343
344         List<Entry<Host,Host>> hostPairList= getAllFlowsByHost(al);
345         for (Entry<Host,Host> hostPair : hostPairList) {
346             /* Create a match for each host pair in the affinity link. */
347
348             Host host1 = hostPair.getKey();
349             Host host2 = hostPair.getValue();
350             address1 = host1.getNetworkAddress();
351             address2 = host2.getNetworkAddress();
352             
353             match.setField(MatchType.NW_SRC, address1, mask);
354             match.setField(MatchType.NW_DST, address2, mask);
355             
356             /* Send this flow rule to all nodes in the network. */
357             byte [] dstMAC = InetAddressToMAC(waypoint);
358             pushFlowRule(f, dstMAC);
359         }
360         return new Status(StatusCode.SUCCESS);
361     }
362
363     public byte [] InetAddressToMAC(String ipaddress) {
364         InetAddress inetAddr = NetUtils.parseInetAddress(ipaddress);
365         HostNodeConnector host = hostTracker.hostFind(inetAddr);
366         byte [] dst_mac = host.getDataLayerAddressBytes();
367         return dst_mac;
368     }
369
370     public Status removeAffinityLink(String name) {
371         affinityLinkList.remove(name);
372         return new Status(StatusCode.SUCCESS);
373     }
374
375     public Status removeAffinityLink(AffinityLink al) {
376         AffinityLink alCurr = affinityLinkList.get(al.getName());
377         if (alCurr != null) {
378             affinityLinkList.remove(alCurr);
379             return new Status(StatusCode.SUCCESS);
380         } else {
381             String msg = "Affinity Link with specified name does not exist." + al.getName();
382             return new Status(StatusCode.INTERNALERROR, msg);
383         }
384     }
385     
386     @Override
387     public AffinityLink getAffinityLink(String linkName) {
388         return affinityLinkList.get(linkName);
389     }
390
391     @Override
392     public List<AffinityLink> getAllAffinityLinks() {
393         return new ArrayList<AffinityLink>(affinityLinkList.values());
394     }
395
396     @Override
397     public Status addAffinityGroup(AffinityGroup ag) {
398         boolean putNewGroup = false;
399         String name = ag.getName();
400         if (affinityGroupList.containsKey(name)) {
401             return new Status(StatusCode.CONFLICT,
402                               "AffinityGroup with the specified name already configured.");
403         } 
404         AffinityGroup agCurr = affinityGroupList.get(name);
405         if (agCurr == null) {
406             if (affinityGroupList.putIfAbsent(name, ag) == null) {
407                 putNewGroup = true;
408             } 
409         } else {
410             putNewGroup = affinityGroupList.replace(name, agCurr, ag);
411         }
412
413         if (!putNewGroup) {
414             String msg = "Cluster conflict: Conflict while adding the subnet " + name;
415             return new Status(StatusCode.CONFLICT, msg);
416         }
417         
418         return new Status(StatusCode.SUCCESS);
419     }
420
421     /* Check for errors. */
422     @Override
423     public Status removeAffinityGroup(String name) {
424         affinityGroupList.remove(name);
425         return new Status(StatusCode.SUCCESS);
426     }
427
428     @Override
429     public AffinityGroup getAffinityGroup(String groupName) {
430         log.debug("getAffinityGroup" + groupName);
431         return affinityGroupList.get(groupName);
432     }
433
434     @Override
435     public List<AffinityGroup> getAllAffinityGroups() {
436         return new ArrayList<AffinityGroup>(affinityGroupList.values());
437     }
438
439     /* Find where this is used. */
440     @Override
441     public Object readObject(ObjectInputStream ois)
442             throws FileNotFoundException, IOException, ClassNotFoundException {
443         // Perform the class deserialization locally, from inside the package
444         // where the class is defined
445         return ois.readObject();
446     }
447
448     @SuppressWarnings("unchecked")
449     private void loadAffinityConfiguration() {
450         ObjectReader objReader = new ObjectReader();
451         ConcurrentMap<String, AffinityGroup> groupList = (ConcurrentMap<String, AffinityGroup>) objReader.read(this, affinityGroupFileName);
452         ConcurrentMap<String, AffinityLink> linkList = (ConcurrentMap<String, AffinityLink>) objReader.read(this, affinityLinkFileName);
453         
454         /* group list */
455         if (groupList != null) {
456             for (AffinityGroup ag : groupList.values()) {
457                 addAffinityGroup(ag);
458             }
459         }
460
461         /* link list */
462         if (linkList != null) {
463             for (AffinityLink al : linkList.values()) {
464                 addAffinityLink(al);
465             }
466         }
467     }
468
469     @Override 
470     public ArrayList<AffinityIdentifier> getAllElementsByAffinityIdentifier(AffinityGroup ag) {
471         return ag.getAllElements();
472     }
473  
474     @Override 
475     public List<Host> getAllElementsByHost(AffinityGroup ag) {
476         List<Host> hostList= new ArrayList<Host>();
477
478         for (AffinityIdentifier h : ag.getAllElements()) {
479             /* TBD: Do not assume this to be an InetAddress. */ 
480             h.print();
481             if (hostTracker != null) {
482                 Host host1 = hostTracker.hostFind((InetAddress) h.get());
483                 hostList.add(host1);
484             }
485         }
486         return hostList;
487     }
488
489     @Override
490     public List<Entry<Host, Host>> getAllFlowsByHost(AffinityLink al) {
491         List<Entry<Host,Host>> hostPairList= new ArrayList<Entry<Host, Host>>();
492
493         AffinityGroup fromGroup = al.getFromGroup();
494         AffinityGroup toGroup = al.getToGroup();
495         
496         for (AffinityIdentifier h1 : fromGroup.getAllElements()) {
497             for (AffinityIdentifier h2 : toGroup.getAllElements()) {
498                 if (hostTracker != null) {
499                     Host host1 = hostTracker.hostFind((InetAddress) h1.get());
500                     Host host2 = hostTracker.hostFind((InetAddress) h2.get());
501                     Entry<Host, Host> hp1=new AbstractMap.SimpleEntry<Host, Host>(host1, host2);
502                     hostPairList.add(hp1);
503                 }
504             }
505         }
506         return hostPairList;
507     }
508
509     @Override 
510     public List<Entry<AffinityIdentifier, AffinityIdentifier>> getAllFlowsByAffinityIdentifier(AffinityLink al) {
511         List<Entry<AffinityIdentifier, AffinityIdentifier>> hostPairList= new ArrayList<Entry<AffinityIdentifier, AffinityIdentifier>>();
512
513         AffinityGroup fromGroup = al.getFromGroup();
514         AffinityGroup toGroup = al.getToGroup();
515         
516         for (AffinityIdentifier h1 : fromGroup.getAllElements()) {
517             for (AffinityIdentifier h2 : toGroup.getAllElements()) {
518                 Entry<AffinityIdentifier, AffinityIdentifier> hp1=new AbstractMap.SimpleEntry<AffinityIdentifier, AffinityIdentifier>(h1, h2);
519                 hostPairList.add(hp1);
520             }
521         }
522         return hostPairList;
523     }
524
525     @Override
526     public Status saveConfiguration() {
527         return saveAffinityConfig();
528     }
529
530     @Override
531     public Status saveAffinityConfig() {
532         // Publish the save config event to the cluster nodes
533         configSaveEvent.put(new Date().getTime(), SAVE);
534         return saveAffinityConfigInternal();
535     }
536
537     public Status saveAffinityConfigInternal() {
538         Status retS = null, retP = null;
539         ObjectWriter objWriter = new ObjectWriter();
540
541         retS = objWriter.write(new ConcurrentHashMap<String, AffinityLink>(
542                 affinityLinkList), affinityLinkFileName);
543
544         retP = objWriter.write(new ConcurrentHashMap<String, AffinityGroup>(
545                 affinityGroupList), affinityGroupFileName);
546
547         if (retS.isSuccess() && retP.isSuccess()) {
548             return new Status(StatusCode.SUCCESS, "Configuration saved.");
549         } else {
550             return new Status(StatusCode.INTERNALERROR, "Save failed");
551         }
552     }
553
554     @Override
555     public void entryCreated(Long key, String cacheName, boolean local) {
556     }
557
558     @Override
559     public void entryUpdated(Long key, String new_value, String cacheName,
560             boolean originLocal) {
561         saveAffinityConfigInternal();
562     }
563
564     @Override
565     public void entryDeleted(Long key, String cacheName, boolean originLocal) {
566     }
567
568     /**
569      * Function called by the dependency manager when all the required
570      * dependencies are satisfied
571      *
572      */
573     void init() {
574         log.debug("INIT called!");
575         containerName = GlobalConstants.DEFAULT.toString();
576         startUp();
577     }
578
579     /**
580      * Function called by the dependency manager when at least one
581      * dependency become unsatisfied or when the component is shutting
582      * down because for example bundle is being stopped.
583      *
584      */
585     void destroy() {
586         log.debug("DESTROY called!");
587     }
588
589     /**
590      * Function called by dependency manager after "init ()" is called
591      * and after the services provided by the class are registered in
592      * the service registry
593      *
594      */
595     void start() {
596         log.debug("START called!");
597     }
598
599     /**
600      * Function called after registering the service in OSGi service registry.
601      */
602     void started(){
603         // Retrieve current statistics so we don't have to wait for next refresh
604         IAffinityManager affinityManager = (IAffinityManager) ServiceHelper.getInstance(
605                 IAffinityManager.class, this.getContainerName(), this);
606         if (affinityManager != null) {
607             log.debug("STARTED method called!");
608         }
609     }
610
611     /**
612      * Function called by the dependency manager before the services
613      * exported by the component are unregistered, this will be
614      * followed by a "destroy ()" calls
615      *
616      */
617     void stop() {
618         log.debug("STOP called!");
619     }
620
621     void setClusterContainerService(IClusterContainerServices s) {
622         log.debug("Cluster Service set for affinity mgr");
623         this.clusterContainerService = s;
624     }
625
626     void unsetClusterContainerService(IClusterContainerServices s) {
627         if (this.clusterContainerService == s) {
628             log.debug("Cluster Service removed for affinity mgr!");
629             this.clusterContainerService = null;
630         }
631     }
632 }