Modernize codebase
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsGatheringUtils.java
old mode 100644 (file)
new mode 100755 (executable)
index 2c13b48..9f6559b
@@ -5,27 +5,23 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.openflowplugin.impl.statistics;
 
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.text.SimpleDateFormat;
-import java.util.Collections;
 import java.util.Date;
 import java.util.List;
-import java.util.Objects;
+import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
+import org.opendaylight.mdsal.binding.api.ReadTransaction;
+import org.opendaylight.mdsal.binding.api.TransactionChainClosedException;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceRegistry;
 import org.opendaylight.openflowplugin.api.openflow.device.TxFacade;
@@ -54,7 +50,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev13
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,140 +57,94 @@ import org.slf4j.LoggerFactory;
  * Utils for gathering statistics.
  */
 public final class StatisticsGatheringUtils {
-
     private static final String DATE_AND_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
     private static final Logger LOG = LoggerFactory.getLogger(StatisticsGatheringUtils.class);
     private static final String QUEUE2_REQCTX = "QUEUE2REQCTX-";
 
     private StatisticsGatheringUtils() {
-        throw new IllegalStateException("This class should not be instantiated.");
+        // Hidden on purpose
     }
 
     static <T extends OfHeader> ListenableFuture<Boolean> gatherStatistics(
-                                                            final StatisticsGatherer<T> statisticsGatheringService,
-                                                            final DeviceInfo deviceInfo,
-                                                            final MultipartType type,
-                                                            final TxFacade txFacade,
-                                                            final DeviceRegistry registry,
-                                                            final ConvertorExecutor convertorExecutor,
-                                                            final MultipartWriterProvider statisticsWriterProvider) {
-        return Futures.transformAsync(
-                statisticsGatheringService.getStatisticsOfType(
-                        new EventIdentifier(QUEUE2_REQCTX + type.toString(), deviceInfo.getNodeId().toString()),
-                        type),
-                new AsyncFunction<RpcResult<List<T>>, Boolean>() {
-                    @Nullable
-                    @Override
-                    @SuppressWarnings("checkstyle:IllegalCatch")
-                    public ListenableFuture<Boolean> apply(@Nonnull final RpcResult<List<T>> rpcResult) {
-                        boolean isMultipartProcessed = Boolean.TRUE;
-
-                        if (rpcResult.isSuccessful()) {
-                            LOG.debug("Stats reply successfully received for node {} of type {}",
-                                    deviceInfo.getNodeId(), type);
-
-                            // TODO: in case the result value is null then multipart data probably got processed
-                            // TODO: on the fly. This contract should by clearly stated and enforced.
-                            // TODO: Now simple true value is returned
-                            if (Objects.nonNull(rpcResult.getResult()) && !rpcResult.getResult().isEmpty()) {
-                                final List<DataContainer> allMultipartData;
-
-                                try {
-                                    allMultipartData = rpcResult
-                                            .getResult()
-                                            .stream()
-                                            .map(reply ->  MultipartReplyTranslatorUtil
+            final StatisticsGatherer<T> statisticsGatheringService, final DeviceInfo deviceInfo,
+            final MultipartType type, final TxFacade txFacade, final DeviceRegistry registry,
+            final ConvertorExecutor convertorExecutor, final MultipartWriterProvider statisticsWriterProvider,
+            final Executor executor) {
+        return Futures.transform(statisticsGatheringService.getStatisticsOfType(
+            new EventIdentifier(QUEUE2_REQCTX + type.toString(), deviceInfo.getNodeId().toString()), type),
+            rpcResult -> {
+                final boolean rpcResultIsNull = rpcResult == null;
+
+                if (!rpcResultIsNull && rpcResult.isSuccessful()) {
+                    LOG.debug("Stats reply successfully received for node {} of type {}", deviceInfo.getNodeId(), type);
+                        // TODO: in case the result value is null then multipart data probably got processed
+                        // TODO: on the fly. This contract should by clearly stated and enforced.
+                        // TODO: Now simple true value is returned
+                    if (rpcResult.getResult() != null && !rpcResult.getResult().isEmpty()) {
+                        final List<DataContainer> allMultipartData = rpcResult.getResult().stream()
+                                .map(reply -> MultipartReplyTranslatorUtil
                                                     .translate(reply, deviceInfo, convertorExecutor, null))
-                                            .filter(java.util.Optional::isPresent)
-                                            .map(java.util.Optional::get)
-                                            .collect(Collectors.toList());
-                                } catch (final Exception e) {
-                                    LOG.warn("Stats processing of type {} for node {} "
-                                                    + "failed during transformation step",
-                                            type, deviceInfo, e);
-                                    return Futures.immediateFailedFuture(e);
-                                }
+                                .filter(Optional::isPresent).map(Optional::orElseThrow)
+                                .collect(Collectors.toList());
 
-                                try {
-                                    return Futures.immediateFuture(processStatistics(
-                                            type,
-                                            allMultipartData,
-                                            txFacade,
-                                            registry,
-                                            deviceInfo,
-                                            statisticsWriterProvider));
-                                } catch (final Exception e) {
-                                    LOG.warn("Stats processing of type {} for node {} failed during processing step",
-                                            type, deviceInfo.getNodeId(), e);
-                                    return Futures.immediateFailedFuture(e);
-                                }
-                            } else {
-                                LOG.debug("Stats reply was empty for node {} of type {}", deviceInfo.getNodeId(), type);
-                            }
-                        } else {
-                            LOG.warn("Stats reply FAILED for node {} of type {}: {}", deviceInfo.getNodeId(), type,
-                                    rpcResult.getErrors());
-                            isMultipartProcessed = Boolean.FALSE;
-                        }
-
-                        return Futures.immediateFuture(isMultipartProcessed);
+                        return processStatistics(type, allMultipartData, txFacade, registry, deviceInfo,
+                                        statisticsWriterProvider);
+                    } else {
+                        LOG.debug("Stats reply was empty for node {} of type {}", deviceInfo.getNodeId(), type);
                     }
-                });
+                } else {
+                    LOG.warn("Stats reply FAILED for node {} of type {}: {}", deviceInfo.getNodeId(), type,
+                                rpcResultIsNull ? "" : rpcResult.getErrors());
+                }
+                return false;
+            }, executor);
     }
 
-    private static boolean processStatistics(final MultipartType type,
-                                             final List<? extends DataContainer> statistics,
-                                             final TxFacade txFacade,
-                                             final DeviceRegistry deviceRegistry,
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private static boolean processStatistics(final MultipartType type, final List<? extends DataContainer> statistics,
+                                             final TxFacade txFacade, final DeviceRegistry deviceRegistry,
                                              final DeviceInfo deviceInfo,
                                              final MultipartWriterProvider statisticsWriterProvider) {
-        final InstanceIdentifier<FlowCapableNode> instanceIdentifier = deviceInfo
-                .getNodeInstanceIdentifier()
+        final InstanceIdentifier<FlowCapableNode> instanceIdentifier = deviceInfo.getNodeInstanceIdentifier()
                 .augmentation(FlowCapableNode.class);
-
-        switch (type) {
-            case OFPMPFLOW:
-                deleteAllKnownFlows(txFacade, instanceIdentifier, deviceRegistry.getDeviceFlowRegistry());
-                break;
-            case OFPMPMETERCONFIG:
-                deleteAllKnownMeters(txFacade, instanceIdentifier, deviceRegistry.getDeviceMeterRegistry());
-                break;
-            case OFPMPGROUPDESC:
-                deleteAllKnownGroups(txFacade, instanceIdentifier, deviceRegistry.getDeviceGroupRegistry());
-                break;
-            default:
-                // no operation
-        }
-
-        if (writeStatistics(type, statistics, deviceInfo, statisticsWriterProvider)) {
-            txFacade.submitTransaction();
-
+        try {
+            txFacade.acquireWriteTransactionLock();
             switch (type) {
                 case OFPMPFLOW:
+                    deleteAllKnownFlows(txFacade, instanceIdentifier, deviceRegistry.getDeviceFlowRegistry());
                     deviceRegistry.getDeviceFlowRegistry().processMarks();
                     break;
                 case OFPMPMETERCONFIG:
+                    deleteAllKnownMeters(txFacade, instanceIdentifier, deviceRegistry.getDeviceMeterRegistry());
                     deviceRegistry.getDeviceMeterRegistry().processMarks();
                     break;
                 case OFPMPGROUPDESC:
+                    deleteAllKnownGroups(txFacade, instanceIdentifier, deviceRegistry.getDeviceGroupRegistry());
                     deviceRegistry.getDeviceGroupRegistry().processMarks();
                     break;
                 default:
                     // no operation
             }
 
-            LOG.debug("Stats reply added to transaction for node {} of type {}", deviceInfo.getNodeId(), type);
-            return true;
+            if (writeStatistics(type, statistics, deviceInfo, statisticsWriterProvider)) {
+                txFacade.submitTransaction();
+
+                LOG.debug("Stats reply added to transaction for node {} of type {}", deviceInfo.getNodeId(), type);
+                return true;
+            }
+        } catch (Exception e) {
+            LOG.error("Exception while writing statistics to operational inventory for the device {}",
+                    deviceInfo.getLOGValue(), e);
+        } finally {
+            txFacade.releaseWriteTransactionLock();
         }
 
-        LOG.warn("Stats processing of type {} for node {} "
-                + "failed during write-to-tx step", type, deviceInfo);
+        LOG.warn("Stats processing of type {} for node {} " + "failed during write-to-tx step", type, deviceInfo);
         return false;
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    private static boolean writeStatistics(final MultipartType type,
-                                           final List<? extends DataContainer> statistics,
+    private static boolean writeStatistics(final MultipartType type, final List<? extends DataContainer> statistics,
                                            final DeviceInfo deviceInfo,
                                            final MultipartWriterProvider statisticsWriterProvider) {
         final AtomicBoolean result = new AtomicBoolean(false);
@@ -209,8 +158,8 @@ public final class StatisticsGatheringUtils {
                 }
             }));
         } catch (final Exception ex) {
-            LOG.warn("Stats processing of type {} for node {} "
-                    + "failed during write-to-tx step", type, deviceInfo, ex);
+            LOG.warn("Stats processing of type {} for node {} " + "failed during write-to-tx step", type, deviceInfo,
+                     ex);
         }
 
         return result.get();
@@ -223,53 +172,54 @@ public final class StatisticsGatheringUtils {
             return;
         }
 
-        final ReadOnlyTransaction readTx = txFacade.getReadTransaction();
+        final ListenableFuture<Optional<FlowCapableNode>> future;
+        try (ReadTransaction readTx = txFacade.getReadTransaction()) {
+            future = readTx.read(LogicalDatastoreType.OPERATIONAL, instanceIdentifier);
+        }
 
         try {
-            Futures.transform(Futures
-                .catchingAsync(readTx.read(LogicalDatastoreType.OPERATIONAL, instanceIdentifier), Throwable.class,
-                    t -> {
-                        // we wish to close readTx for fallBack
-                        readTx.close();
-                        return Futures.immediateFailedFuture(t);
-                    }), (Function<Optional<FlowCapableNode>, Void>)
-                flowCapNodeOpt -> {
+            Futures.transform(Futures.catchingAsync(future, Throwable.class, Futures::immediateFailedFuture,
+                MoreExecutors.directExecutor()), flowCapNodeOpt -> {
                     // we have to read actual tables with all information before we set empty Flow list,
                     // merge is expensive and not applicable for lists
                     if (flowCapNodeOpt != null && flowCapNodeOpt.isPresent()) {
-                        for (final Table tableData : flowCapNodeOpt.get().getTable()) {
-                            final Table table =
-                                    new TableBuilder(tableData).setFlow(Collections.emptyList()).build();
-                            final InstanceIdentifier<Table> iiToTable =
-                                    instanceIdentifier.child(Table.class, tableData.getKey());
+                        for (final Table tableData : flowCapNodeOpt.orElseThrow().nonnullTable().values()) {
+                            final Table table = new TableBuilder(tableData).setFlow(Map.of()).build();
+                            final InstanceIdentifier<Table> iiToTable = instanceIdentifier
+                                .child(Table.class, tableData.key());
                             txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToTable, table);
                         }
                     }
-
-                    readTx.close();
                     return null;
-                }).get();
+                }, MoreExecutors.directExecutor()).get();
         } catch (InterruptedException | ExecutionException ex) {
-            LOG.debug("Failed to delete {} flows, exception: {}", deviceFlowRegistry.size(), ex);
+            LOG.debug("Failed to delete {} flows", deviceFlowRegistry.size(), ex);
         }
     }
 
     public static void deleteAllKnownMeters(final TxFacade txFacade,
                                             final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
                                             final DeviceMeterRegistry meterRegistry) {
-        meterRegistry.forEach(meterId -> txFacade
-                .addDeleteToTxChain(
-                        LogicalDatastoreType.OPERATIONAL,
-                        instanceIdentifier.child(Meter.class, new MeterKey(meterId))));
+        meterRegistry.forEach(meterId -> {
+            txFacade
+                    .addDeleteToTxChain(
+                            LogicalDatastoreType.OPERATIONAL,
+                            instanceIdentifier.child(Meter.class, new MeterKey(meterId)));
+            meterRegistry.addMark(meterId);
+        });
     }
 
     public static void deleteAllKnownGroups(final TxFacade txFacade,
                                             final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
                                             final DeviceGroupRegistry groupRegistry) {
-        groupRegistry.forEach(groupId -> txFacade
-                .addDeleteToTxChain(
-                        LogicalDatastoreType.OPERATIONAL,
-                        instanceIdentifier.child(Group.class, new GroupKey(groupId))));
+        LOG.debug("deleteAllKnownGroups on device targetType {}", instanceIdentifier.getTargetType());
+        groupRegistry.forEach(groupId -> {
+            txFacade
+                    .addDeleteToTxChain(
+                            LogicalDatastoreType.OPERATIONAL,
+                            instanceIdentifier.child(Group.class, new GroupKey(groupId)));
+            groupRegistry.addMark(groupId);
+        });
     }
 
     /**
@@ -306,7 +256,8 @@ public final class StatisticsGatheringUtils {
      * @param txFacade tx manager
      * @param succeeded     outcome of currently finished gathering
      */
-    static void markDeviceStateSnapshotEnd(final DeviceInfo deviceInfo, final TxFacade txFacade, final boolean succeeded) {
+    static void markDeviceStateSnapshotEnd(final DeviceInfo deviceInfo,
+                                           final TxFacade txFacade, final boolean succeeded) {
         final InstanceIdentifier<SnapshotGatheringStatusEnd> statusEndPath = deviceInfo
                 .getNodeInstanceIdentifier().augmentation(FlowCapableStatisticsGatheringStatus.class)
                 .child(SnapshotGatheringStatusEnd.class);