2 * Copyright (c) 2013 Plexxi, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.affinity.affinity.internal;
11 import java.io.FileNotFoundException;
12 import java.io.IOException;
13 import java.io.ObjectInputStream;
14 import java.net.UnknownHostException;
15 import java.net.InetAddress;
16 import java.net.NetworkInterface;
17 import java.net.SocketException;
18 import java.util.ArrayList;
19 import java.util.Collections;
20 import java.util.Date;
21 import java.util.Dictionary;
22 import java.util.EnumSet;
23 import java.util.Enumeration;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.List;
28 import java.util.AbstractMap;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.ConcurrentMap;
32 import java.util.concurrent.CopyOnWriteArrayList;
33 import java.util.ArrayList;
34 import java.util.EnumSet;
35 import java.util.HashMap;
36 import java.util.HashSet;
37 import java.util.List;
39 import java.util.Map.Entry;
41 import java.util.concurrent.ConcurrentHashMap;
42 import java.util.concurrent.ConcurrentMap;
44 import org.apache.felix.dm.Component;
45 import org.eclipse.osgi.framework.console.CommandInterpreter;
46 import org.eclipse.osgi.framework.console.CommandProvider;
47 import org.opendaylight.controller.clustering.services.CacheConfigException;
48 import org.opendaylight.controller.clustering.services.CacheExistException;
49 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
50 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
51 import org.opendaylight.controller.clustering.services.IClusterServices;
52 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
53 import org.opendaylight.controller.forwardingrulesmanager.FlowEntry;
54 import org.opendaylight.controller.sal.core.IContainer;
55 import org.opendaylight.controller.sal.core.Node;
56 import org.opendaylight.controller.sal.core.Host;
57 import org.opendaylight.controller.sal.core.NodeConnector;
58 import org.opendaylight.controller.sal.core.NodeTable;
59 import org.opendaylight.controller.sal.core.Property;
60 import org.opendaylight.controller.sal.core.UpdateType;
61 import org.opendaylight.controller.sal.flowprogrammer.IFlowProgrammerService;
62 import org.opendaylight.controller.sal.flowprogrammer.Flow;
63 import org.opendaylight.controller.sal.match.Match;
64 import org.opendaylight.controller.sal.match.MatchType;
65 import org.opendaylight.controller.sal.match.MatchField;
66 import org.opendaylight.controller.sal.action.Action;
67 import org.opendaylight.controller.sal.action.Output;
69 import org.opendaylight.controller.sal.reader.FlowOnNode;
70 import org.opendaylight.controller.sal.reader.IReadService;
71 import org.opendaylight.controller.sal.reader.IReadServiceListener;
72 import org.opendaylight.controller.sal.utils.GlobalConstants;
73 import org.opendaylight.controller.sal.utils.IObjectReader;
74 import org.opendaylight.controller.sal.utils.ObjectReader;
75 import org.opendaylight.controller.sal.utils.ObjectWriter;
76 import org.opendaylight.controller.sal.utils.NetUtils;
78 import org.opendaylight.controller.hosttracker.IfIptoHost;
79 import org.opendaylight.controller.sal.utils.Status;
80 import org.opendaylight.controller.sal.utils.StatusCode;
82 import org.opendaylight.controller.sal.utils.ServiceHelper;
83 import org.opendaylight.affinity.affinity.AffinityGroup;
84 import org.opendaylight.affinity.affinity.AffinityLink;
85 import org.opendaylight.affinity.affinity.AffinityIdentifier;
86 import org.opendaylight.affinity.affinity.IAffinityManager;
87 import org.opendaylight.affinity.affinity.IAffinityManagerAware;
89 import org.opendaylight.controller.hosttracker.IfIptoHost;
90 import org.opendaylight.controller.hosttracker.hostAware.HostNodeConnector;
91 import org.opendaylight.controller.switchmanager.ISwitchManager;
92 import org.opendaylight.affinity.l2agent.L2Agent;
94 import org.slf4j.Logger;
95 import org.slf4j.LoggerFactory;
98 * The class caches latest network nodes statistics as notified by reader
99 * services and provides API to retrieve them.
101 public class AffinityManagerImpl implements IAffinityManager, IConfigurationContainerAware, IObjectReader, ICacheUpdateAware<Long, String> {
102 private static final Logger log = LoggerFactory.getLogger(AffinityManagerImpl.class);
104 private static String ROOT = GlobalConstants.STARTUPHOME.toString();
105 private static final String SAVE = "Save";
106 private String affinityLinkFileName = null;
107 private String affinityGroupFileName = null;
108 private IFlowProgrammerService fps = null;
109 private ISwitchManager switchManager = null;
110 private L2Agent l2agent = null;
112 private ConcurrentMap<String, AffinityGroup> affinityGroupList;
113 private ConcurrentMap<String, AffinityLink> affinityLinkList;
114 private ConcurrentMap<Long, String> configSaveEvent;
116 private IfIptoHost hostTracker;
118 private final Set<IAffinityManagerAware> affinityManagerAware = Collections
119 .synchronizedSet(new HashSet<IAffinityManagerAware>());
122 private static boolean hostRefresh = true;
123 private int hostRetryCount = 5;
124 private IClusterContainerServices clusterContainerService = null;
125 private String containerName = GlobalConstants.DEFAULT.toString();
126 private boolean isDefaultContainer = true;
127 private static final int REPLACE_RETRY = 1;
129 public enum ReasonCode {
130 SUCCESS("Success"), FAILURE("Failure"), INVALID_CONF(
131 "Invalid Configuration"), EXIST("Entry Already Exist"), CONFLICT(
132 "Configuration Conflict with Existing Entry");
134 private final String name;
136 private ReasonCode(String name) {
141 public String toString() {
146 /* Only default container. */
147 public String getContainerName() {
148 return containerName;
151 public void startUp() {
152 // Initialize configuration file names
153 affinityLinkFileName = ROOT + "affinityConfig_link" + this.getContainerName()
155 affinityGroupFileName = ROOT + "affinityConfig_group" + this.getContainerName()
157 log.debug("configuration file names " + affinityLinkFileName + "and " + affinityGroupFileName);
158 // Instantiate cluster synced variables
163 * Read startup and build database if we have not already gotten the
164 * configurations synced from another node
166 if (affinityGroupList.isEmpty() || affinityLinkList.isEmpty()) {
167 loadAffinityConfiguration();
171 public void shutDown() {
174 @SuppressWarnings("deprecation")
175 private void allocateCaches() {
176 if (this.clusterContainerService == null) {
177 this.nonClusterObjectCreate();
178 log.warn("un-initialized clusterContainerService, can't create cache");
182 clusterContainerService.createCache(
183 "affinity.affinityGroupList",
184 EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
185 clusterContainerService.createCache(
186 "affinity.affinityLinkList",
187 EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
188 clusterContainerService.createCache(
189 "affinity.configSaveEvent",
190 EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
191 } catch (CacheConfigException cce) {
192 log.error("\nCache configuration invalid - check cache mode");
193 } catch (CacheExistException ce) {
194 log.error("\nCache already exits - destroy and recreate if needed");
198 @SuppressWarnings({ "unchecked", "deprecation" })
199 private void retrieveCaches() {
200 if (this.clusterContainerService == null) {
201 log.info("un-initialized clusterContainerService, can't retrieve cache");
204 affinityGroupList = (ConcurrentMap<String, AffinityGroup>) clusterContainerService
205 .getCache("affinity.affinityGroupList");
206 if (affinityGroupList == null) {
207 log.error("\nFailed to get cache for affinityGroupList");
209 affinityLinkList = (ConcurrentMap<String, AffinityLink>) clusterContainerService
210 .getCache("affinity.affinityLinkList");
211 if (affinityLinkList == null) {
212 log.error("\nFailed to get cache for affinityLinkList");
215 configSaveEvent = (ConcurrentMap<Long, String>) clusterContainerService
216 .getCache("affinity.configSaveEvent");
217 if (configSaveEvent == null) {
218 log.error("\nFailed to get cache for configSaveEvent");
222 private void nonClusterObjectCreate() {
223 affinityLinkList = new ConcurrentHashMap<String, AffinityLink>();
224 affinityGroupList = new ConcurrentHashMap<String, AffinityGroup>();
225 configSaveEvent = new ConcurrentHashMap<Long, String>();
229 void setHostTracker(IfIptoHost h) {
230 this.hostTracker = h;
233 void unsetHostTracker(IfIptoHost h) {
234 if (this.hostTracker.equals(h)) {
235 this.hostTracker = null;
238 public void setFlowProgrammerService(IFlowProgrammerService s)
243 public void unsetFlowProgrammerService(IFlowProgrammerService s) {
248 public void setL2Agent(L2Agent s)
253 public void unsetL2Agent(L2Agent s) {
254 if (this.l2agent == s) {
260 public void setForwardingRulesManager(
261 IForwardingRulesManager forwardingRulesManager) {
262 this.ruleManager = forwardingRulesManager;
265 public void unsetForwardingRulesManager(
266 IForwardingRulesManager forwardingRulesManager) {
267 if (this.ruleManager == forwardingRulesManager) {
268 this.ruleManager = null;
273 public Status addAffinityLink(AffinityLink al) {
274 boolean putNewLink = false;
276 if (affinityLinkList.containsKey(al.getName())) {
277 return new Status(StatusCode.CONFLICT,
278 "AffinityLink with the specified name already configured.");
282 AffinityLink alCurr = affinityLinkList.get(al.getName());
283 if (alCurr == null) {
284 if (affinityLinkList.putIfAbsent(al.getName(), al) == null) {
288 putNewLink = affinityLinkList.replace(al.getName(), alCurr, al);
292 String msg = "Cluster conflict: Conflict while adding the subnet " + al.getName();
293 return new Status(StatusCode.CONFLICT, msg);
296 return new Status(StatusCode.SUCCESS);
301 * Fetch all node connectors. Each switch port will receive a flow
302 * rule. Do not stop on error. Pass in the waypointMAC address so
303 * that the correct output port can be determined.
305 public Status pushFlowRule(Flow flow, byte [] waypointMAC) {
306 /* Get all node connectors. */
307 Set<Node> nodes = switchManager.getNodes();
308 Status success = new Status(StatusCode.SUCCESS);
309 Status notfound = new Status(StatusCode.NOTFOUND);
312 log.debug("No nodes in network.");
315 for (Node node: nodes) {
316 /* Look up the output port leading to the waypoint. */
317 NodeConnector dst_connector = l2agent.lookup(node, waypointMAC);
318 Action action = new Output(dst_connector);
319 flow.addAction(action);
321 Status status = fps.addFlow(node, flow);
322 if (!status.isSuccess()) {
323 log.debug("Error during addFlow: {} on {}. The failure is: {}",
324 flow, node, status.getDescription());
331 * add flow rules for each node connector.
333 public Status addFlowRulesForRedirect(AffinityLink al) throws Exception {
334 Match match = new Match();
335 List<Action> actions = new ArrayList<Action>();
337 InetAddress address1, address2;
339 mask = InetAddress.getByName("255.255.255.255");
341 Flow f = new Flow(match, actions);
342 String waypoint = al.getWaypoint();
344 List<Entry<Host,Host>> hostPairList= getAllFlowsByHost(al);
345 for (Entry<Host,Host> hostPair : hostPairList) {
346 /* Create a match for each host pair in the affinity link. */
348 Host host1 = hostPair.getKey();
349 Host host2 = hostPair.getValue();
350 address1 = host1.getNetworkAddress();
351 address2 = host2.getNetworkAddress();
353 match.setField(MatchType.NW_SRC, address1, mask);
354 match.setField(MatchType.NW_DST, address2, mask);
356 /* Send this flow rule to all nodes in the network. */
357 byte [] dstMAC = InetAddressToMAC(waypoint);
358 pushFlowRule(f, dstMAC);
360 return new Status(StatusCode.SUCCESS);
363 public byte [] InetAddressToMAC(String ipaddress) {
364 InetAddress inetAddr = NetUtils.parseInetAddress(ipaddress);
365 HostNodeConnector host = hostTracker.hostFind(inetAddr);
366 byte [] dst_mac = host.getDataLayerAddressBytes();
370 public Status removeAffinityLink(String name) {
371 affinityLinkList.remove(name);
372 return new Status(StatusCode.SUCCESS);
375 public Status removeAffinityLink(AffinityLink al) {
376 AffinityLink alCurr = affinityLinkList.get(al.getName());
377 if (alCurr != null) {
378 affinityLinkList.remove(alCurr);
379 return new Status(StatusCode.SUCCESS);
381 String msg = "Affinity Link with specified name does not exist." + al.getName();
382 return new Status(StatusCode.INTERNALERROR, msg);
387 public AffinityLink getAffinityLink(String linkName) {
388 return affinityLinkList.get(linkName);
392 public List<AffinityLink> getAllAffinityLinks() {
393 return new ArrayList<AffinityLink>(affinityLinkList.values());
397 public Status addAffinityGroup(AffinityGroup ag) {
398 boolean putNewGroup = false;
399 String name = ag.getName();
400 if (affinityGroupList.containsKey(name)) {
401 return new Status(StatusCode.CONFLICT,
402 "AffinityGroup with the specified name already configured.");
404 AffinityGroup agCurr = affinityGroupList.get(name);
405 if (agCurr == null) {
406 if (affinityGroupList.putIfAbsent(name, ag) == null) {
410 putNewGroup = affinityGroupList.replace(name, agCurr, ag);
414 String msg = "Cluster conflict: Conflict while adding the subnet " + name;
415 return new Status(StatusCode.CONFLICT, msg);
418 return new Status(StatusCode.SUCCESS);
421 /* Check for errors. */
423 public Status removeAffinityGroup(String name) {
424 affinityGroupList.remove(name);
425 return new Status(StatusCode.SUCCESS);
429 public AffinityGroup getAffinityGroup(String groupName) {
430 log.debug("getAffinityGroup" + groupName);
431 return affinityGroupList.get(groupName);
435 public List<AffinityGroup> getAllAffinityGroups() {
436 return new ArrayList<AffinityGroup>(affinityGroupList.values());
439 /* Find where this is used. */
441 public Object readObject(ObjectInputStream ois)
442 throws FileNotFoundException, IOException, ClassNotFoundException {
443 // Perform the class deserialization locally, from inside the package
444 // where the class is defined
445 return ois.readObject();
448 @SuppressWarnings("unchecked")
449 private void loadAffinityConfiguration() {
450 ObjectReader objReader = new ObjectReader();
451 ConcurrentMap<String, AffinityGroup> groupList = (ConcurrentMap<String, AffinityGroup>) objReader.read(this, affinityGroupFileName);
452 ConcurrentMap<String, AffinityLink> linkList = (ConcurrentMap<String, AffinityLink>) objReader.read(this, affinityLinkFileName);
455 if (groupList != null) {
456 for (AffinityGroup ag : groupList.values()) {
457 addAffinityGroup(ag);
462 if (linkList != null) {
463 for (AffinityLink al : linkList.values()) {
470 public ArrayList<AffinityIdentifier> getAllElementsByAffinityIdentifier(AffinityGroup ag) {
471 return ag.getAllElements();
475 public List<Host> getAllElementsByHost(AffinityGroup ag) {
476 List<Host> hostList= new ArrayList<Host>();
478 for (AffinityIdentifier h : ag.getAllElements()) {
479 /* TBD: Do not assume this to be an InetAddress. */
481 if (hostTracker != null) {
482 Host host1 = hostTracker.hostFind((InetAddress) h.get());
490 public List<Entry<Host, Host>> getAllFlowsByHost(AffinityLink al) {
491 List<Entry<Host,Host>> hostPairList= new ArrayList<Entry<Host, Host>>();
493 AffinityGroup fromGroup = al.getFromGroup();
494 AffinityGroup toGroup = al.getToGroup();
496 for (AffinityIdentifier h1 : fromGroup.getAllElements()) {
497 for (AffinityIdentifier h2 : toGroup.getAllElements()) {
498 if (hostTracker != null) {
499 Host host1 = hostTracker.hostFind((InetAddress) h1.get());
500 Host host2 = hostTracker.hostFind((InetAddress) h2.get());
501 Entry<Host, Host> hp1=new AbstractMap.SimpleEntry<Host, Host>(host1, host2);
502 hostPairList.add(hp1);
510 public List<Entry<AffinityIdentifier, AffinityIdentifier>> getAllFlowsByAffinityIdentifier(AffinityLink al) {
511 List<Entry<AffinityIdentifier, AffinityIdentifier>> hostPairList= new ArrayList<Entry<AffinityIdentifier, AffinityIdentifier>>();
513 AffinityGroup fromGroup = al.getFromGroup();
514 AffinityGroup toGroup = al.getToGroup();
516 for (AffinityIdentifier h1 : fromGroup.getAllElements()) {
517 for (AffinityIdentifier h2 : toGroup.getAllElements()) {
518 Entry<AffinityIdentifier, AffinityIdentifier> hp1=new AbstractMap.SimpleEntry<AffinityIdentifier, AffinityIdentifier>(h1, h2);
519 hostPairList.add(hp1);
526 public Status saveConfiguration() {
527 return saveAffinityConfig();
531 public Status saveAffinityConfig() {
532 // Publish the save config event to the cluster nodes
533 configSaveEvent.put(new Date().getTime(), SAVE);
534 return saveAffinityConfigInternal();
537 public Status saveAffinityConfigInternal() {
538 Status retS = null, retP = null;
539 ObjectWriter objWriter = new ObjectWriter();
541 retS = objWriter.write(new ConcurrentHashMap<String, AffinityLink>(
542 affinityLinkList), affinityLinkFileName);
544 retP = objWriter.write(new ConcurrentHashMap<String, AffinityGroup>(
545 affinityGroupList), affinityGroupFileName);
547 if (retS.isSuccess() && retP.isSuccess()) {
548 return new Status(StatusCode.SUCCESS, "Configuration saved.");
550 return new Status(StatusCode.INTERNALERROR, "Save failed");
555 public void entryCreated(Long key, String cacheName, boolean local) {
559 public void entryUpdated(Long key, String new_value, String cacheName,
560 boolean originLocal) {
561 saveAffinityConfigInternal();
565 public void entryDeleted(Long key, String cacheName, boolean originLocal) {
569 * Function called by the dependency manager when all the required
570 * dependencies are satisfied
574 log.debug("INIT called!");
575 containerName = GlobalConstants.DEFAULT.toString();
580 * Function called by the dependency manager when at least one
581 * dependency become unsatisfied or when the component is shutting
582 * down because for example bundle is being stopped.
586 log.debug("DESTROY called!");
590 * Function called by dependency manager after "init ()" is called
591 * and after the services provided by the class are registered in
592 * the service registry
596 log.debug("START called!");
600 * Function called after registering the service in OSGi service registry.
603 // Retrieve current statistics so we don't have to wait for next refresh
604 IAffinityManager affinityManager = (IAffinityManager) ServiceHelper.getInstance(
605 IAffinityManager.class, this.getContainerName(), this);
606 if (affinityManager != null) {
607 log.debug("STARTED method called!");
612 * Function called by the dependency manager before the services
613 * exported by the component are unregistered, this will be
614 * followed by a "destroy ()" calls
618 log.debug("STOP called!");
621 void setClusterContainerService(IClusterContainerServices s) {
622 log.debug("Cluster Service set for affinity mgr");
623 this.clusterContainerService = s;
626 void unsetClusterContainerService(IClusterContainerServices s) {
627 if (this.clusterContainerService == s) {
628 log.debug("Cluster Service removed for affinity mgr!");
629 this.clusterContainerService = null;