2 * Copyright (c) 2013 Plexxi, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.affinity.affinity.internal;
11 import java.io.FileNotFoundException;
12 import java.io.IOException;
13 import java.io.ObjectInputStream;
14 import java.net.UnknownHostException;
15 import java.net.InetAddress;
16 import java.net.NetworkInterface;
17 import java.net.SocketException;
18 import java.util.ArrayList;
19 import java.util.Collections;
20 import java.util.Date;
21 import java.util.Dictionary;
22 import java.util.EnumSet;
23 import java.util.Enumeration;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.List;
28 import java.util.AbstractMap;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.ConcurrentMap;
32 import java.util.concurrent.CopyOnWriteArrayList;
33 import java.util.ArrayList;
34 import java.util.EnumSet;
35 import java.util.HashMap;
36 import java.util.HashSet;
37 import java.util.List;
39 import java.util.Map.Entry;
41 import java.util.concurrent.ConcurrentHashMap;
42 import java.util.concurrent.ConcurrentMap;
44 import org.apache.felix.dm.Component;
45 import org.eclipse.osgi.framework.console.CommandInterpreter;
46 import org.eclipse.osgi.framework.console.CommandProvider;
47 import org.opendaylight.controller.clustering.services.CacheConfigException;
48 import org.opendaylight.controller.clustering.services.CacheExistException;
49 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
50 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
51 import org.opendaylight.controller.clustering.services.IClusterServices;
52 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
53 //import org.opendaylight.controller.forwardingrulesmanager.FlowEntry;
54 //import org.opendaylight.controller.forwardingrulesmanager.IForwardingRulesManager;
55 import org.opendaylight.controller.sal.flowprogrammer.IFlowProgrammerService;
56 import org.opendaylight.controller.sal.flowprogrammer.Flow;
57 import org.opendaylight.controller.sal.utils.IPProtocols;
59 import org.opendaylight.controller.sal.core.IContainer;
60 import org.opendaylight.controller.sal.core.Node;
61 import org.opendaylight.controller.sal.core.Host;
62 import org.opendaylight.controller.sal.core.NodeConnector;
63 import org.opendaylight.controller.sal.core.NodeTable;
64 import org.opendaylight.controller.sal.core.Property;
65 import org.opendaylight.controller.sal.core.UpdateType;
67 import org.opendaylight.controller.sal.flowprogrammer.Flow;
68 import org.opendaylight.controller.sal.match.Match;
69 import org.opendaylight.controller.sal.match.MatchType;
70 import org.opendaylight.controller.sal.match.MatchField;
71 import org.opendaylight.controller.sal.action.Action;
72 import org.opendaylight.controller.sal.action.Output;
73 import org.opendaylight.controller.sal.utils.EtherTypes;
75 import org.opendaylight.controller.sal.reader.FlowOnNode;
76 import org.opendaylight.controller.sal.reader.IReadService;
77 import org.opendaylight.controller.sal.reader.IReadServiceListener;
78 import org.opendaylight.controller.sal.utils.GlobalConstants;
79 import org.opendaylight.controller.sal.utils.IObjectReader;
80 import org.opendaylight.controller.sal.utils.ObjectReader;
81 import org.opendaylight.controller.sal.utils.ObjectWriter;
82 import org.opendaylight.controller.sal.utils.NetUtils;
84 import org.opendaylight.controller.hosttracker.IfIptoHost;
85 import org.opendaylight.controller.sal.utils.Status;
86 import org.opendaylight.controller.sal.utils.StatusCode;
88 import org.opendaylight.controller.sal.utils.ServiceHelper;
89 import org.opendaylight.affinity.affinity.AffinityGroup;
90 import org.opendaylight.affinity.affinity.AffinityLink;
91 import org.opendaylight.affinity.affinity.AffinityIdentifier;
92 import org.opendaylight.affinity.affinity.IAffinityManager;
93 import org.opendaylight.affinity.affinity.IAffinityManagerAware;
95 import org.opendaylight.controller.hosttracker.IfIptoHost;
96 import org.opendaylight.controller.hosttracker.IfNewHostNotify;
97 import org.opendaylight.controller.hosttracker.hostAware.HostNodeConnector;
98 import org.opendaylight.controller.switchmanager.ISwitchManager;
99 import org.opendaylight.affinity.l2agent.IfL2Agent;
100 import org.opendaylight.affinity.nfchainagent.NFchainAgent;
101 import org.opendaylight.affinity.nfchainagent.NFchainconfig;
103 import org.slf4j.Logger;
104 import org.slf4j.LoggerFactory;
107 * Affinity configuration.
109 public class AffinityManagerImpl implements IAffinityManager, IfNewHostNotify,
110 IConfigurationContainerAware, IObjectReader, ICacheUpdateAware<Long, String> {
111 private static final Logger log = LoggerFactory.getLogger(AffinityManagerImpl.class);
113 private static String ROOT = GlobalConstants.STARTUPHOME.toString();
114 private static final String SAVE = "Save";
115 private String affinityLinkFileName = null;
116 private String affinityGroupFileName = null;
117 // private IForwardingRulesManager ruleManager;
118 private IFlowProgrammerService programmer = null;
119 private NFchainAgent nfchainagent = null;
121 private ISwitchManager switchManager = null;
122 private IfL2Agent l2agent = null;
123 private IfIptoHost hostTracker = null;
125 private ConcurrentMap<String, AffinityGroup> affinityGroupList;
126 private ConcurrentMap<String, AffinityLink> affinityLinkList;
127 private ConcurrentMap<Long, String> configSaveEvent;
130 private final Set<IAffinityManagerAware> affinityManagerAware = Collections
131 .synchronizedSet(new HashSet<IAffinityManagerAware>());
134 private static boolean hostRefresh = true;
135 private int hostRetryCount = 5;
136 private IClusterContainerServices clusterContainerService = null;
137 private String containerName = GlobalConstants.DEFAULT.toString();
138 private boolean isDefaultContainer = true;
139 private static final int REPLACE_RETRY = 1;
141 private static short REDIRECT_IPSWITCH_PRIORITY = 3;
143 public enum ReasonCode {
144 SUCCESS("Success"), FAILURE("Failure"), INVALID_CONF(
145 "Invalid Configuration"), EXIST("Entry Already Exist"), CONFLICT(
146 "Configuration Conflict with Existing Entry");
148 private final String name;
150 private ReasonCode(String name) {
155 public String toString() {
160 /* Only default container. */
161 public String getContainerName() {
162 return containerName;
165 public void startUp() {
166 // Initialize configuration file names
167 affinityLinkFileName = ROOT + "affinityConfig_link" + this.getContainerName()
169 affinityGroupFileName = ROOT + "affinityConfig_group" + this.getContainerName()
171 log.debug("configuration file names " + affinityLinkFileName + "and " + affinityGroupFileName);
172 // Instantiate cluster synced variables
177 * Read startup and build database if we have not already gotten the
178 * configurations synced from another node
180 if (affinityGroupList.isEmpty() || affinityLinkList.isEmpty()) {
181 loadAffinityConfiguration();
185 public void shutDown() {
188 @SuppressWarnings("deprecation")
189 private void allocateCaches() {
190 if (this.clusterContainerService == null) {
191 this.nonClusterObjectCreate();
192 log.warn("un-initialized clusterContainerService, can't create cache");
196 clusterContainerService.createCache(
197 "affinity.affinityGroupList",
198 EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
199 clusterContainerService.createCache(
200 "affinity.affinityLinkList",
201 EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
202 clusterContainerService.createCache(
203 "affinity.configSaveEvent",
204 EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
205 } catch (CacheConfigException cce) {
206 log.error("\nCache configuration invalid - check cache mode");
207 } catch (CacheExistException ce) {
208 log.error("\nCache already exits - destroy and recreate if needed");
212 @SuppressWarnings({ "unchecked", "deprecation" })
213 private void retrieveCaches() {
214 if (this.clusterContainerService == null) {
215 log.info("un-initialized clusterContainerService, can't retrieve cache");
218 affinityGroupList = (ConcurrentMap<String, AffinityGroup>) clusterContainerService
219 .getCache("affinity.affinityGroupList");
220 if (affinityGroupList == null) {
221 log.error("\nFailed to get cache for affinityGroupList");
223 affinityLinkList = (ConcurrentMap<String, AffinityLink>) clusterContainerService
224 .getCache("affinity.affinityLinkList");
225 if (affinityLinkList == null) {
226 log.error("\nFailed to get cache for affinityLinkList");
229 configSaveEvent = (ConcurrentMap<Long, String>) clusterContainerService
230 .getCache("affinity.configSaveEvent");
231 if (configSaveEvent == null) {
232 log.error("\nFailed to get cache for configSaveEvent");
236 private void nonClusterObjectCreate() {
237 affinityLinkList = new ConcurrentHashMap<String, AffinityLink>();
238 affinityGroupList = new ConcurrentHashMap<String, AffinityGroup>();
239 configSaveEvent = new ConcurrentHashMap<Long, String>();
243 void setHostTracker(IfIptoHost h) {
244 log.info("Setting hosttracker {}", h);
245 this.hostTracker = h;
248 void unsetHostTracker(IfIptoHost h) {
249 if (this.hostTracker.equals(h)) {
250 this.hostTracker = null;
253 /* public void setForwardingRulesManager(
254 IForwardingRulesManager forwardingRulesManager) {
255 log.debug("Setting ForwardingRulesManager");
256 this.ruleManager = forwardingRulesManager;
259 public void unsetForwardingRulesManager(
260 IForwardingRulesManager forwardingRulesManager) {
261 if (this.ruleManager == forwardingRulesManager) {
262 this.ruleManager = null;
266 public void setFlowProgrammerService(IFlowProgrammerService s)
271 public void unsetFlowProgrammerService(IFlowProgrammerService s) {
272 if (this.programmer == s) {
273 this.programmer = null;
277 void setNFchainAgent(NFchainAgent s)
279 log.info("Setting nfchainagent {}", s);
280 this.nfchainagent = s;
283 void unsetNFchainAgent(NFchainAgent s) {
284 if (this.nfchainagent == s) {
285 this.nfchainagent = null;
289 void setL2Agent(IfL2Agent s)
291 log.info("Setting l2agent {}", s);
295 void unsetL2Agent(IfL2Agent s) {
296 if (this.l2agent == s) {
301 void setSwitchManager(ISwitchManager s)
303 this.switchManager = s;
306 void unsetSwitchManager(ISwitchManager s) {
307 if (this.switchManager == s) {
308 this.switchManager = null;
313 public void setForwardingRulesManager(
314 IForwardingRulesManager forwardingRulesManager) {
315 this.ruleManager = forwardingRulesManager;
318 public void unsetForwardingRulesManager(
319 IForwardingRulesManager forwardingRulesManager) {
320 if (this.ruleManager == forwardingRulesManager) {
321 this.ruleManager = null;
326 public Status addAffinityLink(AffinityLink al) {
327 boolean putNewLink = false;
329 if (affinityLinkList.containsKey(al.getName())) {
330 return new Status(StatusCode.CONFLICT,
331 "AffinityLink with the specified name already configured.");
335 AffinityLink alCurr = affinityLinkList.get(al.getName());
336 if (alCurr == null) {
337 if (affinityLinkList.putIfAbsent(al.getName(), al) == null) {
341 putNewLink = affinityLinkList.replace(al.getName(), alCurr, al);
345 String msg = "Cluster conflict: Conflict while adding the subnet " + al.getName();
346 return new Status(StatusCode.CONFLICT, msg);
349 return new Status(StatusCode.SUCCESS);
352 /* public byte [] InetAddressToMAC(InetAddress inetAddr) {
354 log.debug("Find {} -> {} using hostTracker {}", inetAddr, host, hostTracker);
355 byte [] dst_mac = host.getDataLayerAddressBytes();
360 public Status removeAffinityLink(String name) {
361 affinityLinkList.remove(name);
362 return new Status(StatusCode.SUCCESS);
365 public Status removeAffinityLink(AffinityLink al) {
366 AffinityLink alCurr = affinityLinkList.get(al.getName());
367 if (alCurr != null) {
368 affinityLinkList.remove(alCurr);
369 return new Status(StatusCode.SUCCESS);
371 String msg = "Affinity Link with specified name does not exist." + al.getName();
372 return new Status(StatusCode.INTERNALERROR, msg);
377 public AffinityLink getAffinityLink(String linkName) {
378 return affinityLinkList.get(linkName);
382 public List<AffinityLink> getAllAffinityLinks() {
383 return new ArrayList<AffinityLink>(affinityLinkList.values());
387 public Status addAffinityGroup(AffinityGroup ag) {
388 boolean putNewGroup = false;
389 String name = ag.getName();
390 if (affinityGroupList.containsKey(name)) {
391 return new Status(StatusCode.CONFLICT,
392 "AffinityGroup with the specified name already configured.");
394 AffinityGroup agCurr = affinityGroupList.get(name);
395 if (agCurr == null) {
396 if (affinityGroupList.putIfAbsent(name, ag) == null) {
400 putNewGroup = affinityGroupList.replace(name, agCurr, ag);
404 String msg = "Cluster conflict: Conflict while adding the subnet " + name;
405 return new Status(StatusCode.CONFLICT, msg);
408 return new Status(StatusCode.SUCCESS);
411 /* Check for errors. */
413 public Status removeAffinityGroup(String name) {
414 affinityGroupList.remove(name);
415 return new Status(StatusCode.SUCCESS);
419 public AffinityGroup getAffinityGroup(String groupName) {
420 log.debug("getAffinityGroup" + groupName);
421 return affinityGroupList.get(groupName);
425 public List<AffinityGroup> getAllAffinityGroups() {
426 return new ArrayList<AffinityGroup>(affinityGroupList.values());
429 /* Find where this is used. */
431 public Object readObject(ObjectInputStream ois)
432 throws FileNotFoundException, IOException, ClassNotFoundException {
433 // Perform the class deserialization locally, from inside the package
434 // where the class is defined
435 return ois.readObject();
438 @SuppressWarnings("unchecked")
439 private void loadAffinityConfiguration() {
440 ObjectReader objReader = new ObjectReader();
441 ConcurrentMap<String, AffinityGroup> groupList = (ConcurrentMap<String, AffinityGroup>) objReader.read(this, affinityGroupFileName);
442 ConcurrentMap<String, AffinityLink> linkList = (ConcurrentMap<String, AffinityLink>) objReader.read(this, affinityLinkFileName);
445 if (groupList != null) {
446 for (AffinityGroup ag : groupList.values()) {
447 addAffinityGroup(ag);
452 if (linkList != null) {
453 for (AffinityLink al : linkList.values()) {
460 public ArrayList<AffinityIdentifier> getAllElementsByAffinityIdentifier(AffinityGroup ag) {
461 return ag.getAllElements();
465 public List<Host> getAllElementsByHost(AffinityGroup ag) {
466 List<Host> hostList= new ArrayList<Host>();
468 for (AffinityIdentifier h : ag.getAllElements()) {
469 /* TBD: Do not assume this to be an InetAddress. */
471 if (hostTracker != null) {
472 Host host1 = hostTracker.hostFind((InetAddress) h.get());
480 public List<Entry<Host, Host>> getAllFlowsByHost(AffinityLink al) {
481 List<Entry<Host,Host>> hostPairList= new ArrayList<Entry<Host, Host>>();
483 AffinityGroup fromGroup = al.getFromGroup();
484 AffinityGroup toGroup = al.getToGroup();
486 for (AffinityIdentifier h1 : fromGroup.getAllElements()) {
487 for (AffinityIdentifier h2 : toGroup.getAllElements()) {
488 if (hostTracker != null) {
489 Host host1 = hostTracker.hostFind((InetAddress) h1.get());
490 Host host2 = hostTracker.hostFind((InetAddress) h2.get());
491 log.debug("Flow between {}, {}", host1, host2);
492 Entry<Host, Host> hp1=new AbstractMap.SimpleEntry<Host, Host>(host1, host2);
493 hostPairList.add(hp1);
501 public List<Entry<AffinityIdentifier, AffinityIdentifier>> getAllFlowsByAffinityIdentifier(AffinityLink al) {
502 List<Entry<AffinityIdentifier, AffinityIdentifier>> hostPairList= new ArrayList<Entry<AffinityIdentifier, AffinityIdentifier>>();
504 AffinityGroup fromGroup = al.getFromGroup();
505 AffinityGroup toGroup = al.getToGroup();
507 for (AffinityIdentifier h1 : fromGroup.getAllElements()) {
508 for (AffinityIdentifier h2 : toGroup.getAllElements()) {
509 Entry<AffinityIdentifier, AffinityIdentifier> hp1=new AbstractMap.SimpleEntry<AffinityIdentifier, AffinityIdentifier>(h1, h2);
510 log.debug("Adding hostPair {} -> {}", h1, h2);
511 hostPairList.add(hp1);
517 private void notifyHostUpdate(HostNodeConnector host, boolean added) {
521 log.info("Host update received (new = {}).", added);
525 public void notifyHTClient(HostNodeConnector host) {
526 notifyHostUpdate(host, true);
530 public void notifyHTClientHostRemoved(HostNodeConnector host) {
531 notifyHostUpdate(host, false);
536 public Status saveConfiguration() {
537 return saveAffinityConfig();
541 public Status saveAffinityConfig() {
542 // Publish the save config event to the cluster nodes
543 configSaveEvent.put(new Date().getTime(), SAVE);
544 return saveAffinityConfigInternal();
547 public Status saveAffinityConfigInternal() {
548 Status retS = null, retP = null;
549 ObjectWriter objWriter = new ObjectWriter();
551 retS = objWriter.write(new ConcurrentHashMap<String, AffinityLink>(
552 affinityLinkList), affinityLinkFileName);
554 retP = objWriter.write(new ConcurrentHashMap<String, AffinityGroup>(
555 affinityGroupList), affinityGroupFileName);
557 if (retS.isSuccess() && retP.isSuccess()) {
558 return new Status(StatusCode.SUCCESS, "Configuration saved.");
560 return new Status(StatusCode.INTERNALERROR, "Save failed");
565 public void entryCreated(Long key, String cacheName, boolean local) {
569 public void entryUpdated(Long key, String new_value, String cacheName,
570 boolean originLocal) {
571 saveAffinityConfigInternal();
575 public void entryDeleted(Long key, String cacheName, boolean originLocal) {
579 * Function called by the dependency manager when all the required
580 * dependencies are satisfied
584 log.debug("INIT called!");
585 containerName = GlobalConstants.DEFAULT.toString();
590 * Function called by the dependency manager when at least one
591 * dependency become unsatisfied or when the component is shutting
592 * down because for example bundle is being stopped.
596 log.debug("DESTROY called!");
600 * Function called by dependency manager after "init ()" is called
601 * and after the services provided by the class are registered in
602 * the service registry
606 log.debug("START called!");
610 * Function called after registering the service in OSGi service registry.
613 // Retrieve current statistics so we don't have to wait for next refresh
614 IAffinityManager affinityManager = (IAffinityManager) ServiceHelper.getInstance(
615 IAffinityManager.class, this.getContainerName(), this);
616 if (affinityManager != null) {
617 log.debug("STARTED method called!");
622 * Function called by the dependency manager before the services
623 * exported by the component are unregistered, this will be
624 * followed by a "destroy ()" calls
628 log.debug("STOP called!");
631 void setClusterContainerService(IClusterContainerServices s) {
632 log.debug("Cluster Service set for affinity mgr");
633 this.clusterContainerService = s;
636 void unsetClusterContainerService(IClusterContainerServices s) {
637 if (this.clusterContainerService == s) {
638 log.debug("Cluster Service removed for affinity mgr!");
639 this.clusterContainerService = null;
643 /* Add a nfchain config for this affinity link. */
644 List<Flow> getFlowlist(AffinityLink al) {
645 InetAddress from, to;
647 log.debug("get flowlist affinity link = {}", al.getName());
648 List<Flow> flowlist = new ArrayList<Flow>();
649 List<Entry<Host,Host>> hostPairList= getAllFlowsByHost(al);
651 /* Create a Flow for each host pair in the affinity link. */
652 for (Entry<Host,Host> hostPair : hostPairList) {
653 log.debug("Processing next hostPair {}", hostPair);
655 Match match = new Match();
656 from = hostPair.getKey().getNetworkAddress();
657 to = hostPair.getValue().getNetworkAddress();
658 log.debug("Adding a flow for {} -> {}", from, to);
660 if (from == null ||to == null) {
661 /* Skip host pairs if one end is null. */
662 log.debug("Hosts in hostpair {} -> {} not found in hosttracker.", from, to);
665 match.setField(new MatchField(MatchType.NW_SRC, from, null));
666 match.setField(new MatchField(MatchType.NW_DST, to, null));
667 match.setField(MatchType.DL_TYPE, EtherTypes.IPv4.shortValue());
668 Flow flow = new Flow(match, null);
669 flow.setPriority(REDIRECT_IPSWITCH_PRIORITY);
676 /* From affinity link, create a nfc config. Pass this nfc config to nfchainagent. */
677 public Status addNfchain(AffinityLink al) {
678 List<Flow> flowlist = getFlowlist(al);
679 InetAddress waypoint = NetUtils.parseInetAddress(al.getWaypoint());
680 NFchainconfig nfcc = new NFchainconfig(al.getName(), flowlist, waypoint);
681 /* Only one hop initially... */
682 List<NFchainconfig> nfclist = new ArrayList<NFchainconfig>();
683 String key = al.getName();
685 nfchainagent.addNfchain(key, nfclist);
686 return new Status(StatusCode.SUCCESS);
689 public Status enableRedirect(AffinityLink al) throws Exception {
690 String nfccname = al.getName();
691 return nfchainagent.enable(nfccname);