Centralize targetNodeRef in the NodeStatisticsHandler 98/5298/7
authorRobert Varga <rovarga@cisco.com>
Thu, 13 Feb 2014 03:41:07 +0000 (04:41 +0100)
committerGerrit Code Review <gerrit@opendaylight.org>
Sat, 15 Feb 2014 02:09:39 +0000 (02:09 +0000)
There's no reason to allocate it over and over again, we will be using
for a long time. Also push the DataProviderService into
NodeStatisticsHandler such that we do not have circular references.

Change-Id: I1e2444644a27516c87ca93d386f5dc2ac334ffaf
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/FlowCapableTracker.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsHandler.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsProvider.java

index 075348d54c5b6b988427d443dea972d82264b6b2..bb1544c57abea8d87a598b271b33d50e6edee7c2 100644 (file)
@@ -68,6 +68,8 @@ final class FlowCapableTracker implements DataChangeListener {
      */
     @Override
     public synchronized void onDataChanged(final DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
+        logger.debug("Tracker at root {} processing notification", root);
+
         /*
          * First process all the identifiers which were removed, trying to figure out
          * whether they constitute removal of FlowCapableNode.
@@ -103,5 +105,7 @@ final class FlowCapableTracker implements DataChangeListener {
                 }
             }), Predicates.notNull());
         stats.startNodeHandlers(addedNodes);
+
+        logger.debug("Tracker at root {} finished processing notification", root);
     }
 }
index 6a7033d795a63bc60d792d11583c1d8204ac2352..5ef4b36563173815af981977e542f88a956e948b 100644 (file)
@@ -19,6 +19,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
@@ -64,6 +65,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
@@ -107,7 +109,7 @@ import com.google.common.base.Preconditions;
  *
  * @author avishnoi@in.ibm.com
  */
-public class NodeStatisticsHandler implements AutoCloseable {
+public final class NodeStatisticsHandler implements AutoCloseable {
     private static final Logger logger = LoggerFactory.getLogger(NodeStatisticsHandler.class);
     private static final int NUMBER_OF_WAIT_CYCLES = 2;
 
@@ -116,15 +118,17 @@ public class NodeStatisticsHandler implements AutoCloseable {
     private final Map<FlowEntry,Long> flowStatsUpdate = new HashMap<>();
     private final Map<QueueEntry,Long> queuesStatsUpdate = new HashMap<>();
     private final InstanceIdentifier<Node> targetNodeIdentifier;
-    private final StatisticsProvider statisticsProvider;
+    private final DataProviderService dps;
+    private final NodeRef targetNodeRef;
     private final NodeKey targetNodeKey;
     private Collection<TableKey> knownTables = Collections.emptySet();
     private int unaccountedFlowsCounter = 1;
 
-    public NodeStatisticsHandler(StatisticsProvider statisticsProvider, NodeKey nodeKey){
-        this.statisticsProvider = Preconditions.checkNotNull(statisticsProvider);
+    public NodeStatisticsHandler(final DataProviderService dps, final NodeKey nodeKey) {
+        this.dps = Preconditions.checkNotNull(dps);
         this.targetNodeKey = Preconditions.checkNotNull(nodeKey);
         this.targetNodeIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).build();
+        this.targetNodeRef = new NodeRef(targetNodeIdentifier);
     }
 
     private static class FlowEntry {
@@ -235,9 +239,17 @@ public class NodeStatisticsHandler implements AutoCloseable {
         return knownTables;
     }
 
+    public InstanceIdentifier<Node> getTargetNodeIdentifier() {
+        return targetNodeIdentifier;
+    }
+
+    public NodeRef getTargetNodeRef() {
+        return targetNodeRef;
+    }
+
     public synchronized void updateGroupDescStats(List<GroupDescStats> list){
         final Long expiryTime = getExpiryTime();
-        final DataModificationTransaction trans = statisticsProvider.startChange();
+        final DataModificationTransaction trans = dps.beginTransaction();
 
         for (GroupDescStats groupDescStats : list) {
             GroupBuilder groupBuilder = new GroupBuilder();
@@ -264,7 +276,7 @@ public class NodeStatisticsHandler implements AutoCloseable {
     }
 
     public synchronized void updateGroupStats(List<GroupStats> list) {
-        final DataModificationTransaction trans = statisticsProvider.startChange();
+        final DataModificationTransaction trans = dps.beginTransaction();
 
         for(GroupStats groupStats : list) {
             GroupBuilder groupBuilder = new GroupBuilder();
@@ -292,7 +304,7 @@ public class NodeStatisticsHandler implements AutoCloseable {
 
     public synchronized void updateMeterConfigStats(List<MeterConfigStats> list) {
         final Long expiryTime = getExpiryTime();
-        final DataModificationTransaction trans = statisticsProvider.startChange();
+        final DataModificationTransaction trans = dps.beginTransaction();
 
         for(MeterConfigStats meterConfigStats : list) {
             MeterBuilder meterBuilder = new MeterBuilder();
@@ -320,7 +332,7 @@ public class NodeStatisticsHandler implements AutoCloseable {
 
 
     public synchronized void updateMeterStats(List<MeterStats> list) {
-        final DataModificationTransaction trans = statisticsProvider.startChange();
+        final DataModificationTransaction trans = dps.beginTransaction();
 
         for(MeterStats meterStats : list) {
             MeterBuilder meterBuilder = new MeterBuilder();
@@ -348,7 +360,7 @@ public class NodeStatisticsHandler implements AutoCloseable {
 
     public synchronized void updateQueueStats(List<QueueIdAndStatisticsMap> list) {
         final Long expiryTime = getExpiryTime();
-        final DataModificationTransaction trans = statisticsProvider.startChange();
+        final DataModificationTransaction trans = dps.beginTransaction();
 
         for (QueueIdAndStatisticsMap swQueueStats : list) {
 
@@ -387,7 +399,7 @@ public class NodeStatisticsHandler implements AutoCloseable {
     }
 
     public synchronized void updateFlowTableStats(List<FlowTableAndStatisticsMap> list) {
-        final DataModificationTransaction trans = statisticsProvider.startChange();
+        final DataModificationTransaction trans = dps.beginTransaction();
 
         final Set<TableKey> knownTables = new HashSet<>(list.size());
         for (FlowTableAndStatisticsMap ftStats : list) {
@@ -415,7 +427,7 @@ public class NodeStatisticsHandler implements AutoCloseable {
     }
 
     public synchronized void updateNodeConnectorStats(List<NodeConnectorStatisticsAndPortNumberMap> list) {
-        final DataModificationTransaction trans = statisticsProvider.startChange();
+        final DataModificationTransaction trans = dps.beginTransaction();
 
         for(NodeConnectorStatisticsAndPortNumberMap portStats : list) {
 
@@ -461,7 +473,7 @@ public class NodeStatisticsHandler implements AutoCloseable {
 
     public synchronized void updateAggregateFlowStats(Short tableId, AggregateFlowStatistics flowStats) {
         if (tableId != null) {
-            final DataModificationTransaction trans = statisticsProvider.startChange();
+            final DataModificationTransaction trans = dps.beginTransaction();
 
 
             InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
@@ -486,7 +498,7 @@ public class NodeStatisticsHandler implements AutoCloseable {
     }
 
     public synchronized void updateGroupFeatures(GroupFeatures notification) {
-        final DataModificationTransaction trans = statisticsProvider.startChange();
+        final DataModificationTransaction trans = dps.beginTransaction();
 
         final NodeBuilder nodeData = new NodeBuilder();
         nodeData.setKey(targetNodeKey);
@@ -504,7 +516,7 @@ public class NodeStatisticsHandler implements AutoCloseable {
     }
 
     public synchronized void updateMeterFeatures(MeterFeatures features) {
-        final DataModificationTransaction trans = statisticsProvider.startChange();
+        final DataModificationTransaction trans = dps.beginTransaction();
 
         final NodeBuilder nodeData = new NodeBuilder();
         nodeData.setKey(targetNodeKey);
@@ -523,7 +535,7 @@ public class NodeStatisticsHandler implements AutoCloseable {
 
     public synchronized void updateFlowStats(List<FlowAndStatisticsMapList> list) {
         final Long expiryTime = getExpiryTime();
-        final DataModificationTransaction trans = statisticsProvider.startChange();
+        final DataModificationTransaction trans = dps.beginTransaction();
 
         for(FlowAndStatisticsMapList map : list) {
             short tableId = map.getTableId();
@@ -683,7 +695,7 @@ public class NodeStatisticsHandler implements AutoCloseable {
     }
 
     public synchronized void cleanStaleStatistics(){
-        final DataModificationTransaction trans = this.statisticsProvider.startChange();
+        final DataModificationTransaction trans = dps.beginTransaction();
         final long now = System.nanoTime();
 
         //Clean stale statistics related to group
index ce2a0b3eeb40bdc4f0be621730e98b472a3988e0..3b9b4dc39e29ebaabb7d0bae2b944dbf0624abba 100644 (file)
@@ -92,12 +92,10 @@ public class StatisticsProvider implements AutoCloseable {
 
     private static final Logger spLogger = LoggerFactory.getLogger(StatisticsProvider.class);
 
+    private final ConcurrentMap<NodeId, NodeStatisticsHandler> handlers = new ConcurrentHashMap<>();
     private final MultipartMessageManager multipartMessageManager = new MultipartMessageManager();
     private final DataProviderService dps;
 
-    //Local caching of stats
-    private final ConcurrentMap<NodeId, NodeStatisticsHandler> handlers = new ConcurrentHashMap<>();
-
     private OpendaylightGroupStatisticsService groupStatsService;
 
     private OpendaylightMeterStatisticsService meterStatsService;
@@ -147,6 +145,7 @@ public class StatisticsProvider implements AutoCloseable {
         // Register for switch connect/disconnect notifications
         final InstanceIdentifier<FlowCapableNode> fcnId = InstanceIdentifier.builder(Nodes.class)
                 .child(Node.class).augmentation(FlowCapableNode.class).build();
+        spLogger.debug("Registering FlowCapable tracker to {}", fcnId);
         this.flowCapableTrackerRegistration = dbs.registerDataChangeListener(fcnId,
                 new FlowCapableTracker(this, fcnId));
 
@@ -240,38 +239,34 @@ public class StatisticsProvider implements AutoCloseable {
         }
     }
 
-    private void sendStatisticsRequestsToNode(NodeStatisticsHandler h) {
+    private void sendStatisticsRequestsToNode(final NodeStatisticsHandler h) {
         NodeKey targetNode = h.getTargetNodeKey();
         spLogger.debug("Send requests for statistics collection to node : {}", targetNode.getId());
 
-        InstanceIdentifier<Node> targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNode).build();
-
-        NodeRef targetNodeRef = new NodeRef(targetInstanceId);
-
         try{
             if(flowTableStatsService != null){
-                sendAllFlowTablesStatisticsRequest(targetNodeRef);
+                sendAllFlowTablesStatisticsRequest(h);
             }
             if(flowStatsService != null){
                 // FIXME: it does not make sense to trigger this before sendAllFlowTablesStatisticsRequest()
                 //        comes back -- we do not have any tables anyway.
                 sendAggregateFlowsStatsFromAllTablesRequest(h);
 
-                sendAllFlowsStatsFromAllTablesRequest(targetNodeRef);
+                sendAllFlowsStatsFromAllTablesRequest(h);
             }
             if(portStatsService != null){
-                sendAllNodeConnectorsStatisticsRequest(targetNodeRef);
+                sendAllNodeConnectorsStatisticsRequest(h);
             }
             if(groupStatsService != null){
-                sendAllGroupStatisticsRequest(targetNodeRef);
-                sendGroupDescriptionRequest(targetNodeRef);
+                sendAllGroupStatisticsRequest(h);
+                sendGroupDescriptionRequest(h.getTargetNodeRef());
             }
             if(meterStatsService != null){
-                sendAllMeterStatisticsRequest(targetNodeRef);
-                sendMeterConfigStatisticsRequest(targetNodeRef);
+                sendAllMeterStatisticsRequest(h);
+                sendMeterConfigStatisticsRequest(h.getTargetNodeRef());
             }
             if(queueStatsService != null){
-                sendAllQueueStatsFromAllNodeConnector(targetNodeRef);
+                sendAllQueueStatsFromAllNodeConnector(h);
             }
         }catch(Exception e){
             spLogger.error("Exception occured while sending statistics requests : {}", e);
@@ -279,30 +274,30 @@ public class StatisticsProvider implements AutoCloseable {
     }
 
 
-    private void sendAllFlowTablesStatisticsRequest(NodeRef targetNodeRef) throws InterruptedException, ExecutionException {
+    private void sendAllFlowTablesStatisticsRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException {
         final GetFlowTablesStatisticsInputBuilder input =
                 new GetFlowTablesStatisticsInputBuilder();
 
-        input.setNode(targetNodeRef);
+        input.setNode(h.getTargetNodeRef());
 
         Future<RpcResult<GetFlowTablesStatisticsOutput>> response =
                 flowTableStatsService.getFlowTablesStatistics(input.build());
 
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNodeRef),response.get().getResult().getTransactionId()
+        this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(),response.get().getResult().getTransactionId()
                 , StatsRequestType.ALL_FLOW_TABLE);
 
     }
 
-    private void sendAllFlowsStatsFromAllTablesRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
+    private void sendAllFlowsStatsFromAllTablesRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException{
         final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input =
                 new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
 
-        input.setNode(targetNode);
+        input.setNode(h.getTargetNodeRef());
 
         Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> response =
                 flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build());
 
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
+        this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId()
                 , StatsRequestType.ALL_FLOW);
 
     }
@@ -347,29 +342,29 @@ public class StatisticsProvider implements AutoCloseable {
                 , StatsRequestType.AGGR_FLOW);
     }
 
-    private void sendAllNodeConnectorsStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
+    private void sendAllNodeConnectorsStatisticsRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException{
 
         final GetAllNodeConnectorsStatisticsInputBuilder input = new GetAllNodeConnectorsStatisticsInputBuilder();
 
-        input.setNode(targetNode);
+        input.setNode(h.getTargetNodeRef());
 
         Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> response =
                 portStatsService.getAllNodeConnectorsStatistics(input.build());
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
+        this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId()
                 , StatsRequestType.ALL_PORT);
 
     }
 
-    private void sendAllGroupStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
+    private void sendAllGroupStatisticsRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException{
 
         final GetAllGroupStatisticsInputBuilder input = new GetAllGroupStatisticsInputBuilder();
 
-        input.setNode(targetNode);
+        input.setNode(h.getTargetNodeRef());
 
         Future<RpcResult<GetAllGroupStatisticsOutput>> response =
                 groupStatsService.getAllGroupStatistics(input.build());
 
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
+        this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId()
                 , StatsRequestType.ALL_GROUP);
 
     }
@@ -387,16 +382,16 @@ public class StatisticsProvider implements AutoCloseable {
 
     }
 
-    private void sendAllMeterStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
+    private void sendAllMeterStatisticsRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException{
 
         GetAllMeterStatisticsInputBuilder input = new GetAllMeterStatisticsInputBuilder();
 
-        input.setNode(targetNode);
+        input.setNode(h.getTargetNodeRef());
 
         Future<RpcResult<GetAllMeterStatisticsOutput>> response =
                 meterStatsService.getAllMeterStatistics(input.build());
 
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
+        this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId()
                 , StatsRequestType.ALL_METER);;
 
     }
@@ -415,15 +410,15 @@ public class StatisticsProvider implements AutoCloseable {
 
     }
 
-    private void sendAllQueueStatsFromAllNodeConnector(NodeRef targetNode) throws InterruptedException, ExecutionException {
+    private void sendAllQueueStatsFromAllNodeConnector(NodeStatisticsHandler h) throws InterruptedException, ExecutionException {
         GetAllQueuesStatisticsFromAllPortsInputBuilder input = new GetAllQueuesStatisticsFromAllPortsInputBuilder();
 
-        input.setNode(targetNode);
+        input.setNode(h.getTargetNodeRef());
 
         Future<RpcResult<GetAllQueuesStatisticsFromAllPortsOutput>> response =
                 queueStatsService.getAllQueuesStatisticsFromAllPorts(input.build());
 
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
+        this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId()
                 , StatsRequestType.ALL_QUEUE_STATS);;
 
     }
@@ -484,23 +479,27 @@ public class StatisticsProvider implements AutoCloseable {
         }
     }
 
-    synchronized void startNodeHandlers(final Collection<NodeKey> addedNodes) {
+    void startNodeHandlers(final Collection<NodeKey> addedNodes) {
         for (NodeKey key : addedNodes) {
             if (handlers.containsKey(key.getId())) {
                 spLogger.warn("Attempted to start already-existing handler for {}, very strange", key.getId());
                 continue;
             }
 
-            final NodeStatisticsHandler h = new NodeStatisticsHandler(this, key);
-            handlers.put(key.getId(), h);
-            spLogger.debug("Started node handler for {}", key.getId());
+            final NodeStatisticsHandler h = new NodeStatisticsHandler(dps, key);
+            final NodeStatisticsHandler old = handlers.putIfAbsent(key.getId(), h);
+            if (old == null) {
+                spLogger.debug("Started node handler for {}", key.getId());
 
-            // FIXME: this should be in the NodeStatisticsHandler itself
-            sendStatisticsRequestsToNode(h);
+                // FIXME: this should be in the NodeStatisticsHandler itself
+                sendStatisticsRequestsToNode(h);
+            } else {
+                spLogger.debug("Prevented race on handler for {}", key.getId());
+            }
         }
     }
 
-    synchronized void stopNodeHandlers(final Collection<NodeKey> removedNodes) {
+    void stopNodeHandlers(final Collection<NodeKey> removedNodes) {
         for (NodeKey key : removedNodes) {
             final NodeStatisticsHandler s = handlers.remove(key.getId());
             if (s != null) {