2 * Copyright (c) 2013 Plexxi, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.affinity.affinity.internal;
11 import java.io.FileNotFoundException;
12 import java.io.IOException;
13 import java.io.ObjectInputStream;
14 import java.net.UnknownHostException;
15 import java.net.InetAddress;
16 import java.net.NetworkInterface;
17 import java.net.SocketException;
18 import java.util.ArrayList;
19 import java.util.Collections;
20 import java.util.Date;
21 import java.util.Dictionary;
22 import java.util.EnumSet;
23 import java.util.Enumeration;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.List;
28 import java.util.AbstractMap;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.ConcurrentMap;
32 import java.util.concurrent.CopyOnWriteArrayList;
33 import java.util.ArrayList;
34 import java.util.EnumSet;
35 import java.util.HashMap;
36 import java.util.HashSet;
37 import java.util.List;
39 import java.util.Map.Entry;
41 import java.util.concurrent.ConcurrentHashMap;
42 import java.util.concurrent.ConcurrentMap;
44 import org.apache.felix.dm.Component;
45 import org.eclipse.osgi.framework.console.CommandInterpreter;
46 import org.eclipse.osgi.framework.console.CommandProvider;
47 import org.opendaylight.controller.clustering.services.CacheConfigException;
48 import org.opendaylight.controller.clustering.services.CacheExistException;
49 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
50 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
51 import org.opendaylight.controller.clustering.services.IClusterServices;
52 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
53 import org.opendaylight.controller.forwardingrulesmanager.FlowEntry;
54 import org.opendaylight.controller.sal.core.IContainer;
55 import org.opendaylight.controller.sal.core.Node;
56 import org.opendaylight.controller.sal.core.Host;
57 import org.opendaylight.controller.sal.core.NodeConnector;
58 import org.opendaylight.controller.sal.core.NodeTable;
59 import org.opendaylight.controller.sal.core.Property;
60 import org.opendaylight.controller.sal.core.UpdateType;
61 import org.opendaylight.controller.sal.flowprogrammer.IFlowProgrammerService;
62 import org.opendaylight.controller.sal.flowprogrammer.Flow;
63 import org.opendaylight.controller.sal.match.Match;
64 import org.opendaylight.controller.sal.match.MatchType;
65 import org.opendaylight.controller.sal.match.MatchField;
66 import org.opendaylight.controller.sal.action.Action;
67 import org.opendaylight.controller.sal.action.Output;
69 import org.opendaylight.controller.sal.reader.FlowOnNode;
70 import org.opendaylight.controller.sal.reader.IReadService;
71 import org.opendaylight.controller.sal.reader.IReadServiceListener;
72 import org.opendaylight.controller.sal.utils.GlobalConstants;
73 import org.opendaylight.controller.sal.utils.IObjectReader;
74 import org.opendaylight.controller.sal.utils.ObjectReader;
75 import org.opendaylight.controller.sal.utils.ObjectWriter;
76 import org.opendaylight.controller.sal.utils.NetUtils;
78 import org.opendaylight.controller.hosttracker.IfIptoHost;
79 import org.opendaylight.controller.sal.utils.Status;
80 import org.opendaylight.controller.sal.utils.StatusCode;
82 import org.opendaylight.controller.sal.utils.ServiceHelper;
83 import org.opendaylight.affinity.affinity.AffinityGroup;
84 import org.opendaylight.affinity.affinity.AffinityLink;
85 import org.opendaylight.affinity.affinity.AffinityIdentifier;
86 import org.opendaylight.affinity.affinity.IAffinityManager;
87 import org.opendaylight.affinity.affinity.IAffinityManagerAware;
89 import org.opendaylight.controller.hosttracker.IfIptoHost;
90 import org.opendaylight.controller.hosttracker.hostAware.HostNodeConnector;
91 import org.opendaylight.controller.switchmanager.ISwitchManager;
92 import org.opendaylight.affinity.l2agent.L2Agent;
94 import org.slf4j.Logger;
95 import org.slf4j.LoggerFactory;
98 * The class caches latest network nodes statistics as notified by reader
99 * services and provides API to retrieve them.
101 public class AffinityManagerImpl implements IAffinityManager, IConfigurationContainerAware, IObjectReader, ICacheUpdateAware<Long, String> {
102 private static final Logger log = LoggerFactory.getLogger(AffinityManagerImpl.class);
104 private static String ROOT = GlobalConstants.STARTUPHOME.toString();
105 private static final String SAVE = "Save";
106 private String affinityLinkFileName = null;
107 private String affinityGroupFileName = null;
108 private IFlowProgrammerService fps = null;
109 private ISwitchManager switchManager = null;
110 private L2Agent l2agent = null;
112 private ConcurrentMap<String, AffinityGroup> affinityGroupList;
113 private ConcurrentMap<String, AffinityLink> affinityLinkList;
114 private ConcurrentMap<Long, String> configSaveEvent;
116 private IfIptoHost hostTracker;
118 private final Set<IAffinityManagerAware> affinityManagerAware = Collections
119 .synchronizedSet(new HashSet<IAffinityManagerAware>());
122 private static boolean hostRefresh = true;
123 private int hostRetryCount = 5;
124 private IClusterContainerServices clusterContainerService = null;
125 private String containerName = null;
126 private boolean isDefaultContainer = true;
127 private static final int REPLACE_RETRY = 1;
129 public enum ReasonCode {
130 SUCCESS("Success"), FAILURE("Failure"), INVALID_CONF(
131 "Invalid Configuration"), EXIST("Entry Already Exist"), CONFLICT(
132 "Configuration Conflict with Existing Entry");
134 private final String name;
136 private ReasonCode(String name) {
141 public String toString() {
146 /* Only default container. */
147 public String getContainerName() {
148 return containerName;
151 public void startUp() {
152 // Initialize configuration file names
153 affinityLinkFileName = ROOT + "affinityConfig_link" + this.getContainerName()
155 affinityGroupFileName = ROOT + "affinityConfig_group" + this.getContainerName()
158 // Instantiate cluster synced variables
163 * Read startup and build database if we have not already gotten the
164 * configurations synced from another node
166 if (affinityGroupList.isEmpty() || affinityLinkList.isEmpty()) {
167 loadAffinityConfiguration();
171 public void shutDown() {
174 @SuppressWarnings("deprecation")
175 private void allocateCaches() {
176 if (this.clusterContainerService == null) {
177 this.nonClusterObjectCreate();
178 log.warn("un-initialized clusterContainerService, can't create cache");
182 clusterContainerService.createCache(
183 "affinity.affinityGroupList",
184 EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
185 clusterContainerService.createCache(
186 "affinity.affinityLinkList",
187 EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
188 clusterContainerService.createCache(
189 "affinity.configSaveEvent",
190 EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
191 } catch (CacheConfigException cce) {
192 log.error("\nCache configuration invalid - check cache mode");
193 } catch (CacheExistException ce) {
194 log.error("\nCache already exits - destroy and recreate if needed");
198 @SuppressWarnings({ "unchecked", "deprecation" })
199 private void retrieveCaches() {
200 if (this.clusterContainerService == null) {
201 log.info("un-initialized clusterContainerService, can't retrieve cache");
204 affinityGroupList = (ConcurrentMap<String, AffinityGroup>) clusterContainerService
205 .getCache("affinity.affinityGroupList");
206 if (affinityGroupList == null) {
207 log.error("\nFailed to get cache for affinityGroupList");
209 affinityLinkList = (ConcurrentMap<String, AffinityLink>) clusterContainerService
210 .getCache("affinity.affinityLinkList");
211 if (affinityLinkList == null) {
212 log.error("\nFailed to get cache for affinityLinkList");
215 configSaveEvent = (ConcurrentMap<Long, String>) clusterContainerService
216 .getCache("affinity.configSaveEvent");
217 if (configSaveEvent == null) {
218 log.error("\nFailed to get cache for configSaveEvent");
222 private void nonClusterObjectCreate() {
223 affinityLinkList = new ConcurrentHashMap<String, AffinityLink>();
224 affinityGroupList = new ConcurrentHashMap<String, AffinityGroup>();
225 configSaveEvent = new ConcurrentHashMap<Long, String>();
229 void setHostTracker(IfIptoHost h) {
230 this.hostTracker = h;
233 void unsetHostTracker(IfIptoHost h) {
234 if (this.hostTracker.equals(h)) {
235 this.hostTracker = null;
238 public void setFlowProgrammerService(IFlowProgrammerService s)
243 public void unsetFlowProgrammerService(IFlowProgrammerService s) {
248 public void setL2Agent(L2Agent s)
253 public void unsetL2Agent(L2Agent s) {
254 if (this.l2agent == s) {
260 public void setForwardingRulesManager(
261 IForwardingRulesManager forwardingRulesManager) {
262 this.ruleManager = forwardingRulesManager;
265 public void unsetForwardingRulesManager(
266 IForwardingRulesManager forwardingRulesManager) {
267 if (this.ruleManager == forwardingRulesManager) {
268 this.ruleManager = null;
273 public Status addAffinityLink(AffinityLink al) {
274 boolean putNewLink = false;
276 if (affinityLinkList.containsKey(al.getName())) {
277 return new Status(StatusCode.CONFLICT,
278 "AffinityLink with the specified name already configured.");
282 AffinityLink alCurr = affinityLinkList.get(al.getName());
283 if (alCurr == null) {
284 if (affinityLinkList.putIfAbsent(al.getName(), al) == null) {
288 putNewLink = affinityLinkList.replace(al.getName(), alCurr, al);
292 String msg = "Cluster conflict: Conflict while adding the subnet " + al.getName();
293 return new Status(StatusCode.CONFLICT, msg);
296 return new Status(StatusCode.SUCCESS);
301 * Fetch all node connectors. Each switch port will receive a flow rule. Do not stop on error.
303 public Status pushFlowRule(Flow flow) {
304 /* Get all node connectors. */
305 Set<Node> nodes = switchManager.getNodes();
306 Status success = new Status(StatusCode.SUCCESS);
307 Status notfound = new Status(StatusCode.NOTFOUND);
310 log.debug("No nodes in network.");
313 for (Node node: nodes) {
314 Set<NodeConnector> ncs = switchManager.getNodeConnectors(node);
318 Status status = fps.addFlow(node, flow);
319 if (!status.isSuccess()) {
320 log.debug("Error during addFlow: {} on {}. The failure is: {}",
321 flow, node, status.getDescription());
328 * add flow rules for each node connector.
330 public Status addFlowRulesForRedirect(AffinityLink al) throws Exception {
331 Match match = new Match();
332 List<Action> actions = new ArrayList<Action>();
334 InetAddress address1, address2;
336 mask = InetAddress.getByName("255.255.255.255");
338 Flow f = new Flow(match, actions);
340 List<Entry<Host,Host>> hostPairList= getAllFlowsByHost(al);
341 for (Entry<Host,Host> hostPair : hostPairList) {
342 /* Create a match for each host pair in the affinity link. */
344 Host host1 = hostPair.getKey();
345 Host host2 = hostPair.getValue();
346 address1 = host1.getNetworkAddress();
347 address2 = host2.getNetworkAddress();
349 match.setField(MatchType.NW_SRC, address1, mask);
350 match.setField(MatchType.NW_DST, address2, mask);
353 /* For each end point, discover the mac address of the
354 * host. Then lookup the L2 table to find the port to send
355 * this flow along. Program the flow. */
357 byte [] mac = ((HostNodeConnector) host1).getDataLayerAddressBytes();
358 /* Tbd: use hosttracker for this. */
359 // NodeConnector dst_connector = l2agent.lookupMacAddress(mac);
360 // actions.add(new Output(dst_connector));
362 return new Status(StatusCode.SUCCESS);
365 public Status removeAffinityLink(String name) {
366 affinityLinkList.remove(name);
367 return new Status(StatusCode.SUCCESS);
370 public Status removeAffinityLink(AffinityLink al) {
371 AffinityLink alCurr = affinityLinkList.get(al.getName());
372 if (alCurr != null) {
373 affinityLinkList.remove(alCurr);
374 return new Status(StatusCode.SUCCESS);
376 String msg = "Affinity Link with specified name does not exist." + al.getName();
377 return new Status(StatusCode.INTERNALERROR, msg);
382 public AffinityLink getAffinityLink(String linkName) {
383 return affinityLinkList.get(linkName);
387 public List<AffinityLink> getAllAffinityLinks() {
388 return new ArrayList<AffinityLink>(affinityLinkList.values());
392 public Status addAffinityGroup(AffinityGroup ag) {
393 boolean putNewGroup = false;
394 String name = ag.getName();
395 if (affinityGroupList.containsKey(name)) {
396 return new Status(StatusCode.CONFLICT,
397 "AffinityGroup with the specified name already configured.");
399 AffinityGroup agCurr = affinityGroupList.get(name);
400 if (agCurr == null) {
401 if (affinityGroupList.putIfAbsent(name, ag) == null) {
405 putNewGroup = affinityGroupList.replace(name, agCurr, ag);
409 String msg = "Cluster conflict: Conflict while adding the subnet " + name;
410 return new Status(StatusCode.CONFLICT, msg);
413 return new Status(StatusCode.SUCCESS);
416 /* Check for errors. */
418 public Status removeAffinityGroup(String name) {
419 affinityGroupList.remove(name);
420 return new Status(StatusCode.SUCCESS);
424 public AffinityGroup getAffinityGroup(String groupName) {
425 return affinityGroupList.get(groupName);
429 public List<AffinityGroup> getAllAffinityGroups() {
430 return new ArrayList<AffinityGroup>(affinityGroupList.values());
433 /* Find where this is used. */
435 public Object readObject(ObjectInputStream ois)
436 throws FileNotFoundException, IOException, ClassNotFoundException {
437 // Perform the class deserialization locally, from inside the package
438 // where the class is defined
439 return ois.readObject();
442 @SuppressWarnings("unchecked")
443 private void loadAffinityConfiguration() {
444 ObjectReader objReader = new ObjectReader();
445 ConcurrentMap<String, AffinityGroup> groupList = (ConcurrentMap<String, AffinityGroup>) objReader.read(this, affinityGroupFileName);
446 ConcurrentMap<String, AffinityLink> linkList = (ConcurrentMap<String, AffinityLink>) objReader.read(this, affinityLinkFileName);
449 if (groupList != null) {
450 for (AffinityGroup ag : groupList.values()) {
451 addAffinityGroup(ag);
456 if (linkList != null) {
457 for (AffinityLink al : linkList.values()) {
464 public ArrayList<AffinityIdentifier> getAllElementsByAffinityIdentifier(AffinityGroup ag) {
465 return ag.getAllElements();
469 public List<Host> getAllElementsByHost(AffinityGroup ag) {
470 List<Host> hostList= new ArrayList<Host>();
472 for (AffinityIdentifier h : ag.getAllElements()) {
473 /* TBD: Do not assume this to be an InetAddress. */
475 if (hostTracker != null) {
476 Host host1 = hostTracker.hostFind((InetAddress) h.get());
484 public List<Entry<Host, Host>> getAllFlowsByHost(AffinityLink al) {
485 List<Entry<Host,Host>> hostPairList= new ArrayList<Entry<Host, Host>>();
487 AffinityGroup fromGroup = al.getFromGroup();
488 AffinityGroup toGroup = al.getToGroup();
490 for (AffinityIdentifier h1 : fromGroup.getAllElements()) {
491 for (AffinityIdentifier h2 : toGroup.getAllElements()) {
492 if (hostTracker != null) {
493 Host host1 = hostTracker.hostFind((InetAddress) h1.get());
494 Host host2 = hostTracker.hostFind((InetAddress) h2.get());
495 Entry<Host, Host> hp1=new AbstractMap.SimpleEntry<Host, Host>(host1, host2);
496 hostPairList.add(hp1);
504 public List<Entry<AffinityIdentifier, AffinityIdentifier>> getAllFlowsByAffinityIdentifier(AffinityLink al) {
505 List<Entry<AffinityIdentifier, AffinityIdentifier>> hostPairList= new ArrayList<Entry<AffinityIdentifier, AffinityIdentifier>>();
507 AffinityGroup fromGroup = al.getFromGroup();
508 AffinityGroup toGroup = al.getToGroup();
510 for (AffinityIdentifier h1 : fromGroup.getAllElements()) {
511 for (AffinityIdentifier h2 : toGroup.getAllElements()) {
512 Entry<AffinityIdentifier, AffinityIdentifier> hp1=new AbstractMap.SimpleEntry<AffinityIdentifier, AffinityIdentifier>(h1, h2);
513 hostPairList.add(hp1);
520 public Status saveConfiguration() {
521 return saveAffinityConfig();
525 public Status saveAffinityConfig() {
526 // Publish the save config event to the cluster nodes
527 configSaveEvent.put(new Date().getTime(), SAVE);
528 return saveAffinityConfigInternal();
531 public Status saveAffinityConfigInternal() {
532 Status retS = null, retP = null;
533 ObjectWriter objWriter = new ObjectWriter();
535 retS = objWriter.write(new ConcurrentHashMap<String, AffinityLink>(
536 affinityLinkList), affinityLinkFileName);
538 retP = objWriter.write(new ConcurrentHashMap<String, AffinityGroup>(
539 affinityGroupList), affinityGroupFileName);
541 if (retS.isSuccess() && retP.isSuccess()) {
542 return new Status(StatusCode.SUCCESS, "Configuration saved.");
544 return new Status(StatusCode.INTERNALERROR, "Save failed");
549 public void entryCreated(Long key, String cacheName, boolean local) {
553 public void entryUpdated(Long key, String new_value, String cacheName,
554 boolean originLocal) {
555 saveAffinityConfigInternal();
559 public void entryDeleted(Long key, String cacheName, boolean originLocal) {
563 * Function called by the dependency manager when all the required
564 * dependencies are satisfied
568 log.debug("INIT called!");
569 containerName = GlobalConstants.DEFAULT.toString();
574 * Function called by the dependency manager when at least one
575 * dependency become unsatisfied or when the component is shutting
576 * down because for example bundle is being stopped.
580 log.debug("DESTROY called!");
584 * Function called by dependency manager after "init ()" is called
585 * and after the services provided by the class are registered in
586 * the service registry
590 log.debug("START called!");
594 * Function called after registering the service in OSGi service registry.
597 // Retrieve current statistics so we don't have to wait for next refresh
598 IAffinityManager affinityManager = (IAffinityManager) ServiceHelper.getInstance(
599 IAffinityManager.class, this.getContainerName(), this);
600 if (affinityManager != null) {
601 log.debug("STARTED method called!");
606 * Function called by the dependency manager before the services
607 * exported by the component are unregistered, this will be
608 * followed by a "destroy ()" calls
612 log.debug("STOP called!");
615 void setClusterContainerService(IClusterContainerServices s) {
616 log.debug("Cluster Service set for affinity mgr");
617 this.clusterContainerService = s;
620 void unsetClusterContainerService(IClusterContainerServices s) {
621 if (this.clusterContainerService == s) {
622 log.debug("Cluster Service removed for affinity mgr!");
623 this.clusterContainerService = null;