import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.felix.dm.Component;
import org.eclipse.osgi.framework.console.CommandProvider;
import org.opendaylight.controller.clustering.services.CacheConfigException;
import org.opendaylight.controller.clustering.services.CacheExistException;
+import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
import org.opendaylight.controller.clustering.services.IClusterContainerServices;
import org.opendaylight.controller.clustering.services.IClusterServices;
import org.opendaylight.controller.configuration.IConfigurationContainerAware;
import org.opendaylight.controller.sal.utils.StatusCode;
import org.opendaylight.controller.topologymanager.ITopologyManager;
import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
+import org.opendaylight.controller.topologymanager.ITopologyManagerClusterWideAware;
import org.opendaylight.controller.topologymanager.TopologyUserLinkConfig;
import org.osgi.framework.BundleContext;
import org.osgi.framework.FrameworkUtil;
* network topology. It provides service for applications to interact with
* topology database and notifies all the listeners of topology changes.
*/
-public class TopologyManagerImpl implements ITopologyManager,
-IConfigurationContainerAware, IListenTopoUpdates, IObjectReader,
-CommandProvider {
+public class TopologyManagerImpl implements
+ ICacheUpdateAware,
+ ITopologyManager,
+ IConfigurationContainerAware,
+ IListenTopoUpdates,
+ IObjectReader,
+ CommandProvider {
+ static final String TOPOEDGESDB = "topologymanager.edgesDB";
+ static final String TOPOHOSTSDB = "topologymanager.hostsDB";
+ static final String TOPONODECONNECTORDB = "topologymanager.nodeConnectorDB";
+ static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class);
private static final String SAVE = "Save";
private ITopologyService topoService;
// DB of all the NodeConnectors with an Host attached to it
private ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>> hostsDB;
// Topology Manager Aware listeners
- private Set<ITopologyManagerAware> topologyManagerAware =
- new CopyOnWriteArraySet<ITopologyManagerAware>();;
-
+ private Set<ITopologyManagerAware> topologyManagerAware = new CopyOnWriteArraySet<ITopologyManagerAware>();
+ // Topology Manager Aware listeners - for clusterwide updates
+ private Set<ITopologyManagerClusterWideAware> topologyManagerClusterWideAware =
+ new CopyOnWriteArraySet<ITopologyManagerClusterWideAware>();
private static String ROOT = GlobalConstants.STARTUPHOME.toString();
private String userLinksFileName;
private ConcurrentMap<String, TopologyUserLinkConfig> userLinksDB;
- private ConcurrentMap<Long, String> configSaveEvent;
+ private BlockingQueue<TopoEdgeUpdate> notifyQ = new LinkedBlockingQueue<TopoEdgeUpdate>();
+ private volatile Boolean shuttingDown = false;
+ private Thread notifyThread;
void nonClusterObjectCreate() {
hostsDB = new ConcurrentHashMap<NodeConnector, ImmutablePair<Host, Set<Property>>>();
nodeConnectorsDB = new ConcurrentHashMap<NodeConnector, Set<Property>>();
userLinksDB = new ConcurrentHashMap<String, TopologyUserLinkConfig>();
- configSaveEvent = new ConcurrentHashMap<Long, String>();
}
void setTopologyManagerAware(ITopologyManagerAware s) {
}
}
+ void setTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
+ if (this.topologyManagerClusterWideAware != null) {
+ log.debug("Adding ITopologyManagerClusterWideAware: {}", s);
+ this.topologyManagerClusterWideAware.add(s);
+ }
+ }
+
+ void unsetTopologyManagerClusterWideAware(ITopologyManagerClusterWideAware s) {
+ if (this.topologyManagerClusterWideAware != null) {
+ log.debug("Removing ITopologyManagerClusterWideAware: {}", s);
+ this.topologyManagerClusterWideAware.remove(s);
+ }
+ }
+
void setTopoService(ITopologyService s) {
log.debug("Adding ITopologyService: {}", s);
this.topoService = s;
*
*/
void init(Component c) {
-
allocateCaches();
retrieveCaches();
-
String containerName = null;
Dictionary<?, ?> props = c.getServiceProperties();
if (props != null) {
userLinksFileName = ROOT + "userTopology_" + containerName + ".conf";
registerWithOSGIConsole();
loadConfiguration();
+ // Restore the shuttingDown status on init of the component
+ shuttingDown = false;
+ notifyThread = new Thread(new TopologyNotify(notifyQ));
}
@SuppressWarnings({ "unchecked", "deprecation" })
- private void allocateCaches(){
- if (this.clusterContainerService == null) {
- nonClusterObjectCreate();
- log.error("Cluster Services unavailable, allocated non-cluster caches!");
- return;
- }
-
+ private void allocateCaches() {
try {
- this.edgesDB = (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.createCache(
- "topologymanager.edgesDB", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ this.edgesDB =
+ (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.createCache(TOPOEDGESDB,
+ EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
} catch (CacheExistException cee) {
- log.debug("topologymanager.edgesDB Cache already exists - destroy and recreate if needed");
+ log.debug(TOPOEDGESDB + " Cache already exists - destroy and recreate if needed");
} catch (CacheConfigException cce) {
- log.error("topologymanager.edgesDB Cache configuration invalid - check cache mode");
+ log.error(TOPOEDGESDB + " Cache configuration invalid - check cache mode");
}
try {
- this.hostsDB = (ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>>) this.clusterContainerService
- .createCache("topologymanager.hostsDB", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ this.hostsDB =
+ (ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>>) this.clusterContainerService.createCache(
+ TOPOHOSTSDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
} catch (CacheExistException cee) {
- log.debug("topologymanager.hostsDB Cache already exists - destroy and recreate if needed");
+ log.debug(TOPOHOSTSDB + " Cache already exists - destroy and recreate if needed");
} catch (CacheConfigException cce) {
- log.error("topologymanager.hostsDB Cache configuration invalid - check cache mode");
+ log.error(TOPOHOSTSDB + " Cache configuration invalid - check cache mode");
}
try {
- this.nodeConnectorsDB = (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService
- .createCache("topologymanager.nodeConnectorDB", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ this.nodeConnectorsDB =
+ (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.createCache(
+ TOPONODECONNECTORDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
} catch (CacheExistException cee) {
- log.debug("topologymanager.nodeConnectorDB Cache already exists - destroy and recreate if needed");
+ log.debug(TOPONODECONNECTORDB + " Cache already exists - destroy and recreate if needed");
} catch (CacheConfigException cce) {
- log.error("topologymanager.nodeConnectorDB Cache configuration invalid - check cache mode");
+ log.error(TOPONODECONNECTORDB + " Cache configuration invalid - check cache mode");
}
try {
- this.userLinksDB = (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService
- .createCache("topologymanager.userLinksDB", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ this.userLinksDB =
+ (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.createCache(
+ TOPOUSERLINKSDB, EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
} catch (CacheExistException cee) {
- log.debug("topologymanager.userLinksDB Cache already exists - destroy and recreate if needed");
+ log.debug(TOPOUSERLINKSDB + " Cache already exists - destroy and recreate if needed");
} catch (CacheConfigException cce) {
- log.error("topologymanager.userLinksDB Cache configuration invalid - check cache mode");
+ log.error(TOPOUSERLINKSDB + " Cache configuration invalid - check cache mode");
}
-
- try {
- this.configSaveEvent = (ConcurrentMap<Long, String>) this.clusterContainerService
- .createCache("topologymanager.configSaveEvent", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
- } catch (CacheExistException cee) {
- log.debug("topologymanager.configSaveEvent Cache already exists - destroy and recreate if needed");
- } catch (CacheConfigException cce) {
- log.error("topologymanager.configSaveEvent Cache configuration invalid - check cache mode");
- }
-
}
@SuppressWarnings({ "unchecked", "deprecation" })
return;
}
- this.edgesDB = (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService
- .getCache("topologymanager.edgesDB");
+ this.edgesDB = (ConcurrentMap<Edge, Set<Property>>) this.clusterContainerService.getCache(TOPOEDGESDB);
if (edgesDB == null) {
- log.error("Failed to get cache for topologymanager.edgesDB");
+ log.error("Failed to get cache for " + TOPOEDGESDB);
}
- this.hostsDB = (ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>>) this.clusterContainerService
- .getCache("topologymanager.hostsDB");
+ this.hostsDB =
+ (ConcurrentMap<NodeConnector, ImmutablePair<Host, Set<Property>>>) this.clusterContainerService.getCache(TOPOHOSTSDB);
if (hostsDB == null) {
- log.error("Failed to get cache for topologymanager.hostsDB");
+ log.error("Failed to get cache for " + TOPOHOSTSDB);
}
- this.nodeConnectorsDB = (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService
- .getCache("topologymanager.nodeConnectorDB");
+ this.nodeConnectorsDB =
+ (ConcurrentMap<NodeConnector, Set<Property>>) this.clusterContainerService.getCache(TOPONODECONNECTORDB);
if (nodeConnectorsDB == null) {
- log.error("Failed to get cache for topologymanager.nodeConnectorDB");
+ log.error("Failed to get cache for " + TOPONODECONNECTORDB);
}
- this.userLinksDB = (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService
- .getCache("topologymanager.userLinksDB");
+ this.userLinksDB =
+ (ConcurrentMap<String, TopologyUserLinkConfig>) this.clusterContainerService.getCache(TOPOUSERLINKSDB);
if (userLinksDB == null) {
- log.error("Failed to get cache for topologymanager.userLinksDB");
+ log.error("Failed to get cache for " + TOPOUSERLINKSDB);
}
-
- this.configSaveEvent = (ConcurrentMap<Long, String>) this.clusterContainerService
- .getCache("topologymanager.configSaveEvent");
- if (configSaveEvent == null) {
- log.error("Failed to get cache for topologymanager.configSaveEvent");
- }
-
}
/**
*
*/
void started() {
+ // Start the batcher thread for the cluster wide topology updates
+ notifyThread.start();
// SollicitRefresh MUST be called here else if called at init
// time it may sollicit refresh too soon.
log.debug("Sollicit topology refresh");
topoService.sollicitRefresh();
}
+ void stop() {
+ shuttingDown = true;
+ notifyThread.interrupt();
+ }
+
/**
* Function called by the dependency manager when at least one dependency
* become unsatisfied or when the component is shutting down because for
*
*/
void destroy() {
+ notifyQ.clear();
+ notifyThread = null;
}
@SuppressWarnings("unchecked")
@Override
public Status saveConfig() {
- // Publish the save config event to the cluster
- configSaveEvent.put(new Date().getTime(), SAVE );
return saveConfigInternal();
}
@Override
public Host getHostAttachedToNodeConnector(NodeConnector port) {
ImmutablePair<Host, Set<Property>> host;
- if (this.hostsDB == null || (host = this.hostsDB.get(port)) == null) {
+ if ((this.hostsDB == null) || ((host = this.hostsDB.get(port)) == null)) {
return null;
}
return host.getLeft();
TopologyUserLinkConfig link = userLinksDB.remove(linkName);
Edge linkTuple;
- if (link != null && (linkTuple = getLinkTuple(link)) != null) {
+ if ((link != null) && ((linkTuple = getLinkTuple(link)) != null)) {
if (! isProductionLink(linkTuple)) {
edgeUpdate(linkTuple, UpdateType.REMOVED, null);
}
log.warn("Link Utilization back to normal: {}", edge);
}
+ private void edgeUpdateClusterWide(Edge e, UpdateType type, Set<Property> props, boolean isLocal) {
+ TopoEdgeUpdate upd = new TopoEdgeUpdate(e, props, type);
+ upd.setLocal(isLocal);
+ notifyQ.add(upd);
+ }
+
+ @Override
+ public void entryCreated(final Object key, final String cacheName, final boolean originLocal) {
+ if (cacheName.equals(TOPOEDGESDB)) {
+ // This is the case of an Edge being added to the topology DB
+ final Edge e = (Edge) key;
+ log.trace("Edge {} CREATED isLocal:{}", e, originLocal);
+ edgeUpdateClusterWide(e, UpdateType.ADDED, null, originLocal);
+ }
+ }
+
+ @Override
+ public void entryUpdated(final Object key, final Object new_value, final String cacheName, final boolean originLocal) {
+ if (cacheName.equals(TOPOEDGESDB)) {
+ final Edge e = (Edge) key;
+ log.trace("Edge {} CHANGED isLocal:{}", e, originLocal);
+ final Set<Property> props = (Set<Property>) new_value;
+ edgeUpdateClusterWide(e, UpdateType.CHANGED, props, originLocal);
+ }
+ }
+
+ @Override
+ public void entryDeleted(final Object key, final String cacheName, final boolean originLocal) {
+ if (cacheName.equals(TOPOEDGESDB)) {
+ final Edge e = (Edge) key;
+ log.trace("Edge {} DELETED isLocal:{}", e, originLocal);
+ edgeUpdateClusterWide(e, UpdateType.REMOVED, null, originLocal);
+ }
+ }
+
+ class TopologyNotify implements Runnable {
+ private final BlockingQueue<TopoEdgeUpdate> notifyQ;
+ private TopoEdgeUpdate entry;
+ private List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
+ private boolean notifyListeners;
+
+ TopologyNotify(BlockingQueue<TopoEdgeUpdate> notifyQ) {
+ this.notifyQ = notifyQ;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ log.trace("New run of TopologyNotify");
+ notifyListeners = false;
+ // First we block waiting for an element to get in
+ entry = notifyQ.take();
+ // Then we drain the whole queue if elements are
+ // in it without getting into any blocking condition
+ for (; entry != null; entry = notifyQ.poll()) {
+ teuList.add(entry);
+ notifyListeners = true;
+ }
+
+ // Notify listeners only if there were updates drained else
+ // give up
+ if (notifyListeners) {
+ log.trace("Notifier thread, notified a listener");
+ // Now update the listeners
+ for (ITopologyManagerClusterWideAware s : topologyManagerClusterWideAware) {
+ try {
+ s.edgeUpdate(teuList);
+ } catch (Exception exc) {
+ log.error("Exception on edge update:", exc);
+ }
+ }
+ }
+ teuList.clear();
+
+ // Lets sleep for sometime to allow aggregation of event
+ Thread.sleep(100);
+ } catch (InterruptedException e1) {
+ log.warn("TopologyNotify interrupted {}", e1.getMessage());
+ if (shuttingDown) {
+ return;
+ }
+ } catch (Exception e2) {
+ log.error("", e2);
+ }
+ }
+ }
+ }
}