2 * Copyright (c) 2013 Plexxi, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.affinity.affinity.internal;
11 import java.io.FileNotFoundException;
12 import java.io.IOException;
13 import java.io.ObjectInputStream;
14 import java.net.UnknownHostException;
15 import java.net.InetAddress;
16 import java.net.NetworkInterface;
17 import java.net.SocketException;
18 import java.util.ArrayList;
19 import java.util.Collections;
20 import java.util.Date;
21 import java.util.Dictionary;
22 import java.util.EnumSet;
23 import java.util.Enumeration;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.List;
28 import java.util.AbstractMap;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.ConcurrentMap;
32 import java.util.concurrent.CopyOnWriteArrayList;
33 import java.util.ArrayList;
34 import java.util.EnumSet;
35 import java.util.HashMap;
36 import java.util.HashSet;
37 import java.util.List;
39 import java.util.Map.Entry;
41 import java.util.concurrent.ConcurrentHashMap;
42 import java.util.concurrent.ConcurrentMap;
44 import org.apache.felix.dm.Component;
45 import org.eclipse.osgi.framework.console.CommandInterpreter;
46 import org.eclipse.osgi.framework.console.CommandProvider;
47 import org.opendaylight.controller.clustering.services.CacheConfigException;
48 import org.opendaylight.controller.clustering.services.CacheExistException;
49 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
50 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
51 import org.opendaylight.controller.clustering.services.IClusterServices;
52 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
53 import org.opendaylight.controller.forwardingrulesmanager.FlowEntry;
54 import org.opendaylight.controller.sal.core.IContainer;
55 import org.opendaylight.controller.sal.core.Node;
56 import org.opendaylight.controller.sal.core.Host;
57 import org.opendaylight.controller.sal.core.NodeConnector;
58 import org.opendaylight.controller.sal.core.NodeTable;
59 import org.opendaylight.controller.sal.core.Property;
60 import org.opendaylight.controller.sal.core.UpdateType;
61 import org.opendaylight.controller.sal.flowprogrammer.IFlowProgrammerService;
62 import org.opendaylight.controller.sal.flowprogrammer.Flow;
63 import org.opendaylight.controller.sal.match.Match;
64 import org.opendaylight.controller.sal.match.MatchType;
65 import org.opendaylight.controller.sal.match.MatchField;
66 import org.opendaylight.controller.sal.action.Action;
67 import org.opendaylight.controller.sal.action.Output;
69 import org.opendaylight.controller.sal.reader.FlowOnNode;
70 import org.opendaylight.controller.sal.reader.IReadService;
71 import org.opendaylight.controller.sal.reader.IReadServiceListener;
72 import org.opendaylight.controller.sal.utils.GlobalConstants;
73 import org.opendaylight.controller.sal.utils.IObjectReader;
74 import org.opendaylight.controller.sal.utils.ObjectReader;
75 import org.opendaylight.controller.sal.utils.ObjectWriter;
76 import org.opendaylight.controller.sal.utils.NetUtils;
78 import org.opendaylight.controller.hosttracker.IfIptoHost;
79 import org.opendaylight.controller.sal.utils.Status;
80 import org.opendaylight.controller.sal.utils.StatusCode;
82 import org.opendaylight.controller.sal.utils.ServiceHelper;
83 import org.opendaylight.affinity.affinity.AffinityGroup;
84 import org.opendaylight.affinity.affinity.AffinityLink;
85 import org.opendaylight.affinity.affinity.AffinityIdentifier;
86 import org.opendaylight.affinity.affinity.IAffinityManager;
87 import org.opendaylight.affinity.affinity.IAffinityManagerAware;
89 import org.opendaylight.controller.hosttracker.IfIptoHost;
90 import org.opendaylight.controller.hosttracker.hostAware.HostNodeConnector;
91 import org.opendaylight.controller.switchmanager.ISwitchManager;
92 import org.opendaylight.affinity.l2agent.L2Agent;
93 import org.slf4j.Logger;
94 import org.slf4j.LoggerFactory;
97 * The class caches latest network nodes statistics as notified by reader
98 * services and provides API to retrieve them.
100 public class AffinityManagerImpl implements IAffinityManager, IConfigurationContainerAware, IObjectReader, ICacheUpdateAware<Long, String> {
101 private static final Logger log = LoggerFactory.getLogger(AffinityManagerImpl.class);
103 private static String ROOT = GlobalConstants.STARTUPHOME.toString();
104 private static final String SAVE = "Save";
105 private String affinityLinkFileName = null;
106 private String affinityGroupFileName = null;
107 private IFlowProgrammerService fps = null;
108 private ISwitchManager switchManager = null;
109 private L2Agent l2agent = null;
111 private ConcurrentMap<String, AffinityGroup> affinityGroupList;
112 private ConcurrentMap<String, AffinityLink> affinityLinkList;
113 private ConcurrentMap<Long, String> configSaveEvent;
115 private IfIptoHost hostTracker;
117 private final Set<IAffinityManagerAware> affinityManagerAware = Collections
118 .synchronizedSet(new HashSet<IAffinityManagerAware>());
121 private static boolean hostRefresh = true;
122 private int hostRetryCount = 5;
123 private IClusterContainerServices clusterContainerService = null;
124 private String containerName = GlobalConstants.DEFAULT.toString();
125 private boolean isDefaultContainer = true;
126 private static final int REPLACE_RETRY = 1;
128 public enum ReasonCode {
129 SUCCESS("Success"), FAILURE("Failure"), INVALID_CONF(
130 "Invalid Configuration"), EXIST("Entry Already Exist"), CONFLICT(
131 "Configuration Conflict with Existing Entry");
133 private final String name;
135 private ReasonCode(String name) {
140 public String toString() {
145 /* Only default container. */
146 public String getContainerName() {
147 return containerName;
150 public void startUp() {
151 // Initialize configuration file names
152 affinityLinkFileName = ROOT + "affinityConfig_link" + this.getContainerName()
154 affinityGroupFileName = ROOT + "affinityConfig_group" + this.getContainerName()
156 log.debug("configuration file names " + affinityLinkFileName + "and " + affinityGroupFileName);
157 // Instantiate cluster synced variables
162 * Read startup and build database if we have not already gotten the
163 * configurations synced from another node
165 if (affinityGroupList.isEmpty() || affinityLinkList.isEmpty()) {
166 loadAffinityConfiguration();
170 public void shutDown() {
173 @SuppressWarnings("deprecation")
174 private void allocateCaches() {
175 if (this.clusterContainerService == null) {
176 this.nonClusterObjectCreate();
177 log.warn("un-initialized clusterContainerService, can't create cache");
181 clusterContainerService.createCache(
182 "affinity.affinityGroupList",
183 EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
184 clusterContainerService.createCache(
185 "affinity.affinityLinkList",
186 EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
187 clusterContainerService.createCache(
188 "affinity.configSaveEvent",
189 EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
190 } catch (CacheConfigException cce) {
191 log.error("\nCache configuration invalid - check cache mode");
192 } catch (CacheExistException ce) {
193 log.error("\nCache already exits - destroy and recreate if needed");
197 @SuppressWarnings({ "unchecked", "deprecation" })
198 private void retrieveCaches() {
199 if (this.clusterContainerService == null) {
200 log.info("un-initialized clusterContainerService, can't retrieve cache");
203 affinityGroupList = (ConcurrentMap<String, AffinityGroup>) clusterContainerService
204 .getCache("affinity.affinityGroupList");
205 if (affinityGroupList == null) {
206 log.error("\nFailed to get cache for affinityGroupList");
208 affinityLinkList = (ConcurrentMap<String, AffinityLink>) clusterContainerService
209 .getCache("affinity.affinityLinkList");
210 if (affinityLinkList == null) {
211 log.error("\nFailed to get cache for affinityLinkList");
214 configSaveEvent = (ConcurrentMap<Long, String>) clusterContainerService
215 .getCache("affinity.configSaveEvent");
216 if (configSaveEvent == null) {
217 log.error("\nFailed to get cache for configSaveEvent");
221 private void nonClusterObjectCreate() {
222 affinityLinkList = new ConcurrentHashMap<String, AffinityLink>();
223 affinityGroupList = new ConcurrentHashMap<String, AffinityGroup>();
224 configSaveEvent = new ConcurrentHashMap<Long, String>();
228 void setHostTracker(IfIptoHost h) {
229 this.hostTracker = h;
232 void unsetHostTracker(IfIptoHost h) {
233 if (this.hostTracker.equals(h)) {
234 this.hostTracker = null;
237 public void setFlowProgrammerService(IFlowProgrammerService s)
242 public void unsetFlowProgrammerService(IFlowProgrammerService s) {
248 public void setL2Agent(L2Agent s)
250 public void setL2Agent(TutorialL2Forwarding s)
251 >>>>>>> Add flow pushing rules.
257 public void unsetL2Agent(L2Agent s) {
259 public void unsetL2Agent(TutorialL2Forwarding s) {
260 >>>>>>> Add flow pushing rules.
261 if (this.l2agent == s) {
267 public void setForwardingRulesManager(
268 IForwardingRulesManager forwardingRulesManager) {
269 this.ruleManager = forwardingRulesManager;
272 public void unsetForwardingRulesManager(
273 IForwardingRulesManager forwardingRulesManager) {
274 if (this.ruleManager == forwardingRulesManager) {
275 this.ruleManager = null;
280 public Status addAffinityLink(AffinityLink al) {
281 boolean putNewLink = false;
283 if (affinityLinkList.containsKey(al.getName())) {
284 return new Status(StatusCode.CONFLICT,
285 "AffinityLink with the specified name already configured.");
289 AffinityLink alCurr = affinityLinkList.get(al.getName());
290 if (alCurr == null) {
291 if (affinityLinkList.putIfAbsent(al.getName(), al) == null) {
295 putNewLink = affinityLinkList.replace(al.getName(), alCurr, al);
299 String msg = "Cluster conflict: Conflict while adding the subnet " + al.getName();
300 return new Status(StatusCode.CONFLICT, msg);
303 return new Status(StatusCode.SUCCESS);
308 * Fetch all node connectors. Each switch port will receive a flow
309 * rule. Do not stop on error. Pass in the waypointMAC address so
310 * that the correct output port can be determined.
312 public Status pushFlowRule(Flow flow, byte [] waypointMAC) {
313 /* Get all node connectors. */
314 Set<Node> nodes = switchManager.getNodes();
315 Status success = new Status(StatusCode.SUCCESS);
316 Status notfound = new Status(StatusCode.NOTFOUND);
319 log.debug("No nodes in network.");
322 for (Node node: nodes) {
323 /* Look up the output port leading to the waypoint. */
324 NodeConnector dst_connector = l2agent.lookup(node, waypointMAC);
325 Action action = new Output(dst_connector);
326 flow.addAction(action);
328 Status status = fps.addFlow(node, flow);
329 if (!status.isSuccess()) {
330 log.debug("Error during addFlow: {} on {}. The failure is: {}",
331 flow, node, status.getDescription());
338 * add flow rules for each node connector.
340 public Status addFlowRulesForRedirect(AffinityLink al) throws Exception {
341 Match match = new Match();
342 List<Action> actions = new ArrayList<Action>();
344 InetAddress address1, address2;
346 mask = InetAddress.getByName("255.255.255.255");
348 Flow f = new Flow(match, actions);
349 String waypoint = al.getWaypoint();
351 List<Entry<Host,Host>> hostPairList= getAllFlowsByHost(al);
352 for (Entry<Host,Host> hostPair : hostPairList) {
353 /* Create a match for each host pair in the affinity link. */
355 Host host1 = hostPair.getKey();
356 Host host2 = hostPair.getValue();
357 address1 = host1.getNetworkAddress();
358 address2 = host2.getNetworkAddress();
360 match.setField(MatchType.NW_SRC, address1, mask);
361 match.setField(MatchType.NW_DST, address2, mask);
363 /* Send this flow rule to all nodes in the network. */
364 byte [] dstMAC = InetAddressToMAC(waypoint);
365 pushFlowRule(f, dstMAC);
367 return new Status(StatusCode.SUCCESS);
370 public byte [] InetAddressToMAC(String ipaddress) {
371 InetAddress inetAddr = NetUtils.parseInetAddress(ipaddress);
372 HostNodeConnector host = hostTracker.hostFind(inetAddr);
373 byte [] dst_mac = host.getDataLayerAddressBytes();
377 public Status removeAffinityLink(String name) {
378 affinityLinkList.remove(name);
379 return new Status(StatusCode.SUCCESS);
382 public Status removeAffinityLink(AffinityLink al) {
383 AffinityLink alCurr = affinityLinkList.get(al.getName());
384 if (alCurr != null) {
385 affinityLinkList.remove(alCurr);
386 return new Status(StatusCode.SUCCESS);
388 String msg = "Affinity Link with specified name does not exist." + al.getName();
389 return new Status(StatusCode.INTERNALERROR, msg);
394 public AffinityLink getAffinityLink(String linkName) {
395 return affinityLinkList.get(linkName);
399 public List<AffinityLink> getAllAffinityLinks() {
400 return new ArrayList<AffinityLink>(affinityLinkList.values());
404 public Status addAffinityGroup(AffinityGroup ag) {
405 boolean putNewGroup = false;
406 String name = ag.getName();
407 if (affinityGroupList.containsKey(name)) {
408 return new Status(StatusCode.CONFLICT,
409 "AffinityGroup with the specified name already configured.");
411 AffinityGroup agCurr = affinityGroupList.get(name);
412 if (agCurr == null) {
413 if (affinityGroupList.putIfAbsent(name, ag) == null) {
417 putNewGroup = affinityGroupList.replace(name, agCurr, ag);
421 String msg = "Cluster conflict: Conflict while adding the subnet " + name;
422 return new Status(StatusCode.CONFLICT, msg);
425 return new Status(StatusCode.SUCCESS);
428 /* Check for errors. */
430 public Status removeAffinityGroup(String name) {
431 affinityGroupList.remove(name);
432 return new Status(StatusCode.SUCCESS);
436 public AffinityGroup getAffinityGroup(String groupName) {
437 log.debug("getAffinityGroup" + groupName);
438 return affinityGroupList.get(groupName);
442 public List<AffinityGroup> getAllAffinityGroups() {
443 return new ArrayList<AffinityGroup>(affinityGroupList.values());
446 /* Find where this is used. */
448 public Object readObject(ObjectInputStream ois)
449 throws FileNotFoundException, IOException, ClassNotFoundException {
450 // Perform the class deserialization locally, from inside the package
451 // where the class is defined
452 return ois.readObject();
455 @SuppressWarnings("unchecked")
456 private void loadAffinityConfiguration() {
457 ObjectReader objReader = new ObjectReader();
458 ConcurrentMap<String, AffinityGroup> groupList = (ConcurrentMap<String, AffinityGroup>) objReader.read(this, affinityGroupFileName);
459 ConcurrentMap<String, AffinityLink> linkList = (ConcurrentMap<String, AffinityLink>) objReader.read(this, affinityLinkFileName);
462 if (groupList != null) {
463 for (AffinityGroup ag : groupList.values()) {
464 addAffinityGroup(ag);
469 if (linkList != null) {
470 for (AffinityLink al : linkList.values()) {
477 public ArrayList<AffinityIdentifier> getAllElementsByAffinityIdentifier(AffinityGroup ag) {
478 return ag.getAllElements();
482 public List<Host> getAllElementsByHost(AffinityGroup ag) {
483 List<Host> hostList= new ArrayList<Host>();
485 for (AffinityIdentifier h : ag.getAllElements()) {
486 /* TBD: Do not assume this to be an InetAddress. */
488 if (hostTracker != null) {
489 Host host1 = hostTracker.hostFind((InetAddress) h.get());
497 public List<Entry<Host, Host>> getAllFlowsByHost(AffinityLink al) {
498 List<Entry<Host,Host>> hostPairList= new ArrayList<Entry<Host, Host>>();
500 AffinityGroup fromGroup = al.getFromGroup();
501 AffinityGroup toGroup = al.getToGroup();
503 for (AffinityIdentifier h1 : fromGroup.getAllElements()) {
504 for (AffinityIdentifier h2 : toGroup.getAllElements()) {
505 if (hostTracker != null) {
506 Host host1 = hostTracker.hostFind((InetAddress) h1.get());
507 Host host2 = hostTracker.hostFind((InetAddress) h2.get());
508 Entry<Host, Host> hp1=new AbstractMap.SimpleEntry<Host, Host>(host1, host2);
509 hostPairList.add(hp1);
517 public List<Entry<AffinityIdentifier, AffinityIdentifier>> getAllFlowsByAffinityIdentifier(AffinityLink al) {
518 List<Entry<AffinityIdentifier, AffinityIdentifier>> hostPairList= new ArrayList<Entry<AffinityIdentifier, AffinityIdentifier>>();
520 AffinityGroup fromGroup = al.getFromGroup();
521 AffinityGroup toGroup = al.getToGroup();
523 for (AffinityIdentifier h1 : fromGroup.getAllElements()) {
524 for (AffinityIdentifier h2 : toGroup.getAllElements()) {
525 Entry<AffinityIdentifier, AffinityIdentifier> hp1=new AbstractMap.SimpleEntry<AffinityIdentifier, AffinityIdentifier>(h1, h2);
526 hostPairList.add(hp1);
533 public Status saveConfiguration() {
534 return saveAffinityConfig();
538 public Status saveAffinityConfig() {
539 // Publish the save config event to the cluster nodes
540 configSaveEvent.put(new Date().getTime(), SAVE);
541 return saveAffinityConfigInternal();
544 public Status saveAffinityConfigInternal() {
545 Status retS = null, retP = null;
546 ObjectWriter objWriter = new ObjectWriter();
548 retS = objWriter.write(new ConcurrentHashMap<String, AffinityLink>(
549 affinityLinkList), affinityLinkFileName);
551 retP = objWriter.write(new ConcurrentHashMap<String, AffinityGroup>(
552 affinityGroupList), affinityGroupFileName);
554 if (retS.isSuccess() && retP.isSuccess()) {
555 return new Status(StatusCode.SUCCESS, "Configuration saved.");
557 return new Status(StatusCode.INTERNALERROR, "Save failed");
562 public void entryCreated(Long key, String cacheName, boolean local) {
566 public void entryUpdated(Long key, String new_value, String cacheName,
567 boolean originLocal) {
568 saveAffinityConfigInternal();
572 public void entryDeleted(Long key, String cacheName, boolean originLocal) {
576 * Function called by the dependency manager when all the required
577 * dependencies are satisfied
581 log.debug("INIT called!");
582 containerName = GlobalConstants.DEFAULT.toString();
587 * Function called by the dependency manager when at least one
588 * dependency become unsatisfied or when the component is shutting
589 * down because for example bundle is being stopped.
593 log.debug("DESTROY called!");
597 * Function called by dependency manager after "init ()" is called
598 * and after the services provided by the class are registered in
599 * the service registry
603 log.debug("START called!");
607 * Function called after registering the service in OSGi service registry.
610 // Retrieve current statistics so we don't have to wait for next refresh
611 IAffinityManager affinityManager = (IAffinityManager) ServiceHelper.getInstance(
612 IAffinityManager.class, this.getContainerName(), this);
613 if (affinityManager != null) {
614 log.debug("STARTED method called!");
619 * Function called by the dependency manager before the services
620 * exported by the component are unregistered, this will be
621 * followed by a "destroy ()" calls
625 log.debug("STOP called!");
628 void setClusterContainerService(IClusterContainerServices s) {
629 log.debug("Cluster Service set for affinity mgr");
630 this.clusterContainerService = s;
633 void unsetClusterContainerService(IClusterContainerServices s) {
634 if (this.clusterContainerService == s) {
635 log.debug("Cluster Service removed for affinity mgr!");
636 this.clusterContainerService = null;