BUG 5523 Add onDeviceContextLevelDown for Statistics and Rpc Managers
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsGatheringUtils.java
index 06d8d9d5b60970d37718217d793ea45f2795bab3..02e86f33c3b80525f28107f8b97cc8077da03e1b 100644 (file)
@@ -11,15 +11,16 @@ package org.opendaylight.openflowplugin.impl.statistics;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureFallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
 import javax.annotation.Nullable;
-import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
@@ -92,7 +93,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.
 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.queue.id.and.statistics.map.QueueIdAndStatisticsMap;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -120,60 +120,85 @@ public final class StatisticsGatheringUtils {
             wholeProcessEventIdentifier = new EventIdentifier(type.toString(), deviceId);
             EventsTimeCounter.markStart(wholeProcessEventIdentifier);
         }
-        EventIdentifier ofpQueuToRequestContextEventIdentifier = new EventIdentifier(QUEUE2_REQCTX + type.toString(), deviceId);
+        final EventIdentifier ofpQueuToRequestContextEventIdentifier = new EventIdentifier(QUEUE2_REQCTX + type.toString(), deviceId);
         final ListenableFuture<RpcResult<List<MultipartReply>>> statisticsDataInFuture =
                 JdkFutureAdapters.listenInPoolThread(statisticsGatheringService.getStatisticsOfType(
                         ofpQueuToRequestContextEventIdentifier, type));
-        return transformAndStoreStatisticsData(statisticsDataInFuture, deviceContext, wholeProcessEventIdentifier);
+        return transformAndStoreStatisticsData(statisticsDataInFuture, deviceContext, wholeProcessEventIdentifier, type);
     }
 
     private static ListenableFuture<Boolean> transformAndStoreStatisticsData(final ListenableFuture<RpcResult<List<MultipartReply>>> statisticsDataInFuture,
                                                                              final DeviceContext deviceContext,
-                                                                             final EventIdentifier eventIdentifier) {
-        return Futures.transform(statisticsDataInFuture, new Function<RpcResult<List<MultipartReply>>, Boolean>() {
+                                                                             final EventIdentifier eventIdentifier, final MultipartType type) {
+        return Futures.transform(statisticsDataInFuture, new AsyncFunction<RpcResult<List<MultipartReply>>, Boolean>() {
             @Nullable
             @Override
-            public Boolean apply(final RpcResult<List<MultipartReply>> rpcResult) {
+            public ListenableFuture<Boolean> apply(final RpcResult<List<MultipartReply>> rpcResult) {
+                boolean isMultipartProcessed = Boolean.TRUE;
                 if (rpcResult.isSuccessful()) {
-                    boolean isMultipartProcessed = Boolean.TRUE;
+                    LOG.debug("Stats reply successfully received for node {} of type {}", deviceContext.getDeviceState().getNodeId(), type);
 
                     // TODO: in case the result value is null then multipart data probably got processed on the fly -
                     // TODO: this contract should by clearly stated and enforced - now simple true value is returned
                     if (null != rpcResult.getResult()) {
                         Iterable<? extends DataObject> allMultipartData = Collections.emptyList();
                         DataObject multipartData = null;
-                        for (final MultipartReply singleReply : rpcResult.getResult()) {
-                            final List<? extends DataObject> multipartDataList = MULTIPART_REPLY_TRANSLATOR.translate(deviceContext, singleReply);
-                            multipartData = multipartDataList.get(0);
-                            allMultipartData = Iterables.concat(allMultipartData, multipartDataList);
+
+
+                        try {
+                            for (final MultipartReply singleReply : rpcResult.getResult()) {
+                                final List<? extends DataObject> multipartDataList = MULTIPART_REPLY_TRANSLATOR.translate(deviceContext, singleReply);
+                                multipartData = multipartDataList.get(0);
+                                allMultipartData = Iterables.concat(allMultipartData, multipartDataList);
+                            }
+                        } catch (final Exception e) {
+                            LOG.warn("stats processing of type {} for node {} failed during transfomation step",
+                                    type, deviceContext.getDeviceState().getNodeId(), e);
+                            return Futures.immediateFailedFuture(e);
                         }
 
-                        if (multipartData instanceof GroupStatisticsUpdated) {
-                            processGroupStatistics((Iterable<GroupStatisticsUpdated>) allMultipartData, deviceContext);
-                        } else if (multipartData instanceof MeterStatisticsUpdated) {
-                            processMetersStatistics((Iterable<MeterStatisticsUpdated>) allMultipartData, deviceContext);
-                        } else if (multipartData instanceof NodeConnectorStatisticsUpdate) {
-                            processNodeConnectorStatistics((Iterable<NodeConnectorStatisticsUpdate>) allMultipartData, deviceContext);
-                        } else if (multipartData instanceof FlowTableStatisticsUpdate) {
-                            processFlowTableStatistics((Iterable<FlowTableStatisticsUpdate>) allMultipartData, deviceContext);
-                        } else if (multipartData instanceof QueueStatisticsUpdate) {
-                            processQueueStatistics((Iterable<QueueStatisticsUpdate>) allMultipartData, deviceContext);
-                        } else if (multipartData instanceof FlowsStatisticsUpdate) {
-                            processFlowStatistics((Iterable<FlowsStatisticsUpdate>) allMultipartData, deviceContext);
-                            EventsTimeCounter.markEnd(eventIdentifier);
-                        } else if (multipartData instanceof GroupDescStatsUpdated) {
-                            processGroupDescStats((Iterable<GroupDescStatsUpdated>) allMultipartData, deviceContext);
-                        } else if (multipartData instanceof MeterConfigStatsUpdated) {
-                            processMeterConfigStatsUpdated((Iterable<MeterConfigStatsUpdated>) allMultipartData, deviceContext);
-                        } else {
-                            isMultipartProcessed = Boolean.FALSE;
+
+                        try {
+                            if (multipartData instanceof GroupStatisticsUpdated) {
+                                processGroupStatistics((Iterable<GroupStatisticsUpdated>) allMultipartData, deviceContext);
+                            } else if (multipartData instanceof MeterStatisticsUpdated) {
+                                processMetersStatistics((Iterable<MeterStatisticsUpdated>) allMultipartData, deviceContext);
+                            } else if (multipartData instanceof NodeConnectorStatisticsUpdate) {
+                                processNodeConnectorStatistics((Iterable<NodeConnectorStatisticsUpdate>) allMultipartData, deviceContext);
+                            } else if (multipartData instanceof FlowTableStatisticsUpdate) {
+                                processFlowTableStatistics((Iterable<FlowTableStatisticsUpdate>) allMultipartData, deviceContext);
+                            } else if (multipartData instanceof QueueStatisticsUpdate) {
+                                processQueueStatistics((Iterable<QueueStatisticsUpdate>) allMultipartData, deviceContext);
+                            } else if (multipartData instanceof FlowsStatisticsUpdate) {
+                                /* FlowStat Processing is realized by NettyThread only by initPhase, otherwise it is realized
+                                 * by MD-SAL thread */
+                                return processFlowStatistics((Iterable<FlowsStatisticsUpdate>) allMultipartData, deviceContext, eventIdentifier);
+
+                            } else if (multipartData instanceof GroupDescStatsUpdated) {
+                                processGroupDescStats((Iterable<GroupDescStatsUpdated>) allMultipartData, deviceContext);
+                            } else if (multipartData instanceof MeterConfigStatsUpdated) {
+                                processMeterConfigStatsUpdated((Iterable<MeterConfigStatsUpdated>) allMultipartData, deviceContext);
+                            } else {
+                                isMultipartProcessed = Boolean.FALSE;
+                            }
+                        } catch (final Exception e) {
+                            LOG.warn("stats processing of type {} for node {} failed during write-to-tx step",
+                                    type, deviceContext.getDeviceState().getNodeId(), e);
+                            return Futures.immediateFailedFuture(e);
                         }
+
+                        LOG.debug("Stats reply added to transaction for node {} of type {}", deviceContext.getDeviceState().getNodeId(), type);
+
                         //TODO : implement experimenter
+                    } else {
+                        LOG.debug("Stats reply was empty for node {} of type {}", deviceContext.getDeviceState().getNodeId(), type);
                     }
 
-                    return isMultipartProcessed;
+                } else {
+                    LOG.debug("Stats reply FAILED for node {} of type {}: {}", deviceContext.getDeviceState().getNodeId(), type, rpcResult.getErrors());
+                    isMultipartProcessed = Boolean.FALSE;
                 }
-                return Boolean.FALSE;
+                return Futures.immediateFuture(isMultipartProcessed);
             }
         });
     }
@@ -196,13 +221,22 @@ public final class StatisticsGatheringUtils {
         deviceContext.submitTransaction();
     }
 
-    private static void processFlowStatistics(final Iterable<FlowsStatisticsUpdate> data, final DeviceContext deviceContext) {
-        deleteAllKnownFlows(deviceContext);
-        writeFlowStatistics(data, deviceContext);
-        deviceContext.submitTransaction();
+    private static ListenableFuture<Boolean> processFlowStatistics(final Iterable<FlowsStatisticsUpdate> data,
+            final DeviceContext deviceContext, final EventIdentifier eventIdentifier) {
+        final ListenableFuture<Void> deleFuture = deleteAllKnownFlows(deviceContext);
+        return Futures.transform(deleFuture, new Function<Void, Boolean>() {
+
+            @Override
+            public Boolean apply(final Void input) {
+                writeFlowStatistics(data, deviceContext);
+                deviceContext.submitTransaction();
+                EventsTimeCounter.markEnd(eventIdentifier);
+                return Boolean.TRUE;
+            }
+        });
     }
 
-    public static void writeFlowStatistics(Iterable<FlowsStatisticsUpdate> data, DeviceContext deviceContext) {
+    public static void writeFlowStatistics(final Iterable<FlowsStatisticsUpdate> data, final DeviceContext deviceContext) {
         final InstanceIdentifier<FlowCapableNode> fNodeIdent = assembleFlowCapableNodeInstanceIdentifier(deviceContext);
         for (final FlowsStatisticsUpdate flowsStatistics : data) {
             for (final FlowAndStatisticsMapList flowStat : flowsStatistics.getFlowAndStatisticsMapList()) {
@@ -234,29 +268,46 @@ public final class StatisticsGatheringUtils {
         return flowStatisticsDataBld;
     }
 
-    public static void deleteAllKnownFlows(final DeviceContext deviceContext) {
+    public static ListenableFuture<Void> deleteAllKnownFlows(final DeviceContext deviceContext) {
+        /* DeviceState.deviceSynchronized is a marker for actual phase - false means initPhase, true means noInitPhase */
         if (deviceContext.getDeviceState().deviceSynchronized()) {
-            InstanceIdentifier<FlowCapableNode> flowCapableNodePath = assembleFlowCapableNodeInstanceIdentifier(deviceContext);
-            final Short numOfTablesOnDevice = deviceContext.getDeviceState().getFeatures().getTables();
-            for (short i = 0; i < numOfTablesOnDevice; i++) {
-                final KeyedInstanceIdentifier<Table, TableKey> iiToTable = flowCapableNodePath.child(Table.class, new TableKey(i));
-                final ReadTransaction readTx = deviceContext.getReadTransaction();
-                final CheckedFuture<Optional<Table>, ReadFailedException> tableDataFuture = readTx.read(LogicalDatastoreType.OPERATIONAL, iiToTable);
-                try {
-                    final Optional<Table> tableDataOpt = tableDataFuture.get();
-                    if (tableDataOpt.isPresent()) {
-                        final Table tableData = tableDataOpt.get();
-                        final Table table = new TableBuilder(tableData).setFlow(Collections.<Flow>emptyList()).build();
-                        deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToTable, table);
+            final InstanceIdentifier<FlowCapableNode> flowCapableNodePath = assembleFlowCapableNodeInstanceIdentifier(deviceContext);
+            final ReadOnlyTransaction readTx = deviceContext.getReadTransaction();
+            final CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> flowCapableNodeFuture = readTx.read(
+                    LogicalDatastoreType.OPERATIONAL, flowCapableNodePath);
+
+            /* we wish to close readTx for fallBack */
+            Futures.withFallback(flowCapableNodeFuture, new FutureFallback<Optional<FlowCapableNode>>() {
+
+                @Override
+                public ListenableFuture<Optional<FlowCapableNode>> create(final Throwable t) throws Exception {
+                    readTx.close();
+                    return Futures.immediateFailedFuture(t);
+                }
+            });
+            /*
+             * we have to read actual tables with all information before we set empty Flow list, merge is expensive and
+             * not applicable for lists
+             */
+            return Futures.transform(flowCapableNodeFuture, new AsyncFunction<Optional<FlowCapableNode>, Void>() {
+
+                @Override
+                public ListenableFuture<Void> apply(final Optional<FlowCapableNode> flowCapNodeOpt) throws Exception {
+                    if (flowCapNodeOpt.isPresent()) {
+                        for (final Table tableData : flowCapNodeOpt.get().getTable()) {
+                            final Table table = new TableBuilder(tableData).setFlow(Collections.<Flow> emptyList()).build();
+                            final InstanceIdentifier<Table> iiToTable = flowCapableNodePath.child(Table.class, tableData.getKey());
+                            deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToTable, table);
+                        }
                     }
-                } catch (final InterruptedException e) {
-                    LOG.trace("Reading of table features for table wit ID {} was interrputed.", i);
-                } catch (final ExecutionException e) {
-                    LOG.trace("Reading of table features for table wit ID {} encountered execution exception {}.", i, e);
+                    deviceContext.getDeviceFlowRegistry().removeMarked();
+                    readTx.close();
+                    return Futures.immediateFuture(null);
                 }
-            }
-            deviceContext.getDeviceFlowRegistry().removeMarked();
+
+            });
         }
+        return Futures.immediateFuture(null);
     }
 
     private static void processQueueStatistics(final Iterable<QueueStatisticsUpdate> data, final DeviceContext deviceContext) {