Fix statistics race condition on big flows
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsGatheringUtils.java
index 3893b9f391ef6ccef71bff37be4aa56fbd742a1a..966608079424ad29c84e875d2eda890e1a989e8c 100644 (file)
@@ -16,11 +16,6 @@ import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
-import java.text.SimpleDateFormat;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import javax.annotation.Nullable;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
@@ -111,6 +106,13 @@ import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+import java.text.SimpleDateFormat;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
 /**
  * Utils for gathering statistics.
  */
@@ -314,12 +316,12 @@ public final class StatisticsGatheringUtils {
                                                                    final boolean initial,
                                                                    final EventIdentifier eventIdentifier) {
         final ListenableFuture<Void> deleteFuture
-                = initial ? Futures.immediateFuture(null) : deleteAllKnownFlows(deviceInfo,
-                flowRegistry, txFacade);
+                = initial ? Futures.immediateFuture(null) : deleteAllKnownFlows(deviceInfo, flowRegistry, txFacade);
         return Futures.transform(deleteFuture, (Function<Void, Boolean>) input -> {
             writeFlowStatistics(data, deviceInfo, flowRegistry, txFacade);
             txFacade.submitTransaction();
             EventsTimeCounter.markEnd(eventIdentifier);
+            flowRegistry.processMarks();
             return Boolean.TRUE;
         });
     }
@@ -333,13 +335,13 @@ public final class StatisticsGatheringUtils {
             for (final FlowsStatisticsUpdate flowsStatistics : data) {
                 for (final FlowAndStatisticsMapList flowStat : flowsStatistics.getFlowAndStatisticsMapList()) {
                     final FlowBuilder flowBuilder = new FlowBuilder(flowStat);
-                    flowBuilder.addAugmentation(
-                            FlowStatisticsData.class,
+                    flowBuilder.addAugmentation(FlowStatisticsData.class,
                             refineFlowStatisticsAugmentation(flowStat).build());
 
                     final short tableId = flowStat.getTableId();
-                    final FlowRegistryKey flowRegistryKey = FlowRegistryKeyFactory.create(flowBuilder.build());
-                    final FlowId flowId = registry.storeIfNecessary(flowRegistryKey);
+                    final FlowRegistryKey flowRegistryKey = FlowRegistryKeyFactory.create(deviceInfo.getVersion(), flowBuilder.build());
+                    registry.store(flowRegistryKey);
+                    final FlowId flowId = registry.retrieveDescriptor(flowRegistryKey).getFlowId();
 
                     final FlowKey flowKey = new FlowKey(flowId);
                     flowBuilder.setKey(flowKey);
@@ -350,7 +352,7 @@ public final class StatisticsGatheringUtils {
                 }
             }
         } catch (TransactionChainClosedException e) {
-            LOG.warn("Not able to write to transaction: {}", e.getMessage());
+            LOG.warn("Not able to write to transaction chain: {}", e.getMessage());
         }
     }
 
@@ -387,17 +389,45 @@ public final class StatisticsGatheringUtils {
          */
         return Futures.transform(
                 flowCapableNodeFuture, (AsyncFunction<Optional<FlowCapableNode>, Void>) flowCapNodeOpt -> {
-                if (flowCapNodeOpt.isPresent()) {
-                    for (final Table tableData : flowCapNodeOpt.get().getTable()) {
-                        final Table table = new TableBuilder(tableData).setFlow(Collections.<Flow>emptyList()).build();
-                        final InstanceIdentifier<Table> iiToTable
-                                = flowCapableNodePath.child(Table.class, tableData.getKey());
-                        txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToTable, table);
+                    if (flowCapNodeOpt.isPresent()) {
+                        for (final Table tableData : flowCapNodeOpt.get().getTable()) {
+                            final Table table = new TableBuilder(tableData).setFlow(Collections.<Flow>emptyList()).build();
+                            final InstanceIdentifier<Table> iiToTable
+                                    = flowCapableNodePath.child(Table.class, tableData.getKey());
+                            txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToTable, table);
+                        }
                     }
+                    readTx.close();
+                    return Futures.immediateFuture(null);
+                });
+    }
+
+    public static Optional<FlowCapableNode> deleteAllKnownFlows(final DeviceInfo deviceInfo,
+                                                             final TxFacade txFacade) {
+        final InstanceIdentifier<FlowCapableNode> flowCapableNodePath
+                = assembleFlowCapableNodeInstanceIdentifier(deviceInfo);
+        final ReadOnlyTransaction readTx = txFacade.getReadTransaction();
+        final CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> flowCapableNodeFuture = readTx.read(
+                LogicalDatastoreType.OPERATIONAL, flowCapableNodePath);
+
+        try {
+            Optional<FlowCapableNode> fcNodeOpt = flowCapableNodeFuture.get();
+            if ( fcNodeOpt != null && fcNodeOpt.isPresent()){
+                for (final Table tableData : flowCapableNodeFuture.get().get().getTable()) {
+                    final Table table = new TableBuilder(tableData).setFlow(Collections.<Flow>emptyList()).build();
+                    final InstanceIdentifier<Table> iiToTable = flowCapableNodePath.child(Table.class, tableData.getKey());
+                    txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToTable, table);
                 }
+                return fcNodeOpt;
+            }
+        } catch (InterruptedException|ExecutionException e) {
+            LOG.error("Failed to read current OpenFlow node {] operational data", deviceInfo.getNodeId());
+        } finally {
+            if (readTx != null){
                 readTx.close();
-                return Futures.immediateFuture(null);
-            });
+            }
+        }
+        return Optional.absent();
     }
 
     private static void processQueueStatistics(
@@ -494,11 +524,11 @@ public final class StatisticsGatheringUtils {
             final DeviceMeterRegistry meterRegistry,
             final InstanceIdentifier<FlowCapableNode> flowNodeIdent,
             final TxFacade txFacade) throws TransactionChainClosedException {
-        for (final MeterId meterId : meterRegistry.getAllMeterIds()) {
+        meterRegistry.forEach(meterId -> {
             final InstanceIdentifier<Meter> meterIdent = flowNodeIdent.child(Meter.class, new MeterKey(meterId));
             txFacade.addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, meterIdent);
-        }
-        meterRegistry.removeMarked();
+        });
+        meterRegistry.processMarks();
     }
 
     private static void processGroupDescStats(
@@ -530,11 +560,11 @@ public final class StatisticsGatheringUtils {
             final TxFacade txFacade,
             final InstanceIdentifier<FlowCapableNode> flowNodeIdent,
             final DeviceGroupRegistry groupRegistry) throws TransactionChainClosedException {
-        for (final GroupId groupId : groupRegistry.getAllGroupIds()) {
+        groupRegistry.forEach(groupId -> {
             final InstanceIdentifier<Group> groupIdent = flowNodeIdent.child(Group.class, new GroupKey(groupId));
             txFacade.addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, groupIdent);
-        }
-        groupRegistry.removeMarked();
+        });
+        groupRegistry.processMarks();
     }
 
     private static void processGroupStatistics(