X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=bgp%2Frib-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fprotocol%2Fbgp%2Frib%2Fimpl%2FBGPSessionImpl.java;h=e77c61eebaa2d23b3f48462202c9715fb4f2570e;hb=20f0ead22adb474614a5efc83555db5627a0c27e;hp=0a04aaa28d39efb0c011f2cd3e1ab693469b84c5;hpb=feb71f5bbccac2d766a68c28d805043e5faa046d;p=bgpcep.git diff --git a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java index 0a04aaa28d..e77c61eeba 100644 --- a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java +++ b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java @@ -12,30 +12,39 @@ import com.google.common.base.MoreObjects; import com.google.common.base.MoreObjects.ToStringHelper; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import io.netty.buffer.ByteBufUtil; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPipeline; import io.netty.channel.SimpleChannelInboundHandler; import java.io.IOException; +import java.nio.channels.NonWritableChannelException; import java.util.Date; +import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.config.yang.bgp.rib.impl.BgpSessionState; import org.opendaylight.protocol.bgp.parser.AsNumberUtil; +import org.opendaylight.protocol.bgp.parser.BGPDocumentedException; import org.opendaylight.protocol.bgp.parser.BGPError; import org.opendaylight.protocol.bgp.parser.BgpExtendedMessageUtil; import org.opendaylight.protocol.bgp.parser.BgpTableTypeImpl; +import org.opendaylight.protocol.bgp.parser.spi.MultiPathSupport; +import org.opendaylight.protocol.bgp.parser.spi.pojo.MultiPathSupportImpl; import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry; import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences; -import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionStatistics; +import org.opendaylight.protocol.bgp.rib.impl.stats.peer.BGPSessionStats; +import org.opendaylight.protocol.bgp.rib.impl.stats.peer.BGPSessionStatsImpl; import org.opendaylight.protocol.bgp.rib.spi.BGPSession; import org.opendaylight.protocol.bgp.rib.spi.BGPSessionListener; import org.opendaylight.protocol.bgp.rib.spi.BGPTerminationReason; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.AsNumber; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Ipv4Address; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.AsNumber; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Address; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.Keepalive; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.KeepaliveBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.Notify; @@ -48,14 +57,16 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.mess import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.BgpTableType; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.CParameters1; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.RouteRefresh; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.open.bgp.parameters.optional.capabilities.c.parameters.MultiprotocolCapability; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.mp.capabilities.AddPathCapability; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.mp.capabilities.MultiprotocolCapability; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.mp.capabilities.add.path.capability.AddressFamilies; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.rib.TablesKey; import org.opendaylight.yangtools.yang.binding.Notification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @VisibleForTesting -public class BGPSessionImpl extends SimpleChannelInboundHandler implements BGPSession, BGPSessionStatistics, AutoCloseable { +public class BGPSessionImpl extends SimpleChannelInboundHandler implements BGPSession, BGPSessionStats, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(BGPSessionImpl.class); @@ -110,6 +121,7 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im private State state = State.OPEN_CONFIRM; private final Set tableTypes; + private final List addPathTypes; private final int holdTimerValue; private final int keepAlive; private final AsNumber asNumber; @@ -117,12 +129,12 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im private final BGPPeerRegistry peerRegistry; private final ChannelOutputLimiter limiter; - private BGPSessionStats sessionStats; + private BGPSessionStatsImpl sessionStats; public BGPSessionImpl(final BGPSessionListener listener, final Channel channel, final Open remoteOpen, final BGPSessionPreferences localPreferences, final BGPPeerRegistry peerRegistry) { this(listener, channel, remoteOpen, localPreferences.getHoldTime(), peerRegistry); - this.sessionStats = new BGPSessionStats(remoteOpen, this.holdTimerValue, this.keepAlive, channel, Optional.of(localPreferences), this.tableTypes); + this.sessionStats = new BGPSessionStatsImpl(this, remoteOpen, this.holdTimerValue, this.keepAlive, channel, Optional.of(localPreferences), this.tableTypes, this.addPathTypes); } public BGPSessionImpl(final BGPSessionListener listener, final Channel channel, final Open remoteOpen, final int localHoldTimer, @@ -136,32 +148,41 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im this.keepAlive = this.holdTimerValue / KA_TO_DEADTIMER_RATIO; this.asNumber = AsNumberUtil.advertizedAsNumber(remoteOpen); this.peerRegistry = peerRegistry; - final boolean enableExMess = BgpExtendedMessageUtil.advertizedBgpExtendedMessageCapability(remoteOpen); - if (enableExMess) { - this.channel.pipeline().replace(BGPMessageHeaderDecoder.class, EXTENDED_MSG_DECODER, BGPMessageHeaderDecoder.getExtendedBGPMessageHeaderDecoder()); - } final Set tts = Sets.newHashSet(); final Set tats = Sets.newHashSet(); + final List addPathCapabilitiesList = Lists.newArrayList(); if (remoteOpen.getBgpParameters() != null) { for (final BgpParameters param : remoteOpen.getBgpParameters()) { for (final OptionalCapabilities optCapa : param.getOptionalCapabilities()) { final CParameters cParam = optCapa.getCParameters(); - if ( cParam.getAugmentation(CParameters1.class) == null || - cParam.getAugmentation(CParameters1.class).getMultiprotocolCapability() == null ) { + if ( cParam.getAugmentation(CParameters1.class) == null) { continue; } - final MultiprotocolCapability multi = cParam.getAugmentation(CParameters1.class).getMultiprotocolCapability(); - final TablesKey tt = new TablesKey(multi.getAfi(), multi.getSafi()); - LOG.trace("Added table type to sync {}", tt); - tts.add(tt); - tats.add(new BgpTableTypeImpl(tt.getAfi(), tt.getSafi())); + if(cParam.getAugmentation(CParameters1.class).getMultiprotocolCapability() != null) { + final MultiprotocolCapability multi = cParam.getAugmentation(CParameters1.class).getMultiprotocolCapability(); + final TablesKey tt = new TablesKey(multi.getAfi(), multi.getSafi()); + LOG.trace("Added table type to sync {}", tt); + tts.add(tt); + tats.add(new BgpTableTypeImpl(tt.getAfi(), tt.getSafi())); + } else if (cParam.getAugmentation(CParameters1.class).getAddPathCapability() != null) { + final AddPathCapability addPathCap = cParam.getAugmentation(CParameters1.class).getAddPathCapability(); + addPathCapabilitiesList.addAll(addPathCap.getAddressFamilies()); + } } } } this.sync = new BGPSynchronization(this.listener, tts); this.tableTypes = tats; + this.addPathTypes = addPathCapabilitiesList; + + if (! this.addPathTypes.isEmpty()) { + final ChannelPipeline pipeline = this.channel.pipeline(); + final BGPByteToMessageDecoder decoder = pipeline.get(BGPByteToMessageDecoder.class); + decoder.addDecoderConstraint(MultiPathSupport.class, + MultiPathSupportImpl.createParserMultiPathSupport(this.addPathTypes)); + } if (this.holdTimerValue != 0) { channel.eventLoop().schedule(new Runnable() { @@ -179,17 +200,34 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im }, this.keepAlive, TimeUnit.SECONDS); } this.bgpId = remoteOpen.getBgpIdentifier(); - this.sessionStats = new BGPSessionStats(remoteOpen, this.holdTimerValue, this.keepAlive, channel, Optional.absent(), - this.tableTypes); + this.sessionStats = new BGPSessionStatsImpl(this, remoteOpen, this.holdTimerValue, this.keepAlive, channel, Optional.absent(), + this.tableTypes, this.addPathTypes); + } + + /** + * Set the extend message coder for current channel + * The reason for separating this part from constructor is, in #channel.pipeline().replace(..), the + * invokeChannelRead() will be invoked after the original message coder handler got removed. And there + * is chance that before the session instance is fully initiated (constructor returns), a KeepAlive + * message arrived already in the channel buffer. Thus #AbstractBGPSessionNegotiator.handleMessage(..) + * gets invoked again and a deadlock is caused. A BGP final state machine error will happen as BGP + * negotiator is still in OPEN_SENT state as the session constructor hasn't returned yet. + * + * @param remoteOpen + */ + public synchronized void setChannelExtMsgCoder(final Open remoteOpen) { + final boolean enableExMess = BgpExtendedMessageUtil.advertizedBgpExtendedMessageCapability(remoteOpen); + if (enableExMess) { + this.channel.pipeline().replace(BGPMessageHeaderDecoder.class, EXTENDED_MSG_DECODER, BGPMessageHeaderDecoder.getExtendedBGPMessageHeaderDecoder()); + } } @Override public synchronized void close() { - if (this.state != State.IDLE && this.channel.isActive()) { - this.writeAndFlush(new NotifyBuilder().setErrorCode(BGPError.CEASE.getCode()).setErrorSubcode( - BGPError.CEASE.getSubcode()).build()); + if (this.state != State.IDLE) { + this.writeAndFlush(new NotifyBuilder().setErrorCode(BGPError.CEASE.getCode()).setErrorSubcode(BGPError.CEASE.getSubcode()).build()); + this.closeWithoutMessage(); } - this.closeWithoutMessage(); } /** @@ -197,42 +235,49 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im * * @param msg incoming message */ - public synchronized void handleMessage(final Notification msg) { - // Update last reception time - this.lastMessageReceivedAt = System.nanoTime(); - this.sessionStats.updateReceivedMsgTotal(); - - if (msg instanceof Open) { - // Open messages should not be present here - this.terminate(BGPError.FSM_ERROR); - } else if (msg instanceof Notify) { - // Notifications are handled internally - LOG.info("Session closed because Notification message received: {} / {}", ((Notify) msg).getErrorCode(), - ((Notify) msg).getErrorSubcode()); - this.closeWithoutMessage(); - this.listener.onSessionTerminated(this, new BGPTerminationReason(BGPError.forValue(((Notify) msg).getErrorCode(), - ((Notify) msg).getErrorSubcode()))); - this.sessionStats.updateReceivedMsgErr((Notify) msg); - } else if (msg instanceof Keepalive) { - // Keepalives are handled internally - LOG.trace("Received KeepAlive messsage."); - this.kaCounter++; - this.sessionStats.updateReceivedMsgKA(); - if (this.kaCounter >= 2) { - this.sync.kaReceived(); + synchronized void handleMessage(final Notification msg) { + if (this.state == State.IDLE) { + return; + } + try { + // Update last reception time + this.lastMessageReceivedAt = System.nanoTime(); + + if (msg instanceof Open) { + // Open messages should not be present here + this.terminate(new BGPDocumentedException(null, BGPError.FSM_ERROR)); + } else if (msg instanceof Notify) { + final Notify notify = (Notify) msg; + // Notifications are handled internally + LOG.info("Session closed because Notification message received: {} / {}, data={}", notify.getErrorCode(), + notify.getErrorSubcode(), notify.getData() != null ? ByteBufUtil.hexDump(notify.getData()) : null); + this.closeWithoutMessage(); + this.listener.onSessionTerminated(this, new BGPTerminationReason( + BGPError.forValue(notify.getErrorCode(), notify.getErrorSubcode()))); + } else if (msg instanceof Keepalive) { + // Keepalives are handled internally + LOG.trace("Received KeepAlive message."); + this.kaCounter++; + if (this.kaCounter >= 2) { + this.sync.kaReceived(); + } + } else if (msg instanceof RouteRefresh) { + this.listener.onMessage(this, msg); + } else if (msg instanceof Update) { + this.listener.onMessage(this, msg); + this.sync.updReceived((Update) msg); + } else { + LOG.warn("Ignoring unhandled message: {}.", msg.getClass()); } - } else if (msg instanceof RouteRefresh) { - this.listener.onMessage(this, msg); - this.sessionStats.updateReceivedMsgRR(); - } else { - // All others are passed up - this.listener.onMessage(this, msg); - this.sync.updReceived((Update) msg); - this.sessionStats.updateReceivedMsgUpd(); + + this.sessionStats.updateReceivedMsg(msg); + + } catch (final BGPDocumentedException e) { + this.terminate(e); } } - public synchronized void endOfInput() { + synchronized void endOfInput() { if (this.state == State.UP) { LOG.info(END_OF_INPUT); this.listener.onSessionDown(this, new IOException(END_OF_INPUT)); @@ -240,25 +285,21 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im } @GuardedBy("this") - private void writeEpilogue(final ChannelFuture future, final Notification msg) { + private ChannelFuture writeEpilogue(final ChannelFuture future, final Notification msg) { future.addListener( - new ChannelFutureListener() { - @Override - public void operationComplete(final ChannelFuture f) { - if (!f.isSuccess()) { - LOG.warn("Failed to send message {} to socket {}", msg, f.cause(), BGPSessionImpl.this.channel); - } else { - LOG.trace("Message {} sent to socket {}", msg, BGPSessionImpl.this.channel); - } + new ChannelFutureListener() { + @Override + public void operationComplete(final ChannelFuture f) { + if (!f.isSuccess()) { + LOG.warn("Failed to send message {} to socket {}", msg, BGPSessionImpl.this.channel, f.cause()); + } else { + LOG.trace("Message {} sent to socket {}", msg, BGPSessionImpl.this.channel); } - }); + } + }); this.lastMessageSentAt = System.nanoTime(); - this.sessionStats.updateSentMsgTotal(); - if (msg instanceof Update) { - this.sessionStats.updateSentMsgUpd(); - } else if (msg instanceof Notify) { - this.sessionStats.updateSentMsgErr((Notify) msg); - } + this.sessionStats.updateSentMsg(msg); + return future; } void flush() { @@ -273,28 +314,44 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im } } - synchronized void writeAndFlush(final Notification msg) { - writeEpilogue(this.channel.writeAndFlush(msg), msg); + synchronized ChannelFuture writeAndFlush(final Notification msg) { + if (isWritable()) { + return writeEpilogue(this.channel.writeAndFlush(msg), msg); + } + return this.channel.newFailedFuture(new NonWritableChannelException()); } private synchronized void closeWithoutMessage() { + if (this.state == State.IDLE) { + return; + } LOG.info("Closing session: {}", this); - removePeerSession(); - this.channel.close(); + this.channel.close().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(final ChannelFuture future) throws Exception { + Preconditions.checkArgument(future.isSuccess(), "Channel failed to close: %s", future.cause()); + } + }); this.state = State.IDLE; + removePeerSession(); } /** - * Closes PCEP session from the parent with given reason. A message needs to be sent, but parent doesn't have to be + * Closes BGP session from the parent with given reason. A message needs to be sent, but parent doesn't have to be * modified, because he initiated the closing. (To prevent concurrent modification exception). * - * @param error + * @param e BGPDocumentedException */ - private void terminate(final BGPError error) { - this.writeAndFlush(new NotifyBuilder().setErrorCode(error.getCode()).setErrorSubcode(error.getSubcode()).build()); - this.closeWithoutMessage(); - + private synchronized void terminate(final BGPDocumentedException e) { + final BGPError error = e.getError(); + final byte[] data = e.getData(); + final NotifyBuilder builder = new NotifyBuilder().setErrorCode(error.getCode()).setErrorSubcode(error.getSubcode()); + if (data != null && data.length != 0) { + builder.setData(data); + } + this.writeAndFlush(builder.build()); this.listener.onSessionTerminated(this, new BGPTerminationReason(error)); + this.closeWithoutMessage(); } private void removePeerSession() { @@ -319,7 +376,7 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im if (ct >= nextHold) { LOG.debug("HoldTimer expired. {}", new Date()); - this.terminate(BGPError.HOLD_TIMER_EXPIRED); + this.terminate(new BGPDocumentedException(BGPError.HOLD_TIMER_EXPIRED)); } else { this.channel.eventLoop().schedule(new Runnable() { @Override @@ -373,6 +430,11 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im return this.tableTypes; } + @Override + public List getAdvertisedAddPathTableTypes() { + return this.addPathTypes; + } + protected synchronized void sessionUp() { this.sessionStats.startSessionStopwatch(); this.state = State.UP; @@ -397,27 +459,17 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im return this.channel != null && this.channel.isWritable(); } - void schedule(final Runnable task) { - Preconditions.checkState(this.channel != null); - this.channel.eventLoop().submit(task); - } - - @VisibleForTesting - protected synchronized void setLastMessageSentAt(final long lastMessageSentAt) { - this.lastMessageSentAt = lastMessageSentAt; - } - @Override - public synchronized BgpSessionState getBgpSesionState() { - return this.sessionStats.getBgpSessionState(this.state); + public synchronized BgpSessionState getBgpSessionState() { + return this.sessionStats.getBgpSessionState(); } @Override - public synchronized void resetSessionStats() { - this.sessionStats.resetStats(); + public synchronized void resetBgpSessionStats() { + this.sessionStats.resetBgpSessionStats(); } - ChannelOutputLimiter getLimiter() { + public ChannelOutputLimiter getLimiter() { return this.limiter; } @@ -443,4 +495,14 @@ public class BGPSessionImpl extends SimpleChannelInboundHandler im public final void handlerAdded(final ChannelHandlerContext ctx) { this.sessionUp(); } + + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) { + LOG.warn("BGP session encountered error", cause); + if (cause.getCause() instanceof BGPDocumentedException) { + this.terminate((BGPDocumentedException) cause.getCause()); + } else { + this.close(); + } + } }