re-activate FindBugs
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsGatheringUtils.java
old mode 100644 (file)
new mode 100755 (executable)
index 2704e06..99d37f4
@@ -10,32 +10,31 @@ package org.opendaylight.openflowplugin.impl.statistics;
 
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
-import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.text.SimpleDateFormat;
 import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
-import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceRegistry;
 import org.opendaylight.openflowplugin.api.openflow.device.TxFacade;
+import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.EventIdentifier;
 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.StatisticsGatherer;
-import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
-import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.EventsTimeCounter;
 import org.opendaylight.openflowplugin.impl.common.MultipartReplyTranslatorUtil;
+import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
@@ -54,12 +53,11 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev13
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Utils for gathering statistics
+ * Utils for gathering statistics.
  */
 public final class StatisticsGatheringUtils {
 
@@ -71,127 +69,80 @@ public final class StatisticsGatheringUtils {
         throw new IllegalStateException("This class should not be instantiated.");
     }
 
-    static <T extends OfHeader>ListenableFuture<Boolean> gatherStatistics(final StatisticsGatherer<T> statisticsGatheringService,
-                                                                          final DeviceInfo deviceInfo,
-                                                                          final MultipartType type,
-                                                                          final TxFacade txFacade,
-                                                                          final DeviceRegistry registry,
-                                                                          final Boolean initial,
-                                                                          final ConvertorExecutor convertorExecutor,
-                                                                          final MultipartWriterProvider statisticsWriterProvider) {
-
-        final EventIdentifier eventIdentifier;
-        if (MultipartType.OFPMPFLOW.equals(type)) {
-            eventIdentifier = new EventIdentifier(type.toString(), deviceInfo.getNodeId().getValue());
-            EventsTimeCounter.markStart(eventIdentifier);
-        } else {
-            eventIdentifier = null;
-        }
-
-        return Futures.transform(
-            statisticsGatheringService.getStatisticsOfType(
-                new EventIdentifier(QUEUE2_REQCTX + type.toString(), deviceInfo.getNodeId().toString()),
-                type),
-            new AsyncFunction<RpcResult<List<T>>, Boolean>() {
-                @Nullable
-                @Override
-                public ListenableFuture<Boolean> apply(@Nonnull final RpcResult<List<T>> rpcResult) {
-                    boolean isMultipartProcessed = Boolean.TRUE;
-
-                    if (rpcResult.isSuccessful()) {
-                        LOG.debug("Stats reply successfully received for node {} of type {}", deviceInfo.getNodeId(), type);
-
-                        // TODO: in case the result value is null then multipart data probably got processed on the fly -
-                        // TODO: this contract should by clearly stated and enforced - now simple true value is returned
-                        if (Objects.nonNull(rpcResult.getResult()) && !rpcResult.getResult().isEmpty()) {
-                            final List<DataContainer> allMultipartData;
-
-                            try {
-                                allMultipartData = rpcResult
-                                    .getResult()
-                                    .stream()
-                                    .map(reply ->  MultipartReplyTranslatorUtil
-                                        .translate(reply, deviceInfo, convertorExecutor, null))
-                                    .filter(java.util.Optional::isPresent)
-                                    .map(java.util.Optional::get)
-                                    .collect(Collectors.toList());
-                            } catch (final Exception e) {
-                                LOG.warn("Stats processing of type {} for node {} failed during transformation step",
-                                    type, deviceInfo.getLOGValue(), e);
-                                return Futures.immediateFailedFuture(e);
-                            }
-
-                            try {
-                                return processStatistics(type, allMultipartData, txFacade, registry, deviceInfo,
-                                    statisticsWriterProvider,
-                                    eventIdentifier, initial);
-                            } catch (final Exception e) {
-                                LOG.warn("Stats processing of type {} for node {} failed during processing step",
-                                    type, deviceInfo.getNodeId(), e);
-                                return Futures.immediateFailedFuture(e);
-                            }
-                        } else {
-                            LOG.debug("Stats reply was empty for node {} of type {}", deviceInfo.getNodeId(), type);
-                        }
+    static <T extends OfHeader> ListenableFuture<Boolean> gatherStatistics(
+            final StatisticsGatherer<T> statisticsGatheringService, final DeviceInfo deviceInfo,
+            final MultipartType type, final TxFacade txFacade, final DeviceRegistry registry,
+            final ConvertorExecutor convertorExecutor, final MultipartWriterProvider statisticsWriterProvider,
+            final ListeningExecutorService executorService) {
+        return Futures.transformAsync(statisticsGatheringService.getStatisticsOfType(
+           new EventIdentifier(QUEUE2_REQCTX + type.toString(), deviceInfo.getNodeId().toString()), type),
+            rpcResult -> executorService.submit(() -> {
+                final boolean rpcResultIsNull = rpcResult == null;
+
+                if (!rpcResultIsNull && rpcResult.isSuccessful()) {
+                    LOG.debug("Stats reply successfully received for node {} of type {}", deviceInfo.getNodeId(), type);
+                        // TODO: in case the result value is null then multipart data probably got processed
+                        // TODO: on the fly. This contract should by clearly stated and enforced.
+                        // TODO: Now simple true value is returned
+                    if (Objects.nonNull(rpcResult.getResult()) && !rpcResult.getResult().isEmpty()) {
+                        final List<DataContainer> allMultipartData = rpcResult.getResult().stream()
+                                .map(reply -> MultipartReplyTranslatorUtil
+                                                    .translate(reply, deviceInfo, convertorExecutor, null))
+                                .filter(java.util.Optional::isPresent).map(java.util.Optional::get)
+                                            .collect(Collectors.toList());
+
+                        return processStatistics(type, allMultipartData, txFacade, registry, deviceInfo,
+                                        statisticsWriterProvider);
                     } else {
-                        LOG.warn("Stats reply FAILED for node {} of type {}: {}", deviceInfo.getNodeId(), type,
-                            rpcResult.getErrors());
-                        isMultipartProcessed = Boolean.FALSE;
+                        LOG.debug("Stats reply was empty for node {} of type {}", deviceInfo.getNodeId(), type);
                     }
-
-                    return Futures.immediateFuture(isMultipartProcessed);
+                } else {
+                    LOG.warn("Stats reply FAILED for node {} of type {}: {}", deviceInfo.getNodeId(), type,
+                                rpcResultIsNull ? "" : rpcResult.getErrors());
                 }
-            });
+                return false;
+            }), MoreExecutors.directExecutor());
     }
 
-    private static ListenableFuture<Boolean> processStatistics(final MultipartType type,
-                                                               final List<? extends DataContainer> statistics,
-                                                               final TxFacade txFacade,
-                                                               final DeviceRegistry deviceRegistry,
-                                                               final DeviceInfo deviceInfo,
-                                                               final MultipartWriterProvider statisticsWriterProvider,
-                                                               final EventIdentifier eventIdentifier,
-                                                               final boolean initial) {
-
-        ListenableFuture<Void> future = Futures.immediateFuture(null);
-
-        final InstanceIdentifier<FlowCapableNode> instanceIdentifier = deviceInfo
-            .getNodeInstanceIdentifier()
-            .augmentation(FlowCapableNode.class);
+    private static boolean processStatistics(final MultipartType type, final List<? extends DataContainer> statistics,
+                                             final TxFacade txFacade, final DeviceRegistry deviceRegistry,
+                                             final DeviceInfo deviceInfo,
+                                             final MultipartWriterProvider statisticsWriterProvider) {
+        final InstanceIdentifier<FlowCapableNode> instanceIdentifier = deviceInfo.getNodeInstanceIdentifier()
+                .augmentation(FlowCapableNode.class);
 
         switch (type) {
             case OFPMPFLOW:
-                future = deleteAllKnownFlows(txFacade, instanceIdentifier, initial);
+                deleteAllKnownFlows(txFacade, instanceIdentifier, deviceRegistry.getDeviceFlowRegistry());
+                deviceRegistry.getDeviceFlowRegistry().processMarks();
                 break;
             case OFPMPMETERCONFIG:
                 deleteAllKnownMeters(txFacade, instanceIdentifier, deviceRegistry.getDeviceMeterRegistry());
+                deviceRegistry.getDeviceMeterRegistry().processMarks();
                 break;
             case OFPMPGROUPDESC:
                 deleteAllKnownGroups(txFacade, instanceIdentifier, deviceRegistry.getDeviceGroupRegistry());
+                deviceRegistry.getDeviceGroupRegistry().processMarks();
                 break;
+            default:
+                // no operation
         }
 
-        return Futures.transform(future, (Function<Void, Boolean>) input -> {
-            if (writeStatistics(type, statistics, deviceInfo, statisticsWriterProvider)) {
-                txFacade.submitTransaction();
-
-                if (MultipartType.OFPMPFLOW.equals(type)) {
-                    EventsTimeCounter.markEnd(eventIdentifier);
-                }
+        if (writeStatistics(type, statistics, deviceInfo, statisticsWriterProvider)) {
+            txFacade.submitTransaction();
 
-                LOG.debug("Stats reply added to transaction for node {} of type {}", deviceInfo.getNodeId(), type);
-                return Boolean.TRUE;
-            }
+            LOG.debug("Stats reply added to transaction for node {} of type {}", deviceInfo.getNodeId(), type);
+            return true;
+        }
 
-            LOG.warn("Stats processing of type {} for node {} failed during write-to-tx step", type, deviceInfo.getLOGValue());
-            return Boolean.FALSE;
-        });
+        LOG.warn("Stats processing of type {} for node {} " + "failed during write-to-tx step", type, deviceInfo);
+        return false;
     }
 
-    private static boolean writeStatistics(final MultipartType type,
-                                          final List<? extends DataContainer> statistics,
-                                          final DeviceInfo deviceInfo,
-                                          final MultipartWriterProvider statisticsWriterProvider) {
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private static boolean writeStatistics(final MultipartType type, final List<? extends DataContainer> statistics,
+                                           final DeviceInfo deviceInfo,
+                                           final MultipartWriterProvider statisticsWriterProvider) {
         final AtomicBoolean result = new AtomicBoolean(false);
 
         try {
@@ -203,113 +154,123 @@ public final class StatisticsGatheringUtils {
                 }
             }));
         } catch (final Exception ex) {
-            LOG.warn("Stats processing of type {} for node {} failed during write-to-tx step", type, deviceInfo.getLOGValue(), ex);
+            LOG.warn("Stats processing of type {} for node {} " + "failed during write-to-tx step", type, deviceInfo,
+                     ex);
         }
 
         return result.get();
     }
 
-    public static ListenableFuture<Void> deleteAllKnownFlows(final TxFacade txFacade,
-                                                             final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
-                                                             final boolean initial) {
-        if (initial) {
-            return Futures.immediateFuture(null);
+    public static void deleteAllKnownFlows(final TxFacade txFacade,
+                                           final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
+                                           final DeviceFlowRegistry deviceFlowRegistry) {
+        if (!txFacade.isTransactionsEnabled()) {
+            return;
         }
 
-        final ReadOnlyTransaction readTx = txFacade.getReadTransaction();
-        return Futures.transform(Futures
-            .withFallback(readTx.read(LogicalDatastoreType.OPERATIONAL, instanceIdentifier), t -> {
-                // we wish to close readTx for fallBack
-                readTx.close();
-                return Futures.immediateFailedFuture(t);
-            }), (Function<Optional<FlowCapableNode>, Void>)
-            flowCapNodeOpt -> {
-                // we have to read actual tables with all information before we set empty Flow list, merge is expensive and
-                // not applicable for lists
-                if (flowCapNodeOpt != null && flowCapNodeOpt.isPresent()) {
-                    for (final Table tableData : flowCapNodeOpt.get().getTable()) {
-                        final Table table = new TableBuilder(tableData).setFlow(Collections.emptyList()).build();
-                        final InstanceIdentifier<Table> iiToTable = instanceIdentifier.child(Table.class, tableData.getKey());
-                        txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToTable, table);
-                    }
-                }
+        final ListenableFuture<Optional<FlowCapableNode>> future;
+        try (ReadOnlyTransaction readTx = txFacade.getReadTransaction()) {
+            future = readTx.read(LogicalDatastoreType.OPERATIONAL, instanceIdentifier);
+        }
 
-                readTx.close();
-                return null;
-            });
+        try {
+            Futures.transform(Futures.catchingAsync(future, Throwable.class, throwable -> {
+                return Futures.immediateFailedFuture(throwable);
+            }, MoreExecutors.directExecutor()), (Function<Optional<FlowCapableNode>, Void>) flowCapNodeOpt -> {
+                    // we have to read actual tables with all information before we set empty Flow list,
+                    // merge is expensive and not applicable for lists
+                    if (flowCapNodeOpt != null && flowCapNodeOpt.isPresent()) {
+                        for (final Table tableData : flowCapNodeOpt.get().getTable()) {
+                            final Table table = new TableBuilder(tableData).setFlow(Collections.emptyList()).build();
+                            final InstanceIdentifier<Table> iiToTable = instanceIdentifier
+                                    .child(Table.class, tableData.key());
+                            txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToTable, table);
+                        }
+                    }
+                    return null;
+                }, MoreExecutors.directExecutor()).get();
+        } catch (InterruptedException | ExecutionException ex) {
+            LOG.debug("Failed to delete {} flows", deviceFlowRegistry.size(), ex);
+        }
     }
 
-    private static void deleteAllKnownMeters(final TxFacade txFacade,
-                                             final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
-                                             final DeviceMeterRegistry meterRegistry) {
-        meterRegistry.getAllMeterIds().forEach(meterId -> txFacade
-            .addDeleteToTxChain(
-                LogicalDatastoreType.OPERATIONAL,
-                instanceIdentifier.child(Meter.class, new MeterKey(meterId))));
-
-        meterRegistry.removeMarked();
+    public static void deleteAllKnownMeters(final TxFacade txFacade,
+                                            final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
+                                            final DeviceMeterRegistry meterRegistry) {
+        meterRegistry.forEach(meterId -> {
+            txFacade
+                    .addDeleteToTxChain(
+                            LogicalDatastoreType.OPERATIONAL,
+                            instanceIdentifier.child(Meter.class, new MeterKey(meterId)));
+            meterRegistry.addMark(meterId);
+        });
     }
 
-    private static void deleteAllKnownGroups(final TxFacade txFacade,
-                                             final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
-                                             final DeviceGroupRegistry groupRegistry) {
-        groupRegistry.getAllGroupIds().forEach(groupId -> txFacade
-            .addDeleteToTxChain(
-                LogicalDatastoreType.OPERATIONAL,
-                instanceIdentifier.child(Group.class, new GroupKey(groupId))));
-
-        groupRegistry.removeMarked();
+    public static void deleteAllKnownGroups(final TxFacade txFacade,
+                                            final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
+                                            final DeviceGroupRegistry groupRegistry) {
+        LOG.debug("deleteAllKnownGroups on device targetType {}", instanceIdentifier.getTargetType());
+        groupRegistry.forEach(groupId -> {
+            txFacade
+                    .addDeleteToTxChain(
+                            LogicalDatastoreType.OPERATIONAL,
+                            instanceIdentifier.child(Group.class, new GroupKey(groupId)));
+            groupRegistry.addMark(groupId);
+        });
     }
 
     /**
-     * Writes snapshot gathering start timestamp + cleans end mark
+     * Writes snapshot gathering start timestamp + cleans end mark.
      *
-     * @param deviceContext txManager + node path keeper
+     * @param deviceInfo device info
+     * @param txFacade tx manager
      */
-    static void markDeviceStateSnapshotStart(final DeviceContext deviceContext) {
-        final InstanceIdentifier<FlowCapableStatisticsGatheringStatus> statusPath = deviceContext.getDeviceInfo()
-            .getNodeInstanceIdentifier().augmentation(FlowCapableStatisticsGatheringStatus.class);
+    static void markDeviceStateSnapshotStart(final DeviceInfo deviceInfo, final TxFacade txFacade) {
+        final InstanceIdentifier<FlowCapableStatisticsGatheringStatus> statusPath = deviceInfo
+                .getNodeInstanceIdentifier().augmentation(FlowCapableStatisticsGatheringStatus.class);
 
         final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_AND_TIME_FORMAT);
         final FlowCapableStatisticsGatheringStatus gatheringStatus = new FlowCapableStatisticsGatheringStatusBuilder()
-            .setSnapshotGatheringStatusStart(new SnapshotGatheringStatusStartBuilder()
-                .setBegin(new DateAndTime(simpleDateFormat.format(new Date())))
-                .build())
-            .setSnapshotGatheringStatusEnd(null) // TODO: reconsider if really need to clean end mark here
-            .build();
+                .setSnapshotGatheringStatusStart(new SnapshotGatheringStatusStartBuilder()
+                        .setBegin(new DateAndTime(simpleDateFormat.format(new Date())))
+                        .build())
+                .setSnapshotGatheringStatusEnd(null) // TODO: reconsider if really need to clean end mark here
+                .build();
         try {
-            deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, statusPath, gatheringStatus);
+            txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, statusPath, gatheringStatus);
         } catch (final TransactionChainClosedException e) {
             LOG.warn("Can't write to transaction, transaction chain probably closed.");
             LOG.trace("Write to transaction exception: ", e);
         }
 
-        deviceContext.submitTransaction();
+        txFacade.submitTransaction();
     }
 
     /**
-     * Writes snapshot gathering end timestamp + outcome
+     * Writes snapshot gathering end timestamp + outcome.
      *
-     * @param deviceContext txManager + node path keeper
+     * @param deviceInfo device info
+     * @param txFacade tx manager
      * @param succeeded     outcome of currently finished gathering
      */
-    static void markDeviceStateSnapshotEnd(final DeviceContext deviceContext, final boolean succeeded) {
-        final InstanceIdentifier<SnapshotGatheringStatusEnd> statusEndPath = deviceContext.getDeviceInfo()
-            .getNodeInstanceIdentifier().augmentation(FlowCapableStatisticsGatheringStatus.class)
-            .child(SnapshotGatheringStatusEnd.class);
+    static void markDeviceStateSnapshotEnd(final DeviceInfo deviceInfo,
+                                           final TxFacade txFacade, final boolean succeeded) {
+        final InstanceIdentifier<SnapshotGatheringStatusEnd> statusEndPath = deviceInfo
+                .getNodeInstanceIdentifier().augmentation(FlowCapableStatisticsGatheringStatus.class)
+                .child(SnapshotGatheringStatusEnd.class);
 
         final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_AND_TIME_FORMAT);
         final SnapshotGatheringStatusEnd gatheringStatus = new SnapshotGatheringStatusEndBuilder()
-            .setEnd(new DateAndTime(simpleDateFormat.format(new Date())))
-            .setSucceeded(succeeded)
-            .build();
+                .setEnd(new DateAndTime(simpleDateFormat.format(new Date())))
+                .setSucceeded(succeeded)
+                .build();
         try {
-            deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, statusEndPath, gatheringStatus);
+            txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, statusEndPath, gatheringStatus);
         } catch (TransactionChainClosedException e) {
             LOG.warn("Can't write to transaction, transaction chain probably closed.");
             LOG.trace("Write to transaction exception: ", e);
         }
 
-        deviceContext.submitTransaction();
+        txFacade.submitTransaction();
     }
 }