X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=bgp%2Frib-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fprotocol%2Fbgp%2Frib%2Fimpl%2FBGPSessionImpl.java;h=1bd5a3a2b7ffa6cbc24326f6df57fdace5a7bdde;hb=804cd520199b1e0299952787027febef349d5980;hp=31ba665d1b75ecd1a7df21a58a675a8c29230869;hpb=73aabdae878ddaafc0cc8da602d2146fef263d9b;p=bgpcep.git diff --git a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java index 31ba665d1b..1bd5a3a2b7 100644 --- a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java +++ b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java @@ -21,6 +21,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; import io.netty.channel.SimpleChannelInboundHandler; import java.io.IOException; +import java.nio.channels.NonWritableChannelException; import java.util.Date; import java.util.List; import java.util.Set; @@ -28,6 +29,7 @@ import java.util.concurrent.TimeUnit; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.config.yang.bgp.rib.impl.BgpSessionState; import org.opendaylight.protocol.bgp.parser.AsNumberUtil; +import org.opendaylight.protocol.bgp.parser.BGPDocumentedException; import org.opendaylight.protocol.bgp.parser.BGPError; import org.opendaylight.protocol.bgp.parser.BgpExtendedMessageUtil; import org.opendaylight.protocol.bgp.parser.BgpTableTypeImpl; @@ -39,8 +41,8 @@ import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionStatistics; import org.opendaylight.protocol.bgp.rib.spi.BGPSession; import org.opendaylight.protocol.bgp.rib.spi.BGPSessionListener; import org.opendaylight.protocol.bgp.rib.spi.BGPTerminationReason; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.AsNumber; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Ipv4Address; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.AsNumber; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Address; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.Keepalive; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.KeepaliveBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.Notify; @@ -206,9 +208,8 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im @Override public synchronized void close() { - if (this.state != State.IDLE && this.channel.isActive()) { - this.writeAndFlush(new NotifyBuilder().setErrorCode(BGPError.CEASE.getCode()).setErrorSubcode( - BGPError.CEASE.getSubcode()).build()); + if (this.state != State.IDLE) { + this.writeAndFlush(new NotifyBuilder().setErrorCode(BGPError.CEASE.getCode()).setErrorSubcode(BGPError.CEASE.getSubcode()).build()); } this.closeWithoutMessage(); } @@ -218,21 +219,21 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im * * @param msg incoming message */ - public synchronized void handleMessage(final Notification msg) { + synchronized void handleMessage(final Notification msg) throws BGPDocumentedException { // Update last reception time this.lastMessageReceivedAt = System.nanoTime(); this.sessionStats.updateReceivedMsgTotal(); if (msg instanceof Open) { // Open messages should not be present here - this.terminate(BGPError.FSM_ERROR); + this.terminate(new BGPDocumentedException(null, BGPError.FSM_ERROR)); } else if (msg instanceof Notify) { // Notifications are handled internally LOG.info("Session closed because Notification message received: {} / {}", ((Notify) msg).getErrorCode(), ((Notify) msg).getErrorSubcode()); this.closeWithoutMessage(); - this.listener.onSessionTerminated(this, new BGPTerminationReason(BGPError.forValue(((Notify) msg).getErrorCode(), - ((Notify) msg).getErrorSubcode()))); + this.listener.onSessionTerminated(this, new BGPTerminationReason( + BGPError.forValue(((Notify) msg).getErrorCode(), ((Notify) msg).getErrorSubcode()))); this.sessionStats.updateReceivedMsgErr((Notify) msg); } else if (msg instanceof Keepalive) { // Keepalives are handled internally @@ -254,7 +255,7 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im } } - public synchronized void endOfInput() { + synchronized void endOfInput() { if (this.state == State.UP) { LOG.info(END_OF_INPUT); this.listener.onSessionDown(this, new IOException(END_OF_INPUT)); @@ -264,16 +265,16 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im @GuardedBy("this") private ChannelFuture writeEpilogue(final ChannelFuture future, final Notification msg) { future.addListener( - new ChannelFutureListener() { - @Override - public void operationComplete(final ChannelFuture f) { - if (!f.isSuccess()) { - LOG.warn("Failed to send message {} to socket {}", msg, f.cause(), BGPSessionImpl.this.channel); - } else { - LOG.trace("Message {} sent to socket {}", msg, BGPSessionImpl.this.channel); - } + new ChannelFutureListener() { + @Override + public void operationComplete(final ChannelFuture f) { + if (!f.isSuccess()) { + LOG.warn("Failed to send message {} to socket {}", msg, BGPSessionImpl.this.channel, f.cause()); + } else { + LOG.trace("Message {} sent to socket {}", msg, BGPSessionImpl.this.channel); } - }); + } + }); this.lastMessageSentAt = System.nanoTime(); this.sessionStats.updateSentMsgTotal(); if (msg instanceof Update) { @@ -297,24 +298,38 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im } synchronized ChannelFuture writeAndFlush(final Notification msg) { - return writeEpilogue(this.channel.writeAndFlush(msg), msg); + if (isWritable()) { + return writeEpilogue(this.channel.writeAndFlush(msg), msg); + } + return this.channel.newFailedFuture(new NonWritableChannelException()); } private synchronized void closeWithoutMessage() { LOG.info("Closing session: {}", this); removePeerSession(); - this.channel.close(); + this.channel.close().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(final ChannelFuture future) throws Exception { + Preconditions.checkArgument(future.isSuccess(), "Channel failed to close: %s", future.cause()); + } + }); this.state = State.IDLE; } /** - * Closes PCEP session from the parent with given reason. A message needs to be sent, but parent doesn't have to be + * Closes BGP session from the parent with given reason. A message needs to be sent, but parent doesn't have to be * modified, because he initiated the closing. (To prevent concurrent modification exception). * - * @param error + * @param e BGPDocumentedException */ - private void terminate(final BGPError error) { - this.writeAndFlush(new NotifyBuilder().setErrorCode(error.getCode()).setErrorSubcode(error.getSubcode()).build()); + private synchronized void terminate(final BGPDocumentedException e) { + final BGPError error = e.getError(); + final byte[] data = e.getData(); + final NotifyBuilder builder = new NotifyBuilder().setErrorCode(error.getCode()).setErrorSubcode(error.getSubcode()); + if (data != null && data.length != 0) { + builder.setData(data); + } + this.writeAndFlush(builder.build()); this.closeWithoutMessage(); this.listener.onSessionTerminated(this, new BGPTerminationReason(error)); @@ -342,7 +357,7 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im if (ct >= nextHold) { LOG.debug("HoldTimer expired. {}", new Date()); - this.terminate(BGPError.HOLD_TIMER_EXPIRED); + this.terminate(new BGPDocumentedException(BGPError.HOLD_TIMER_EXPIRED)); } else { this.channel.eventLoop().schedule(new Runnable() { @Override @@ -436,7 +451,7 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im } @Override - public synchronized BgpSessionState getBgpSesionState() { + public synchronized BgpSessionState getBgpSessionState() { return this.sessionStats.getBgpSessionState(this.state); } @@ -445,7 +460,7 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im this.sessionStats.resetStats(); } - ChannelOutputLimiter getLimiter() { + public ChannelOutputLimiter getLimiter() { return this.limiter; } @@ -464,11 +479,25 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im @Override protected final void channelRead0(final ChannelHandlerContext ctx, final Notification msg) { LOG.debug("Message was received: {}", msg); - this.handleMessage(msg); + try { + this.handleMessage(msg); + } catch (final BGPDocumentedException e) { + this.terminate(e); + } } @Override public final void handlerAdded(final ChannelHandlerContext ctx) { this.sessionUp(); } + + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) { + LOG.warn("BGP session encountered error", cause); + if (cause.getCause() instanceof BGPDocumentedException) { + this.terminate((BGPDocumentedException) cause.getCause()); + } else { + this.close(); + } + } }