2 * Copyright (c) 2013 Plexxi, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.affinity.affinity.internal;
11 import java.io.FileNotFoundException;
12 import java.io.IOException;
13 import java.io.ObjectInputStream;
14 import java.net.UnknownHostException;
15 import java.net.InetAddress;
16 import java.net.NetworkInterface;
17 import java.net.SocketException;
18 import java.util.ArrayList;
19 import java.util.Collections;
20 import java.util.Date;
21 import java.util.Dictionary;
22 import java.util.EnumSet;
23 import java.util.Enumeration;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.List;
28 import java.util.AbstractMap;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.ConcurrentMap;
32 import java.util.concurrent.CopyOnWriteArrayList;
33 import java.util.ArrayList;
34 import java.util.EnumSet;
35 import java.util.HashMap;
36 import java.util.HashSet;
37 import java.util.List;
39 import java.util.Map.Entry;
41 import java.util.concurrent.ConcurrentHashMap;
42 import java.util.concurrent.ConcurrentMap;
44 import org.apache.felix.dm.Component;
45 import org.eclipse.osgi.framework.console.CommandInterpreter;
46 import org.eclipse.osgi.framework.console.CommandProvider;
47 import org.opendaylight.controller.clustering.services.CacheConfigException;
48 import org.opendaylight.controller.clustering.services.CacheExistException;
49 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
50 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
51 import org.opendaylight.controller.clustering.services.IClusterServices;
52 import org.opendaylight.controller.configuration.IConfigurationContainerAware;
53 import org.opendaylight.controller.hosttracker.IfIptoHost;
54 import org.opendaylight.controller.sal.flowprogrammer.Flow;
55 import org.opendaylight.controller.sal.utils.IPProtocols;
57 import org.opendaylight.controller.sal.core.IContainer;
58 import org.opendaylight.controller.sal.core.Node;
59 import org.opendaylight.controller.sal.core.Host;
60 import org.opendaylight.controller.sal.core.NodeConnector;
61 import org.opendaylight.controller.sal.core.NodeTable;
62 import org.opendaylight.controller.sal.core.Property;
63 import org.opendaylight.controller.sal.core.UpdateType;
65 import org.opendaylight.controller.sal.flowprogrammer.Flow;
66 import org.opendaylight.controller.sal.match.Match;
67 import org.opendaylight.controller.sal.match.MatchType;
68 import org.opendaylight.controller.sal.match.MatchField;
69 import org.opendaylight.controller.sal.action.Action;
70 import org.opendaylight.controller.sal.action.Output;
71 import org.opendaylight.controller.sal.utils.EtherTypes;
73 import org.opendaylight.controller.sal.reader.FlowOnNode;
74 import org.opendaylight.controller.sal.reader.IReadService;
75 import org.opendaylight.controller.sal.reader.IReadServiceListener;
76 import org.opendaylight.controller.sal.utils.GlobalConstants;
77 import org.opendaylight.controller.sal.utils.IObjectReader;
78 import org.opendaylight.controller.sal.utils.ObjectReader;
79 import org.opendaylight.controller.sal.utils.ObjectWriter;
80 import org.opendaylight.controller.sal.utils.NetUtils;
82 import org.opendaylight.controller.sal.utils.Status;
83 import org.opendaylight.controller.sal.utils.StatusCode;
85 import org.opendaylight.controller.sal.utils.ServiceHelper;
86 import org.opendaylight.affinity.affinity.AffinityGroup;
87 import org.opendaylight.affinity.affinity.AffinityLink;
88 import org.opendaylight.affinity.affinity.AffinityIdentifier;
89 import org.opendaylight.affinity.affinity.AffinityAttributeType;
90 import org.opendaylight.affinity.affinity.AffinityAttribute;
91 import org.opendaylight.affinity.affinity.IAffinityManager;
92 import org.opendaylight.affinity.affinity.IAffinityManagerAware;
93 import org.opendaylight.affinity.affinity.InetAddressMask;
95 import org.slf4j.Logger;
96 import org.slf4j.LoggerFactory;
99 * Affinity configuration.
101 public class AffinityManagerImpl implements IAffinityManager,
102 IConfigurationContainerAware, IObjectReader, ICacheUpdateAware<Long, String> {
103 private static final Logger log = LoggerFactory.getLogger(AffinityManagerImpl.class);
105 private static String ROOT = GlobalConstants.STARTUPHOME.toString();
106 private static final String SAVE = "Save";
108 // write all objects to a single file.
109 private String affinityLinkFileName = null;
110 private String affinityGroupFileName = null;
112 private ConcurrentMap<String, AffinityGroup> affinityGroupList;
113 private ConcurrentMap<String, AffinityLink> affinityLinkList;
114 private ConcurrentMap<Long, String> configSaveEvent;
117 private final Set<IAffinityManagerAware> affinityManagerAware = Collections
118 .synchronizedSet(new HashSet<IAffinityManagerAware>());
120 private static boolean hostRefresh = true;
121 private int hostRetryCount = 5;
122 private IClusterContainerServices clusterContainerService = null;
123 private String containerName = GlobalConstants.DEFAULT.toString();
124 private boolean isDefaultContainer = true;
125 private static final int REPLACE_RETRY = 1;
126 private IfIptoHost hostTracker;
128 private static short REDIRECT_IPSWITCH_PRIORITY = 3;
130 public enum ReasonCode {
131 SUCCESS("Success"), FAILURE("Failure"), INVALID_CONF(
132 "Invalid Configuration"), EXIST("Entry Already Exist"), CONFLICT(
133 "Configuration Conflict with Existing Entry");
135 private final String name;
137 private ReasonCode(String name) {
142 public String toString() {
147 /* Only default container. */
148 public String getContainerName() {
149 return containerName;
152 public void startUp() {
153 // Initialize configuration file names
154 affinityLinkFileName = ROOT + "affinityConfig_link" + this.getContainerName()
156 affinityGroupFileName = ROOT + "affinityConfig_group" + this.getContainerName()
158 log.debug("configuration file names " + affinityLinkFileName + "and " + affinityGroupFileName);
159 // Instantiate cluster synced variables
164 * Read startup and build database if we have not already gotten the
165 * configurations synced from another node
167 if (affinityGroupList.isEmpty() || affinityLinkList.isEmpty()) {
168 loadAffinityConfiguration();
172 public void shutDown() {
175 @SuppressWarnings("deprecation")
176 private void allocateCaches() {
177 if (this.clusterContainerService == null) {
178 this.nonClusterObjectCreate();
179 log.warn("un-initialized clusterContainerService, can't create cache");
183 clusterContainerService.createCache(
184 "affinity.affinityGroupList",
185 EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
186 clusterContainerService.createCache(
187 "affinity.affinityLinkList",
188 EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
189 clusterContainerService.createCache(
190 "affinity.configSaveEvent",
191 EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
192 } catch (CacheConfigException cce) {
193 log.error("\nCache configuration invalid - check cache mode");
194 } catch (CacheExistException ce) {
195 log.error("\nCache already exits - destroy and recreate if needed");
199 @SuppressWarnings({ "unchecked", "deprecation" })
200 private void retrieveCaches() {
201 if (this.clusterContainerService == null) {
202 log.info("un-initialized clusterContainerService, can't retrieve cache");
205 affinityGroupList = (ConcurrentMap<String, AffinityGroup>) clusterContainerService
206 .getCache("affinity.affinityGroupList");
207 if (affinityGroupList == null) {
208 log.error("\nFailed to get cache for affinityGroupList");
210 affinityLinkList = (ConcurrentMap<String, AffinityLink>) clusterContainerService
211 .getCache("affinity.affinityLinkList");
212 if (affinityLinkList == null) {
213 log.error("\nFailed to get cache for affinityLinkList");
216 configSaveEvent = (ConcurrentMap<Long, String>) clusterContainerService
217 .getCache("affinity.configSaveEvent");
218 if (configSaveEvent == null) {
219 log.error("\nFailed to get cache for configSaveEvent");
223 private void nonClusterObjectCreate() {
224 affinityLinkList = new ConcurrentHashMap<String, AffinityLink>();
225 affinityGroupList = new ConcurrentHashMap<String, AffinityGroup>();
226 configSaveEvent = new ConcurrentHashMap<Long, String>();
230 public Status addAffinityLink(AffinityLink al) {
231 boolean putNewLink = false;
233 if (affinityLinkList.containsKey(al.getName())) {
234 return new Status(StatusCode.CONFLICT,
235 "AffinityLink with the specified name already configured.");
239 AffinityLink alCurr = affinityLinkList.get(al.getName());
240 if (alCurr == null) {
241 if (affinityLinkList.putIfAbsent(al.getName(), al) == null) {
245 putNewLink = affinityLinkList.replace(al.getName(), alCurr, al);
249 String msg = "Cluster conflict: Conflict while adding the subnet " + al.getName();
250 return new Status(StatusCode.CONFLICT, msg);
253 return new Status(StatusCode.SUCCESS);
256 public Status removeAffinityLink(String name) {
257 affinityLinkList.remove(name);
258 return new Status(StatusCode.SUCCESS);
261 public Status removeAffinityLink(AffinityLink al) {
262 AffinityLink alCurr = affinityLinkList.get(al.getName());
263 if (alCurr != null) {
264 affinityLinkList.remove(alCurr);
265 return new Status(StatusCode.SUCCESS);
267 String msg = "Affinity Link with specified name does not exist." + al.getName();
268 return new Status(StatusCode.INTERNALERROR, msg);
273 public AffinityLink getAffinityLink(String linkName) {
274 return affinityLinkList.get(linkName);
278 public List<AffinityLink> getAllAffinityLinks() {
279 return new ArrayList<AffinityLink>(affinityLinkList.values());
283 public Status addAffinityGroup(AffinityGroup ag) {
284 boolean putNewGroup = false;
285 String name = ag.getName();
286 if (affinityGroupList.containsKey(name)) {
287 return new Status(StatusCode.CONFLICT,
288 "AffinityGroup with the specified name already configured.");
290 AffinityGroup agCurr = affinityGroupList.get(name);
291 if (agCurr == null) {
292 if (affinityGroupList.putIfAbsent(name, ag) == null) {
296 putNewGroup = affinityGroupList.replace(name, agCurr, ag);
300 String msg = "Cluster conflict: Conflict while adding the subnet " + name;
301 return new Status(StatusCode.CONFLICT, msg);
304 return new Status(StatusCode.SUCCESS);
307 /* Check for errors. */
309 public Status removeAffinityGroup(String name) {
310 affinityGroupList.remove(name);
311 return new Status(StatusCode.SUCCESS);
315 public AffinityGroup getAffinityGroup(String groupName) {
316 log.debug("getAffinityGroup" + groupName);
317 return affinityGroupList.get(groupName);
321 public List<AffinityGroup> getAllAffinityGroups() {
322 return new ArrayList<AffinityGroup>(affinityGroupList.values());
325 /* Find where this is used. */
327 public Object readObject(ObjectInputStream ois)
328 throws FileNotFoundException, IOException, ClassNotFoundException {
329 // Perform the class deserialization locally, from inside the package
330 // where the class is defined
331 return ois.readObject();
334 @SuppressWarnings("unchecked")
335 private void loadAffinityConfiguration() {
336 ObjectReader objReader = new ObjectReader();
337 ConcurrentMap<String, AffinityGroup> groupList = (ConcurrentMap<String, AffinityGroup>) objReader.read(this, affinityGroupFileName);
338 ConcurrentMap<String, AffinityLink> linkList = (ConcurrentMap<String, AffinityLink>) objReader.read(this, affinityLinkFileName);
341 if (groupList != null) {
342 for (AffinityGroup ag : groupList.values()) {
343 addAffinityGroup(ag);
348 if (linkList != null) {
349 for (AffinityLink al : linkList.values()) {
356 public ArrayList<AffinityIdentifier> getAllElementsByAffinityIdentifier(AffinityGroup ag) {
357 return ag.getAllElements();
360 public List<Host> getAllElementsByHost(AffinityGroup ag) {
361 List<Host> hostList= new ArrayList<Host>();
363 for (AffinityIdentifier h : ag.getAllElements()) {
365 if (hostTracker != null) {
366 Host host1 = hostTracker.hostFind((InetAddress) h.get());
374 public List<Entry<Host, Host>> getAllFlowsByHost(AffinityLink al) {
375 List<Entry<Host,Host>> hostPairList= new ArrayList<Entry<Host, Host>>();
377 AffinityGroup fromGroup = al.getFromGroup();
378 AffinityGroup toGroup = al.getToGroup();
380 for (AffinityIdentifier h1 : fromGroup.getAllElements()) {
381 for (AffinityIdentifier h2 : toGroup.getAllElements()) {
382 if (hostTracker != null) {
383 Host host1 = hostTracker.hostFind((InetAddress) h1.get());
384 Host host2 = hostTracker.hostFind((InetAddress) h2.get());
385 log.debug("Flow between {}, {}", host1, host2);
386 Entry<Host, Host> hp1=new AbstractMap.SimpleEntry<Host, Host>(host1, host2);
387 hostPairList.add(hp1);
394 public List<Entry<AffinityIdentifier, AffinityIdentifier>> getAllFlowsByAffinityIdentifier(AffinityLink al) {
395 List<Entry<AffinityIdentifier, AffinityIdentifier>> hostPairList= new ArrayList<Entry<AffinityIdentifier, AffinityIdentifier>>();
397 AffinityGroup fromGroup = al.getFromGroup();
398 AffinityGroup toGroup = al.getToGroup();
400 for (AffinityIdentifier h1 : fromGroup.getAllElements()) {
401 for (AffinityIdentifier h2 : toGroup.getAllElements()) {
402 Entry<AffinityIdentifier, AffinityIdentifier> hp1=new AbstractMap.SimpleEntry<AffinityIdentifier, AffinityIdentifier>(h1, h2);
403 log.debug("Adding hostPair {} -> {}", h1, h2);
404 hostPairList.add(hp1);
411 public Status saveConfiguration() {
412 return saveAffinityConfig();
416 public Status saveAffinityConfig() {
417 // Publish the save config event to the cluster nodes
418 configSaveEvent.put(new Date().getTime(), SAVE);
419 return saveAffinityConfigInternal();
422 public Status saveAffinityConfigInternal() {
423 Status retS = null, retP = null;
424 ObjectWriter objWriter = new ObjectWriter();
426 retS = objWriter.write(new ConcurrentHashMap<String, AffinityLink>(
427 affinityLinkList), affinityLinkFileName);
429 retP = objWriter.write(new ConcurrentHashMap<String, AffinityGroup>(
430 affinityGroupList), affinityGroupFileName);
432 if (retS.isSuccess() && retP.isSuccess()) {
433 return new Status(StatusCode.SUCCESS, "Configuration saved.");
435 return new Status(StatusCode.INTERNALERROR, "Save failed");
440 public void entryCreated(Long key, String cacheName, boolean local) {
444 public void entryUpdated(Long key, String new_value, String cacheName,
445 boolean originLocal) {
446 saveAffinityConfigInternal();
450 public void entryDeleted(Long key, String cacheName, boolean originLocal) {
454 * Function called by the dependency manager when all the required
455 * dependencies are satisfied
459 log.debug("INIT called!");
460 containerName = GlobalConstants.DEFAULT.toString();
465 * Function called by the dependency manager when at least one
466 * dependency become unsatisfied or when the component is shutting
467 * down because for example bundle is being stopped.
471 log.debug("DESTROY called!");
475 * Function called by dependency manager after "init ()" is called
476 * and after the services provided by the class are registered in
477 * the service registry
481 log.debug("START called!");
485 * Function called after registering the service in OSGi service registry.
488 // Retrieve current statistics so we don't have to wait for next refresh
489 IAffinityManager affinityManager = (IAffinityManager) ServiceHelper.getInstance(
490 IAffinityManager.class, this.getContainerName(), this);
491 if (affinityManager != null) {
492 log.debug("STARTED method called!");
497 * Function called by the dependency manager before the services
498 * exported by the component are unregistered, this will be
499 * followed by a "destroy ()" calls
503 log.debug("STOP called!");
506 void setClusterContainerService(IClusterContainerServices s) {
507 log.debug("Cluster Service set for affinity mgr");
508 this.clusterContainerService = s;
511 void unsetClusterContainerService(IClusterContainerServices s) {
512 if (this.clusterContainerService == s) {
513 log.debug("Cluster Service removed for affinity mgr!");
514 this.clusterContainerService = null;
518 void setHostTracker(IfIptoHost h) {
519 log.info("Setting hosttracker {}", h);
520 this.hostTracker = h;
523 void unsetHostTracker(IfIptoHost h) {
524 if (this.hostTracker.equals(h)) {
525 this.hostTracker = null;
529 /* Add a nfchain config for this affinity link. */
530 public List<Flow> getFlowlist(AffinityLink al) {
531 InetAddress from = null, to = null;
533 log.info("get flowlist affinity link = {}", al.getName());
534 List<Flow> flowlist = new ArrayList<Flow>();
535 List<Entry<AffinityIdentifier,AffinityIdentifier>> hostPairList= getAllFlowsByAffinityIdentifier(al);
537 /* Create a Flow for each host pair in the affinity link. */
538 for (Entry<AffinityIdentifier,AffinityIdentifier> hostPair : hostPairList) {
539 log.info("Processing next hostPair {} ", hostPair);
541 Match match = new Match();
545 /* Set source fields. */
546 if (hostPair.getKey().get() instanceof InetAddress) {
547 addr = hostPair.getKey().get();
548 match.setField(new MatchField(MatchType.NW_SRC, (InetAddress) addr, null));
549 } else if (hostPair.getKey().get() instanceof InetAddressMask) {
550 addrmask = hostPair.getKey().get();
551 InetAddress faddr = ((InetAddressMask) addrmask).getNetworkAddress();
552 InetAddress fmask = NetUtils.getInetNetworkMask((int) ((InetAddressMask) addrmask).getMask(), false);
553 match.setField(new MatchField(MatchType.NW_SRC, faddr, fmask));
555 /* xxx mac address ... */
557 /* Set destination fields. */
558 if (hostPair.getValue().get() instanceof InetAddress) {
559 addr = (InetAddress) hostPair.getValue().get();
560 match.setField(new MatchField(MatchType.NW_DST, addr, null));
561 } else if (hostPair.getValue().get() instanceof InetAddressMask) {
562 addrmask = (InetAddressMask) hostPair.getValue().get();
563 InetAddress taddr = ((InetAddressMask) addrmask).getNetworkAddress();
564 InetAddress tmask = NetUtils.getInetNetworkMask((int) ((InetAddressMask) addrmask).getMask(), false);
565 match.setField(new MatchField(MatchType.NW_DST, taddr, tmask));
567 /* xxx mac address ... */
569 /* Set other fields. */
570 match.setField(MatchType.DL_TYPE, EtherTypes.IPv4.shortValue());
571 Flow flow = new Flow(match, null);
572 flow.setPriority(REDIRECT_IPSWITCH_PRIORITY);
578 public HashMap<String, List<Flow>>getAllFlowGroups() {
579 HashMap<String, List<Flow>> flowgroups = new HashMap<String, List<Flow>>();
580 for (AffinityLink al: getAllAffinityLinks()) {
581 List<Flow> flowlist = getFlowlist(al);
582 flowgroups.put(al.getName(), flowlist);
587 public HashMap<String, HashMap<AffinityAttributeType,AffinityAttribute>>getAllAttributes() {
588 HashMap<String, HashMap<AffinityAttributeType, AffinityAttribute>>attributes =
589 new HashMap<String, HashMap<AffinityAttributeType, AffinityAttribute>>();
592 for (AffinityLink al: getAllAffinityLinks()) {
593 HashMap<AffinityAttributeType, AffinityAttribute> pergroupattrs = al.getAttributeList();
594 attributes.put(al.getName(), pergroupattrs);