Fix statistics race condition on big flows
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / services / multilayer / MultiLayerFlowMultipartRequestOnTheFlyCallback.java
index 982bb6212839b0b2bb91860fdc2afb593e4c5c7b..3c3c3f7c4fcbfc9b2b28dd4944cdaed77ecb1477 100644 (file)
@@ -16,6 +16,7 @@ import java.util.Optional;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
+import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.EventIdentifier;
 import org.opendaylight.openflowplugin.impl.common.MultipartReplyTranslatorUtil;
 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
@@ -32,6 +33,7 @@ public class MultiLayerFlowMultipartRequestOnTheFlyCallback<T extends OfHeader>
 
     private final ConvertorExecutor convertorExecutor;
     private final DeviceInfo deviceInfo;
+    private final DeviceFlowRegistry deviceFlowRegistry;
     private boolean virgin = true;
 
     public MultiLayerFlowMultipartRequestOnTheFlyCallback(final RequestContext<List<T>> context,
@@ -43,6 +45,7 @@ public class MultiLayerFlowMultipartRequestOnTheFlyCallback<T extends OfHeader>
         super(context, requestType, deviceContext, eventIdentifier, statisticsWriterProvider);
         this.convertorExecutor = convertorExecutor;
         deviceInfo = deviceContext.getDeviceInfo();
+        deviceFlowRegistry = deviceContext.getDeviceFlowRegistry();
     }
 
     @Override
@@ -61,6 +64,11 @@ public class MultiLayerFlowMultipartRequestOnTheFlyCallback<T extends OfHeader>
         return MultipartType.OFPMPFLOW;
     }
 
+    @Override
+    protected void onFinishedCollecting() {
+        deviceFlowRegistry.processMarks();
+    }
+
     @Override
     protected ListenableFuture<Optional<? extends MultipartReplyBody>> processStatistics(T result) {
         final ListenableFuture<Optional<? extends MultipartReplyBody>> future = Futures.transform(