BUG-3774: 100k flows initial stats fail - fix
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / connection / listener / HandshakeListenerImpl.java
index b17d3675649edacdac47c5f7847b7fa9535381c9..c8f0600edf866903a2f4bfc45545e7c8e304d53e 100644 (file)
@@ -8,13 +8,22 @@
 
 package org.opendaylight.openflowplugin.impl.connection.listener;
 
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import javax.annotation.Nullable;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
 import org.opendaylight.openflowplugin.api.openflow.connection.HandshakeContext;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceConnectedHandler;
 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener;
 import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.SessionStatistics;
 import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
+import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,8 +54,33 @@ public class HandshakeListenerImpl implements HandshakeListener {
         connectionContext.changeStateToWorking();
         connectionContext.setFeatures(featureOutput);
         connectionContext.setNodeId(InventoryDataServiceUtil.nodeIdFromDatapathId(featureOutput.getDatapathId()));
-        deviceConnectedHandler.deviceConnected(connectionContext);
-        SessionStatistics.countEvent(connectionContext.getNodeId().toString(), SessionStatistics.ConnectionStatus.CONNECTION_CREATED);
+
+        // fire barrier in order to sweep all handshake and posthandshake messages before continue
+        final ListenableFuture<RpcResult<BarrierOutput>> barrier = fireBarrier(version, 0L);
+        Futures.addCallback(barrier, new FutureCallback<RpcResult<BarrierOutput>>() {
+            @Override
+            public void onSuccess(@Nullable final RpcResult<BarrierOutput> result) {
+                LOG.debug("succeeded by getting sweep barrier after posthandshake for device {}", connectionContext.getNodeId());
+                deviceConnectedHandler.deviceConnected(connectionContext);
+                SessionStatistics.countEvent(connectionContext.getNodeId().toString(),
+                        SessionStatistics.ConnectionStatus.CONNECTION_CREATED);
+            }
+
+            @Override
+            public void onFailure(final Throwable t) {
+                LOG.info("failed to get sweep barrier after posthandshake for device {}", connectionContext.getNodeId());
+                connectionContext.closeConnection(false);
+            }
+        });
+    }
+
+    protected ListenableFuture<RpcResult<BarrierOutput>> fireBarrier(final Short version, final long xid) {
+        final BarrierInput barrierInput = new BarrierInputBuilder()
+                .setXid(xid)
+                .setVersion(version)
+                .build();
+        return JdkFutureAdapters.listenInPoolThread(
+                connectionContext.getConnectionAdapter().barrier(barrierInput));
     }
 
     @Override