Fix comparison between port numbers in match
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsGatheringUtils.java
index d75d3aaf646c62ff1f98576080b3c76579822053..e0dd990709409adce8828531d92d5cf0364ed19c 100644 (file)
@@ -21,15 +21,18 @@ import java.text.SimpleDateFormat;
 import java.util.Collections;
 import java.util.Date;
 import java.util.List;
+import java.util.Objects;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
-import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceRegistry;
 import org.opendaylight.openflowplugin.api.openflow.device.TxFacade;
 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
+import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowDescriptor;
 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowRegistryKey;
 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
@@ -37,7 +40,7 @@ import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.Event
 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.StatisticsGatherer;
 import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory;
 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.EventsTimeCounter;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev100924.DateAndTime;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatus;
@@ -119,7 +122,6 @@ public final class StatisticsGatheringUtils {
     private static String DATE_AND_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
 
     private static final Logger LOG = LoggerFactory.getLogger(StatisticsGatheringUtils.class);
-    private static final SinglePurposeMultipartReplyTranslator MULTIPART_REPLY_TRANSLATOR = new SinglePurposeMultipartReplyTranslator();
     private static final String QUEUE2_REQCTX = "QUEUE2REQCTX-";
 
     private StatisticsGatheringUtils() {
@@ -131,10 +133,9 @@ public final class StatisticsGatheringUtils {
                                                       final DeviceInfo deviceInfo,
                                                       final MultipartType type,
                                                       final TxFacade txFacade,
-                                                      final DeviceFlowRegistry flowRegistry,
-                                                      final DeviceGroupRegistry groupRegistry,
-                                                      final DeviceMeterRegistry meterRegistry,
-                                                      final DeviceState deviceState) {
+                                                      final DeviceRegistry registry,
+                                                      final Boolean initial,
+                                                      final SinglePurposeMultipartReplyTranslator multipartReplyTranslator) {
         EventIdentifier wholeProcessEventIdentifier = null;
         if (MultipartType.OFPMPFLOW.equals(type)) {
             wholeProcessEventIdentifier = new EventIdentifier(type.toString(), deviceInfo.getNodeId().getValue());
@@ -144,7 +145,7 @@ public final class StatisticsGatheringUtils {
         final ListenableFuture<RpcResult<List<MultipartReply>>> statisticsDataInFuture =
                 JdkFutureAdapters.listenInPoolThread(statisticsGatheringService.getStatisticsOfType(
                         ofpQueuToRequestContextEventIdentifier, type));
-        return transformAndStoreStatisticsData(statisticsDataInFuture, deviceInfo, wholeProcessEventIdentifier, type, txFacade, flowRegistry, groupRegistry, meterRegistry, deviceState);
+        return transformAndStoreStatisticsData(statisticsDataInFuture, deviceInfo, wholeProcessEventIdentifier, type, txFacade, registry, initial, multipartReplyTranslator);
     }
 
     private static ListenableFuture<Boolean> transformAndStoreStatisticsData(final ListenableFuture<RpcResult<List<MultipartReply>>> statisticsDataInFuture,
@@ -152,10 +153,9 @@ public final class StatisticsGatheringUtils {
                                                                              final EventIdentifier eventIdentifier,
                                                                              final MultipartType type,
                                                                              final TxFacade txFacade,
-                                                                             final DeviceFlowRegistry flowRegistry,
-                                                                             final DeviceGroupRegistry groupRegistry,
-                                                                             final DeviceMeterRegistry meterRegistry,
-                                                                             final DeviceState deviceState) {
+                                                                             final DeviceRegistry registry,
+                                                                             final boolean initial,
+                                                                             final SinglePurposeMultipartReplyTranslator multipartReplyTranslator) {
         return Futures.transform(statisticsDataInFuture, new AsyncFunction<RpcResult<List<MultipartReply>>, Boolean>() {
             @Nullable
             @Override
@@ -173,7 +173,7 @@ public final class StatisticsGatheringUtils {
 
                         try {
                             for (final MultipartReply singleReply : rpcResult.getResult()) {
-                                final List<? extends DataObject> multipartDataList = MULTIPART_REPLY_TRANSLATOR.translate(
+                                final List<? extends DataObject> multipartDataList = multipartReplyTranslator.translate(
                                         deviceInfo.getDatapathId(),
                                         deviceInfo.getVersion(), singleReply);
                                 multipartData = multipartDataList.get(0);
@@ -200,12 +200,12 @@ public final class StatisticsGatheringUtils {
                             } else if (multipartData instanceof FlowsStatisticsUpdate) {
                                 /* FlowStat Processing is realized by NettyThread only by initPhase, otherwise it is realized
                                  * by MD-SAL thread */
-                                return processFlowStatistics((Iterable<FlowsStatisticsUpdate>) allMultipartData, deviceInfo, txFacade, flowRegistry, deviceState, eventIdentifier);
+                                return processFlowStatistics((Iterable<FlowsStatisticsUpdate>) allMultipartData, deviceInfo, txFacade, registry.getDeviceFlowRegistry(), initial, eventIdentifier);
 
                             } else if (multipartData instanceof GroupDescStatsUpdated) {
-                                processGroupDescStats((Iterable<GroupDescStatsUpdated>) allMultipartData, deviceInfo, txFacade, groupRegistry);
+                                processGroupDescStats((Iterable<GroupDescStatsUpdated>) allMultipartData, deviceInfo, txFacade, registry.getDeviceGroupRegistry());
                             } else if (multipartData instanceof MeterConfigStatsUpdated) {
-                                processMeterConfigStatsUpdated((Iterable<MeterConfigStatsUpdated>) allMultipartData, deviceInfo, txFacade, meterRegistry);
+                                processMeterConfigStatsUpdated((Iterable<MeterConfigStatsUpdated>) allMultipartData, deviceInfo, txFacade, registry.getDeviceMeterRegistry());
                             } else {
                                 isMultipartProcessed = Boolean.FALSE;
                             }
@@ -256,11 +256,11 @@ public final class StatisticsGatheringUtils {
                                                                    final DeviceInfo deviceInfo,
                                                                    final TxFacade txFacade,
                                                                    final DeviceFlowRegistry flowRegistry,
-                                                                   final DeviceState deviceState,
+                                                                   final boolean initial,
                                                                    final EventIdentifier eventIdentifier) {
-        final ListenableFuture<Void> deleFuture = deleteAllKnownFlows(deviceInfo,
-                flowRegistry, txFacade, deviceState);
-        return Futures.transform(deleFuture, new Function<Void, Boolean>() {
+        final ListenableFuture<Void> deleteFuture = initial ? Futures.immediateFuture(null) : deleteAllKnownFlows(deviceInfo,
+                flowRegistry, txFacade);
+        return Futures.transform(deleteFuture, new Function<Void, Boolean>() {
 
             @Override
             public Boolean apply(final Void input) {
@@ -284,8 +284,8 @@ public final class StatisticsGatheringUtils {
                     flowBuilder.addAugmentation(FlowStatisticsData.class, refineFlowStatisticsAugmentation(flowStat).build());
 
                     final short tableId = flowStat.getTableId();
-                    final FlowRegistryKey flowRegistryKey = FlowRegistryKeyFactory.create(flowBuilder.build());
-                    final FlowId flowId = registry.storeIfNecessary(flowRegistryKey, tableId);
+                    final FlowRegistryKey flowRegistryKey = FlowRegistryKeyFactory.create(deviceInfo.getVersion(), flowBuilder.build());
+                    final FlowId flowId = registry.storeIfNecessary(flowRegistryKey);
 
                     final FlowKey flowKey = new FlowKey(flowId);
                     flowBuilder.setKey(flowKey);
@@ -313,48 +313,41 @@ public final class StatisticsGatheringUtils {
 
     public static ListenableFuture<Void> deleteAllKnownFlows(final DeviceInfo deviceInfo,
                                                              final DeviceFlowRegistry registry,
-                                                             final TxFacade txFacade,
-                                                             final DeviceState deviceState) {
-        //TODO:Make check for phase from enum
-        /* DeviceState.deviceSynchronized is a marker for actual phase - false means initPhase, true means noInitPhase */
-        if (deviceState.deviceSynchronized()) {
-            final InstanceIdentifier<FlowCapableNode> flowCapableNodePath = assembleFlowCapableNodeInstanceIdentifier(deviceInfo);
-            final ReadOnlyTransaction readTx = txFacade.getReadTransaction();
-            final CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> flowCapableNodeFuture = readTx.read(
-                    LogicalDatastoreType.OPERATIONAL, flowCapableNodePath);
-
-            /* we wish to close readTx for fallBack */
-            Futures.withFallback(flowCapableNodeFuture, new FutureFallback<Optional<FlowCapableNode>>() {
-
-                @Override
-                public ListenableFuture<Optional<FlowCapableNode>> create(final Throwable t) throws Exception {
-                    readTx.close();
-                    return Futures.immediateFailedFuture(t);
-                }
-            });
-            /*
-             * we have to read actual tables with all information before we set empty Flow list, merge is expensive and
-             * not applicable for lists
-             */
-            return Futures.transform(flowCapableNodeFuture, new AsyncFunction<Optional<FlowCapableNode>, Void>() {
-
-                @Override
-                public ListenableFuture<Void> apply(final Optional<FlowCapableNode> flowCapNodeOpt) throws Exception {
-                    if (flowCapNodeOpt.isPresent()) {
-                        for (final Table tableData : flowCapNodeOpt.get().getTable()) {
-                            final Table table = new TableBuilder(tableData).setFlow(Collections.<Flow>emptyList()).build();
-                            final InstanceIdentifier<Table> iiToTable = flowCapableNodePath.child(Table.class, tableData.getKey());
-                            txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToTable, table);
-                        }
+                                                             final TxFacade txFacade) {
+        final InstanceIdentifier<FlowCapableNode> flowCapableNodePath = assembleFlowCapableNodeInstanceIdentifier(deviceInfo);
+        final ReadOnlyTransaction readTx = txFacade.getReadTransaction();
+        final CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> flowCapableNodeFuture = readTx.read(
+                LogicalDatastoreType.OPERATIONAL, flowCapableNodePath);
+
+        /* we wish to close readTx for fallBack */
+        Futures.withFallback(flowCapableNodeFuture, new FutureFallback<Optional<FlowCapableNode>>() {
+
+            @Override
+            public ListenableFuture<Optional<FlowCapableNode>> create(final Throwable t) throws Exception {
+                readTx.close();
+                return Futures.immediateFailedFuture(t);
+            }
+        });
+        /*
+         * we have to read actual tables with all information before we set empty Flow list, merge is expensive and
+         * not applicable for lists
+         */
+        return Futures.transform(flowCapableNodeFuture, new AsyncFunction<Optional<FlowCapableNode>, Void>() {
+
+            @Override
+            public ListenableFuture<Void> apply(final Optional<FlowCapableNode> flowCapNodeOpt) throws Exception {
+                if (flowCapNodeOpt.isPresent()) {
+                    for (final Table tableData : flowCapNodeOpt.get().getTable()) {
+                        final Table table = new TableBuilder(tableData).setFlow(Collections.<Flow>emptyList()).build();
+                        final InstanceIdentifier<Table> iiToTable = flowCapableNodePath.child(Table.class, tableData.getKey());
+                        txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToTable, table);
                     }
-                    registry.removeMarked();
-                    readTx.close();
-                    return Futures.immediateFuture(null);
                 }
+                readTx.close();
+                return Futures.immediateFuture(null);
+            }
 
-            });
-        }
-        return Futures.immediateFuture(null);
+        });
     }
 
     private static void processQueueStatistics(final Iterable<QueueStatisticsUpdate> data, final DeviceInfo deviceInfo, final TxFacade txFacade) throws Exception {
@@ -443,8 +436,7 @@ public final class StatisticsGatheringUtils {
     }
 
     private static void processGroupDescStats(final Iterable<GroupDescStatsUpdated> data, final DeviceInfo deviceInfo, final TxFacade txFacade, final DeviceGroupRegistry groupRegistry) throws Exception {
-        final InstanceIdentifier<FlowCapableNode> fNodeIdent =
-                deviceInfo.getNodeInstanceIdentifier().augmentation(FlowCapableNode.class);
+        final InstanceIdentifier<FlowCapableNode> fNodeIdent = assembleFlowCapableNodeInstanceIdentifier(deviceInfo);
         deleteAllKnownGroups(txFacade, fNodeIdent, groupRegistry);
 
         for (final GroupDescStatsUpdated groupDescStatsUpdated : data) {
@@ -511,8 +503,9 @@ public final class StatisticsGatheringUtils {
                 .build();
         try {
             deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, statusPath, gatheringStatus);
-        } catch (final Exception e) {
-            LOG.warn("Can't write to transaction: {}", e);
+        } catch (final TransactionChainClosedException e) {
+            LOG.warn("Can't write to transaction, transaction chain probably closed.");
+            LOG.trace("Write to transaction exception: ", e);
         }
 
         deviceContext.submitTransaction();
@@ -536,8 +529,9 @@ public final class StatisticsGatheringUtils {
                 .build();
         try {
             deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, statusEndPath, gatheringStatus);
-        } catch (Exception e) {
-            LOG.warn("Can't write to transaction: {}", e);
+        } catch (TransactionChainClosedException e) {
+            LOG.warn("Can't write to transaction, transaction chain probably closed.");
+            LOG.trace("Write to transaction exception: ", e);
         }
 
         deviceContext.submitTransaction();