From 7fffede33481db0b1adf36974a1717c13ae7fdac Mon Sep 17 00:00:00 2001 From: Michal Rehak Date: Thu, 17 Dec 2015 19:42:06 +0100 Subject: [PATCH] BUG-3774: 100k flows initial stats fail - fix - synchronized all handshake entry points - added sweeping barrier after handshake succeeds - fixed unit test Change-Id: I651cd2244c1f58e53503e7af46e19254975d5418 Signed-off-by: Michal Rehak (cherry picked from commit d45ce11aa158cea9bff6648f529dac978758ab27) --- .../connection/ConnectionContextImpl.java | 8 ++-- .../listener/ConnectionReadyListenerImpl.java | 35 ++++++++++++++--- .../listener/HandshakeListenerImpl.java | 38 ++++++++++++++++++- .../OpenflowProtocolListenerInitialImpl.java | 32 ++++++++++------ .../connection/ConnectionManagerImplTest.java | 4 ++ .../listener/HandshakeListenerImplTest.java | 5 +++ 6 files changed, 99 insertions(+), 23 deletions(-) diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/ConnectionContextImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/ConnectionContextImpl.java index ce6a51e758..3fecb793c5 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/ConnectionContextImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/ConnectionContextImpl.java @@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory; public class ConnectionContextImpl implements ConnectionContext { private final ConnectionAdapter connectionAdapter; - private CONNECTION_STATE connectionState; + private volatile CONNECTION_STATE connectionState; private FeaturesReply featuresReply; private NodeId nodeId; private DeviceDisconnectedHandler deviceDisconnectedHandler; @@ -175,17 +175,17 @@ public class ConnectionContextImpl implements ConnectionContext { } @Override - public void changeStateToHandshaking() { + public synchronized void changeStateToHandshaking() { connectionState = CONNECTION_STATE.HANDSHAKING; } @Override - public void changeStateToTimeouting() { + public synchronized void changeStateToTimeouting() { connectionState = CONNECTION_STATE.TIMEOUTING; } @Override - public void changeStateToWorking() { + public synchronized void changeStateToWorking() { connectionState = CONNECTION_STATE.WORKING; } diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/listener/ConnectionReadyListenerImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/listener/ConnectionReadyListenerImpl.java index 7c9f0eb6f2..41610e7822 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/listener/ConnectionReadyListenerImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/listener/ConnectionReadyListenerImpl.java @@ -7,6 +7,7 @@ */ package org.opendaylight.openflowplugin.impl.connection.listener; +import java.util.concurrent.Future; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; import org.opendaylight.openflowplugin.api.openflow.connection.HandshakeContext; @@ -40,12 +41,36 @@ public class ConnectionReadyListenerImpl implements ConnectionReadyListener { connectionContext.getConnectionAdapter().getRemoteAddress()); if (connectionContext.getConnectionState() == null) { - HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper( - null, handshakeContext.getHandshakeManager(), connectionContext.getConnectionAdapter()); - handshakeContext.getHandshakePool().execute(handshakeStepWrapper); - connectionContext.changeStateToHandshaking(); + synchronized (connectionContext) { + if (connectionContext.getConnectionState() == null) { + connectionContext.changeStateToHandshaking(); + HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper( + null, handshakeContext.getHandshakeManager(), connectionContext.getConnectionAdapter()); + final Future handshakeResult = handshakeContext.getHandshakePool().submit(handshakeStepWrapper); + + try { + // as we run not in netty thread, need to remain in sync lock until initial handshake step processed + handshakeResult.get(); + } catch (Exception e) { + LOG.warn("failed to process onConnectionReady event on device {}", + connectionContext.getConnectionAdapter().getRemoteAddress(), + e); + connectionContext.closeConnection(false); + try { + handshakeContext.close(); + } catch (Exception e1) { + LOG.info("failed to close handshake context for device {}", + connectionContext.getConnectionAdapter().getRemoteAddress(), + e1 + ); + } + } + } else { + LOG.debug("already touched by hello message from device {}", connectionContext.getConnectionAdapter().getRemoteAddress()); + } + } } else { - LOG.debug("already touched by hello message"); + LOG.debug("already touched by hello message from device {}", connectionContext.getConnectionAdapter().getRemoteAddress()); } } diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/listener/HandshakeListenerImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/listener/HandshakeListenerImpl.java index b17d367564..c8f0600edf 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/listener/HandshakeListenerImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/listener/HandshakeListenerImpl.java @@ -8,13 +8,22 @@ package org.opendaylight.openflowplugin.impl.connection.listener; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.JdkFutureAdapters; +import com.google.common.util.concurrent.ListenableFuture; +import javax.annotation.Nullable; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; import org.opendaylight.openflowplugin.api.openflow.connection.HandshakeContext; import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceConnectedHandler; import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener; import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.SessionStatistics; import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput; +import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,8 +54,33 @@ public class HandshakeListenerImpl implements HandshakeListener { connectionContext.changeStateToWorking(); connectionContext.setFeatures(featureOutput); connectionContext.setNodeId(InventoryDataServiceUtil.nodeIdFromDatapathId(featureOutput.getDatapathId())); - deviceConnectedHandler.deviceConnected(connectionContext); - SessionStatistics.countEvent(connectionContext.getNodeId().toString(), SessionStatistics.ConnectionStatus.CONNECTION_CREATED); + + // fire barrier in order to sweep all handshake and posthandshake messages before continue + final ListenableFuture> barrier = fireBarrier(version, 0L); + Futures.addCallback(barrier, new FutureCallback>() { + @Override + public void onSuccess(@Nullable final RpcResult result) { + LOG.debug("succeeded by getting sweep barrier after posthandshake for device {}", connectionContext.getNodeId()); + deviceConnectedHandler.deviceConnected(connectionContext); + SessionStatistics.countEvent(connectionContext.getNodeId().toString(), + SessionStatistics.ConnectionStatus.CONNECTION_CREATED); + } + + @Override + public void onFailure(final Throwable t) { + LOG.info("failed to get sweep barrier after posthandshake for device {}", connectionContext.getNodeId()); + connectionContext.closeConnection(false); + } + }); + } + + protected ListenableFuture> fireBarrier(final Short version, final long xid) { + final BarrierInput barrierInput = new BarrierInputBuilder() + .setXid(xid) + .setVersion(version) + .build(); + return JdkFutureAdapters.listenInPoolThread( + connectionContext.getConnectionAdapter().barrier(barrierInput)); } @Override diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/listener/OpenflowProtocolListenerInitialImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/listener/OpenflowProtocolListenerInitialImpl.java index f75ee91018..585c58caec 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/listener/OpenflowProtocolListenerInitialImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/listener/OpenflowProtocolListenerInitialImpl.java @@ -72,22 +72,30 @@ public class OpenflowProtocolListenerInitialImpl implements OpenflowProtocolList @Override public void onHelloMessage(final HelloMessage hello) { - LOG.debug("processing HELLO.xid: {}", hello.getXid()); - if (connectionContext.getConnectionState() == null) { - connectionContext.changeStateToHandshaking(); - } - - if (checkState(ConnectionContext.CONNECTION_STATE.HANDSHAKING)) { - final HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper( - hello, handshakeContext.getHandshakeManager(), connectionContext.getConnectionAdapter()); - //handshakeContext.getHandshakePool().submit(handshakeStepWrapper); - // use up netty thread - handshakeStepWrapper.run(); + LOG.debug("processing HELLO.xid: {} from device {}", hello.getXid(), connectionContext.getConnectionAdapter().getRemoteAddress()); + final ConnectionContext.CONNECTION_STATE connectionState = connectionContext.getConnectionState(); + if (connectionState == null + || ConnectionContext.CONNECTION_STATE.HANDSHAKING.equals(connectionState)) { + synchronized (connectionContext) { + if (connectionContext.getConnectionState() == null) { + // got here before connection ready notification + connectionContext.changeStateToHandshaking(); + } + + if (checkState(ConnectionContext.CONNECTION_STATE.HANDSHAKING)) { + final HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper( + hello, handshakeContext.getHandshakeManager(), connectionContext.getConnectionAdapter()); + // use up netty thread + handshakeStepWrapper.run(); + } else { + LOG.debug("already out of handshake phase but still received hello message from device {}", connectionContext.getConnectionAdapter().getRemoteAddress()); + } + } } else { //TODO: consider disconnecting of bad behaving device LOG.warn("Hello message received outside handshake phase: ", hello); + LOG.debug("already touched by onConnectionReady event from device {} (or finished handshake)", connectionContext.getConnectionAdapter().getRemoteAddress()); } - } @Override diff --git a/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/connection/ConnectionManagerImplTest.java b/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/connection/ConnectionManagerImplTest.java index 24bfad6ee0..0f228516af 100644 --- a/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/connection/ConnectionManagerImplTest.java +++ b/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/connection/ConnectionManagerImplTest.java @@ -25,6 +25,8 @@ import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyList import org.opendaylight.openflowplugin.api.OFConstants; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceConnectedHandler; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutputBuilder; @@ -63,6 +65,8 @@ public class ConnectionManagerImplTest { final InetSocketAddress deviceAddress = InetSocketAddress.createUnresolved("yahoo", 42); Mockito.when(connection.getRemoteAddress()).thenReturn(deviceAddress); Mockito.when(connection.isAlive()).thenReturn(true); + Mockito.when(connection.barrier(Matchers.any())) + .thenReturn(RpcResultBuilder.success(new BarrierOutputBuilder().build()).buildFuture()); } /** diff --git a/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/connection/listener/HandshakeListenerImplTest.java b/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/connection/listener/HandshakeListenerImplTest.java index 2fb353223e..73d2a36709 100644 --- a/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/connection/listener/HandshakeListenerImplTest.java +++ b/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/connection/listener/HandshakeListenerImplTest.java @@ -28,8 +28,11 @@ import org.opendaylight.openflowplugin.api.openflow.connection.HandshakeContext; import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceConnectedHandler; import org.opendaylight.openflowplugin.impl.connection.ConnectionContextImpl; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FeaturesReply; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput; +import org.opendaylight.yangtools.yang.common.RpcResultBuilder; /** * Test for {@link HandshakeListenerImpl}. @@ -55,6 +58,8 @@ public class HandshakeListenerImplTest { @Before public void setUp() throws Exception { + Mockito.when(connectionAdapter.barrier(Matchers.any())) + .thenReturn(RpcResultBuilder.success(new BarrierOutputBuilder().build()).buildFuture()); connectionContextSpy = Mockito.spy(new ConnectionContextImpl(connectionAdapter)); Mockito.when(connectionContextSpy.getConnectionAdapter()).thenReturn(connectionAdapter); Mockito.when(features.getDatapathId()).thenReturn(BigInteger.TEN); -- 2.36.6