package org.opendaylight.controller.md.statistics.manager;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import org.eclipse.xtext.xbase.lib.Exceptions;
import org.opendaylight.controller.md.statistics.manager.MultipartMessageManager.StatsRequestType;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
+import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
private static final Logger spLogger = LoggerFactory.getLogger(StatisticsProvider.class);
private final MultipartMessageManager multipartMessageManager = new MultipartMessageManager();
- private final InstanceIdentifier<Nodes> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).toInstance();
private final DataProviderService dps;
//Local caching of stats
- private final ConcurrentMap<NodeId,NodeStatisticsHandler> statisticsCache = new ConcurrentHashMap<>();
+ private final ConcurrentMap<NodeId, NodeStatisticsHandler> handlers = new ConcurrentHashMap<>();
private OpendaylightGroupStatisticsService groupStatsService;
private Registration<NotificationListener> listenerRegistration;
- public void start(final DataBrokerService dbs, final NotificationProviderService nps, final RpcConsumerRegistry rpcRegistry) {
-
- this.listenerRegistration = nps.registerNotificationListener(this.updateCommiter);
+ private ListenerRegistration<DataChangeListener> flowCapableTrackerRegistration;
- statsUpdateHandler = new StatisticsUpdateHandler(StatisticsProvider.this);
- registerDataStoreUpdateListener(dbs);
+ public void start(final DataBrokerService dbs, final NotificationProviderService nps, final RpcConsumerRegistry rpcRegistry) {
- // Get Group/Meter statistics service instance
+ // Get Group/Meter statistics service instances
groupStatsService = rpcRegistry.getRpcService(OpendaylightGroupStatisticsService.class);
meterStatsService = rpcRegistry.getRpcService(OpendaylightMeterStatisticsService.class);
flowStatsService = rpcRegistry.getRpcService(OpendaylightFlowStatisticsService.class);
flowTableStatsService = rpcRegistry.getRpcService(OpendaylightFlowTableStatisticsService.class);
queueStatsService = rpcRegistry.getRpcService(OpendaylightQueueStatisticsService.class);
+ // Start receiving notifications
+ this.listenerRegistration = nps.registerNotificationListener(this.updateCommiter);
+
+ // Register for switch connect/disconnect notifications
+ final InstanceIdentifier<FlowCapableNode> fcnId = InstanceIdentifier.builder(Nodes.class)
+ .child(Node.class).augmentation(FlowCapableNode.class).build();
+ this.flowCapableTrackerRegistration = dbs.registerDataChangeListener(fcnId,
+ new FlowCapableTracker(this, fcnId));
+
+ statsUpdateHandler = new StatisticsUpdateHandler(StatisticsProvider.this);
+ registerDataStoreUpdateListener(dbs);
+
statisticsRequesterThread = new Thread( new Runnable(){
@Override
public void run() {
while(true){
try {
- for(NodeStatisticsHandler nodeStatisticsAger : statisticsCache.values()){
+ for(NodeStatisticsHandler nodeStatisticsAger : handlers.values()){
nodeStatisticsAger.cleanStaleStatistics();
}
multipartMessageManager.cleanStaleTransactionIds();
}
private void registerDataStoreUpdateListener(DataBrokerService dbs) {
- //Register for Node updates
- InstanceIdentifier<? extends DataObject> pathNode = InstanceIdentifier.builder(Nodes.class)
- .child(Node.class).toInstance();
- dbs.registerDataChangeListener(pathNode, statsUpdateHandler);
+ // FIXME: the below should be broken out into StatisticsUpdateHandler
//Register for flow updates
InstanceIdentifier<? extends DataObject> pathFlow = InstanceIdentifier.builder(Nodes.class).child(Node.class)
return dps.beginTransaction();
}
- private void statsRequestSender(){
-
- List<Node> targetNodes = getAllConnectedNodes();
-
- if(targetNodes == null)
- return;
-
-
- for (Node targetNode : targetNodes){
-
- if(targetNode.getAugmentation(FlowCapableNode.class) != null){
- sendStatisticsRequestsToNode(targetNode);
- }
+ private void statsRequestSender() {
+ for (NodeStatisticsHandler h : handlers.values()) {
+ sendStatisticsRequestsToNode(h.getTargetNodeKey());
}
}
- public void sendStatisticsRequestsToNode(Node targetNode){
+ private void sendStatisticsRequestsToNode(NodeKey targetNode){
spLogger.debug("Send requests for statistics collection to node : {})",targetNode.getId());
- InstanceIdentifier<Node> targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode.getKey()).toInstance();
+ InstanceIdentifier<Node> targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode).toInstance();
NodeRef targetNodeRef = new NodeRef(targetInstanceId);
try{
if(flowStatsService != null){
- sendAggregateFlowsStatsFromAllTablesRequest(targetNode.getKey());
+ sendAggregateFlowsStatsFromAllTablesRequest(targetNode);
sendAllFlowsStatsFromAllTablesRequest(targetNodeRef);
}
if(flowTableStatsService != null){
}
- public void sendAllFlowTablesStatisticsRequest(NodeRef targetNodeRef) throws InterruptedException, ExecutionException {
+ private void sendAllFlowTablesStatisticsRequest(NodeRef targetNodeRef) throws InterruptedException, ExecutionException {
final GetFlowTablesStatisticsInputBuilder input =
new GetFlowTablesStatisticsInputBuilder();
}
- public void sendAllFlowsStatsFromAllTablesRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
+ private void sendAllFlowsStatsFromAllTablesRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input =
new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
}
- public void sendAggregateFlowsStatsFromAllTablesRequest(NodeKey targetNodeKey) throws InterruptedException, ExecutionException{
+ private void sendAggregateFlowsStatsFromAllTablesRequest(NodeKey targetNodeKey) throws InterruptedException, ExecutionException{
List<Short> tablesId = getTablesFromNode(targetNodeKey);
}
}
- public void sendAggregateFlowsStatsFromTableRequest(NodeKey targetNodeKey,Short tableId) throws InterruptedException, ExecutionException{
+ private void sendAggregateFlowsStatsFromTableRequest(NodeKey targetNodeKey,Short tableId) throws InterruptedException, ExecutionException{
spLogger.debug("Send aggregate stats request for flow table {} to node {}",tableId,targetNodeKey);
GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input =
, StatsRequestType.AGGR_FLOW);
}
- public void sendAllNodeConnectorsStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
+ private void sendAllNodeConnectorsStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
final GetAllNodeConnectorsStatisticsInputBuilder input = new GetAllNodeConnectorsStatisticsInputBuilder();
}
- public void sendAllGroupStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
+ private void sendAllGroupStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
final GetAllGroupStatisticsInputBuilder input = new GetAllGroupStatisticsInputBuilder();
}
- public void sendAllMeterStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
+ private void sendAllMeterStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
GetAllMeterStatisticsInputBuilder input = new GetAllMeterStatisticsInputBuilder();
}
- public void sendAllQueueStatsFromAllNodeConnector(NodeRef targetNode) throws InterruptedException, ExecutionException {
+ private void sendAllQueueStatsFromAllNodeConnector(NodeRef targetNode) throws InterruptedException, ExecutionException {
GetAllQueuesStatisticsFromAllPortsInputBuilder input = new GetAllQueuesStatisticsFromAllPortsInputBuilder();
input.setNode(targetNode);
*/
public final NodeStatisticsHandler getStatisticsHandler(final NodeId nodeId) {
Preconditions.checkNotNull(nodeId);
- NodeStatisticsHandler ager = statisticsCache.get(nodeId);
- if (ager == null) {
- ager = new NodeStatisticsHandler(this, new NodeKey(nodeId));
- statisticsCache.put(nodeId, ager);
+ NodeStatisticsHandler handler = handlers.get(nodeId);
+ if (handler == null) {
+ spLogger.info("Attempted to get non-existing handler for {}", nodeId);
}
-
- return ager;
- }
-
- private List<Node> getAllConnectedNodes(){
- Nodes nodes = (Nodes) dps.readOperationalData(nodesIdentifier);
- if(nodes == null)
- return null;
-
- spLogger.debug("Number of connected nodes : {}",nodes.getNode().size());
- return nodes.getNode();
+ return handler;
}
private List<Short> getTablesFromNode(NodeKey nodeKey){
return nodeKey.getId();
}
- @SuppressWarnings("deprecation")
@Override
- public void close(){
-
+ public void close() {
try {
- spLogger.info("Statistics Provider stopped.");
if (this.listenerRegistration != null) {
-
this.listenerRegistration.close();
-
this.statisticsRequesterThread.destroy();
-
this.statisticsAgerThread.destroy();
+ }
+ if (this.flowCapableTrackerRegistration != null) {
+ this.flowCapableTrackerRegistration.close();
+ this.flowCapableTrackerRegistration = null;
+ }
+ } catch (Exception e) {
+ spLogger.warn("Failed to stop Statistics Provider completely", e);
+ } finally {
+ spLogger.info("Statistics Provider stopped.");
+ }
+ }
+ synchronized void startNodeHandlers(final Collection<NodeKey> addedNodes) {
+ for (NodeKey key : addedNodes) {
+ if (handlers.containsKey(key.getId())) {
+ spLogger.warn("Attempted to start already-existing handler for {}, very strange", key.getId());
+ continue;
}
- } catch (Throwable e) {
- throw Exceptions.sneakyThrow(e);
- }
+
+ final NodeStatisticsHandler h = new NodeStatisticsHandler(this, key);
+ handlers.put(key.getId(), h);
+ spLogger.debug("Started node handler for {}", key.getId());
+
+ // FIXME: this should be in the NodeStatisticsHandler itself
+ sendStatisticsRequestsToNode(key);
+ }
}
+ synchronized void stopNodeHandlers(final Collection<NodeKey> removedNodes) {
+ for (NodeKey key : removedNodes) {
+ final NodeStatisticsHandler s = handlers.remove(key.getId());
+ if (s != null) {
+ spLogger.debug("Stopping node handler for {}", key.getId());
+ s.close();
+ } else {
+ spLogger.warn("Attempted to remove non-existing handler for {}, very strange", key.getId());
+ }
+ }
+ }
}