package org.opendaylight.controller.statisticsmanager.internal;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.opendaylight.controller.clustering.services.CacheConfigException;
import org.opendaylight.controller.clustering.services.CacheExistException;
+import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
import org.opendaylight.controller.clustering.services.IClusterContainerServices;
import org.opendaylight.controller.clustering.services.IClusterServices;
+import org.opendaylight.controller.connectionmanager.IConnectionManager;
import org.opendaylight.controller.forwardingrulesmanager.FlowEntry;
+import org.opendaylight.controller.sal.connection.ConnectionLocality;
import org.opendaylight.controller.sal.core.IContainer;
import org.opendaylight.controller.sal.core.Node;
import org.opendaylight.controller.sal.core.NodeConnector;
* The class caches latest network nodes statistics as notified by reader
* services and provides API to retrieve them.
*/
-public class StatisticsManager implements IStatisticsManager, IReadServiceListener, IListenInventoryUpdates {
+public class StatisticsManager implements IStatisticsManager, IReadServiceListener, IListenInventoryUpdates,
+ ICacheUpdateAware<Object,Object> {
private static final Logger log = LoggerFactory.getLogger(StatisticsManager.class);
private IContainer container;
private IClusterContainerServices clusterContainerService;
private IReadService reader;
+ private IConnectionManager connectionManager;
//statistics caches
private ConcurrentMap<Node, List<FlowOnNode>> flowStatistics;
private ConcurrentMap<Node, List<NodeConnectorStatistics>> nodeConnectorStatistics;
private ConcurrentMap<Node, List<NodeTableStatistics>> tableStatistics;
private ConcurrentMap<Node, NodeDescription> descriptionStatistics;
+ // data structure for latches
+ // this is not a cluster cache
+ private ConcurrentMap<Node, CountDownLatch> latches = new ConcurrentHashMap<Node, CountDownLatch>();
+ // 30 seconds is the timeout.
+ // the value of this can be tweaked based on performance tests.
+ private static long latchTimeout = 30;
+
+ // cache for flow stats refresh triggers
+ // an entry added to this map triggers the statistics manager
+ // to which the node is connected to get the latest flow stats from that node
+ // this is a cluster cache
+ private ConcurrentMap<Integer, Node> triggers;
+
+ // use an atomic integer for the triggers key
+ private AtomicInteger triggerKey = new AtomicInteger();
+
+ // single thread executor for the triggers
+ private ExecutorService triggerExecutor;
+
+ static final String TRIGGERS_CACHE = "statisticsmanager.triggers";
+ static final String FLOW_STATISTICS_CACHE = "statisticsmanager.flowStatistics";
+
private void nonClusterObjectCreate() {
flowStatistics = new ConcurrentHashMap<Node, List<FlowOnNode>>();
nodeConnectorStatistics = new ConcurrentHashMap<Node, List<NodeConnectorStatistics>>();
tableStatistics = new ConcurrentHashMap<Node, List<NodeTableStatistics>>();
descriptionStatistics = new ConcurrentHashMap<Node, NodeDescription>();
+ triggers = new ConcurrentHashMap<Integer, Node>();
}
- @SuppressWarnings("deprecation")
private void allocateCaches() {
if (clusterContainerService == null) {
nonClusterObjectCreate();
}
try {
- clusterContainerService.createCache("statisticsmanager.flowStatistics",
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ clusterContainerService.createCache(FLOW_STATISTICS_CACHE,
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache("statisticsmanager.nodeConnectorStatistics",
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache("statisticsmanager.tableStatistics",
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache("statisticsmanager.descriptionStatistics",
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
-
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+ clusterContainerService.createCache(TRIGGERS_CACHE,
+ EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC));
} catch (CacheConfigException cce) {
log.error("Statistics cache configuration invalid - check cache mode");
} catch (CacheExistException ce) {
log.debug("Skipping statistics cache creation - already present");
}
}
- @SuppressWarnings({ "unchecked", "deprecation" })
+ @SuppressWarnings({ "unchecked" })
private void retrieveCaches() {
ConcurrentMap<?, ?> map;
log.debug("Statistics Manager - retrieveCaches for Container {}", container);
- map = clusterContainerService.getCache("statisticsmanager.flowStatistics");
+ map = clusterContainerService.getCache(FLOW_STATISTICS_CACHE);
if (map != null) {
this.flowStatistics = (ConcurrentMap<Node, List<FlowOnNode>>) map;
} else {
} else {
log.error("Cache allocation failed for statisticsmanager.descriptionStatistics in container {}", container.getName());
}
+
+ map = clusterContainerService.getCache(TRIGGERS_CACHE);
+ if (map != null) {
+ this.triggers = (ConcurrentMap<Integer, Node>) map;
+ } else {
+ log.error("Cache allocation failed for " + TRIGGERS_CACHE +" in container {}", container.getName());
+ }
}
/**
*/
void start() {
log.debug("START called!");
+ this.triggerExecutor = Executors.newSingleThreadExecutor();
}
/**
*/
void stop() {
log.debug("STOP called!");
+ this.triggerExecutor.shutdownNow();
}
void setClusterContainerService(IClusterContainerServices s) {
@Override
public List<FlowOnNode> getFlows(Node node) {
if (node == null) {
- return null;
+ return Collections.emptyList();
}
List<FlowOnNode> flowList = new ArrayList<FlowOnNode>();
return flowList;
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<FlowOnNode> getFlowsNoCache(Node node) {
+ if (node == null) {
+ return Collections.emptyList();
+ }
+ // check if the node is local to this controller
+ ConnectionLocality locality = ConnectionLocality.LOCAL;
+ if(this.connectionManager != null) {
+ locality = this.connectionManager.getLocalityStatus(node);
+ }
+ if (locality == ConnectionLocality.NOT_LOCAL) {
+ // send a trigger to all and wait for either a response or timeout
+ CountDownLatch newLatch = new CountDownLatch(1);
+ CountDownLatch oldLatch = this.latches.putIfAbsent(node, newLatch);
+ this.triggers.put(this.triggerKey.incrementAndGet(), node);
+ try {
+ boolean retStatus;
+ if(oldLatch != null) {
+ retStatus = oldLatch.await(StatisticsManager.latchTimeout, TimeUnit.SECONDS);
+ } else {
+ retStatus = newLatch.await(StatisticsManager.latchTimeout, TimeUnit.SECONDS);
+ }
+ // log the return code as it will give us, if
+ // the latch timed out.
+ log.debug("latch timed out {}", !retStatus);
+ } catch (InterruptedException e) {
+ // log the error and move on
+ log.warn("Waiting for statistics response interrupted", e);
+ // restore the interrupt status
+ // its a good practice to restore the interrupt status
+ // if you are not propagating the InterruptedException
+ Thread.currentThread().interrupt();
+ }
+ // now that the wait is over
+ // remove the latch entry
+ this.latches.remove(node);
+ } else {
+ // the node is local.
+ // call the read service
+ if (this.reader != null) {
+ List<FlowOnNode> flows = reader.nonCachedReadAllFlows(node);
+ if (flows != null) {
+ nodeFlowStatisticsUpdated(node, flows);
+ }
+ }
+ }
+ // at this point we are ready to return the cached value.
+ // this cached value will be up to date with a very high probability
+ // due to what we have done previously ie:- send a trigger for cache update
+ // or refreshed the cache if the node is local.
+ return getFlows(node);
+ }
+
@Override
public Map<Node, List<FlowOnNode>> getFlowStatisticsForFlowList(List<FlowEntry> flowList) {
Map<Node, List<FlowOnNode>> statMapOutput = new HashMap<Node, List<FlowOnNode>>();
@Override
public List<NodeConnectorStatistics> getNodeConnectorStatistics(Node node) {
if (node == null){
- return null;
+ return Collections.emptyList();
}
List<NodeConnectorStatistics> statList = new ArrayList<NodeConnectorStatistics>();
@Override
public List<NodeTableStatistics> getNodeTableStatistics(Node node){
if (node == null){
- return null;
+ return Collections.emptyList();
}
List<NodeTableStatistics> statList = new ArrayList<NodeTableStatistics>();
List<NodeTableStatistics> cachedList = tableStatistics.get(node);
@Override
public void nodeFlowStatisticsUpdated(Node node, List<FlowOnNode> flowStatsList) {
- List<FlowOnNode> currentStat = this.flowStatistics.get(node);
- // Update cache only if changed to avoid unnecessary cache sync operations
- if (! flowStatsList.equals(currentStat)){
- this.flowStatistics.put(node, flowStatsList);
- }
+ // No equality check because duration fields change constantly
+ this.flowStatistics.put(node, flowStatsList);
}
@Override
public void updateNodeConnector(NodeConnector nodeConnector, UpdateType type, Set<Property> props) {
// Not interested in this update
}
+
+ public void unsetIConnectionManager(IConnectionManager s) {
+ if (s == this.connectionManager) {
+ this.connectionManager = null;
+ }
+ }
+
+ public void setIConnectionManager(IConnectionManager s) {
+ this.connectionManager = s;
+ }
+
+ @Override
+ public void entryCreated(Object key, String cacheName, boolean originLocal) {
+ /*
+ * Do nothing
+ */
+ }
+
+ @Override
+ public void entryUpdated(Object key, Object new_value, String cacheName, boolean originLocal) {
+ if (originLocal) {
+ /*
+ * Local updates are of no interest
+ */
+ return;
+ }
+ if (cacheName.equals(TRIGGERS_CACHE)) {
+ log.trace("Got a trigger for key {} : value {}", key, new_value);
+ final Node n = (Node) new_value;
+ // check if the node is local to this controller
+ ConnectionLocality locality = ConnectionLocality.NOT_LOCAL;
+ if(this.connectionManager != null) {
+ locality = this.connectionManager.getLocalityStatus(n);
+ }
+ if (locality == ConnectionLocality.LOCAL) {
+ log.trace("trigger for node {} processes locally", n);
+ // delete the trigger and proceed with handling the trigger
+ this.triggers.remove(key);
+ // this is a potentially long running task
+ // off load it from the listener thread
+ Runnable r = new Runnable() {
+ @Override
+ public void run() {
+ // the node is local.
+ // call the read service
+ if (reader != null) {
+ List<FlowOnNode> flows = reader.nonCachedReadAllFlows(n);
+ if (flows != null) {
+ flowStatistics.put(n, flows);
+ }
+ }
+ }
+ };
+ // submit the runnable for execution
+ if(this.triggerExecutor != null) {
+ this.triggerExecutor.execute(r);
+ }
+ }
+ } else if (cacheName.equals(FLOW_STATISTICS_CACHE)) {
+ // flow statistics cache updated
+ // get the node
+ log.trace("Got a flow statistics cache update for key {}", key);
+ // this is a short running task
+ // no need of off loading from the listener thread
+ final Node n = (Node) key;
+ // check if an outstanding trigger exists for this node
+ CountDownLatch l = this.latches.get(n);
+ if(l != null) {
+ // someone was waiting for this update
+ // let him know
+ l.countDown();
+ }
+ }
+ }
+
+ @Override
+ public void entryDeleted(Object key, String cacheName, boolean originLocal) {
+ /*
+ * Do nothing
+ */
+ }
}