Revert "Fix statistics race condition on big flows"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsGatheringUtils.java
index 8a47762e07ca22566a212a5bc0db3ed1237a4dd0..bb789b62712adcb86e0667410cb2bfd40a68dcac 100644 (file)
@@ -16,11 +16,6 @@ import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
-import java.text.SimpleDateFormat;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import javax.annotation.Nullable;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
@@ -111,6 +106,13 @@ import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+import java.text.SimpleDateFormat;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
 /**
  * Utils for gathering statistics.
  */
@@ -170,7 +172,10 @@ public final class StatisticsGatheringUtils {
             public ListenableFuture<Boolean> apply(final RpcResult<List<MultipartReply>> rpcResult) {
                 boolean isMultipartProcessed = Boolean.TRUE;
                 if (rpcResult.isSuccessful()) {
-                    LOG.debug("Stats reply successfully received for node {} of type {}", deviceInfo.getNodeId(), type);
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Stats reply successfully received for node {} of type {}",
+                                deviceInfo.getNodeId(), type);
+                    }
 
                     // TODO: in case the result value is null then multipart data probably got processed on the fly -
                     // TODO: this contract should by clearly stated and enforced - now simple true value is returned
@@ -187,8 +192,8 @@ public final class StatisticsGatheringUtils {
                                 multipartData = multipartDataList.get(0);
                                 allMultipartData = Iterables.concat(allMultipartData, multipartDataList);
                             }
-                        } catch (final Exception e) {
-                            LOG.warn("stats processing of type {} for node {} failed during transfomation step",
+                        } catch (final TransactionChainClosedException e) {
+                            LOG.warn("stats processing of type {} for node {} failed during transformation step",
                                     type, deviceInfo.getNodeId(), e);
                             return Futures.immediateFailedFuture(e);
                         }
@@ -247,24 +252,30 @@ public final class StatisticsGatheringUtils {
                             } else {
                                 isMultipartProcessed = Boolean.FALSE;
                             }
-                        } catch (final Exception e) {
+                        } catch (final TransactionChainClosedException e) {
                             LOG.warn("stats processing of type {} for node {} failed during write-to-tx step",
                                     type, deviceInfo.getNodeId(), e);
                             return Futures.immediateFailedFuture(e);
                         }
 
-                        LOG.debug("Stats reply added to transaction for node {} of type {}",
-                                deviceInfo.getNodeId(), type);
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Stats reply added to transaction for node {} of type {}",
+                                    deviceInfo.getNodeId(), type);
+                        }
 
                         //TODO : implement experimenter
                     } else {
-                        LOG.debug("Stats reply was empty for node {} of type {}",
-                                deviceInfo.getNodeId(), type);
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Stats reply was empty for node {} of type {}",
+                                    deviceInfo.getNodeId(), type);
+                        }
                     }
 
                 } else {
-                    LOG.debug("Stats reply FAILED for node {} of type {}: {}",
-                            deviceInfo.getNodeId(), type, rpcResult.getErrors());
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Stats reply FAILED for node {} of type {}: {}",
+                                deviceInfo.getNodeId(), type, rpcResult.getErrors());
+                    }
                     isMultipartProcessed = Boolean.FALSE;
                 }
                 return Futures.immediateFuture(isMultipartProcessed);
@@ -275,7 +286,8 @@ public final class StatisticsGatheringUtils {
     private static void processMeterConfigStatsUpdated(final Iterable<MeterConfigStatsUpdated> data,
                                                        final DeviceInfo deviceInfo,
                                                        final TxFacade txFacade,
-                                                       final DeviceMeterRegistry meterRegistry) throws Exception {
+                                                       final DeviceMeterRegistry meterRegistry)
+            throws TransactionChainClosedException {
         final InstanceIdentifier<FlowCapableNode> fNodeIdent = assembleFlowCapableNodeInstanceIdentifier(deviceInfo);
         deleteAllKnownMeters(meterRegistry, fNodeIdent, txFacade);
         for (final MeterConfigStatsUpdated meterConfigStatsUpdated : data) {
@@ -304,17 +316,12 @@ public final class StatisticsGatheringUtils {
                                                                    final boolean initial,
                                                                    final EventIdentifier eventIdentifier) {
         final ListenableFuture<Void> deleteFuture
-                = initial ? Futures.immediateFuture(null) : deleteAllKnownFlows(deviceInfo,
-                flowRegistry, txFacade);
-        return Futures.transform(deleteFuture, new Function<Void, Boolean>() {
-
-            @Override
-            public Boolean apply(final Void input) {
-                writeFlowStatistics(data, deviceInfo, flowRegistry, txFacade);
-                txFacade.submitTransaction();
-                EventsTimeCounter.markEnd(eventIdentifier);
-                return Boolean.TRUE;
-            }
+                = initial ? Futures.immediateFuture(null) : deleteAllKnownFlows(deviceInfo, flowRegistry, txFacade);
+        return Futures.transform(deleteFuture, (Function<Void, Boolean>) input -> {
+            writeFlowStatistics(data, deviceInfo, flowRegistry, txFacade);
+            txFacade.submitTransaction();
+            EventsTimeCounter.markEnd(eventIdentifier);
+            return Boolean.TRUE;
         });
     }
 
@@ -327,12 +334,11 @@ public final class StatisticsGatheringUtils {
             for (final FlowsStatisticsUpdate flowsStatistics : data) {
                 for (final FlowAndStatisticsMapList flowStat : flowsStatistics.getFlowAndStatisticsMapList()) {
                     final FlowBuilder flowBuilder = new FlowBuilder(flowStat);
-                    flowBuilder.addAugmentation(
-                            FlowStatisticsData.class,
+                    flowBuilder.addAugmentation(FlowStatisticsData.class,
                             refineFlowStatisticsAugmentation(flowStat).build());
 
                     final short tableId = flowStat.getTableId();
-                    final FlowRegistryKey flowRegistryKey = FlowRegistryKeyFactory.create(flowBuilder.build());
+                    final FlowRegistryKey flowRegistryKey = FlowRegistryKeyFactory.create(deviceInfo.getVersion(), flowBuilder.build());
                     final FlowId flowId = registry.storeIfNecessary(flowRegistryKey);
 
                     final FlowKey flowKey = new FlowKey(flowId);
@@ -343,8 +349,8 @@ public final class StatisticsGatheringUtils {
                     txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, flowIdent, flowBuilder.build());
                 }
             }
-        } catch (Exception e) {
-            LOG.warn("Not able to write to transaction: {}", e.getMessage());
+        } catch (TransactionChainClosedException e) {
+            LOG.warn("Not able to write to transaction chain: {}", e.getMessage());
         }
     }
 
@@ -381,23 +387,51 @@ public final class StatisticsGatheringUtils {
          */
         return Futures.transform(
                 flowCapableNodeFuture, (AsyncFunction<Optional<FlowCapableNode>, Void>) flowCapNodeOpt -> {
-                if (flowCapNodeOpt.isPresent()) {
-                    for (final Table tableData : flowCapNodeOpt.get().getTable()) {
-                        final Table table = new TableBuilder(tableData).setFlow(Collections.<Flow>emptyList()).build();
-                        final InstanceIdentifier<Table> iiToTable
-                                = flowCapableNodePath.child(Table.class, tableData.getKey());
-                        txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToTable, table);
+                    if (flowCapNodeOpt.isPresent()) {
+                        for (final Table tableData : flowCapNodeOpt.get().getTable()) {
+                            final Table table = new TableBuilder(tableData).setFlow(Collections.<Flow>emptyList()).build();
+                            final InstanceIdentifier<Table> iiToTable
+                                    = flowCapableNodePath.child(Table.class, tableData.getKey());
+                            txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToTable, table);
+                        }
                     }
+                    readTx.close();
+                    return Futures.immediateFuture(null);
+                });
+    }
+
+    public static Optional<FlowCapableNode> deleteAllKnownFlows(final DeviceInfo deviceInfo,
+                                                             final TxFacade txFacade) {
+        final InstanceIdentifier<FlowCapableNode> flowCapableNodePath
+                = assembleFlowCapableNodeInstanceIdentifier(deviceInfo);
+        final ReadOnlyTransaction readTx = txFacade.getReadTransaction();
+        final CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> flowCapableNodeFuture = readTx.read(
+                LogicalDatastoreType.OPERATIONAL, flowCapableNodePath);
+
+        try {
+            Optional<FlowCapableNode> fcNodeOpt = flowCapableNodeFuture.get();
+            if ( fcNodeOpt != null && fcNodeOpt.isPresent()){
+                for (final Table tableData : flowCapableNodeFuture.get().get().getTable()) {
+                    final Table table = new TableBuilder(tableData).setFlow(Collections.<Flow>emptyList()).build();
+                    final InstanceIdentifier<Table> iiToTable = flowCapableNodePath.child(Table.class, tableData.getKey());
+                    txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToTable, table);
                 }
+                return fcNodeOpt;
+            }
+        } catch (InterruptedException|ExecutionException e) {
+            LOG.error("Failed to read current OpenFlow node {] operational data", deviceInfo.getNodeId());
+        } finally {
+            if (readTx != null){
                 readTx.close();
-                return Futures.immediateFuture(null);
-            });
+            }
+        }
+        return Optional.absent();
     }
 
     private static void processQueueStatistics(
             final Iterable<QueueStatisticsUpdate> data,
             final DeviceInfo deviceInfo,
-            final TxFacade txFacade) throws Exception {
+            final TxFacade txFacade) throws TransactionChainClosedException {
         // TODO: clean all queues of all node-connectors before writing up-to-date stats
         final InstanceIdentifier<Node> nodeIdent = deviceInfo.getNodeInstanceIdentifier();
         for (final QueueStatisticsUpdate queueStatisticsUpdate : data) {
@@ -429,7 +463,7 @@ public final class StatisticsGatheringUtils {
     private static void processFlowTableStatistics(
             final Iterable<FlowTableStatisticsUpdate> data,
             final DeviceInfo deviceInfo,
-            final TxFacade txFacade) throws Exception {
+            final TxFacade txFacade) throws TransactionChainClosedException {
         final InstanceIdentifier<FlowCapableNode> fNodeIdent = assembleFlowCapableNodeInstanceIdentifier(deviceInfo);
         for (final FlowTableStatisticsUpdate flowTableStatisticsUpdate : data) {
 
@@ -447,7 +481,7 @@ public final class StatisticsGatheringUtils {
     private static void processNodeConnectorStatistics(
             final Iterable<NodeConnectorStatisticsUpdate> data,
             final DeviceInfo deviceInfo,
-            final TxFacade txFacade) throws Exception {
+            final TxFacade txFacade) throws TransactionChainClosedException {
         final InstanceIdentifier<Node> nodeIdent = deviceInfo.getNodeInstanceIdentifier();
         for (final NodeConnectorStatisticsUpdate nodeConnectorStatisticsUpdate : data) {
             for (final NodeConnectorStatisticsAndPortNumberMap nodeConnectPort :
@@ -468,7 +502,7 @@ public final class StatisticsGatheringUtils {
 
     private static void processMetersStatistics(final Iterable<MeterStatisticsUpdated> data,
                                                 final DeviceInfo deviceInfo,
-                                                final TxFacade txFacade) throws Exception {
+                                                final TxFacade txFacade) throws TransactionChainClosedException {
         final InstanceIdentifier<FlowCapableNode> fNodeIdent = assembleFlowCapableNodeInstanceIdentifier(deviceInfo);
         for (final MeterStatisticsUpdated meterStatisticsUpdated : data) {
             for (final MeterStats meterStats : meterStatisticsUpdated.getMeterStats()) {
@@ -487,7 +521,7 @@ public final class StatisticsGatheringUtils {
     private static void deleteAllKnownMeters(
             final DeviceMeterRegistry meterRegistry,
             final InstanceIdentifier<FlowCapableNode> flowNodeIdent,
-            final TxFacade txFacade) throws Exception {
+            final TxFacade txFacade) throws TransactionChainClosedException {
         for (final MeterId meterId : meterRegistry.getAllMeterIds()) {
             final InstanceIdentifier<Meter> meterIdent = flowNodeIdent.child(Meter.class, new MeterKey(meterId));
             txFacade.addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, meterIdent);
@@ -499,7 +533,7 @@ public final class StatisticsGatheringUtils {
             final Iterable<GroupDescStatsUpdated> data,
             final DeviceInfo deviceInfo,
             final TxFacade txFacade,
-            final DeviceGroupRegistry groupRegistry) throws Exception {
+            final DeviceGroupRegistry groupRegistry) throws TransactionChainClosedException {
         final InstanceIdentifier<FlowCapableNode> fNodeIdent = assembleFlowCapableNodeInstanceIdentifier(deviceInfo);
         deleteAllKnownGroups(txFacade, fNodeIdent, groupRegistry);
 
@@ -523,7 +557,7 @@ public final class StatisticsGatheringUtils {
     private static void deleteAllKnownGroups(
             final TxFacade txFacade,
             final InstanceIdentifier<FlowCapableNode> flowNodeIdent,
-            final DeviceGroupRegistry groupRegistry) throws Exception {
+            final DeviceGroupRegistry groupRegistry) throws TransactionChainClosedException {
         for (final GroupId groupId : groupRegistry.getAllGroupIds()) {
             final InstanceIdentifier<Group> groupIdent = flowNodeIdent.child(Group.class, new GroupKey(groupId));
             txFacade.addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, groupIdent);
@@ -534,7 +568,7 @@ public final class StatisticsGatheringUtils {
     private static void processGroupStatistics(
             final Iterable<GroupStatisticsUpdated> data,
             final DeviceInfo deviceInfo,
-            final TxFacade txFacade) throws Exception {
+            final TxFacade txFacade) throws TransactionChainClosedException {
         final InstanceIdentifier<FlowCapableNode> fNodeIdent = assembleFlowCapableNodeInstanceIdentifier(deviceInfo);
         for (final GroupStatisticsUpdated groupStatistics : data) {
             for (final GroupStats groupStats : groupStatistics.getGroupStats()) {