-
/*
* Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
*
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.ObjectInputStream;
-import java.util.Collections;
+import java.util.ArrayList;
+import java.util.Date;
import java.util.Dictionary;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.felix.dm.Component;
import org.eclipse.osgi.framework.console.CommandProvider;
import org.opendaylight.controller.clustering.services.CacheConfigException;
import org.opendaylight.controller.clustering.services.CacheExistException;
+import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
import org.opendaylight.controller.clustering.services.IClusterContainerServices;
import org.opendaylight.controller.clustering.services.IClusterServices;
import org.opendaylight.controller.configuration.IConfigurationContainerAware;
-import org.opendaylight.controller.sal.core.ConstructionException;
import org.opendaylight.controller.sal.core.Edge;
import org.opendaylight.controller.sal.core.Host;
import org.opendaylight.controller.sal.core.Node;
import org.opendaylight.controller.sal.core.UpdateType;
import org.opendaylight.controller.sal.topology.IListenTopoUpdates;
import org.opendaylight.controller.sal.topology.ITopologyService;
-import org.opendaylight.controller.sal.utils.StatusCode;
+import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
import org.opendaylight.controller.sal.utils.GlobalConstants;
-import org.opendaylight.controller.sal.utils.HexEncode;
import org.opendaylight.controller.sal.utils.IObjectReader;
import org.opendaylight.controller.sal.utils.ObjectReader;
import org.opendaylight.controller.sal.utils.ObjectWriter;
import org.opendaylight.controller.sal.utils.Status;
+import org.opendaylight.controller.sal.utils.StatusCode;
import org.opendaylight.controller.topologymanager.ITopologyManager;
import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
+import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware;
import org.opendaylight.controller.topologymanager.TopologyUserLinkConfig;
import org.osgi.framework.BundleContext;
import org.osgi.framework.FrameworkUtil;
* network topology. It provides service for applications to interact with
* topology database and notifies all the listeners of topology changes.
*/
-public class TopologyManagerImpl implements ITopologyManager,
- IConfigurationContainerAware, IListenTopoUpdates, IObjectReader,
+public class TopologyManagerImpl implements
+ ICacheUpdateAware,
+ ITopologyManager,
+ IConfigurationContainerAware,
+ IListenTopoUpdates,
+ IObjectReader,
CommandProvider {
- private static final Logger log = LoggerFactory
- .getLogger(TopologyManagerImpl.class);
- private ITopologyService topoService = null;
- private IClusterContainerServices clusterContainerService = null;
+ static final String TOPOEDGESDB = "topologymanager.edgesDB";
+ static final String TOPOHOSTSDB = "topologymanager.hostsDB";
+ static final String TOPONODECONNECTORDB = "topologymanager.nodeConnectorDB";
+ static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
+ private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class);
+ private static final String SAVE = "Save";
+ private ITopologyService topoService;
+ private IClusterContainerServices clusterContainerService;
// DB of all the Edges with properties which constitute our topology
- private ConcurrentMap<Edge, Set<Property>> edgesDB = null;
- // DB of all NodeConnector which are part of Edges, meaning they
- // are connected to another NodeConnector on the other side
- private ConcurrentMap<NodeConnector, Set<Property>> nodeConnectorsDB = null;
+ private ConcurrentMap<Edge, Set<Property>> edgesDB;
+ // DB of all NodeConnector which are part of ISL Edges, meaning they
+ // are connected to another NodeConnector on the other side of an ISL link.
+ // NodeConnector of a Production Edge is not part of this DB.
+ private ConcurrentMap<NodeConnector, Set<Property>> nodeConnectorsDB;
// DB of all the NodeConnectors with an Host attached to it
- private ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>> hostsDB = null;
+ private ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>> hostsDB;
// Topology Manager Aware listeners
- private Set<ITopologyManagerAware> topologyManagerAware = Collections
- .synchronizedSet(new HashSet<ITopologyManagerAware>());
-
+ private Set<ITopologyManagerAware> topologyManagerAware = new CopyOnWriteArraySet<ITopologyManagerAware>();
+ // Topology Manager Aware listeners - for clusterwide updates
+ private Set<ITopologyManagerClusterWideAware> topologyManagerClusterWideAware =
+ new CopyOnWriteArraySet<ITopologyManagerClusterWideAware>();
private static String ROOT = GlobalConstants.STARTUPHOME.toString();
- private String userLinksFileName = null;
- private ConcurrentMap<String, TopologyUserLinkConfig> userLinks;
-
+ private String userLinksFileName;
+ private ConcurrentMap<String, TopologyUserLinkConfig> userLinksDB;
+ private BlockingQueue<TopoEdgeUpdate> notifyQ = new LinkedBlockingQueue<TopoEdgeUpdate>();
+ private volatile Boolean shuttingDown = false;
+ private Thread notifyThread;
+
+
void nonClusterObjectCreate() {
- edgesDB = new ConcurrentHashMap<Edge, Set<Property>>();
- hostsDB = new ConcurrentHashMap<NodeConnector, ImmutablePair<Host, Set<Property>>>();
- userLinks = new ConcurrentHashMap<String, TopologyUserLinkConfig>();
- nodeConnectorsDB = new ConcurrentHashMap<NodeConnector, Set<Property>>();
+ edgesDB = new ConcurrentHashMap<Edge, Set<Property>>();
+ hostsDB = new ConcurrentHashMap<NodeConnector, ImmutablePair<Host, Set<Property>>>();
+ nodeConnectorsDB = new ConcurrentHashMap<NodeConnector, Set<Property>>();
+ userLinksDB = new ConcurrentHashMap<String, TopologyUserLinkConfig>();
}
-
void setTopologyManagerAware(ITopologyManagerAware s) {
if (this.topologyManagerAware != null) {
- log.debug("Adding ITopologyManagerAware: " + s);
+ log.debug("Adding ITopologyManagerAware: {}", s);
this.topologyManagerAware.add(s);
}
}
void unsetTopologyManagerAware(ITopologyManagerAware s) {
if (this.topologyManagerAware != null) {
+ log.debug("Removing ITopologyManagerAware: {}", s);
this.topologyManagerAware.remove(s);
}
}
+ void setTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
+ if (this.topologyManagerClusterWideAware != null) {
+ log.debug("Adding ITopologyManagerClusterWideAware: {}", s);
+ this.topologyManagerClusterWideAware.add(s);
+ }
+ }
+
+ void unsetTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
+ if (this.topologyManagerClusterWideAware != null) {
+ log.debug("Removing ITopologyManagerClusterWideAware: {}", s);
+ this.topologyManagerClusterWideAware.remove(s);
+ }
+ }
+
void setTopoService(ITopologyService s) {
+ log.debug("Adding ITopologyService: {}", s);
this.topoService = s;
}
void unsetTopoService(ITopologyService s) {
if (this.topoService == s) {
+ log.debug("Removing ITopologyService: {}", s);
this.topoService = null;
}
}
*
*/
void init(Component c) {
+ allocateCaches();
+ retrieveCaches();
String containerName = null;
- Dictionary props = c.getServiceProperties();
+ Dictionary<?, ?> props = c.getServiceProperties();
if (props != null) {
containerName = (String) props.get("containerName");
} else {
containerName = "UNKNOWN";
}
- if (this.clusterContainerService == null) {
- log.error("Cluster Services is null, not expected!");
- return;
- }
+ userLinksFileName = ROOT + "userTopology_" + containerName + ".conf";
+ registerWithOSGIConsole();
+ loadConfiguration();
+ // Restore the shuttingDown status on init of the component
+ shuttingDown = false;
+ notifyThread = new Thread(new TopologyNotify(notifyQ));
+ }
- if (this.topoService == null) {
- log.error("Topology Services is null, not expected!");
- return;
+ @SuppressWarnings({ "unchecked", "deprecation" })
+ private void allocateCaches() {
+ try {
+ this.edgesDB =
+ (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.createCache(TOPOEDGESDB,
+ EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ } catch (CacheExistException cee) {
+ log.debug(TOPOEDGESDB + " Cache already exists - destroy and recreate if needed");
+ } catch (CacheConfigException cce) {
+ log.error(TOPOEDGESDB + " Cache configuration invalid - check cache mode");
}
try {
- this.edgesDB = (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService
- .createCache("topologymanager.edgesDB", EnumSet
- .of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ this.hostsDB =
+ (ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>>) this.clusterContainerService.createCache(
+ TOPOHOSTSDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
} catch (CacheExistException cee) {
- log.error("topologymanager.edgesDB Cache already exists - "
- + "destroy and recreate if needed");
+ log.debug(TOPOHOSTSDB + " Cache already exists - destroy and recreate if needed");
} catch (CacheConfigException cce) {
- log.error("topologymanager.edgesDB Cache configuration invalid - "
- + "check cache mode");
+ log.error(TOPOHOSTSDB + " Cache configuration invalid - check cache mode");
}
try {
- this.hostsDB = (ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>>) this.clusterContainerService
- .createCache("topologymanager.hostsDB", EnumSet
- .of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ this.nodeConnectorsDB =
+ (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.createCache(
+ TOPONODECONNECTORDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
} catch (CacheExistException cee) {
- log.error("topologymanager.hostsDB Cache already exists - "
- + "destroy and recreate if needed");
+ log.debug(TOPONODECONNECTORDB + " Cache already exists - destroy and recreate if needed");
} catch (CacheConfigException cce) {
- log.error("topologymanager.hostsDB Cache configuration invalid - "
- + "check cache mode");
+ log.error(TOPONODECONNECTORDB + " Cache configuration invalid - check cache mode");
}
try {
- this.nodeConnectorsDB = (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService
- .createCache("topologymanager.nodeConnectorDB", EnumSet
- .of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ this.userLinksDB =
+ (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.createCache(
+ TOPOUSERLINKSDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
} catch (CacheExistException cee) {
- log.error("topologymanager.nodeConnectorDB Cache already exists"
- + " - destroy and recreate if needed");
+ log.debug(TOPOUSERLINKSDB + " Cache already exists - destroy and recreate if needed");
} catch (CacheConfigException cce) {
- log.error("topologymanager.nodeConnectorDB Cache configuration "
- + "invalid - check cache mode");
+ log.error(TOPOUSERLINKSDB + " Cache configuration invalid - check cache mode");
}
+ }
- userLinks = new ConcurrentHashMap<String, TopologyUserLinkConfig>();
+ @SuppressWarnings({ "unchecked", "deprecation" })
+ private void retrieveCaches() {
+ if (this.clusterContainerService == null) {
+ log.error("Cluster Services is null, can't retrieve caches.");
+ return;
+ }
- userLinksFileName = ROOT + "userTopology_" + containerName + ".conf";
- registerWithOSGIConsole();
- loadConfiguration();
+ this.edgesDB = (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.getCache(TOPOEDGESDB);
+ if (edgesDB == null) {
+ log.error("Failed to get cache for " + TOPOEDGESDB);
+ }
+
+ this.hostsDB =
+ (ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>>) this.clusterContainerService.getCache(TOPOHOSTSDB);
+ if (hostsDB == null) {
+ log.error("Failed to get cache for " + TOPOHOSTSDB);
+ }
+
+ this.nodeConnectorsDB =
+ (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.getCache(TOPONODECONNECTORDB);
+ if (nodeConnectorsDB == null) {
+ log.error("Failed to get cache for " + TOPONODECONNECTORDB);
+ }
+
+ this.userLinksDB =
+ (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.getCache(TOPOUSERLINKSDB);
+ if (userLinksDB == null) {
+ log.error("Failed to get cache for " + TOPOUSERLINKSDB);
+ }
}
/**
- * Function called after the topology manager has registered the
- * service in OSGi service registry.
+ * Function called after the topology manager has registered the service in
+ * OSGi service registry.
*
*/
void started() {
+ // Start the batcher thread for the cluster wide topology updates
+ notifyThread.start();
// SollicitRefresh MUST be called here else if called at init
// time it may sollicit refresh too soon.
log.debug("Sollicit topology refresh");
topoService.sollicitRefresh();
}
+ void stop() {
+ shuttingDown = true;
+ notifyThread.interrupt();
+ }
+
/**
- * Function called by the dependency manager when at least one
- * dependency become unsatisfied or when the component is shutting
- * down because for example bundle is being stopped.
+ * Function called by the dependency manager when at least one dependency
+ * become unsatisfied or when the component is shutting down because for
+ * example bundle is being stopped.
*
*/
void destroy() {
- if (this.clusterContainerService == null) {
- log.error("Cluster Services is null, not expected!");
- this.edgesDB = null;
- this.hostsDB = null;
- this.nodeConnectorsDB = null;
- return;
- }
- this.clusterContainerService.destroyCache("topologymanager.edgesDB");
- this.edgesDB = null;
- this.clusterContainerService.destroyCache("topologymanager.hostsDB");
- this.hostsDB = null;
- this.clusterContainerService
- .destroyCache("topologymanager.nodeConnectorDB");
- this.nodeConnectorsDB = null;
- log.debug("Topology Manager DB DE-allocated");
+ notifyQ.clear();
+ notifyThread = null;
}
@SuppressWarnings("unchecked")
private void loadConfiguration() {
ObjectReader objReader = new ObjectReader();
- ConcurrentMap<String, TopologyUserLinkConfig> confList = (ConcurrentMap<String, TopologyUserLinkConfig>) objReader
- .read(this, userLinksFileName);
-
- if (confList == null) {
- return;
- }
+ ConcurrentMap<String, TopologyUserLinkConfig> confList =
+ (ConcurrentMap<String, TopologyUserLinkConfig>) objReader.read(this, userLinksFileName);
- for (TopologyUserLinkConfig conf : confList.values()) {
- addUserLink(conf);
+ if (confList != null) {
+ for (TopologyUserLinkConfig conf : confList.values()) {
+ addUserLink(conf);
+ }
}
}
@Override
public Status saveConfig() {
- // Publish the save config event to the cluster nodes
- /**
- * Get the CLUSTERING SERVICES WORKING BEFORE TRYING THIS
-
- configSaveEvent.put(new Date().getTime(), SAVE);
- */
return saveConfigInternal();
}
public Status saveConfigInternal() {
- Status retS;
ObjectWriter objWriter = new ObjectWriter();
- retS = objWriter.write(
- new ConcurrentHashMap<String, TopologyUserLinkConfig>(
- userLinks), userLinksFileName);
+ Status saveStatus = objWriter.write(
+ new ConcurrentHashMap<String, TopologyUserLinkConfig>(userLinksDB), userLinksFileName);
- if (retS.isSuccess()) {
- return retS;
- } else {
- return new Status(StatusCode.INTERNALERROR, "Save failed");
+ if (! saveStatus.isSuccess()) {
+ return new Status(StatusCode.INTERNALERROR, "Topology save failed: " + saveStatus.getDescription());
}
+ return saveStatus;
}
@Override
return null;
}
- HashMap<Node, Set<Edge>> res = new HashMap<Node, Set<Edge>>();
- for (Edge key : this.edgesDB.keySet()) {
+ Map<Node, Set<Edge>> res = new HashMap<Node, Set<Edge>>();
+ for (Edge edge : this.edgesDB.keySet()) {
// Lets analyze the tail
- Node node = key.getTailNodeConnector().getNode();
+ Node node = edge.getTailNodeConnector().getNode();
Set<Edge> nodeEdges = res.get(node);
if (nodeEdges == null) {
nodeEdges = new HashSet<Edge>();
+ res.put(node, nodeEdges);
}
- nodeEdges.add(key);
- // We need to re-add to the MAP even if the element was
- // already there so in case of clustered services the map
- // gets updated in the cluster
- res.put(node, nodeEdges);
+ nodeEdges.add(edge);
// Lets analyze the head
- node = key.getHeadNodeConnector().getNode();
+ node = edge.getHeadNodeConnector().getNode();
nodeEdges = res.get(node);
if (nodeEdges == null) {
nodeEdges = new HashSet<Edge>();
+ res.put(node, nodeEdges);
}
- nodeEdges.add(key);
- // We need to re-add to the MAP even if the element was
- // already there so in case of clustered services the map
- // gets updated in the cluster
- res.put(node, nodeEdges);
+ nodeEdges.add(edge);
}
return res;
}
/**
- * The Map returned is a copy of the current topology hence if the
- * topology changes the copy doesn't
+ * This method returns true if the edge is an ISL link.
+ *
+ * @param e
+ * The edge
+ * @return true if it is an ISL link
+ */
+ public boolean isISLink(Edge e) {
+ return (!isProductionLink(e));
+ }
+
+ /**
+ * This method returns true if the edge is a production link.
+ *
+ * @param e
+ * The edge
+ * @return true if it is a production link
+ */
+ public boolean isProductionLink(Edge e) {
+ return (e.getHeadNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)
+ || e.getTailNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION));
+ }
+
+ /**
+ * The Map returned is a copy of the current topology hence if the topology
+ * changes the copy doesn't
*
- * @return A Map representing the current topology expressed as
- * edges of the network
+ * @return A Map representing the current topology expressed as edges of the
+ * network
*/
@Override
public Map<Edge, Set<Property>> getEdges() {
return null;
}
- HashMap<Edge, Set<Property>> res = new HashMap<Edge, Set<Property>>();
- for (Edge key : this.edgesDB.keySet()) {
+ Map<Edge, Set<Property>> edgeMap = new HashMap<Edge, Set<Property>>();
+ Set<Property> props;
+ for (Map.Entry<Edge, Set<Property>> edgeEntry : edgesDB.entrySet()) {
// Sets of props are copied because the composition of
// those properties could change with time
- HashSet<Property> prop = new HashSet<Property>(this.edgesDB
- .get(key));
+ props = new HashSet<Property>(edgeEntry.getValue());
// We can simply reuse the key because the object is
// immutable so doesn't really matter that we are
// referencing the only owned by a different table, the
// meaning is the same because doesn't change with time.
- res.put(key, prop);
+ edgeMap.put(edgeEntry.getKey(), props);
}
- return res;
- }
-
- // TODO remove with spring-dm removal
- /**
- * @param set the topologyAware to set
- */
- public void setTopologyAware(Set<Object> set) {
- for (Object s : set) {
- setTopologyManagerAware((ITopologyManagerAware) s);
- }
+ return edgeMap;
}
@Override
return null;
}
- return (this.hostsDB.keySet());
+ return (new HashSet<NodeConnector>(this.hostsDB.keySet()));
}
@Override
return null;
}
HashMap<Node, Set<NodeConnector>> res = new HashMap<Node, Set<NodeConnector>>();
-
- for (NodeConnector p : this.hostsDB.keySet()) {
- Node n = p.getNode();
- Set<NodeConnector> pSet = res.get(n);
- if (pSet == null) {
+ Node node;
+ Set<NodeConnector> portSet;
+ for (NodeConnector nc : this.hostsDB.keySet()) {
+ node = nc.getNode();
+ portSet = res.get(node);
+ if (portSet == null) {
// Create the HashSet if null
- pSet = new HashSet<NodeConnector>();
- res.put(n, pSet);
+ portSet = new HashSet<NodeConnector>();
+ res.put(node, portSet);
}
// Keep updating the HashSet, given this is not a
// clustered map we can just update the set without
// worrying to update the hashmap.
- pSet.add(p);
+ portSet.add(nc);
}
return (res);
}
@Override
- public Host getHostAttachedToNodeConnector(NodeConnector p) {
- if (this.hostsDB == null) {
+ public Host getHostAttachedToNodeConnector(NodeConnector port) {
+ ImmutablePair<Host, Set<Property>> host;
+ if ((this.hostsDB == null) || ((host = this.hostsDB.get(port)) == null)) {
return null;
}
-
- return (this.hostsDB.get(p).getLeft());
+ return host.getLeft();
}
@Override
- public void updateHostLink(NodeConnector p, Host h, UpdateType t,
- Set<Property> props) {
- if (this.hostsDB == null) {
- return;
+ public void updateHostLink(NodeConnector port, Host h, UpdateType t, Set<Property> props) {
+
+ // Clone the property set in case non null else just
+ // create an empty one. Caches allocated via infinispan
+ // don't allow null values
+ if (props == null) {
+ props = new HashSet<Property>();
+ } else {
+ props = new HashSet<Property>(props);
}
+ ImmutablePair<Host, Set<Property>> thisHost = new ImmutablePair<Host, Set<Property>>(h, props);
switch (t) {
case ADDED:
case CHANGED:
- // Clone the property set in case non null else just
- // create an empty one. Caches allocated via infinispan
- // don't allow null values
- if (props == null) {
- props = new HashSet<Property>();
- } else {
- props = new HashSet<Property>(props);
- }
-
- this.hostsDB.put(p, new ImmutablePair(h, props));
+ this.hostsDB.put(port, thisHost);
break;
case REMOVED:
- this.hostsDB.remove(p);
+ //remove only if hasn't been concurrently modified
+ this.hostsDB.remove(port, thisHost);
break;
}
}
- @Override
- public void edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
+ private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
switch (type) {
case ADDED:
// Make sure the props are non-null
if (props == null) {
- props = (Set<Property>) new HashSet();
+ props = new HashSet<Property>();
} else {
- // Copy the set so noone is going to change the content
- props = (Set<Property>) new HashSet(props);
+ props = new HashSet<Property>(props);
+ }
+
+ //in case of node switch-over to a different cluster controller,
+ //let's retain edge props
+ Set<Property> currentProps = this.edgesDB.get(e);
+ if (currentProps != null){
+ props.addAll(currentProps);
}
- // Now make sure thre is the creation timestamp for the
- // edge, if not there timestamp with the first update
+ // Now make sure there is the creation timestamp for the
+ // edge, if not there, stamp with the first update
boolean found_create = false;
for (Property prop : props) {
if (prop instanceof TimeStamp) {
TimeStamp t = (TimeStamp) prop;
if (t.getTimeStampName().equals("creation")) {
found_create = true;
+ break;
}
}
}
if (!found_create) {
- TimeStamp t = new TimeStamp(System.currentTimeMillis(),
- "creation");
+ TimeStamp t = new TimeStamp(System.currentTimeMillis(), "creation");
props.add(t);
}
this.edgesDB.put(e, props);
// Now populate the DB of NodeConnectors
- // NOTE WELL: properties are empy sets, not really needed
+ // NOTE WELL: properties are empty sets, not really needed
// for now.
- this.nodeConnectorsDB.put(e.getHeadNodeConnector(),
- new HashSet<Property>());
- this.nodeConnectorsDB.put(e.getTailNodeConnector(),
- new HashSet<Property>());
+ // The DB only contains ISL ports
+ if (isISLink(e)) {
+ this.nodeConnectorsDB.put(e.getHeadNodeConnector(), new HashSet<Property>(1));
+ this.nodeConnectorsDB.put(e.getTailNodeConnector(), new HashSet<Property>(1));
+ }
+ log.trace("Edge {} {}", e.toString(), type.name());
break;
case REMOVED:
// Now remove the edge from edgesDB
// should be safe to assume that won't happen.
this.nodeConnectorsDB.remove(e.getHeadNodeConnector());
this.nodeConnectorsDB.remove(e.getTailNodeConnector());
+ log.trace("Edge {} {}", e.toString(), type.name());
break;
case CHANGED:
- Set<Property> old_props = this.edgesDB.get(e);
+ Set<Property> oldProps = this.edgesDB.get(e);
// When property changes lets make sure we can change it
// all except the creation time stamp because that should
// be changed only when the edge is destroyed and created
// again
- TimeStamp tc = null;
- for (Property prop : old_props) {
+ TimeStamp timeStamp = null;
+ for (Property prop : oldProps) {
if (prop instanceof TimeStamp) {
- TimeStamp t = (TimeStamp) prop;
- if (t.getTimeStampName().equals("creation")) {
- tc = t;
+ TimeStamp tsProp = (TimeStamp) prop;
+ if (tsProp.getTimeStampName().equals("creation")) {
+ timeStamp = tsProp;
+ break;
}
}
}
- // Now lest make sure new properties are non-null
- // Make sure the props are non-null
+ // Now lets make sure new properties are non-null
if (props == null) {
- props = (Set<Property>) new HashSet();
+ props = new HashSet<Property>();
} else {
// Copy the set so noone is going to change the content
- props = (Set<Property>) new HashSet(props);
+ props = new HashSet<Property>(props);
}
// Now lets remove the creation property if exist in the
TimeStamp t = (TimeStamp) prop;
if (t.getTimeStampName().equals("creation")) {
i.remove();
+ break;
}
}
}
// Now lets add the creation timestamp in it
- if (tc != null) {
- props.add(tc);
+ if (timeStamp != null) {
+ props.add(timeStamp);
}
// Finally update
this.edgesDB.put(e, props);
+ log.trace("Edge {} {}", e.toString(), type.name());
break;
}
+ return new TopoEdgeUpdate(e, props, type);
+ }
+
+ @Override
+ public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
+ List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
+ for (int i = 0; i < topoedgeupdateList.size(); i++) {
+ Edge e = topoedgeupdateList.get(i).getEdge();
+ Set<Property> p = topoedgeupdateList.get(i).getProperty();
+ UpdateType type = topoedgeupdateList.get(i).getUpdateType();
+ TopoEdgeUpdate teu = edgeUpdate(e, type, p);
+ teuList.add(teu);
+ }
// Now update the listeners
for (ITopologyManagerAware s : this.topologyManagerAware) {
try {
- s.edgeUpdate(e, type, props);
+ s.edgeUpdate(teuList);
} catch (Exception exc) {
- log.error("Exception on callback", exc);
+ log.error("Exception on edge update:", exc);
}
}
+
}
private Edge getReverseLinkTuple(TopologyUserLinkConfig link) {
- TopologyUserLinkConfig rLink = new TopologyUserLinkConfig(link
- .getName(), link.getDstSwitchId(), link.getDstPort(), link
- .getSrcSwitchId(), link.getSrcPort());
+ TopologyUserLinkConfig rLink = new TopologyUserLinkConfig(
+ link.getName(), link.getDstNodeConnector(), link.getSrcNodeConnector());
return getLinkTuple(rLink);
}
- private Edge getLinkTuple(TopologyUserLinkConfig link) {
- Edge linkTuple = null;
- Long sID = link.getSrcSwitchIDLong();
- Long dID = link.getDstSwitchIDLong();
- Short srcPort = Short.valueOf((short) 0);
- Short dstPort = Short.valueOf((short) 0);
- if (link.isSrcPortByName()) {
- // TODO find the inventory service to do this, for now 0
- //srcPort = srcSw.getPortNumber(link.getSrcPort());
- } else {
- srcPort = Short.parseShort(link.getSrcPort());
- }
- if (link.isDstPortByName()) {
- //dstPort = dstSw.getPortNumber(link.getDstPort());;
- } else {
- dstPort = Short.parseShort(link.getDstPort());
- }
-
- // if atleast 1 link exists for the srcPort and atleast 1 link exists for the dstPort
- // that makes it ineligible for the Manual link addition
- // This is just an extra protection to avoid mis-programming.
- boolean srcLinkExists = false;
- boolean dstLinkExists = false;
- /**
- * Disabling this optimization for now to understand the real benefit of doing this.
- * Since this is a Manual Link addition, the user knows what he is doing and it is
- * not good to restrict such creativity...
- */
- /*
- Set <Edge> links = oneTopology.getLinks().keySet();
- if (links != null) {
- for (Edge eLink : links) {
- if (!eLink.isUserCreated() &&
- eLink.getSrc().getSid().equals(link.getSrcSwitchIDLong()) &&
- eLink.getSrc().getPort().equals(srcPort)) {
- srcLinkExists = true;
- }
-
- if (!eLink.isUserCreated() &&
- eLink.getSrc().getSid().equals(link.getSrcSwitchIDLong()) &&
- eLink.getSrc().getPort().equals(srcPort)) {
- dstLinkExists = true;
- }
-
- if (!eLink.isUserCreated() &&
- eLink.getDst().getSid().equals(link.getSrcSwitchIDLong()) &&
- eLink.getDst().getPort().equals(srcPort)) {
- srcLinkExists = true;
- }
-
- if (!eLink.isUserCreated() &&
- eLink.getDst().getSid().equals(link.getSrcSwitchIDLong()) &&
- eLink.getDst().getPort().equals(srcPort)) {
- dstLinkExists = true;
- }
- }
- }
- */
- //TODO check a way to validate the port with inventory services
- //if (srcSw.getPorts().contains(srcPort) &&
- //dstSw.getPorts().contains(srcPort) &&
- if (!srcLinkExists && !dstLinkExists) {
- Node sNode = null;
- Node dNode = null;
- NodeConnector sPort = null;
- NodeConnector dPort = null;
- linkTuple = null;
- try {
- sNode = new Node(Node.NodeIDType.OPENFLOW, sID);
- dNode = new Node(Node.NodeIDType.OPENFLOW, dID);
- sPort = new NodeConnector(
- NodeConnector.NodeConnectorIDType.OPENFLOW, srcPort,
- sNode);
- dPort = new NodeConnector(
- NodeConnector.NodeConnectorIDType.OPENFLOW, dstPort,
- dNode);
- linkTuple = new Edge(sPort, dPort);
- } catch (ConstructionException cex) {
- }
- return linkTuple;
- }
-
- if (srcLinkExists && dstLinkExists) {
- link.setStatus(TopologyUserLinkConfig.STATUS.INCORRECT);
+ private Edge getLinkTuple(TopologyUserLinkConfig link) {
+ NodeConnector srcNodeConnector = NodeConnector.fromString(link.getSrcNodeConnector());
+ NodeConnector dstNodeConnector = NodeConnector.fromString(link.getDstNodeConnector());
+ try {
+ return new Edge(srcNodeConnector, dstNodeConnector);
+ } catch (Exception e) {
+ return null;
}
- return null;
}
@Override
public ConcurrentMap<String, TopologyUserLinkConfig> getUserLinks() {
- return userLinks;
+ return new ConcurrentHashMap<String, TopologyUserLinkConfig>(userLinksDB);
}
@Override
- public Status addUserLink(TopologyUserLinkConfig link) {
- if (!link.isValid()) {
- return new Status(StatusCode.BADREQUEST,
- "Configuration Invalid. Please check the parameters");
- }
- if (userLinks.get(link.getName()) != null) {
- return new Status(StatusCode.CONFLICT,
- "Link with name : " + link.getName()
- + " already exists. Please use another name");
+ public Status addUserLink(TopologyUserLinkConfig userLink) {
+ if (!userLink.isValid()) {
+ return new Status(StatusCode.BADREQUEST,
+ "User link configuration invalid.");
+ }
+ userLink.setStatus(TopologyUserLinkConfig.STATUS.LINKDOWN);
+
+ //Check if this link already configured
+ //NOTE: infinispan cache doesn't support Map.containsValue()
+ // (which is linear time in most ConcurrentMap impl anyway)
+ for (TopologyUserLinkConfig existingLink : userLinksDB.values()) {
+ if (existingLink.equals(userLink)) {
+ return new Status(StatusCode.CONFLICT, "Link configuration exists");
+ }
}
- if (userLinks.containsValue(link)) {
- return new Status(StatusCode.CONFLICT, "Link configuration exists");
+ //attempt put, if mapping for this key already existed return conflict
+ if (userLinksDB.putIfAbsent(userLink.getName(), userLink) != null) {
+ return new Status(StatusCode.CONFLICT, "Link with name : " + userLink.getName()
+ + " already exists. Please use another name");
}
- link.setStatus(TopologyUserLinkConfig.STATUS.LINKDOWN);
- userLinks.put(link.getName(), link);
-
- Edge linkTuple = getLinkTuple(link);
+ Edge linkTuple = getLinkTuple(userLink);
if (linkTuple != null) {
- try {
- // TODO The onetopology will be gone too, topology
- //manager is the master of the topology at this point
- //if (oneTopology.addUserConfiguredLink(linkTuple)) {
- linkTuple = getReverseLinkTuple(link);
- //if (oneTopology.addUserConfiguredLink(linkTuple)) {
- link.setStatus(TopologyUserLinkConfig.STATUS.SUCCESS);
- //}
- //}
- } catch (Exception e) {
- return new Status(StatusCode.INTERNALERROR,
- "Exception while adding custom link : " +
- e.getMessage());
+ if (!isProductionLink(linkTuple)) {
+ edgeUpdate(linkTuple, UpdateType.ADDED, new HashSet<Property>());
+ }
+
+ linkTuple = getReverseLinkTuple(userLink);
+ if (linkTuple != null) {
+ userLink.setStatus(TopologyUserLinkConfig.STATUS.SUCCESS);
+ if (!isProductionLink(linkTuple)) {
+ edgeUpdate(linkTuple, UpdateType.ADDED, new HashSet<Property>());
+ }
}
}
- return new Status(StatusCode.SUCCESS, null);
+ return new Status(StatusCode.SUCCESS);
}
@Override
public Status deleteUserLink(String linkName) {
if (linkName == null) {
- return new Status(StatusCode.BADREQUEST,
- "A valid linkName is required to Delete a link");
+ return new Status(StatusCode.BADREQUEST, "User link name cannot be null.");
}
- TopologyUserLinkConfig link = userLinks.get(linkName);
-
- Edge linkTuple = getLinkTuple(link);
- userLinks.remove(linkName);
- if (linkTuple != null) {
- try {
- //oneTopology.deleteUserConfiguredLink(linkTuple);
- } catch (Exception e) {
- log
- .warn("Harmless : Exception while Deleting User Configured link "
- + link + " " + e.toString());
+ TopologyUserLinkConfig link = userLinksDB.remove(linkName);
+ Edge linkTuple;
+ if ((link != null) && ((linkTuple = getLinkTuple(link)) != null)) {
+ if (! isProductionLink(linkTuple)) {
+ edgeUpdate(linkTuple, UpdateType.REMOVED, null);
}
+
linkTuple = getReverseLinkTuple(link);
- try {
- //oneTopology.deleteUserConfiguredLink(linkTuple);
- } catch (Exception e) {
- log
- .error("Harmless : Exception while Deleting User Configured Reverse link "
- + link + " " + e.toString());
+ if (! isProductionLink(linkTuple)) {
+ edgeUpdate(linkTuple, UpdateType.REMOVED, null);
}
}
- return new Status(StatusCode.SUCCESS, null);
+ return new Status(StatusCode.SUCCESS);
}
private void registerWithOSGIConsole() {
public String getHelp() {
StringBuffer help = new StringBuffer();
help.append("---Topology Manager---\n");
- help
- .append("\t addTopo name <src-sw-id> <port-number> <dst-sw-id> <port-number>\n");
- help.append("\t delTopo name\n");
- help.append("\t _printTopo\n");
+ help.append("\t addUserLink <name> <node connector string> <node connector string>\n");
+ help.append("\t deleteUserLink <name>\n");
+ help.append("\t printUserLink\n");
+ help.append("\t printNodeEdges\n");
return help.toString();
}
- public void _printTopo(CommandInterpreter ci) {
- for (String name : this.userLinks.keySet()) {
- ci.println(name + " : " + userLinks.get(name));
+ public void _printUserLink(CommandInterpreter ci) {
+ for (String name : this.userLinksDB.keySet()) {
+ TopologyUserLinkConfig linkConfig = userLinksDB.get(name);
+ ci.println("Name : " + name);
+ ci.println(linkConfig);
+ ci.println("Edge " + getLinkTuple(linkConfig));
+ ci.println("Reverse Edge " + getReverseLinkTuple(linkConfig));
}
}
- public void _addTopo(CommandInterpreter ci) {
+ public void _addUserLink(CommandInterpreter ci) {
String name = ci.nextArgument();
if ((name == null)) {
ci.println("Please enter a valid Name");
return;
}
- String dpid = ci.nextArgument();
- if (dpid == null) {
- ci.println("Invalid Switch ID. Format xx:xx:xx:xx:xx:xx:xx:xx");
- return;
- }
- try {
- HexEncode.stringToLong(dpid);
- } catch (Exception e) {
- ci.println("Invalid Switch ID. Format xx:xx:xx:xx:xx:xx:xx:xx");
+ String ncStr1 = ci.nextArgument();
+ if (ncStr1 == null) {
+ ci.println("Please enter two node connector strings");
return;
}
-
- String port = ci.nextArgument();
- if (port == null) {
- ci.println("Invalid port number");
+ String ncStr2 = ci.nextArgument();
+ if (ncStr2 == null) {
+ ci.println("Please enter second node connector string");
return;
}
- String ddpid = ci.nextArgument();
- if (ddpid == null) {
- ci.println("Invalid Switch ID. Format xx:xx:xx:xx:xx:xx:xx:xx");
+ NodeConnector nc1 = NodeConnector.fromString(ncStr1);
+ if (nc1 == null) {
+ ci.println("Invalid input node connector 1 string: " + ncStr1);
return;
}
- try {
- HexEncode.stringToLong(ddpid);
- } catch (Exception e) {
- ci.println("Invalid Switch ID. Format xx:xx:xx:xx:xx:xx:xx:xx");
+ NodeConnector nc2 = NodeConnector.fromString(ncStr2);
+ if (nc2 == null) {
+ ci.println("Invalid input node connector 2 string: " + ncStr2);
return;
}
- String dport = ci.nextArgument();
- if (dport == null) {
- ci.println("Invalid port number");
- return;
- }
- TopologyUserLinkConfig config = new TopologyUserLinkConfig(name,
- dpid, port, ddpid, dport);
+ TopologyUserLinkConfig config = new TopologyUserLinkConfig(name, ncStr1, ncStr2);
ci.println(this.addUserLink(config));
}
- public void _delTopo(CommandInterpreter ci) {
+ public void _deleteUserLink(CommandInterpreter ci) {
String name = ci.nextArgument();
if ((name == null)) {
ci.println("Please enter a valid Name");
this.deleteUserLink(name);
}
+ public void _printNodeEdges(CommandInterpreter ci) {
+ Map<Node, Set<Edge>> nodeEdges = getNodeEdges();
+ if (nodeEdges == null) {
+ return;
+ }
+ Set<Node> nodeSet = nodeEdges.keySet();
+ if (nodeSet == null) {
+ return;
+ }
+ ci.println(" Node Edge");
+ for (Node node : nodeSet) {
+ Set<Edge> edgeSet = nodeEdges.get(node);
+ if (edgeSet == null) {
+ continue;
+ }
+ for (Edge edge : edgeSet) {
+ ci.println(node + " " + edge);
+ }
+ }
+ }
+
@Override
public Object readObject(ObjectInputStream ois)
throws FileNotFoundException, IOException, ClassNotFoundException {
- // TODO Auto-generated method stub
return ois.readObject();
}
@Override
public void edgeOverUtilized(Edge edge) {
- log.warn("Link Utilization above normal: " + edge);
+ log.warn("Link Utilization above normal: {}", edge);
}
@Override
public void edgeUtilBackToNormal(Edge edge) {
- log.warn("Link Utilization back to normal: " + edge);
+ log.warn("Link Utilization back to normal: {}", edge);
}
+ private void edgeUpdateClusterWide(Edge e, UpdateType type, Set<Property> props, boolean isLocal) {
+ TopoEdgeUpdate upd = new TopoEdgeUpdate(e, props, type);
+ upd.setLocal(isLocal);
+ notifyQ.add(upd);
+ }
+
+ @Override
+ public void entryCreated(final Object key, final String cacheName, final boolean originLocal) {
+ if (cacheName.equals(TOPOEDGESDB)) {
+ // This is the case of an Edge being added to the topology DB
+ final Edge e = (Edge) key;
+ log.trace("Edge {} CREATED isLocal:{}", e, originLocal);
+ edgeUpdateClusterWide(e, UpdateType.ADDED, null, originLocal);
+ }
+ }
+
+ @Override
+ public void entryUpdated(final Object key, final Object new_value, final String cacheName, final boolean originLocal) {
+ if (cacheName.equals(TOPOEDGESDB)) {
+ final Edge e = (Edge) key;
+ log.trace("Edge {} CHANGED isLocal:{}", e, originLocal);
+ final Set<Property> props = (Set<Property>) new_value;
+ edgeUpdateClusterWide(e, UpdateType.CHANGED, props, originLocal);
+ }
+ }
+
+ @Override
+ public void entryDeleted(final Object key, final String cacheName, final boolean originLocal) {
+ if (cacheName.equals(TOPOEDGESDB)) {
+ final Edge e = (Edge) key;
+ log.trace("Edge {} DELETED isLocal:{}", e, originLocal);
+ edgeUpdateClusterWide(e, UpdateType.REMOVED, null, originLocal);
+ }
+ }
+
+ class TopologyNotify implements Runnable {
+ private final BlockingQueue<TopoEdgeUpdate> notifyQ;
+ private TopoEdgeUpdate entry;
+ private List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
+ private boolean notifyListeners;
+
+ TopologyNotify(BlockingQueue<TopoEdgeUpdate> notifyQ) {
+ this.notifyQ = notifyQ;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ log.trace("New run of TopologyNotify");
+ notifyListeners = false;
+ // First we block waiting for an element to get in
+ entry = notifyQ.take();
+ // Then we drain the whole queue if elements are
+ // in it without getting into any blocking condition
+ for (; entry != null; entry = notifyQ.poll()) {
+ teuList.add(entry);
+ notifyListeners = true;
+ }
+
+ // Notify listeners only if there were updates drained else
+ // give up
+ if (notifyListeners) {
+ log.trace("Notifier thread, notified a listener");
+ // Now update the listeners
+ for (ITopologyManagerClusterWideAware s : topologyManagerClusterWideAware) {
+ try {
+ s.edgeUpdate(teuList);
+ } catch (Exception exc) {
+ log.error("Exception on edge update:", exc);
+ }
+ }
+ }
+ teuList.clear();
+
+ // Lets sleep for sometime to allow aggregation of event
+ Thread.sleep(100);
+ } catch (InterruptedException e1) {
+ log.warn("TopologyNotify interrupted {}", e1.getMessage());
+ if (shuttingDown) {
+ return;
+ }
+ } catch (Exception e2) {
+ log.error("", e2);
+ }
+ }
+ }
+ }
}