From 9e9d31be714733168d07b7af9a80de457fe0b1ef Mon Sep 17 00:00:00 2001 From: "Claudio D. Gasparini" Date: Sat, 18 Jun 2016 23:24:53 +0200 Subject: [PATCH] BUG-6038: Fix race condition when Open message... Fix race condition when Open message was sent by the peer before startNegotiation Fix by start Negotiation and then handle Open message Change-Id: If5aa3cb024eb5cdbf8995d27445735318cb17759 Signed-off-by: Claudio D. Gasparini --- .../impl/AbstractBGPSessionNegotiator.java | 87 ++++++++------- .../protocol/bgp/rib/impl/BGPSessionImpl.java | 40 ++++--- .../bgp/rib/impl/BGPDispatcherImplTest.java | 100 ++++++++++-------- .../bgp/rib/impl/BGPSessionImplTest.java | 15 ++- .../bgp/rib/impl/SimpleSessionListener.java | 43 ++++---- 5 files changed, 149 insertions(+), 136 deletions(-) diff --git a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/AbstractBGPSessionNegotiator.java b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/AbstractBGPSessionNegotiator.java index 4f00c9af2e..413285ce9c 100644 --- a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/AbstractBGPSessionNegotiator.java +++ b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/AbstractBGPSessionNegotiator.java @@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory; * Bgp Session negotiator. Common for local-to-remote and remote-to-local connections. * One difference is session validation performed by injected BGPSessionValidator when OPEN message is received. */ -public abstract class AbstractBGPSessionNegotiator extends ChannelInboundHandlerAdapter implements SessionNegotiator { +abstract class AbstractBGPSessionNegotiator extends ChannelInboundHandlerAdapter implements SessionNegotiator { // 4 minutes recommended in http://tools.ietf.org/html/rfc4271#section-8.2.2 private static final int INITIAL_HOLDTIMER = 4; @@ -81,7 +81,7 @@ public abstract class AbstractBGPSessionNegotiator extends ChannelInboundHandler @GuardedBy("this") private BGPSessionImpl session; - public AbstractBGPSessionNegotiator(final Promise promise, final Channel channel, + AbstractBGPSessionNegotiator(final Promise promise, final Channel channel, final BGPPeerRegistry registry) { this.promise = Preconditions.checkNotNull(promise); this.channel = Preconditions.checkNotNull(channel); @@ -89,40 +89,45 @@ public abstract class AbstractBGPSessionNegotiator extends ChannelInboundHandler } private synchronized void startNegotiation() { + if (!(this.state == State.IDLE || this.state == State.OPEN_CONFIRM)) { + return; + } // Open can be sent first either from ODL (IDLE) or from peer (OPEN_CONFIRM) - Preconditions.checkState(this.state == State.IDLE || this.state == State.OPEN_CONFIRM); final IpAddress remoteIp = getRemoteIp(); + try { + // Check if peer is configured in registry before retrieving preferences + if (!this.registry.isPeerConfigured(remoteIp)) { + final BGPDocumentedException cause = new BGPDocumentedException( + String.format("BGP peer with ip: %s not configured, check configured peers in : %s", remoteIp, this.registry), BGPError.CONNECTION_REJECTED); + negotiationFailed(cause); + return; + } - // Check if peer is configured in registry before retrieving preferences - if (!this.registry.isPeerConfigured(remoteIp)) { - final BGPDocumentedException cause = new BGPDocumentedException( - String.format("BGP peer with ip: %s not configured, check configured peers in : %s", remoteIp, this.registry), BGPError.CONNECTION_REJECTED); - negotiationFailed(cause); - return; - } + final BGPSessionPreferences preferences = this.registry.getPeerPreferences(remoteIp); - final BGPSessionPreferences preferences = this.registry.getPeerPreferences(remoteIp); - - int as = preferences.getMyAs().getValue().intValue(); - // Set as AS_TRANS if the value is bigger than 2B - if (as > Values.UNSIGNED_SHORT_MAX_VALUE) { - as = AS_TRANS; - } - sendMessage(new OpenBuilder().setMyAsNumber(as).setHoldTimer(preferences.getHoldTime()).setBgpIdentifier( + int as = preferences.getMyAs().getValue().intValue(); + // Set as AS_TRANS if the value is bigger than 2B + if (as > Values.UNSIGNED_SHORT_MAX_VALUE) { + as = AS_TRANS; + } + sendMessage(new OpenBuilder().setMyAsNumber(as).setHoldTimer(preferences.getHoldTime()).setBgpIdentifier( preferences.getBgpId()).setBgpParameters(preferences.getParams()).build()); - if (this.state != State.FINISHED) { - this.state = State.OPEN_SENT; - - this.channel.eventLoop().schedule(new Runnable() { - @Override - public void run() { - if (AbstractBGPSessionNegotiator.this.state != State.FINISHED) { - AbstractBGPSessionNegotiator.this.sendMessage(buildErrorNotify(BGPError.HOLD_TIMER_EXPIRED)); - negotiationFailed(new BGPDocumentedException("HoldTimer expired", BGPError.FSM_ERROR)); - AbstractBGPSessionNegotiator.this.state = State.FINISHED; + if (this.state != State.FINISHED) { + this.state = State.OPEN_SENT; + this.channel.eventLoop().schedule(new Runnable() { + @Override + public void run() { + if (AbstractBGPSessionNegotiator.this.state != State.FINISHED) { + AbstractBGPSessionNegotiator.this.sendMessage(buildErrorNotify(BGPError.HOLD_TIMER_EXPIRED)); + negotiationFailed(new BGPDocumentedException("HoldTimer expired", BGPError.FSM_ERROR)); + AbstractBGPSessionNegotiator.this.state = State.FINISHED; + } } - } - }, INITIAL_HOLDTIMER, TimeUnit.MINUTES); + }, INITIAL_HOLDTIMER, TimeUnit.MINUTES); + } + } catch (final Exception e) { + LOG.warn("Unexpected negotiation failure", e); + negotiationFailedCloseChannel(e); } } @@ -139,10 +144,11 @@ public abstract class AbstractBGPSessionNegotiator extends ChannelInboundHandler return; case IDLE: // to avoid race condition when Open message was sent by the peer before startNegotiation could be executed - if (msg instanceof Open) { - handleOpen((Open) msg); - return; - } + if (msg instanceof Open) { + startNegotiation(); + handleOpen((Open) msg); + return; + } sendMessage(buildErrorNotify(BGPError.FSM_ERROR)); return; case OPEN_CONFIRM: @@ -168,7 +174,7 @@ public abstract class AbstractBGPSessionNegotiator extends ChannelInboundHandler // Catch-all for unexpected message LOG.warn("Channel {} state {} unexpected message {}", this.channel, this.state, msg); sendMessage(buildErrorNotify(BGPError.FSM_ERROR)); - negotiationFailed(new BGPDocumentedException("Unexpected message", BGPError.FSM_ERROR)); + negotiationFailed(new BGPDocumentedException("Unexpected message channel: " + this.channel + ", state: " + this.state + ", message: " + msg, BGPError.FSM_ERROR)); this.state = State.FINISHED; } @@ -253,7 +259,6 @@ public abstract class AbstractBGPSessionNegotiator extends ChannelInboundHandler } else { LOG.trace("Message {} sent to socket", msg); } - } }); } @@ -261,20 +266,12 @@ public abstract class AbstractBGPSessionNegotiator extends ChannelInboundHandler @Override public final void channelActive(final ChannelHandlerContext ctx) { LOG.debug("Starting session negotiation on channel {}", this.channel); - - try { - startNegotiation(); - } catch (final Exception e) { - LOG.warn("Unexpected negotiation failure", e); - negotiationFailedCloseChannel(e); - } - + startNegotiation(); } @Override public final void channelRead(final ChannelHandlerContext ctx, final Object msg) { LOG.debug("Negotiation read invoked on channel {}", this.channel); - try { handleMessage((Notification) msg); } catch (final Exception e) { diff --git a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java index 686a3a197b..f708e2de7a 100644 --- a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java +++ b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java @@ -21,6 +21,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; import io.netty.channel.SimpleChannelInboundHandler; import java.io.IOException; +import java.nio.channels.NonWritableChannelException; import java.util.Date; import java.util.List; import java.util.Set; @@ -207,9 +208,8 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im @Override public synchronized void close() { - if (this.state != State.IDLE && this.channel.isActive()) { - this.writeAndFlush(new NotifyBuilder().setErrorCode(BGPError.CEASE.getCode()).setErrorSubcode( - BGPError.CEASE.getSubcode()).build()); + if (this.state != State.IDLE) { + this.writeAndFlush(new NotifyBuilder().setErrorCode(BGPError.CEASE.getCode()).setErrorSubcode(BGPError.CEASE.getSubcode()).build()); } this.closeWithoutMessage(); } @@ -219,7 +219,7 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im * * @param msg incoming message */ - public synchronized void handleMessage(final Notification msg) throws BGPDocumentedException { + synchronized void handleMessage(final Notification msg) throws BGPDocumentedException { // Update last reception time this.lastMessageReceivedAt = System.nanoTime(); this.sessionStats.updateReceivedMsgTotal(); @@ -255,7 +255,7 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im } } - public synchronized void endOfInput() { + synchronized void endOfInput() { if (this.state == State.UP) { LOG.info(END_OF_INPUT); this.listener.onSessionDown(this, new IOException(END_OF_INPUT)); @@ -265,16 +265,16 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im @GuardedBy("this") private ChannelFuture writeEpilogue(final ChannelFuture future, final Notification msg) { future.addListener( - new ChannelFutureListener() { - @Override - public void operationComplete(final ChannelFuture f) { - if (!f.isSuccess()) { - LOG.warn("Failed to send message {} to socket {}", msg, BGPSessionImpl.this.channel, f.cause()); - } else { - LOG.trace("Message {} sent to socket {}", msg, BGPSessionImpl.this.channel); - } + new ChannelFutureListener() { + @Override + public void operationComplete(final ChannelFuture f) { + if (!f.isSuccess()) { + LOG.warn("Failed to send message {} to socket {}", msg, BGPSessionImpl.this.channel, f.cause()); + } else { + LOG.trace("Message {} sent to socket {}", msg, BGPSessionImpl.this.channel); } - }); + } + }); this.lastMessageSentAt = System.nanoTime(); this.sessionStats.updateSentMsgTotal(); if (msg instanceof Update) { @@ -298,13 +298,21 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im } synchronized ChannelFuture writeAndFlush(final Notification msg) { - return writeEpilogue(this.channel.writeAndFlush(msg), msg); + if (isWritable()) { + return writeEpilogue(this.channel.writeAndFlush(msg), msg); + } + return this.channel.newFailedFuture(new NonWritableChannelException()); } private synchronized void closeWithoutMessage() { LOG.info("Closing session: {}", this); removePeerSession(); - this.channel.close(); + this.channel.close().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(final ChannelFuture future) throws Exception { + Preconditions.checkArgument(future.isSuccess(), "Channel failed to close: %s", future.cause()); + } + }); this.state = State.IDLE; } diff --git a/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImplTest.java b/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImplTest.java index b24fdd7b74..7dfc6cd823 100755 --- a/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImplTest.java +++ b/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImplTest.java @@ -9,11 +9,11 @@ package org.opendaylight.protocol.bgp.rib.impl; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; -import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; @@ -27,6 +27,7 @@ import org.junit.Test; import org.opendaylight.protocol.bgp.parser.BGPDocumentedException; import org.opendaylight.protocol.bgp.parser.BgpExtendedMessageUtil; import org.opendaylight.protocol.bgp.parser.BgpTableTypeImpl; +import org.opendaylight.protocol.bgp.parser.spi.BGPExtensionProviderContext; import org.opendaylight.protocol.bgp.parser.spi.pojo.ServiceLoaderBGPExtensionProviderContext; import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry; import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences; @@ -48,75 +49,82 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.type public class BGPDispatcherImplTest { - private static final InetSocketAddress ADDRESS = new InetSocketAddress("127.0.10.0", 1790); - private static final InetSocketAddress CLIENT_ADDRESS = new InetSocketAddress("127.0.11.0", 1791); - private static final InetSocketAddress CLIENT_ADDRESS2 = new InetSocketAddress("127.0.12.0", 1792); private static final AsNumber AS_NUMBER = new AsNumber(30L); - private static final int TIMEOUT = 5000; private static final int RETRY_TIMER = 10; - - private final BgpTableType ipv4tt = new BgpTableTypeImpl(Ipv4AddressFamily.class, UnicastSubsequentAddressFamily.class); - - private BGPDispatcherImpl dispatcher; + private static final BgpTableType IPV_4_TT = new BgpTableTypeImpl(Ipv4AddressFamily.class, UnicastSubsequentAddressFamily.class); + private BGPDispatcherImpl serverDispatcher; private TestClientDispatcher clientDispatcher; - private BGPPeerRegistry registry; - - private Channel channel; + private SimpleSessionListener clientListener; + private SimpleSessionListener serverListener; @Before public void setUp() throws BGPDocumentedException { - final EventLoopGroup group = new NioEventLoopGroup(); this.registry = new StrictBGPPeerRegistry(); - this.registry.addPeer(new IpAddress(new Ipv4Address(CLIENT_ADDRESS.getAddress().getHostAddress())), - new SimpleSessionListener(), createPreferences(CLIENT_ADDRESS)); - this.registry.addPeer(new IpAddress(new Ipv4Address(ADDRESS.getAddress().getHostAddress())), - new SimpleSessionListener(), createPreferences(ADDRESS)); - this.dispatcher = new BGPDispatcherImpl(ServiceLoaderBGPExtensionProviderContext.getSingletonInstance().getMessageRegistry(), group, group); - this.clientDispatcher = new TestClientDispatcher(group, group, ServiceLoaderBGPExtensionProviderContext.getSingletonInstance().getMessageRegistry(), - CLIENT_ADDRESS); + this.clientListener = new SimpleSessionListener(); + final BGPExtensionProviderContext ctx = ServiceLoaderBGPExtensionProviderContext.getSingletonInstance(); + this.serverDispatcher = new BGPDispatcherImpl(ctx.getMessageRegistry(), new NioEventLoopGroup(), new NioEventLoopGroup()); + configureClient(ctx); + } - final ChannelFuture future = this.dispatcher.createServer(this.registry, ADDRESS); + private void configureClient(final BGPExtensionProviderContext ctx) { + final InetSocketAddress clientAddress = new InetSocketAddress("127.0.11.0", 1791); + final IpAddress clientPeerIp = new IpAddress(new Ipv4Address(clientAddress.getAddress().getHostAddress())); + this.registry.addPeer(clientPeerIp, this.clientListener, createPreferences(clientAddress)); + this.clientDispatcher = new TestClientDispatcher(new NioEventLoopGroup(), new NioEventLoopGroup(), ctx.getMessageRegistry(), clientAddress); + } + + private Channel createServer(final InetSocketAddress serverAddress) throws InterruptedException { + this.serverListener = new SimpleSessionListener(); + this.registry.addPeer(new IpAddress(new Ipv4Address(serverAddress.getAddress().getHostAddress())), this.serverListener, createPreferences(serverAddress)); + final ChannelFuture future = this.serverDispatcher.createServer(this.registry, serverAddress); future.addListener(new GenericFutureListener>() { @Override public void operationComplete(final Future future) { - if(!future.isSuccess()) { - Assert.fail("Failed to create server."); - } + Preconditions.checkArgument(future.isSuccess(), "Unable to start bgp server on %s", future.cause()); } }); - this.channel = future.channel(); + return future.channel(); + } + + @After + public void tearDown() throws Exception { + this.serverDispatcher.close(); + this.registry.close(); } @Test public void testCreateClient() throws InterruptedException, ExecutionException { - final BGPSessionImpl session = this.clientDispatcher.createClient(ADDRESS, this.registry, - 0, Optional.absent()).get(); - Assert.assertEquals(BGPSessionImpl.State.UP, session.getState()); + final InetSocketAddress serverAddress = new InetSocketAddress("127.0.10.0", 1790); + final Channel serverChannel = createServer(serverAddress); + Thread.sleep(1000); + final BGPSessionImpl session = this.clientDispatcher.createClient(serverAddress, this.registry, 0, Optional.absent()).get(); + Thread.sleep(3000); + Assert.assertEquals(BGPSessionImpl.State.UP, this.clientListener.getState()); + Assert.assertEquals(BGPSessionImpl.State.UP, this.serverListener.getState()); Assert.assertEquals(AS_NUMBER, session.getAsNumber()); - Assert.assertEquals(Sets.newHashSet(this.ipv4tt), session.getAdvertisedTableTypes()); + Assert.assertEquals(Sets.newHashSet(IPV_4_TT), session.getAdvertisedTableTypes()); + Assert.assertTrue(serverChannel.isWritable()); session.close(); - } - @After - public void tearDown() throws Exception { - this.channel.close().get(); - this.dispatcher.close(); - this.registry.close(); + Thread.sleep(3000); + Assert.assertEquals(BGPSessionImpl.State.IDLE, this.clientListener.getState()); + Assert.assertEquals(BGPSessionImpl.State.IDLE, this.serverListener.getState()); } @Test - public void testCreateReconnectingClient() throws InterruptedException, ExecutionException { - final SimpleSessionListener listener = new SimpleSessionListener(); - this.registry.addPeer(new IpAddress(new Ipv4Address(CLIENT_ADDRESS2.getAddress().getHostAddress())), listener, createPreferences(CLIENT_ADDRESS2)); - final Future cf = this.clientDispatcher.createReconnectingClient(CLIENT_ADDRESS2, this.registry, - RETRY_TIMER, Optional.absent()); - final Channel channel2 = this.dispatcher.createServer(this.registry, CLIENT_ADDRESS2).channel(); + public void testCreateReconnectingClient() throws Exception { + final InetSocketAddress serverAddress = new InetSocketAddress("127.0.20.0", 1792); + final Channel serverChannel = createServer(serverAddress); Thread.sleep(1000); - Assert.assertTrue(listener.up); - Assert.assertTrue(channel2.isActive()); - cf.cancel(true); - listener.releaseConnection(); + final Future future = this.clientDispatcher.createReconnectingClient(serverAddress, this.registry, RETRY_TIMER, Optional.absent()); + Thread.sleep(3000); + Assert.assertEquals(BGPSessionImpl.State.UP, this.serverListener.getState()); + Assert.assertTrue(serverChannel.isWritable()); + future.cancel(true); + this.serverListener.releaseConnection(); + Thread.sleep(3000); + Assert.assertEquals(BGPSessionImpl.State.IDLE, this.serverListener.getState()); } private BGPSessionPreferences createPreferences(final InetSocketAddress socketAddress) { @@ -124,7 +132,7 @@ public class BGPDispatcherImplTest { final List capas = Lists.newArrayList(); capas.add(new OptionalCapabilitiesBuilder().setCParameters(new CParametersBuilder().addAugmentation( CParameters1.class, new CParameters1Builder().setMultiprotocolCapability(new MultiprotocolCapabilityBuilder() - .setAfi(this.ipv4tt.getAfi()).setSafi(this.ipv4tt.getSafi()).build()).build()) + .setAfi(IPV_4_TT.getAfi()).setSafi(IPV_4_TT.getSafi()).build()).build()) .setAs4BytesCapability(new As4BytesCapabilityBuilder().setAsNumber(new AsNumber(30L)).build()) .build()).build()); capas.add(new OptionalCapabilitiesBuilder().setCParameters(BgpExtendedMessageUtil.EXTENDED_MESSAGE_CAPABILITY).build()); diff --git a/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImplTest.java b/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImplTest.java index 411f1465e1..e694f03b37 100644 --- a/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImplTest.java +++ b/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImplTest.java @@ -136,13 +136,16 @@ public class BGPSessionImplTest { } }).when(this.eventLoop).schedule(Mockito.any(Runnable.class), Mockito.any(long.class), Mockito.any(TimeUnit.class)); doReturn("TestingChannel").when(this.speakerListener).toString(); + doReturn(true).when(this.speakerListener).isWritable(); doReturn(new InetSocketAddress(InetAddress.getByName(BGP_ID.getValue()), 179)).when(this.speakerListener).remoteAddress(); doReturn(new InetSocketAddress(InetAddress.getByName(LOCAL_IP), LOCAL_PORT)).when(this.speakerListener).localAddress(); doReturn(this.pipeline).when(this.speakerListener).pipeline(); doReturn(this.pipeline).when(this.pipeline).replace(Mockito.any(ChannelHandler.class), Mockito.any(String.class), Mockito.any(ChannelHandler.class)); doReturn(null).when(this.pipeline).replace(Matchers.>any(), Mockito.any(String.class), Mockito.any(ChannelHandler.class)); doReturn(this.pipeline).when(this.pipeline).addLast(Mockito.any(ChannelHandler.class)); - doReturn(mock(ChannelFuture.class)).when(this.speakerListener).close(); + final ChannelFuture futureChannel = mock(ChannelFuture.class); + doReturn(null).when(futureChannel).addListener(Mockito.>>any()); + doReturn(futureChannel).when(this.speakerListener).close(); this.listener = new SimpleSessionListener(); this.bgpSession = new BGPSessionImpl(this.listener, this.speakerListener, this.classicOpen, this.classicOpen.getHoldTimer(), null); } @@ -154,7 +157,8 @@ public class BGPSessionImplTest { assertEquals(AS_NUMBER, this.bgpSession.getAsNumber()); assertEquals(BGP_ID, this.bgpSession.getBgpId()); assertEquals(1, this.bgpSession.getAdvertisedTableTypes().size()); - assertTrue(this.listener.up); + Assert.assertEquals(BGPSessionImpl.State.UP, this.listener.getState()); + //test stats final BgpSessionState state = this.bgpSession.getBgpSessionState(); assertEquals(HOLD_TIMER, state.getHoldtimeCurrent().intValue()); @@ -232,11 +236,12 @@ public class BGPSessionImplTest { } @Test - public void testEndOfInput() { + public void testEndOfInput() throws InterruptedException { this.bgpSession.sessionUp(); - Assert.assertFalse(this.listener.down); + Assert.assertEquals(BGPSessionImpl.State.UP, this.listener.getState()); this.bgpSession.endOfInput(); - Assert.assertTrue(this.listener.down); + Thread.sleep(3000); + Assert.assertEquals(BGPSessionImpl.State.IDLE, this.listener.getState()); } @Test diff --git a/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/SimpleSessionListener.java b/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/SimpleSessionListener.java index c7e3d3aaca..d860d72a3b 100644 --- a/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/SimpleSessionListener.java +++ b/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/SimpleSessionListener.java @@ -20,42 +20,38 @@ import org.slf4j.LoggerFactory; /** * Listener for the client. */ -public class SimpleSessionListener implements BGPSessionListener { - - private final List listMsg = Lists.newArrayList(); - - public boolean up = false; +public final class SimpleSessionListener implements BGPSessionListener { private static final Logger LOG = LoggerFactory.getLogger(SimpleSessionListener.class); - - public boolean down = false; - + private final List listMsg = Lists.newArrayList(); private BGPSession session; - public SimpleSessionListener() { + SimpleSessionListener() { } - public List getListMsg() { + List getListMsg() { return this.listMsg; } @Override - public void onMessage(final BGPSession session, final Notification message) { - this.listMsg.add(message); - LOG.debug("Message received: {}", message); + public boolean isSessionActive() { + return ((BGPSessionImpl) this.session).isWritable(); + } + + @Override + public void markUptodate(final TablesKey tablesKey) { + LOG.debug("Table marked as up-to-date {}", tablesKey); } @Override public void onSessionUp(final BGPSession session) { LOG.debug("Session Up"); this.session = session; - this.up = true; } @Override public void onSessionDown(final BGPSession session, final Exception e) { LOG.debug("Session Down", e); - this.down = true; } @Override @@ -63,6 +59,12 @@ public class SimpleSessionListener implements BGPSessionListener { LOG.debug("Session terminated. Cause : {}", cause.toString()); } + @Override + public void onMessage(final BGPSession session, final Notification message) { + this.listMsg.add(message); + LOG.debug("Message received: {}", message); + } + @Override public void releaseConnection() { LOG.debug("Releasing connection"); @@ -72,17 +74,10 @@ public class SimpleSessionListener implements BGPSessionListener { } catch (final Exception e) { LOG.warn("Error closing session", e); } - this.session = null; } } - @Override - public boolean isSessionActive() { - return true; - } - - @Override - public void markUptodate(final TablesKey tablesKey) { - LOG.debug("Table marked as up-to-date {}", tablesKey); + BGPSessionImpl.State getState() { + return ((BGPSessionImpl) this.session).getState(); } } -- 2.36.6