import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
import org.opendaylight.controller.clustering.services.IClusterContainerServices;
import org.opendaylight.controller.clustering.services.IClusterServices;
+import org.opendaylight.controller.configuration.ConfigurationObject;
import org.opendaylight.controller.configuration.IConfigurationContainerAware;
+import org.opendaylight.controller.configuration.IConfigurationContainerService;
import org.opendaylight.controller.sal.core.Edge;
import org.opendaylight.controller.sal.core.Host;
import org.opendaylight.controller.sal.core.Node;
import org.opendaylight.controller.sal.topology.IListenTopoUpdates;
import org.opendaylight.controller.sal.topology.ITopologyService;
import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
-import org.opendaylight.controller.sal.utils.GlobalConstants;
import org.opendaylight.controller.sal.utils.IObjectReader;
-import org.opendaylight.controller.sal.utils.ObjectReader;
-import org.opendaylight.controller.sal.utils.ObjectWriter;
+import org.opendaylight.controller.sal.utils.NodeConnectorCreator;
import org.opendaylight.controller.sal.utils.Status;
import org.opendaylight.controller.sal.utils.StatusCode;
import org.opendaylight.controller.switchmanager.ISwitchManager;
IListenTopoUpdates,
IObjectReader,
CommandProvider {
- static final String TOPOEDGESDB = "topologymanager.edgesDB";
- static final String TOPOHOSTSDB = "topologymanager.hostsDB";
- static final String TOPONODECONNECTORDB = "topologymanager.nodeConnectorDB";
- static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
+ protected static final String TOPOEDGESDB = "topologymanager.edgesDB";
+ protected static final String TOPOHOSTSDB = "topologymanager.hostsDB";
+ protected static final String TOPONODECONNECTORDB = "topologymanager.nodeConnectorDB";
+ protected static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
+ private static final String USER_LINKS_FILE_NAME = "userTopology.conf";
private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class);
private ITopologyService topoService;
private IClusterContainerServices clusterContainerService;
+ private IConfigurationContainerService configurationService;
private ISwitchManager switchManager;
// DB of all the Edges with properties which constitute our topology
private ConcurrentMap<Edge, Set<Property>> edgesDB;
// Topology Manager Aware listeners - for clusterwide updates
private Set<ITopologyManagerClusterWideAware> topologyManagerClusterWideAware =
new CopyOnWriteArraySet<ITopologyManagerClusterWideAware>();
- private static String ROOT = GlobalConstants.STARTUPHOME.toString();
- private String userLinksFileName;
private ConcurrentMap<String, TopologyUserLinkConfig> userLinksDB;
private BlockingQueue<TopoEdgeUpdate> notifyQ = new LinkedBlockingQueue<TopoEdgeUpdate>();
private volatile Boolean shuttingDown = false;
}
}
+ public void setConfigurationContainerService(IConfigurationContainerService service) {
+ log.trace("Got configuration service set request {}", service);
+ this.configurationService = service;
+ }
+
+ public void unsetConfigurationContainerService(IConfigurationContainerService service) {
+ log.trace("Got configuration service UNset request");
+ this.configurationService = null;
+ }
+
void setSwitchManager(ISwitchManager s) {
log.debug("Adding ISwitchManager: {}", s);
this.switchManager = s;
containerName = "UNKNOWN";
}
- userLinksFileName = ROOT + "userTopology_" + containerName + ".conf";
registerWithOSGIConsole();
loadConfiguration();
+
// Restore the shuttingDown status on init of the component
shuttingDown = false;
notifyThread = new Thread(new TopologyNotify(notifyQ));
@SuppressWarnings({ "unchecked" })
private void allocateCaches() {
- try {
this.edgesDB =
- (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.createCache(TOPOEDGESDB,
- EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
- } catch (CacheExistException cee) {
- log.debug(TOPOEDGESDB + " Cache already exists - destroy and recreate if needed");
- } catch (CacheConfigException cce) {
- log.error(TOPOEDGESDB + " Cache configuration invalid - check cache mode");
- }
+ (ConcurrentMap<Edge, Set<Property>>) allocateCache(TOPOEDGESDB,EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
- try {
this.hostsDB =
- (ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>) this.clusterContainerService.createCache(
- TOPOHOSTSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
- } catch (CacheExistException cee) {
- log.debug(TOPOHOSTSDB + " Cache already exists - destroy and recreate if needed");
- } catch (CacheConfigException cce) {
- log.error(TOPOHOSTSDB + " Cache configuration invalid - check cache mode");
- }
+ (ConcurrentMap<NodeConnector, Set<ImmutablePair<Host, Set<Property>>>>) allocateCache(TOPOHOSTSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
- try {
this.nodeConnectorsDB =
- (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.createCache(
+ (ConcurrentMap<NodeConnector, Set<Property>>) allocateCache(
TOPONODECONNECTORDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
- } catch (CacheExistException cee) {
- log.debug(TOPONODECONNECTORDB + " Cache already exists - destroy and recreate if needed");
- } catch (CacheConfigException cce) {
- log.error(TOPONODECONNECTORDB + " Cache configuration invalid - check cache mode");
- }
-
- try {
this.userLinksDB =
- (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.createCache(
+ (ConcurrentMap<String, TopologyUserLinkConfig>) allocateCache(
TOPOUSERLINKSDB, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
- } catch (CacheExistException cee) {
- log.debug(TOPOUSERLINKSDB + " Cache already exists - destroy and recreate if needed");
- } catch (CacheConfigException cce) {
- log.error(TOPOUSERLINKSDB + " Cache configuration invalid - check cache mode");
+ }
+
+ private ConcurrentMap<?, ?> allocateCache(String cacheName, Set<IClusterServices.cacheMode> cacheModes) {
+ ConcurrentMap<?, ?> cache = null;
+ try {
+ cache = this.clusterContainerService.createCache(cacheName, cacheModes);
+ } catch (CacheExistException e) {
+ log.debug(cacheName + " cache already exists - destroy and recreate if needed");
+ } catch (CacheConfigException e) {
+ log.error(cacheName + " cache configuration invalid - check cache mode");
}
+ return cache;
}
@SuppressWarnings({ "unchecked" })
notifyThread = null;
}
- @SuppressWarnings("unchecked")
private void loadConfiguration() {
- ObjectReader objReader = new ObjectReader();
- ConcurrentMap<String, TopologyUserLinkConfig> confList =
- (ConcurrentMap<String, TopologyUserLinkConfig>) objReader.read(this, userLinksFileName);
-
- if (confList != null) {
- for (TopologyUserLinkConfig conf : confList.values()) {
- addUserLink(conf);
- }
+ for (ConfigurationObject conf : configurationService.retrieveConfiguration(this, USER_LINKS_FILE_NAME)) {
+ addUserLink((TopologyUserLinkConfig) conf);
}
}
}
public Status saveConfigInternal() {
- ObjectWriter objWriter = new ObjectWriter();
+ Status saveStatus = configurationService.persistConfiguration(
+ new ArrayList<ConfigurationObject>(userLinksDB.values()), USER_LINKS_FILE_NAME);
- Status saveStatus = objWriter.write(
- new ConcurrentHashMap<String, TopologyUserLinkConfig>(userLinksDB), userLinksFileName);
-
- if (! saveStatus.isSuccess()) {
+ if (!saveStatus.isSuccess()) {
return new Status(StatusCode.INTERNALERROR, "Topology save failed: " + saveStatus.getDescription());
}
return saveStatus;
|| e.getTailNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION));
}
+ /**
+ * This method cross checks the determination of nodeConnector type by Discovery Service
+ * against the information in SwitchManager and updates it accordingly.
+ * @param e
+ * The edge
+ */
+ private void crossCheckNodeConnectors(Edge e) {
+ NodeConnector nc;
+ if (e.getHeadNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)) {
+ nc = updateNCTypeFromSwitchMgr(e.getHeadNodeConnector());
+ if (nc != null) {
+ e.setHeadNodeConnector(nc);
+ }
+ }
+ if (e.getTailNodeConnector().getType().equals(NodeConnector.NodeConnectorIDType.PRODUCTION)) {
+ nc = updateNCTypeFromSwitchMgr(e.getTailNodeConnector());
+ if (nc != null) {
+ e.setTailNodeConnector(nc);
+ }
+ }
+ }
+
+ /**
+ * A NodeConnector may have been categorized as of type Production by Discovery Service.
+ * But at the time when this determination was made, only OF nodes were known to Discovery
+ * Service. This method checks if the node of nodeConnector is known to SwitchManager. If
+ * so, then it returns a new NodeConnector with correct type.
+ *
+ * @param nc
+ * NodeConnector as passed on in the edge
+ * @return
+ * If Node of the NodeConnector is in SwitchManager, then return a new NodeConnector
+ * with correct type, null otherwise
+ */
+
+ private NodeConnector updateNCTypeFromSwitchMgr(NodeConnector nc) {
+
+ for (Node node : switchManager.getNodes()) {
+ String nodeName = node.getNodeIDString();
+ log.trace("Switch Manager Node Name: {}, NodeConnector Node Name: {}", nodeName,
+ nc.getNode().getNodeIDString());
+ if (nodeName.equals(nc.getNode().getNodeIDString())) {
+ NodeConnector nodeConnector = NodeConnectorCreator
+ .createNodeConnector(node.getType(), nc.getID(), node);
+ return nodeConnector;
+ }
+ }
+ return null;
+ }
+
/**
* The Map returned is a copy of the current topology hence if the topology
* changes the copy doesn't
private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
switch (type) {
case ADDED:
- // Avoid redundant update as notifications trigger expensive tasks
- if (edgesDB.containsKey(e)) {
- log.trace("Skipping redundant edge addition: {}", e);
- return null;
- }
-
- // Ensure that head node connector exists
- if (!headNodeConnectorExist(e)) {
- log.warn("Ignore edge that contains invalid node connector: {}", e);
- return null;
- }
- // Make sure the props are non-null
+ // Make sure the props are non-null or create a copy
if (props == null) {
props = new HashSet<Property>();
} else {
props = new HashSet<Property>(props);
}
- //in case of node switch-over to a different cluster controller,
- //let's retain edge props
Set<Property> currentProps = this.edgesDB.get(e);
- if (currentProps != null){
+ if (currentProps != null) {
+
+ if (currentProps.equals(props)) {
+ // Avoid redundant updates as notifications trigger expensive tasks
+ log.trace("Skipping redundant edge addition: {}", e);
+ return null;
+ }
+
+ // In case of node switch-over to a different cluster controller,
+ // let's retain edge props (e.g. creation time)
props.addAll(currentProps);
}
+ // Ensure that head node connector exists
+ if (!headNodeConnectorExist(e)) {
+ log.warn("Ignore edge that contains invalid node connector: {}", e);
+ return null;
+ }
+
+ // Check if nodeConnectors of the edge were correctly categorized
+ // by protocol plugin
+ crossCheckNodeConnectors(e);
+
// Now make sure there is the creation timestamp for the
// edge, if not there, stamp with the first update
boolean found_create = false;