Add flow pushing rules.
[affinity.git] / affinity / implementation / src / main / java / org / opendaylight / affinity / affinity / internal / AffinityManagerImpl.java
1 /*
2  * Copyright (c) 2013 Plexxi, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.affinity.affinity.internal;
10
11 import java.io.FileNotFoundException;
12 import java.io.IOException;
13 import java.io.ObjectInputStream;
14 import java.net.UnknownHostException;
15 import java.net.InetAddress;
16 import java.net.NetworkInterface;
17 import java.net.SocketException;
18 import java.util.ArrayList;
19 import java.util.Collections;
20 import java.util.Date;
21 import java.util.Dictionary;
22 import java.util.EnumSet;
23 import java.util.Enumeration;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.AbstractMap;
29 import java.util.Set;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.ConcurrentMap;
32 import java.util.concurrent.CopyOnWriteArrayList;
33 import java.util.ArrayList;
34 import java.util.EnumSet;
35 import java.util.HashMap;
36 import java.util.HashSet;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.Map.Entry;
40 import java.util.Set;
41 import java.util.concurrent.ConcurrentHashMap;
42 import java.util.concurrent.ConcurrentMap;
43
44 import org.apache.felix.dm.Component;
45 import org.eclipse.osgi.framework.console.CommandInterpreter;
46 import org.eclipse.osgi.framework.console.CommandProvider;
47 import org.opendaylight.controller.clustering.services.CacheConfigException;
48 import org.opendaylight.controller.clustering.services.CacheExistException;
49 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
50 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
51 import org.opendaylight.controller.clustering.services.IClusterServices;
52 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
53 import org.opendaylight.controller.forwardingrulesmanager.FlowEntry;
54 import org.opendaylight.controller.sal.core.IContainer;
55 import org.opendaylight.controller.sal.core.Node;
56 import org.opendaylight.controller.sal.core.Host;
57 import org.opendaylight.controller.sal.core.NodeConnector;
58 import org.opendaylight.controller.sal.core.NodeTable;
59 import org.opendaylight.controller.sal.core.Property;
60 import org.opendaylight.controller.sal.core.UpdateType;
61 import org.opendaylight.controller.sal.flowprogrammer.IFlowProgrammerService;
62 import org.opendaylight.controller.sal.flowprogrammer.Flow;
63 import org.opendaylight.controller.sal.match.Match;
64 import org.opendaylight.controller.sal.match.MatchType;
65 import org.opendaylight.controller.sal.match.MatchField;
66 import org.opendaylight.controller.sal.action.Action;
67 import org.opendaylight.controller.sal.action.Output;
68
69 import org.opendaylight.controller.sal.reader.FlowOnNode;
70 import org.opendaylight.controller.sal.reader.IReadService;
71 import org.opendaylight.controller.sal.reader.IReadServiceListener;
72 import org.opendaylight.controller.sal.utils.GlobalConstants;
73 import org.opendaylight.controller.sal.utils.IObjectReader;
74 import org.opendaylight.controller.sal.utils.ObjectReader;
75 import org.opendaylight.controller.sal.utils.ObjectWriter;
76 import org.opendaylight.controller.sal.utils.NetUtils;
77
78 import org.opendaylight.controller.hosttracker.IfIptoHost;
79 import org.opendaylight.controller.sal.utils.Status;
80 import org.opendaylight.controller.sal.utils.StatusCode;
81
82 import org.opendaylight.controller.sal.utils.ServiceHelper;
83 import org.opendaylight.affinity.affinity.AffinityGroup;
84 import org.opendaylight.affinity.affinity.AffinityLink;
85 import org.opendaylight.affinity.affinity.AffinityIdentifier;
86 import org.opendaylight.affinity.affinity.IAffinityManager;
87 import org.opendaylight.affinity.affinity.IAffinityManagerAware;
88
89 import org.opendaylight.controller.hosttracker.IfIptoHost;
90 import org.opendaylight.controller.hosttracker.hostAware.HostNodeConnector;
91 import org.opendaylight.controller.switchmanager.ISwitchManager;
92 import org.opendaylight.affinity.l2agent.L2Agent;
93 import org.slf4j.Logger;
94 import org.slf4j.LoggerFactory;
95
96 /**
97  * The class caches latest network nodes statistics as notified by reader
98  * services and provides API to retrieve them.
99  */
100 public class AffinityManagerImpl implements IAffinityManager, IConfigurationContainerAware, IObjectReader, ICacheUpdateAware<Long, String> {
101     private static final Logger log = LoggerFactory.getLogger(AffinityManagerImpl.class);
102
103     private static String ROOT = GlobalConstants.STARTUPHOME.toString();
104     private static final String SAVE = "Save";
105     private String affinityLinkFileName = null;
106     private String affinityGroupFileName = null;
107     private IFlowProgrammerService fps = null;
108     private ISwitchManager switchManager = null;
109     private L2Agent l2agent = null;
110
111     private ConcurrentMap<String, AffinityGroup> affinityGroupList;
112     private ConcurrentMap<String, AffinityLink> affinityLinkList;
113     private ConcurrentMap<Long, String> configSaveEvent;
114
115     private IfIptoHost hostTracker;
116
117     private final Set<IAffinityManagerAware> affinityManagerAware = Collections
118             .synchronizedSet(new HashSet<IAffinityManagerAware>());
119
120     private byte[] MAC;
121     private static boolean hostRefresh = true;
122     private int hostRetryCount = 5;
123     private IClusterContainerServices clusterContainerService = null;
124     private String containerName = GlobalConstants.DEFAULT.toString();
125     private boolean isDefaultContainer = true;
126     private static final int REPLACE_RETRY = 1;
127
128     public enum ReasonCode {
129         SUCCESS("Success"), FAILURE("Failure"), INVALID_CONF(
130                 "Invalid Configuration"), EXIST("Entry Already Exist"), CONFLICT(
131                         "Configuration Conflict with Existing Entry");
132
133         private final String name;
134
135         private ReasonCode(String name) {
136             this.name = name;
137         }
138
139         @Override
140         public String toString() {
141             return name;
142         }
143     }
144
145     /* Only default container. */
146     public String getContainerName() {
147         return containerName;
148     }
149
150     public void startUp() {
151         // Initialize configuration file names
152         affinityLinkFileName = ROOT + "affinityConfig_link" + this.getContainerName()
153             + ".conf";
154         affinityGroupFileName = ROOT + "affinityConfig_group" + this.getContainerName()
155             + ".conf";
156         log.debug("configuration file names " + affinityLinkFileName + "and " + affinityGroupFileName);
157         // Instantiate cluster synced variables
158         allocateCaches();
159         retrieveCaches();
160
161         /*
162          * Read startup and build database if we have not already gotten the
163          * configurations synced from another node
164          */
165         if (affinityGroupList.isEmpty() || affinityLinkList.isEmpty()) {
166             loadAffinityConfiguration();
167         }
168     }
169
170     public void shutDown() {
171     }
172
173     @SuppressWarnings("deprecation")
174     private void allocateCaches() {
175         if (this.clusterContainerService == null) {
176             this.nonClusterObjectCreate();
177             log.warn("un-initialized clusterContainerService, can't create cache");
178             return;
179         }
180         try {
181             clusterContainerService.createCache(
182                     "affinity.affinityGroupList",
183                     EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
184             clusterContainerService.createCache(
185                     "affinity.affinityLinkList",
186                     EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
187             clusterContainerService.createCache(
188                     "affinity.configSaveEvent",
189                     EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
190         } catch (CacheConfigException cce) {
191             log.error("\nCache configuration invalid - check cache mode");
192         } catch (CacheExistException ce) {
193             log.error("\nCache already exits - destroy and recreate if needed");
194         }
195     }
196
197     @SuppressWarnings({ "unchecked", "deprecation" })
198     private void retrieveCaches() {
199         if (this.clusterContainerService == null) {
200             log.info("un-initialized clusterContainerService, can't retrieve cache");
201             return;
202         }
203         affinityGroupList = (ConcurrentMap<String, AffinityGroup>) clusterContainerService
204             .getCache("affinity.affinityGroupList");
205         if (affinityGroupList == null) {
206             log.error("\nFailed to get cache for affinityGroupList");
207         }
208         affinityLinkList = (ConcurrentMap<String, AffinityLink>) clusterContainerService
209             .getCache("affinity.affinityLinkList");
210         if (affinityLinkList == null) {
211             log.error("\nFailed to get cache for affinityLinkList");
212         }
213
214         configSaveEvent = (ConcurrentMap<Long, String>) clusterContainerService
215             .getCache("affinity.configSaveEvent");
216         if (configSaveEvent == null) {
217             log.error("\nFailed to get cache for configSaveEvent");
218         }
219     }
220
221     private void nonClusterObjectCreate() {
222         affinityLinkList = new ConcurrentHashMap<String, AffinityLink>();
223         affinityGroupList = new ConcurrentHashMap<String, AffinityGroup>();
224         configSaveEvent = new ConcurrentHashMap<Long, String>();
225     }
226
227
228     void setHostTracker(IfIptoHost h) {
229         this.hostTracker = h;
230     }
231
232     void unsetHostTracker(IfIptoHost h) {
233         if (this.hostTracker.equals(h)) {
234             this.hostTracker = null;
235         }
236     }
237     public void setFlowProgrammerService(IFlowProgrammerService s)
238     {
239         this.fps = s;
240     }
241
242     public void unsetFlowProgrammerService(IFlowProgrammerService s) {
243         if (this.fps == s) {
244             this.fps = null;
245         }
246     }
247 <<<<<<< HEAD
248     public void setL2Agent(L2Agent s)
249 =======
250     public void setL2Agent(TutorialL2Forwarding s)
251 >>>>>>> Add flow pushing rules.
252     {
253         this.l2agent = s;
254     }
255
256 <<<<<<< HEAD
257     public void unsetL2Agent(L2Agent s) {
258 =======
259     public void unsetL2Agent(TutorialL2Forwarding s) {
260 >>>>>>> Add flow pushing rules.
261         if (this.l2agent == s) {
262             this.l2agent = null;
263         }
264     }
265
266     /*
267     public void setForwardingRulesManager(
268             IForwardingRulesManager forwardingRulesManager) {
269         this.ruleManager = forwardingRulesManager;
270     }
271
272     public void unsetForwardingRulesManager(
273             IForwardingRulesManager forwardingRulesManager) {
274         if (this.ruleManager == forwardingRulesManager) {
275             this.ruleManager = null;
276         }
277     }
278     */
279     
280     public Status addAffinityLink(AffinityLink al) {
281         boolean putNewLink = false;
282
283         if (affinityLinkList.containsKey(al.getName())) {
284             return new Status(StatusCode.CONFLICT,
285                               "AffinityLink with the specified name already configured.");
286         }
287
288         
289         AffinityLink alCurr = affinityLinkList.get(al.getName());
290         if (alCurr == null) {
291             if (affinityLinkList.putIfAbsent(al.getName(), al) == null) {
292                 putNewLink = true;
293             } 
294         } else {
295             putNewLink = affinityLinkList.replace(al.getName(), alCurr, al);
296         }
297
298         if (!putNewLink) {
299             String msg = "Cluster conflict: Conflict while adding the subnet " + al.getName();
300             return new Status(StatusCode.CONFLICT, msg);
301         }
302         
303         return new Status(StatusCode.SUCCESS);
304     }
305
306
307     /** 
308      * Fetch all node connectors. Each switch port will receive a flow
309      * rule. Do not stop on error. Pass in the waypointMAC address so
310      * that the correct output port can be determined.
311      */
312     public Status pushFlowRule(Flow flow, byte [] waypointMAC) {
313         /* Get all node connectors. */
314         Set<Node> nodes = switchManager.getNodes();
315         Status success = new Status(StatusCode.SUCCESS);
316         Status notfound = new Status(StatusCode.NOTFOUND);
317
318         if (nodes == null) {
319             log.debug("No nodes in network.");
320             return success;
321         } 
322         for (Node node: nodes) {
323             /* Look up the output port leading to the waypoint. */
324             NodeConnector dst_connector = l2agent.lookup(node, waypointMAC);
325             Action action = new Output(dst_connector);
326             flow.addAction(action);
327
328             Status status = fps.addFlow(node, flow);
329             if (!status.isSuccess()) {
330                 log.debug("Error during addFlow: {} on {}. The failure is: {}",
331                           flow, node, status.getDescription());
332             }
333         }
334         return success;
335     }
336
337     /** 
338      * add flow rules for each node connector.
339      */
340     public Status addFlowRulesForRedirect(AffinityLink al) throws Exception {
341         Match match = new Match();
342         List<Action> actions = new ArrayList<Action>();
343
344         InetAddress address1, address2;
345         InetAddress mask;
346         mask = InetAddress.getByName("255.255.255.255");
347
348         Flow f = new Flow(match, actions);
349         String waypoint = al.getWaypoint();
350
351         List<Entry<Host,Host>> hostPairList= getAllFlowsByHost(al);
352         for (Entry<Host,Host> hostPair : hostPairList) {
353             /* Create a match for each host pair in the affinity link. */
354
355             Host host1 = hostPair.getKey();
356             Host host2 = hostPair.getValue();
357             address1 = host1.getNetworkAddress();
358             address2 = host2.getNetworkAddress();
359             
360             match.setField(MatchType.NW_SRC, address1, mask);
361             match.setField(MatchType.NW_DST, address2, mask);
362             
363             /* Send this flow rule to all nodes in the network. */
364             byte [] dstMAC = InetAddressToMAC(waypoint);
365             pushFlowRule(f, dstMAC);
366         }
367         return new Status(StatusCode.SUCCESS);
368     }
369
370     public byte [] InetAddressToMAC(String ipaddress) {
371         InetAddress inetAddr = NetUtils.parseInetAddress(ipaddress);
372         HostNodeConnector host = hostTracker.hostFind(inetAddr);
373         byte [] dst_mac = host.getDataLayerAddressBytes();
374         return dst_mac;
375     }
376
377     public Status removeAffinityLink(String name) {
378         affinityLinkList.remove(name);
379         return new Status(StatusCode.SUCCESS);
380     }
381
382     public Status removeAffinityLink(AffinityLink al) {
383         AffinityLink alCurr = affinityLinkList.get(al.getName());
384         if (alCurr != null) {
385             affinityLinkList.remove(alCurr);
386             return new Status(StatusCode.SUCCESS);
387         } else {
388             String msg = "Affinity Link with specified name does not exist." + al.getName();
389             return new Status(StatusCode.INTERNALERROR, msg);
390         }
391     }
392     
393     @Override
394     public AffinityLink getAffinityLink(String linkName) {
395         return affinityLinkList.get(linkName);
396     }
397
398     @Override
399     public List<AffinityLink> getAllAffinityLinks() {
400         return new ArrayList<AffinityLink>(affinityLinkList.values());
401     }
402
403     @Override
404     public Status addAffinityGroup(AffinityGroup ag) {
405         boolean putNewGroup = false;
406         String name = ag.getName();
407         if (affinityGroupList.containsKey(name)) {
408             return new Status(StatusCode.CONFLICT,
409                               "AffinityGroup with the specified name already configured.");
410         } 
411         AffinityGroup agCurr = affinityGroupList.get(name);
412         if (agCurr == null) {
413             if (affinityGroupList.putIfAbsent(name, ag) == null) {
414                 putNewGroup = true;
415             } 
416         } else {
417             putNewGroup = affinityGroupList.replace(name, agCurr, ag);
418         }
419
420         if (!putNewGroup) {
421             String msg = "Cluster conflict: Conflict while adding the subnet " + name;
422             return new Status(StatusCode.CONFLICT, msg);
423         }
424         
425         return new Status(StatusCode.SUCCESS);
426     }
427
428     /* Check for errors. */
429     @Override
430     public Status removeAffinityGroup(String name) {
431         affinityGroupList.remove(name);
432         return new Status(StatusCode.SUCCESS);
433     }
434
435     @Override
436     public AffinityGroup getAffinityGroup(String groupName) {
437         log.debug("getAffinityGroup" + groupName);
438         return affinityGroupList.get(groupName);
439     }
440
441     @Override
442     public List<AffinityGroup> getAllAffinityGroups() {
443         return new ArrayList<AffinityGroup>(affinityGroupList.values());
444     }
445
446     /* Find where this is used. */
447     @Override
448     public Object readObject(ObjectInputStream ois)
449             throws FileNotFoundException, IOException, ClassNotFoundException {
450         // Perform the class deserialization locally, from inside the package
451         // where the class is defined
452         return ois.readObject();
453     }
454
455     @SuppressWarnings("unchecked")
456     private void loadAffinityConfiguration() {
457         ObjectReader objReader = new ObjectReader();
458         ConcurrentMap<String, AffinityGroup> groupList = (ConcurrentMap<String, AffinityGroup>) objReader.read(this, affinityGroupFileName);
459         ConcurrentMap<String, AffinityLink> linkList = (ConcurrentMap<String, AffinityLink>) objReader.read(this, affinityLinkFileName);
460         
461         /* group list */
462         if (groupList != null) {
463             for (AffinityGroup ag : groupList.values()) {
464                 addAffinityGroup(ag);
465             }
466         }
467
468         /* link list */
469         if (linkList != null) {
470             for (AffinityLink al : linkList.values()) {
471                 addAffinityLink(al);
472             }
473         }
474     }
475
476     @Override 
477     public ArrayList<AffinityIdentifier> getAllElementsByAffinityIdentifier(AffinityGroup ag) {
478         return ag.getAllElements();
479     }
480  
481     @Override 
482     public List<Host> getAllElementsByHost(AffinityGroup ag) {
483         List<Host> hostList= new ArrayList<Host>();
484
485         for (AffinityIdentifier h : ag.getAllElements()) {
486             /* TBD: Do not assume this to be an InetAddress. */ 
487             h.print();
488             if (hostTracker != null) {
489                 Host host1 = hostTracker.hostFind((InetAddress) h.get());
490                 hostList.add(host1);
491             }
492         }
493         return hostList;
494     }
495
496     @Override
497     public List<Entry<Host, Host>> getAllFlowsByHost(AffinityLink al) {
498         List<Entry<Host,Host>> hostPairList= new ArrayList<Entry<Host, Host>>();
499
500         AffinityGroup fromGroup = al.getFromGroup();
501         AffinityGroup toGroup = al.getToGroup();
502         
503         for (AffinityIdentifier h1 : fromGroup.getAllElements()) {
504             for (AffinityIdentifier h2 : toGroup.getAllElements()) {
505                 if (hostTracker != null) {
506                     Host host1 = hostTracker.hostFind((InetAddress) h1.get());
507                     Host host2 = hostTracker.hostFind((InetAddress) h2.get());
508                     Entry<Host, Host> hp1=new AbstractMap.SimpleEntry<Host, Host>(host1, host2);
509                     hostPairList.add(hp1);
510                 }
511             }
512         }
513         return hostPairList;
514     }
515
516     @Override 
517     public List<Entry<AffinityIdentifier, AffinityIdentifier>> getAllFlowsByAffinityIdentifier(AffinityLink al) {
518         List<Entry<AffinityIdentifier, AffinityIdentifier>> hostPairList= new ArrayList<Entry<AffinityIdentifier, AffinityIdentifier>>();
519
520         AffinityGroup fromGroup = al.getFromGroup();
521         AffinityGroup toGroup = al.getToGroup();
522         
523         for (AffinityIdentifier h1 : fromGroup.getAllElements()) {
524             for (AffinityIdentifier h2 : toGroup.getAllElements()) {
525                 Entry<AffinityIdentifier, AffinityIdentifier> hp1=new AbstractMap.SimpleEntry<AffinityIdentifier, AffinityIdentifier>(h1, h2);
526                 hostPairList.add(hp1);
527             }
528         }
529         return hostPairList;
530     }
531
532     @Override
533     public Status saveConfiguration() {
534         return saveAffinityConfig();
535     }
536
537     @Override
538     public Status saveAffinityConfig() {
539         // Publish the save config event to the cluster nodes
540         configSaveEvent.put(new Date().getTime(), SAVE);
541         return saveAffinityConfigInternal();
542     }
543
544     public Status saveAffinityConfigInternal() {
545         Status retS = null, retP = null;
546         ObjectWriter objWriter = new ObjectWriter();
547
548         retS = objWriter.write(new ConcurrentHashMap<String, AffinityLink>(
549                 affinityLinkList), affinityLinkFileName);
550
551         retP = objWriter.write(new ConcurrentHashMap<String, AffinityGroup>(
552                 affinityGroupList), affinityGroupFileName);
553
554         if (retS.isSuccess() && retP.isSuccess()) {
555             return new Status(StatusCode.SUCCESS, "Configuration saved.");
556         } else {
557             return new Status(StatusCode.INTERNALERROR, "Save failed");
558         }
559     }
560
561     @Override
562     public void entryCreated(Long key, String cacheName, boolean local) {
563     }
564
565     @Override
566     public void entryUpdated(Long key, String new_value, String cacheName,
567             boolean originLocal) {
568         saveAffinityConfigInternal();
569     }
570
571     @Override
572     public void entryDeleted(Long key, String cacheName, boolean originLocal) {
573     }
574
575     /**
576      * Function called by the dependency manager when all the required
577      * dependencies are satisfied
578      *
579      */
580     void init() {
581         log.debug("INIT called!");
582         containerName = GlobalConstants.DEFAULT.toString();
583         startUp();
584     }
585
586     /**
587      * Function called by the dependency manager when at least one
588      * dependency become unsatisfied or when the component is shutting
589      * down because for example bundle is being stopped.
590      *
591      */
592     void destroy() {
593         log.debug("DESTROY called!");
594     }
595
596     /**
597      * Function called by dependency manager after "init ()" is called
598      * and after the services provided by the class are registered in
599      * the service registry
600      *
601      */
602     void start() {
603         log.debug("START called!");
604     }
605
606     /**
607      * Function called after registering the service in OSGi service registry.
608      */
609     void started(){
610         // Retrieve current statistics so we don't have to wait for next refresh
611         IAffinityManager affinityManager = (IAffinityManager) ServiceHelper.getInstance(
612                 IAffinityManager.class, this.getContainerName(), this);
613         if (affinityManager != null) {
614             log.debug("STARTED method called!");
615         }
616     }
617
618     /**
619      * Function called by the dependency manager before the services
620      * exported by the component are unregistered, this will be
621      * followed by a "destroy ()" calls
622      *
623      */
624     void stop() {
625         log.debug("STOP called!");
626     }
627
628     void setClusterContainerService(IClusterContainerServices s) {
629         log.debug("Cluster Service set for affinity mgr");
630         this.clusterContainerService = s;
631     }
632
633     void unsetClusterContainerService(IClusterContainerServices s) {
634         if (this.clusterContainerService == s) {
635             log.debug("Cluster Service removed for affinity mgr!");
636             this.clusterContainerService = null;
637         }
638     }
639 }