2 * Copyright (c) 2013 Plexxi, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.affinity.affinity.internal;
11 import java.io.FileNotFoundException;
12 import java.io.IOException;
13 import java.io.ObjectInputStream;
14 import java.net.UnknownHostException;
15 import java.net.InetAddress;
16 import java.net.NetworkInterface;
17 import java.net.SocketException;
18 import java.util.ArrayList;
19 import java.util.Collections;
20 import java.util.Date;
21 import java.util.Dictionary;
22 import java.util.EnumSet;
23 import java.util.Enumeration;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.List;
28 import java.util.AbstractMap;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.ConcurrentMap;
32 import java.util.concurrent.CopyOnWriteArrayList;
33 import java.util.ArrayList;
34 import java.util.EnumSet;
35 import java.util.HashMap;
36 import java.util.HashSet;
37 import java.util.List;
39 import java.util.Map.Entry;
41 import java.util.concurrent.ConcurrentHashMap;
42 import java.util.concurrent.ConcurrentMap;
44 import org.apache.felix.dm.Component;
45 import org.eclipse.osgi.framework.console.CommandInterpreter;
46 import org.eclipse.osgi.framework.console.CommandProvider;
47 import org.opendaylight.controller.clustering.services.CacheConfigException;
48 import org.opendaylight.controller.clustering.services.CacheExistException;
49 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
50 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
51 import org.opendaylight.controller.clustering.services.IClusterServices;
52 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
53 import org.opendaylight.controller.forwardingrulesmanager.FlowEntry;
54 import org.opendaylight.controller.sal.core.IContainer;
55 import org.opendaylight.controller.sal.core.Node;
56 import org.opendaylight.controller.sal.core.Host;
57 import org.opendaylight.controller.sal.core.NodeConnector;
58 import org.opendaylight.controller.sal.core.NodeTable;
59 import org.opendaylight.controller.sal.core.Property;
60 import org.opendaylight.controller.sal.core.UpdateType;
61 import org.opendaylight.controller.sal.flowprogrammer.IFlowProgrammerService;
62 import org.opendaylight.controller.sal.flowprogrammer.Flow;
63 import org.opendaylight.controller.sal.match.Match;
64 import org.opendaylight.controller.sal.match.MatchType;
65 import org.opendaylight.controller.sal.match.MatchField;
66 import org.opendaylight.controller.sal.action.Action;
67 import org.opendaylight.controller.sal.action.Output;
69 import org.opendaylight.controller.sal.reader.FlowOnNode;
70 import org.opendaylight.controller.sal.reader.IReadService;
71 import org.opendaylight.controller.sal.reader.IReadServiceListener;
72 import org.opendaylight.controller.sal.utils.GlobalConstants;
73 import org.opendaylight.controller.sal.utils.IObjectReader;
74 import org.opendaylight.controller.sal.utils.ObjectReader;
75 import org.opendaylight.controller.sal.utils.ObjectWriter;
76 import org.opendaylight.controller.sal.utils.NetUtils;
78 import org.opendaylight.controller.hosttracker.IfIptoHost;
79 import org.opendaylight.controller.sal.utils.Status;
80 import org.opendaylight.controller.sal.utils.StatusCode;
82 import org.opendaylight.controller.sal.utils.ServiceHelper;
83 import org.opendaylight.affinity.affinity.AffinityGroup;
84 import org.opendaylight.affinity.affinity.AffinityLink;
85 import org.opendaylight.affinity.affinity.AffinityIdentifier;
86 import org.opendaylight.affinity.affinity.IAffinityManager;
87 import org.opendaylight.affinity.affinity.IAffinityManagerAware;
89 import org.opendaylight.controller.hosttracker.IfIptoHost;
90 import org.opendaylight.controller.hosttracker.hostAware.HostNodeConnector;
91 import org.opendaylight.controller.switchmanager.ISwitchManager;
92 import org.opendaylight.affinity.l2agent.L2Agent;
93 import org.slf4j.Logger;
94 import org.slf4j.LoggerFactory;
97 * The class caches latest network nodes statistics as notified by reader
98 * services and provides API to retrieve them.
100 public class AffinityManagerImpl implements IAffinityManager, IConfigurationContainerAware, IObjectReader, ICacheUpdateAware<Long, String> {
101 private static final Logger log = LoggerFactory.getLogger(AffinityManagerImpl.class);
103 private static String ROOT = GlobalConstants.STARTUPHOME.toString();
104 private static final String SAVE = "Save";
105 private String affinityLinkFileName = null;
106 private String affinityGroupFileName = null;
107 private IFlowProgrammerService fps = null;
108 private ISwitchManager switchManager = null;
109 private L2Agent l2agent = null;
111 private ConcurrentMap<String, AffinityGroup> affinityGroupList;
112 private ConcurrentMap<String, AffinityLink> affinityLinkList;
113 private ConcurrentMap<Long, String> configSaveEvent;
115 private IfIptoHost hostTracker;
117 private final Set<IAffinityManagerAware> affinityManagerAware = Collections
118 .synchronizedSet(new HashSet<IAffinityManagerAware>());
121 private static boolean hostRefresh = true;
122 private int hostRetryCount = 5;
123 private IClusterContainerServices clusterContainerService = null;
124 private String containerName = null;
125 private boolean isDefaultContainer = true;
126 private static final int REPLACE_RETRY = 1;
128 public enum ReasonCode {
129 SUCCESS("Success"), FAILURE("Failure"), INVALID_CONF(
130 "Invalid Configuration"), EXIST("Entry Already Exist"), CONFLICT(
131 "Configuration Conflict with Existing Entry");
133 private final String name;
135 private ReasonCode(String name) {
140 public String toString() {
145 /* Only default container. */
146 public String getContainerName() {
147 return containerName;
150 public void startUp() {
151 // Initialize configuration file names
152 affinityLinkFileName = ROOT + "affinityConfig_link" + this.getContainerName()
154 affinityGroupFileName = ROOT + "affinityConfig_group" + this.getContainerName()
157 // Instantiate cluster synced variables
162 * Read startup and build database if we have not already gotten the
163 * configurations synced from another node
165 if (affinityGroupList.isEmpty() || affinityLinkList.isEmpty()) {
166 loadAffinityConfiguration();
170 public void shutDown() {
173 @SuppressWarnings("deprecation")
174 private void allocateCaches() {
175 if (this.clusterContainerService == null) {
176 this.nonClusterObjectCreate();
177 log.warn("un-initialized clusterContainerService, can't create cache");
181 clusterContainerService.createCache(
182 "affinity.affinityGroupList",
183 EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
184 clusterContainerService.createCache(
185 "affinity.affinityLinkList",
186 EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
187 clusterContainerService.createCache(
188 "affinity.configSaveEvent",
189 EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
190 } catch (CacheConfigException cce) {
191 log.error("\nCache configuration invalid - check cache mode");
192 } catch (CacheExistException ce) {
193 log.error("\nCache already exits - destroy and recreate if needed");
197 @SuppressWarnings({ "unchecked", "deprecation" })
198 private void retrieveCaches() {
199 if (this.clusterContainerService == null) {
200 log.info("un-initialized clusterContainerService, can't retrieve cache");
203 affinityGroupList = (ConcurrentMap<String, AffinityGroup>) clusterContainerService
204 .getCache("affinity.affinityGroupList");
205 if (affinityGroupList == null) {
206 log.error("\nFailed to get cache for affinityGroupList");
208 affinityLinkList = (ConcurrentMap<String, AffinityLink>) clusterContainerService
209 .getCache("affinity.affinityLinkList");
210 if (affinityLinkList == null) {
211 log.error("\nFailed to get cache for affinityLinkList");
214 configSaveEvent = (ConcurrentMap<Long, String>) clusterContainerService
215 .getCache("affinity.configSaveEvent");
216 if (configSaveEvent == null) {
217 log.error("\nFailed to get cache for configSaveEvent");
221 private void nonClusterObjectCreate() {
222 affinityLinkList = new ConcurrentHashMap<String, AffinityLink>();
223 affinityGroupList = new ConcurrentHashMap<String, AffinityGroup>();
224 configSaveEvent = new ConcurrentHashMap<Long, String>();
228 void setHostTracker(IfIptoHost h) {
229 this.hostTracker = h;
232 void unsetHostTracker(IfIptoHost h) {
233 if (this.hostTracker.equals(h)) {
234 this.hostTracker = null;
237 public void setFlowProgrammerService(IFlowProgrammerService s)
242 public void unsetFlowProgrammerService(IFlowProgrammerService s) {
248 public void setL2Agent(L2Agent s)
250 public void setL2Agent(TutorialL2Forwarding s)
251 >>>>>>> Add flow pushing rules.
257 public void unsetL2Agent(L2Agent s) {
259 public void unsetL2Agent(TutorialL2Forwarding s) {
260 >>>>>>> Add flow pushing rules.
261 if (this.l2agent == s) {
267 public void setForwardingRulesManager(
268 IForwardingRulesManager forwardingRulesManager) {
269 this.ruleManager = forwardingRulesManager;
272 public void unsetForwardingRulesManager(
273 IForwardingRulesManager forwardingRulesManager) {
274 if (this.ruleManager == forwardingRulesManager) {
275 this.ruleManager = null;
280 public Status addAffinityLink(AffinityLink al) {
281 boolean putNewLink = false;
283 if (affinityLinkList.containsKey(al.getName())) {
284 return new Status(StatusCode.CONFLICT,
285 "AffinityLink with the specified name already configured.");
289 AffinityLink alCurr = affinityLinkList.get(al.getName());
290 if (alCurr == null) {
291 if (affinityLinkList.putIfAbsent(al.getName(), al) == null) {
295 putNewLink = affinityLinkList.replace(al.getName(), alCurr, al);
299 String msg = "Cluster conflict: Conflict while adding the subnet " + al.getName();
300 return new Status(StatusCode.CONFLICT, msg);
303 return new Status(StatusCode.SUCCESS);
308 * Fetch all node connectors. Each switch port will receive a flow rule. Do not stop on error.
310 public Status pushFlowRule(Flow flow) {
311 /* Get all node connectors. */
312 Set<Node> nodes = switchManager.getNodes();
313 Status success = new Status(StatusCode.SUCCESS);
314 Status notfound = new Status(StatusCode.NOTFOUND);
317 log.debug("No nodes in network.");
320 for (Node node: nodes) {
321 Set<NodeConnector> ncs = switchManager.getNodeConnectors(node);
325 Status status = fps.addFlow(node, flow);
326 if (!status.isSuccess()) {
327 log.debug("Error during addFlow: {} on {}. The failure is: {}",
328 flow, node, status.getDescription());
335 * add flow rules for each node connector.
337 public Status addFlowRulesForRedirect(AffinityLink al) throws Exception {
338 Match match = new Match();
339 List<Action> actions = new ArrayList<Action>();
341 InetAddress address1, address2;
343 mask = InetAddress.getByName("255.255.255.255");
345 Flow f = new Flow(match, actions);
347 List<Entry<Host,Host>> hostPairList= getAllFlowsByHost(al);
348 for (Entry<Host,Host> hostPair : hostPairList) {
349 /* Create a match for each host pair in the affinity link. */
351 Host host1 = hostPair.getKey();
352 Host host2 = hostPair.getValue();
353 address1 = host1.getNetworkAddress();
354 address2 = host2.getNetworkAddress();
356 match.setField(MatchType.NW_SRC, address1, mask);
357 match.setField(MatchType.NW_DST, address2, mask);
360 /* For each end point, discover the mac address of the
361 * host. Then lookup the L2 table to find the port to send
362 * this flow along. Program the flow. */
364 byte [] mac = ((HostNodeConnector) host1).getDataLayerAddressBytes();
365 /* Tbd: use hosttracker for this. */
366 // NodeConnector dst_connector = l2agent.lookupMacAddress(mac);
367 // actions.add(new Output(dst_connector));
369 return new Status(StatusCode.SUCCESS);
372 public Status removeAffinityLink(String name) {
373 affinityLinkList.remove(name);
374 return new Status(StatusCode.SUCCESS);
377 public Status removeAffinityLink(AffinityLink al) {
378 AffinityLink alCurr = affinityLinkList.get(al.getName());
379 if (alCurr != null) {
380 affinityLinkList.remove(alCurr);
381 return new Status(StatusCode.SUCCESS);
383 String msg = "Affinity Link with specified name does not exist." + al.getName();
384 return new Status(StatusCode.INTERNALERROR, msg);
389 public AffinityLink getAffinityLink(String linkName) {
390 return affinityLinkList.get(linkName);
394 public List<AffinityLink> getAllAffinityLinks() {
395 return new ArrayList<AffinityLink>(affinityLinkList.values());
399 public Status addAffinityGroup(AffinityGroup ag) {
400 boolean putNewGroup = false;
401 String name = ag.getName();
402 if (affinityGroupList.containsKey(name)) {
403 return new Status(StatusCode.CONFLICT,
404 "AffinityGroup with the specified name already configured.");
406 AffinityGroup agCurr = affinityGroupList.get(name);
407 if (agCurr == null) {
408 if (affinityGroupList.putIfAbsent(name, ag) == null) {
412 putNewGroup = affinityGroupList.replace(name, agCurr, ag);
416 String msg = "Cluster conflict: Conflict while adding the subnet " + name;
417 return new Status(StatusCode.CONFLICT, msg);
420 return new Status(StatusCode.SUCCESS);
423 /* Check for errors. */
425 public Status removeAffinityGroup(String name) {
426 affinityGroupList.remove(name);
427 return new Status(StatusCode.SUCCESS);
431 public AffinityGroup getAffinityGroup(String groupName) {
432 return affinityGroupList.get(groupName);
436 public List<AffinityGroup> getAllAffinityGroups() {
437 return new ArrayList<AffinityGroup>(affinityGroupList.values());
440 /* Find where this is used. */
442 public Object readObject(ObjectInputStream ois)
443 throws FileNotFoundException, IOException, ClassNotFoundException {
444 // Perform the class deserialization locally, from inside the package
445 // where the class is defined
446 return ois.readObject();
449 @SuppressWarnings("unchecked")
450 private void loadAffinityConfiguration() {
451 ObjectReader objReader = new ObjectReader();
452 ConcurrentMap<String, AffinityGroup> groupList = (ConcurrentMap<String, AffinityGroup>) objReader.read(this, affinityGroupFileName);
453 ConcurrentMap<String, AffinityLink> linkList = (ConcurrentMap<String, AffinityLink>) objReader.read(this, affinityLinkFileName);
456 if (groupList != null) {
457 for (AffinityGroup ag : groupList.values()) {
458 addAffinityGroup(ag);
463 if (linkList != null) {
464 for (AffinityLink al : linkList.values()) {
471 public ArrayList<AffinityIdentifier> getAllElementsByAffinityIdentifier(AffinityGroup ag) {
472 return ag.getAllElements();
476 public List<Host> getAllElementsByHost(AffinityGroup ag) {
477 List<Host> hostList= new ArrayList<Host>();
479 for (AffinityIdentifier h : ag.getAllElements()) {
480 /* TBD: Do not assume this to be an InetAddress. */
482 if (hostTracker != null) {
483 Host host1 = hostTracker.hostFind((InetAddress) h.get());
491 public List<Entry<Host, Host>> getAllFlowsByHost(AffinityLink al) {
492 List<Entry<Host,Host>> hostPairList= new ArrayList<Entry<Host, Host>>();
494 AffinityGroup fromGroup = al.getFromGroup();
495 AffinityGroup toGroup = al.getToGroup();
497 for (AffinityIdentifier h1 : fromGroup.getAllElements()) {
498 for (AffinityIdentifier h2 : toGroup.getAllElements()) {
499 if (hostTracker != null) {
500 Host host1 = hostTracker.hostFind((InetAddress) h1.get());
501 Host host2 = hostTracker.hostFind((InetAddress) h2.get());
502 Entry<Host, Host> hp1=new AbstractMap.SimpleEntry<Host, Host>(host1, host2);
503 hostPairList.add(hp1);
511 public List<Entry<AffinityIdentifier, AffinityIdentifier>> getAllFlowsByAffinityIdentifier(AffinityLink al) {
512 List<Entry<AffinityIdentifier, AffinityIdentifier>> hostPairList= new ArrayList<Entry<AffinityIdentifier, AffinityIdentifier>>();
514 AffinityGroup fromGroup = al.getFromGroup();
515 AffinityGroup toGroup = al.getToGroup();
517 for (AffinityIdentifier h1 : fromGroup.getAllElements()) {
518 for (AffinityIdentifier h2 : toGroup.getAllElements()) {
519 Entry<AffinityIdentifier, AffinityIdentifier> hp1=new AbstractMap.SimpleEntry<AffinityIdentifier, AffinityIdentifier>(h1, h2);
520 hostPairList.add(hp1);
527 public Status saveConfiguration() {
528 return saveAffinityConfig();
532 public Status saveAffinityConfig() {
533 // Publish the save config event to the cluster nodes
534 configSaveEvent.put(new Date().getTime(), SAVE);
535 return saveAffinityConfigInternal();
538 public Status saveAffinityConfigInternal() {
539 Status retS = null, retP = null;
540 ObjectWriter objWriter = new ObjectWriter();
542 retS = objWriter.write(new ConcurrentHashMap<String, AffinityLink>(
543 affinityLinkList), affinityLinkFileName);
545 retP = objWriter.write(new ConcurrentHashMap<String, AffinityGroup>(
546 affinityGroupList), affinityGroupFileName);
548 if (retS.isSuccess() && retP.isSuccess()) {
549 return new Status(StatusCode.SUCCESS, "Configuration saved.");
551 return new Status(StatusCode.INTERNALERROR, "Save failed");
556 public void entryCreated(Long key, String cacheName, boolean local) {
560 public void entryUpdated(Long key, String new_value, String cacheName,
561 boolean originLocal) {
562 saveAffinityConfigInternal();
566 public void entryDeleted(Long key, String cacheName, boolean originLocal) {
570 * Function called by the dependency manager when all the required
571 * dependencies are satisfied
575 log.debug("INIT called!");
576 containerName = GlobalConstants.DEFAULT.toString();
581 * Function called by the dependency manager when at least one
582 * dependency become unsatisfied or when the component is shutting
583 * down because for example bundle is being stopped.
587 log.debug("DESTROY called!");
591 * Function called by dependency manager after "init ()" is called
592 * and after the services provided by the class are registered in
593 * the service registry
597 log.debug("START called!");
601 * Function called after registering the service in OSGi service registry.
604 // Retrieve current statistics so we don't have to wait for next refresh
605 IAffinityManager affinityManager = (IAffinityManager) ServiceHelper.getInstance(
606 IAffinityManager.class, this.getContainerName(), this);
607 if (affinityManager != null) {
608 log.debug("STARTED method called!");
613 * Function called by the dependency manager before the services
614 * exported by the component are unregistered, this will be
615 * followed by a "destroy ()" calls
619 log.debug("STOP called!");
622 void setClusterContainerService(IClusterContainerServices s) {
623 log.debug("Cluster Service set for affinity mgr");
624 this.clusterContainerService = s;
627 void unsetClusterContainerService(IClusterContainerServices s) {
628 if (this.clusterContainerService == s) {
629 log.debug("Cluster Service removed for affinity mgr!");
630 this.clusterContainerService = null;