X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=bgp%2Frib-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fprotocol%2Fbgp%2Frib%2Fimpl%2FBGPSessionImpl.java;h=1bd5a3a2b7ffa6cbc24326f6df57fdace5a7bdde;hb=804cd520199b1e0299952787027febef349d5980;hp=9f42cae002448e189c9deb0a2cb425add83ff61d;hpb=b15edb0dead94af60b59a62699dad44903862bad;p=bgpcep.git diff --git a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java index 9f42cae002..1bd5a3a2b7 100644 --- a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java +++ b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java @@ -18,8 +18,10 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPipeline; import io.netty.channel.SimpleChannelInboundHandler; import java.io.IOException; +import java.nio.channels.NonWritableChannelException; import java.util.Date; import java.util.List; import java.util.Set; @@ -27,17 +29,20 @@ import java.util.concurrent.TimeUnit; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.config.yang.bgp.rib.impl.BgpSessionState; import org.opendaylight.protocol.bgp.parser.AsNumberUtil; +import org.opendaylight.protocol.bgp.parser.BGPDocumentedException; import org.opendaylight.protocol.bgp.parser.BGPError; import org.opendaylight.protocol.bgp.parser.BgpExtendedMessageUtil; import org.opendaylight.protocol.bgp.parser.BgpTableTypeImpl; +import org.opendaylight.protocol.bgp.parser.spi.MultiPathSupport; +import org.opendaylight.protocol.bgp.parser.spi.pojo.MultiPathSupportImpl; import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry; import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences; import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionStatistics; import org.opendaylight.protocol.bgp.rib.spi.BGPSession; import org.opendaylight.protocol.bgp.rib.spi.BGPSessionListener; import org.opendaylight.protocol.bgp.rib.spi.BGPTerminationReason; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.AsNumber; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Ipv4Address; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.AsNumber; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Address; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.Keepalive; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.KeepaliveBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.Notify; @@ -50,9 +55,9 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.mess import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.BgpTableType; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.CParameters1; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.RouteRefresh; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.open.bgp.parameters.optional.capabilities.c.parameters.AddPathCapability; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.open.bgp.parameters.optional.capabilities.c.parameters.MultiprotocolCapability; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.open.bgp.parameters.optional.capabilities.c.parameters.add.path.capability.AddressFamilies; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.mp.capabilities.AddPathCapability; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.mp.capabilities.MultiprotocolCapability; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.mp.capabilities.add.path.capability.AddressFamilies; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.rib.TablesKey; import org.opendaylight.yangtools.yang.binding.Notification; import org.slf4j.Logger; @@ -174,6 +179,13 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im this.tableTypes = tats; this.addPathTypes = addPathCapabilitiesList; + if (! this.addPathTypes.isEmpty()) { + final ChannelPipeline pipeline = this.channel.pipeline(); + final BGPByteToMessageDecoder decoder = pipeline.get(BGPByteToMessageDecoder.class); + decoder.addDecoderConstraint(MultiPathSupport.class, + MultiPathSupportImpl.createParserMultiPathSupport(this.addPathTypes)); + } + if (this.holdTimerValue != 0) { channel.eventLoop().schedule(new Runnable() { @Override @@ -196,9 +208,8 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im @Override public synchronized void close() { - if (this.state != State.IDLE && this.channel.isActive()) { - this.writeAndFlush(new NotifyBuilder().setErrorCode(BGPError.CEASE.getCode()).setErrorSubcode( - BGPError.CEASE.getSubcode()).build()); + if (this.state != State.IDLE) { + this.writeAndFlush(new NotifyBuilder().setErrorCode(BGPError.CEASE.getCode()).setErrorSubcode(BGPError.CEASE.getSubcode()).build()); } this.closeWithoutMessage(); } @@ -208,21 +219,21 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im * * @param msg incoming message */ - public synchronized void handleMessage(final Notification msg) { + synchronized void handleMessage(final Notification msg) throws BGPDocumentedException { // Update last reception time this.lastMessageReceivedAt = System.nanoTime(); this.sessionStats.updateReceivedMsgTotal(); if (msg instanceof Open) { // Open messages should not be present here - this.terminate(BGPError.FSM_ERROR); + this.terminate(new BGPDocumentedException(null, BGPError.FSM_ERROR)); } else if (msg instanceof Notify) { // Notifications are handled internally LOG.info("Session closed because Notification message received: {} / {}", ((Notify) msg).getErrorCode(), ((Notify) msg).getErrorSubcode()); this.closeWithoutMessage(); - this.listener.onSessionTerminated(this, new BGPTerminationReason(BGPError.forValue(((Notify) msg).getErrorCode(), - ((Notify) msg).getErrorSubcode()))); + this.listener.onSessionTerminated(this, new BGPTerminationReason( + BGPError.forValue(((Notify) msg).getErrorCode(), ((Notify) msg).getErrorSubcode()))); this.sessionStats.updateReceivedMsgErr((Notify) msg); } else if (msg instanceof Keepalive) { // Keepalives are handled internally @@ -235,15 +246,16 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im } else if (msg instanceof RouteRefresh) { this.listener.onMessage(this, msg); this.sessionStats.updateReceivedMsgRR(); - } else { - // All others are passed up + } else if (msg instanceof Update) { this.listener.onMessage(this, msg); this.sync.updReceived((Update) msg); this.sessionStats.updateReceivedMsgUpd(); + } else { + LOG.warn("Ignoring unhandled message: {}.", msg.getClass()); } } - public synchronized void endOfInput() { + synchronized void endOfInput() { if (this.state == State.UP) { LOG.info(END_OF_INPUT); this.listener.onSessionDown(this, new IOException(END_OF_INPUT)); @@ -251,18 +263,18 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im } @GuardedBy("this") - private void writeEpilogue(final ChannelFuture future, final Notification msg) { + private ChannelFuture writeEpilogue(final ChannelFuture future, final Notification msg) { future.addListener( - new ChannelFutureListener() { - @Override - public void operationComplete(final ChannelFuture f) { - if (!f.isSuccess()) { - LOG.warn("Failed to send message {} to socket {}", msg, f.cause(), BGPSessionImpl.this.channel); - } else { - LOG.trace("Message {} sent to socket {}", msg, BGPSessionImpl.this.channel); - } + new ChannelFutureListener() { + @Override + public void operationComplete(final ChannelFuture f) { + if (!f.isSuccess()) { + LOG.warn("Failed to send message {} to socket {}", msg, BGPSessionImpl.this.channel, f.cause()); + } else { + LOG.trace("Message {} sent to socket {}", msg, BGPSessionImpl.this.channel); } - }); + } + }); this.lastMessageSentAt = System.nanoTime(); this.sessionStats.updateSentMsgTotal(); if (msg instanceof Update) { @@ -270,6 +282,7 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im } else if (msg instanceof Notify) { this.sessionStats.updateSentMsgErr((Notify) msg); } + return future; } void flush() { @@ -284,25 +297,39 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im } } - synchronized void writeAndFlush(final Notification msg) { - writeEpilogue(this.channel.writeAndFlush(msg), msg); + synchronized ChannelFuture writeAndFlush(final Notification msg) { + if (isWritable()) { + return writeEpilogue(this.channel.writeAndFlush(msg), msg); + } + return this.channel.newFailedFuture(new NonWritableChannelException()); } private synchronized void closeWithoutMessage() { LOG.info("Closing session: {}", this); removePeerSession(); - this.channel.close(); + this.channel.close().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(final ChannelFuture future) throws Exception { + Preconditions.checkArgument(future.isSuccess(), "Channel failed to close: %s", future.cause()); + } + }); this.state = State.IDLE; } /** - * Closes PCEP session from the parent with given reason. A message needs to be sent, but parent doesn't have to be + * Closes BGP session from the parent with given reason. A message needs to be sent, but parent doesn't have to be * modified, because he initiated the closing. (To prevent concurrent modification exception). * - * @param error + * @param e BGPDocumentedException */ - private void terminate(final BGPError error) { - this.writeAndFlush(new NotifyBuilder().setErrorCode(error.getCode()).setErrorSubcode(error.getSubcode()).build()); + private synchronized void terminate(final BGPDocumentedException e) { + final BGPError error = e.getError(); + final byte[] data = e.getData(); + final NotifyBuilder builder = new NotifyBuilder().setErrorCode(error.getCode()).setErrorSubcode(error.getSubcode()); + if (data != null && data.length != 0) { + builder.setData(data); + } + this.writeAndFlush(builder.build()); this.closeWithoutMessage(); this.listener.onSessionTerminated(this, new BGPTerminationReason(error)); @@ -330,7 +357,7 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im if (ct >= nextHold) { LOG.debug("HoldTimer expired. {}", new Date()); - this.terminate(BGPError.HOLD_TIMER_EXPIRED); + this.terminate(new BGPDocumentedException(BGPError.HOLD_TIMER_EXPIRED)); } else { this.channel.eventLoop().schedule(new Runnable() { @Override @@ -424,7 +451,7 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im } @Override - public synchronized BgpSessionState getBgpSesionState() { + public synchronized BgpSessionState getBgpSessionState() { return this.sessionStats.getBgpSessionState(this.state); } @@ -433,7 +460,7 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im this.sessionStats.resetStats(); } - ChannelOutputLimiter getLimiter() { + public ChannelOutputLimiter getLimiter() { return this.limiter; } @@ -452,11 +479,25 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im @Override protected final void channelRead0(final ChannelHandlerContext ctx, final Notification msg) { LOG.debug("Message was received: {}", msg); - this.handleMessage(msg); + try { + this.handleMessage(msg); + } catch (final BGPDocumentedException e) { + this.terminate(e); + } } @Override public final void handlerAdded(final ChannelHandlerContext ctx) { this.sessionUp(); } + + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) { + LOG.warn("BGP session encountered error", cause); + if (cause.getCause() instanceof BGPDocumentedException) { + this.terminate((BGPDocumentedException) cause.getCause()); + } else { + this.close(); + } + } }