X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=bgp%2Frib-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fprotocol%2Fbgp%2Frib%2Fimpl%2FBGPSessionImpl.java;h=1bd5a3a2b7ffa6cbc24326f6df57fdace5a7bdde;hb=804cd520199b1e0299952787027febef349d5980;hp=3a68dcecc06ea4d70655a006e6c86dddc5ba9fec;hpb=db472cfbd54adf41841a0bb88cb645dc71e2a5ff;p=bgpcep.git diff --git a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java index 3a68dcecc0..1bd5a3a2b7 100644 --- a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java +++ b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java @@ -7,305 +7,497 @@ */ package org.opendaylight.protocol.bgp.rib.impl; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.MoreObjects; +import com.google.common.base.MoreObjects.ToStringHelper; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import io.netty.channel.Channel; -import io.netty.util.Timeout; -import io.netty.util.Timer; -import io.netty.util.TimerTask; - +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.SimpleChannelInboundHandler; import java.io.IOException; +import java.nio.channels.NonWritableChannelException; import java.util.Date; +import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; - import javax.annotation.concurrent.GuardedBy; - +import org.opendaylight.controller.config.yang.bgp.rib.impl.BgpSessionState; +import org.opendaylight.protocol.bgp.parser.AsNumberUtil; +import org.opendaylight.protocol.bgp.parser.BGPDocumentedException; import org.opendaylight.protocol.bgp.parser.BGPError; -import org.opendaylight.protocol.bgp.parser.BGPSession; -import org.opendaylight.protocol.bgp.parser.BGPSessionListener; -import org.opendaylight.protocol.bgp.parser.BGPTerminationReason; +import org.opendaylight.protocol.bgp.parser.BgpExtendedMessageUtil; import org.opendaylight.protocol.bgp.parser.BgpTableTypeImpl; -import org.opendaylight.protocol.framework.AbstractProtocolSession; +import org.opendaylight.protocol.bgp.parser.spi.MultiPathSupport; +import org.opendaylight.protocol.bgp.parser.spi.pojo.MultiPathSupportImpl; +import org.opendaylight.protocol.bgp.rib.impl.spi.BGPPeerRegistry; +import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences; +import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionStatistics; +import org.opendaylight.protocol.bgp.rib.spi.BGPSession; +import org.opendaylight.protocol.bgp.rib.spi.BGPSessionListener; +import org.opendaylight.protocol.bgp.rib.spi.BGPTerminationReason; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.AsNumber; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Address; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.Keepalive; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.KeepaliveBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.Notify; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.NotifyBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.Open; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.open.BgpParameters; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.open.bgp.parameters.CParameters; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.Update; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.open.message.BgpParameters; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.open.message.bgp.parameters.OptionalCapabilities; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.open.message.bgp.parameters.optional.capabilities.CParameters; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.BgpTableType; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.open.bgp.parameters.c.parameters.MultiprotocolCase; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.CParameters1; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.RouteRefresh; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.mp.capabilities.AddPathCapability; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.mp.capabilities.MultiprotocolCapability; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.mp.capabilities.add.path.capability.AddressFamilies; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.rib.TablesKey; import org.opendaylight.yangtools.yang.binding.Notification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Objects; -import com.google.common.base.Objects.ToStringHelper; -import com.google.common.base.Preconditions; -import com.google.common.collect.Sets; - @VisibleForTesting -public class BGPSessionImpl extends AbstractProtocolSession implements BGPSession { - - private static final Logger LOG = LoggerFactory.getLogger(BGPSessionImpl.class); - - /* - * 240 - */ - private static final int DEFAULT_HOLD_TIMER_VALUE = 15; - - private static final Notification KEEP_ALIVE = new KeepaliveBuilder().build(); - - private static int holdTimerValue = DEFAULT_HOLD_TIMER_VALUE; - - /** - * Internal session state. - */ - public enum State { - /** - * The session object is created by the negotiator in OpenConfirm state. While in this state, the session object - * is half-alive, e.g. the timers are running, but the session is not completely up, e.g. it has not been - * announced to the listener. If the session is torn down in this state, we do not inform the listener. - */ - OpenConfirm, - /** - * The session has been completely established. - */ - Up, - /** - * The session has been closed. It will not be resurrected. - */ - Idle, - } - - /** - * System.nanoTime value about when was sent the last message Protected to be updated also in tests. - */ - @VisibleForTesting - protected long lastMessageSentAt; - - /** - * System.nanoTime value about when was received the last message - */ - private long lastMessageReceivedAt; - - private final BGPSessionListener listener; - - /** - * Timer object grouping FSM Timers - */ - private final Timer stateTimer; - - private final BGPSynchronization sync; - - private int kaCounter = 0; - - private final Channel channel; - - @GuardedBy("this") - private State state = State.OpenConfirm; - - private final int keepAlive; - - private final Set tableTypes; - - BGPSessionImpl(final Timer timer, final BGPSessionListener listener, final Channel channel, final Open remoteOpen) { - this.listener = Preconditions.checkNotNull(listener); - this.stateTimer = Preconditions.checkNotNull(timer); - this.channel = Preconditions.checkNotNull(channel); - this.keepAlive = remoteOpen.getHoldTimer() / 3; - holdTimerValue = remoteOpen.getHoldTimer(); - - final Set tts = Sets.newHashSet(); - final Set tats = Sets.newHashSet(); - if (remoteOpen.getBgpParameters() != null) { - for (final BgpParameters param : remoteOpen.getBgpParameters()) { - if (param instanceof CParameters) { - final CParameters cp = (CParameters) param; - final TablesKey tt = new TablesKey(((MultiprotocolCase) cp).getMultiprotocolCapability().getAfi(), ((MultiprotocolCase) cp).getMultiprotocolCapability().getSafi()); - tts.add(tt); - tats.add(new BgpTableTypeImpl(tt.getAfi(), tt.getSafi())); - } - } - } - - this.sync = new BGPSynchronization(this, this.listener, tts); - this.tableTypes = tats; - - if (remoteOpen.getHoldTimer() != 0) { - this.stateTimer.newTimeout(new TimerTask() { - - @Override - public void run(final Timeout timeout) throws Exception { - handleHoldTimer(); - } - }, remoteOpen.getHoldTimer(), TimeUnit.SECONDS); - - this.stateTimer.newTimeout(new TimerTask() { - @Override - public void run(final Timeout timeout) throws Exception { - handleKeepaliveTimer(); - } - }, this.keepAlive, TimeUnit.SECONDS); - } - } - - @Override - public synchronized void close() { - LOG.debug("Closing session: {}", this); - if (this.state != State.Idle) { - this.sendMessage(new NotifyBuilder().setErrorCode(BGPError.CEASE.getCode()).build()); - this.channel.close(); - this.state = State.Idle; - } - } - - /** - * Handles incoming message based on their type. - * - * @param msg incoming message - */ - @Override - public void handleMessage(final Notification msg) { - // Update last reception time - this.lastMessageReceivedAt = System.nanoTime(); - - if (msg instanceof Open) { - // Open messages should not be present here - this.terminate(BGPError.FSM_ERROR); - } else if (msg instanceof Notify) { - // Notifications are handled internally - LOG.info("Session closed because Notification message received: {} / {}", ((Notify) msg).getErrorCode(), - ((Notify) msg).getErrorSubcode()); - this.closeWithoutMessage(); - this.listener.onSessionTerminated(this, - new BGPTerminationReason(BGPError.forValue(((Notify) msg).getErrorCode(), ((Notify) msg).getErrorSubcode()))); - } else if (msg instanceof Keepalive) { - // Keepalives are handled internally - LOG.debug("Received KeepAlive messsage."); - this.kaCounter++; - if (this.kaCounter >= 2) { - this.sync.kaReceived(); - } - } else { - // All others are passed up - this.listener.onMessage(this, msg); - } - } - - @Override - public synchronized void endOfInput() { - if (this.state == State.Up) { - this.listener.onSessionDown(this, new IOException("End of input detected. Close the session.")); - } - } - - void sendMessage(final Notification msg) { - try { - this.channel.writeAndFlush(msg); - this.lastMessageSentAt = System.nanoTime(); - LOG.debug("Sent message: {}", msg); - } catch (final Exception e) { - LOG.warn("Message {} was not sent.", msg, e); - } - } - - private synchronized void closeWithoutMessage() { - LOG.debug("Closing session: {}", this); - this.channel.close(); - this.state = State.Idle; - } - - /** - * Closes PCEP session from the parent with given reason. A message needs to be sent, but parent doesn't have to be - * modified, because he initiated the closing. (To prevent concurrent modification exception). - * - * @param closeObject - */ - private void terminate(final BGPError error) { - this.sendMessage(new NotifyBuilder().setErrorCode(error.getCode()).setErrorSubcode(error.getSubcode()).build()); - this.closeWithoutMessage(); - - this.listener.onSessionTerminated(this, new BGPTerminationReason(error)); - } - - /** - * If HoldTimer expires, the session ends. If a message (whichever) was received during this period, the HoldTimer - * will be rescheduled by HOLD_TIMER_VALUE + the time that has passed from the start of the HoldTimer to the time at - * which the message was received. If the session was closed by the time this method starts to execute (the session - * state will become IDLE), then rescheduling won't occur. - */ - private synchronized void handleHoldTimer() { - if (this.state == State.Idle) { - return; - } - - final long ct = System.nanoTime(); - final long nextHold = this.lastMessageReceivedAt + TimeUnit.SECONDS.toNanos(holdTimerValue); - - if (ct >= nextHold) { - LOG.debug("HoldTimer expired. " + new Date()); - this.terminate(BGPError.HOLD_TIMER_EXPIRED); - } else { - this.stateTimer.newTimeout(new TimerTask() { - @Override - public void run(final Timeout timeout) throws Exception { - handleHoldTimer(); - } - }, nextHold - ct, TimeUnit.NANOSECONDS); - } - } - - /** - * If KeepAlive Timer expires, sends KeepAlive message. If a message (whichever) was send during this period, the - * KeepAlive Timer will be rescheduled by KEEP_ALIVE_TIMER_VALUE + the time that has passed from the start of the - * KeepAlive timer to the time at which the message was sent. If the session was closed by the time this method - * starts to execute (the session state will become IDLE), that rescheduling won't occur. - */ - private synchronized void handleKeepaliveTimer() { - if (this.state == State.Idle) { - return; - } - - final long ct = System.nanoTime(); - long nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(this.keepAlive); - - if (ct >= nextKeepalive) { - this.sendMessage(KEEP_ALIVE); - nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(this.keepAlive); - } - this.stateTimer.newTimeout(new TimerTask() { - @Override - public void run(final Timeout timeout) throws Exception { - handleKeepaliveTimer(); - } - }, nextKeepalive - ct, TimeUnit.NANOSECONDS); - } - - @Override - public final String toString() { - return addToStringAttributes(Objects.toStringHelper(this)).toString(); - } - - protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) { - toStringHelper.add("channel", this.channel); - toStringHelper.add("state", this.state); - return toStringHelper; - } - - @Override - public Set getAdvertisedTableTypes() { - return this.tableTypes; - } - - @Override - protected synchronized void sessionUp() { - this.state = State.Up; - this.listener.onSessionUp(this); - } - - public synchronized State getState() { - return this.state; - } +public class BGPSessionImpl extends SimpleChannelInboundHandler implements BGPSession, BGPSessionStatistics, AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(BGPSessionImpl.class); + + private static final Notification KEEP_ALIVE = new KeepaliveBuilder().build(); + + private static final int KA_TO_DEADTIMER_RATIO = 3; + + private static final String EXTENDED_MSG_DECODER = "EXTENDED_MSG_DECODER"; + + static final String END_OF_INPUT = "End of input detected. Close the session."; + + /** + * Internal session state. + */ + public enum State { + /** + * The session object is created by the negotiator in OpenConfirm state. While in this state, the session object + * is half-alive, e.g. the timers are running, but the session is not completely up, e.g. it has not been + * announced to the listener. If the session is torn down in this state, we do not inform the listener. + */ + OPEN_CONFIRM, + /** + * The session has been completely established. + */ + UP, + /** + * The session has been closed. It will not be resurrected. + */ + IDLE, + } + + /** + * System.nanoTime value about when was sent the last message. + */ + @VisibleForTesting + private long lastMessageSentAt; + + /** + * System.nanoTime value about when was received the last message + */ + private long lastMessageReceivedAt; + + private final BGPSessionListener listener; + + private final BGPSynchronization sync; + + private int kaCounter = 0; + + private final Channel channel; + + @GuardedBy("this") + private State state = State.OPEN_CONFIRM; + + private final Set tableTypes; + private final List addPathTypes; + private final int holdTimerValue; + private final int keepAlive; + private final AsNumber asNumber; + private final Ipv4Address bgpId; + private final BGPPeerRegistry peerRegistry; + private final ChannelOutputLimiter limiter; + + private BGPSessionStats sessionStats; + + public BGPSessionImpl(final BGPSessionListener listener, final Channel channel, final Open remoteOpen, final BGPSessionPreferences localPreferences, + final BGPPeerRegistry peerRegistry) { + this(listener, channel, remoteOpen, localPreferences.getHoldTime(), peerRegistry); + this.sessionStats = new BGPSessionStats(remoteOpen, this.holdTimerValue, this.keepAlive, channel, Optional.of(localPreferences), this.tableTypes, this.addPathTypes); + } + + public BGPSessionImpl(final BGPSessionListener listener, final Channel channel, final Open remoteOpen, final int localHoldTimer, + final BGPPeerRegistry peerRegistry) { + this.listener = Preconditions.checkNotNull(listener); + this.channel = Preconditions.checkNotNull(channel); + this.limiter = new ChannelOutputLimiter(this); + this.channel.pipeline().addLast(this.limiter); + this.holdTimerValue = (remoteOpen.getHoldTimer() < localHoldTimer) ? remoteOpen.getHoldTimer() : localHoldTimer; + LOG.info("BGP HoldTimer new value: {}", this.holdTimerValue); + this.keepAlive = this.holdTimerValue / KA_TO_DEADTIMER_RATIO; + this.asNumber = AsNumberUtil.advertizedAsNumber(remoteOpen); + this.peerRegistry = peerRegistry; + final boolean enableExMess = BgpExtendedMessageUtil.advertizedBgpExtendedMessageCapability(remoteOpen); + if (enableExMess) { + this.channel.pipeline().replace(BGPMessageHeaderDecoder.class, EXTENDED_MSG_DECODER, BGPMessageHeaderDecoder.getExtendedBGPMessageHeaderDecoder()); + } + + final Set tts = Sets.newHashSet(); + final Set tats = Sets.newHashSet(); + final List addPathCapabilitiesList = Lists.newArrayList(); + if (remoteOpen.getBgpParameters() != null) { + for (final BgpParameters param : remoteOpen.getBgpParameters()) { + for (final OptionalCapabilities optCapa : param.getOptionalCapabilities()) { + final CParameters cParam = optCapa.getCParameters(); + if ( cParam.getAugmentation(CParameters1.class) == null) { + continue; + } + if(cParam.getAugmentation(CParameters1.class).getMultiprotocolCapability() != null) { + final MultiprotocolCapability multi = cParam.getAugmentation(CParameters1.class).getMultiprotocolCapability(); + final TablesKey tt = new TablesKey(multi.getAfi(), multi.getSafi()); + LOG.trace("Added table type to sync {}", tt); + tts.add(tt); + tats.add(new BgpTableTypeImpl(tt.getAfi(), tt.getSafi())); + } else if (cParam.getAugmentation(CParameters1.class).getAddPathCapability() != null) { + final AddPathCapability addPathCap = cParam.getAugmentation(CParameters1.class).getAddPathCapability(); + addPathCapabilitiesList.addAll(addPathCap.getAddressFamilies()); + } + } + } + } + + this.sync = new BGPSynchronization(this.listener, tts); + this.tableTypes = tats; + this.addPathTypes = addPathCapabilitiesList; + + if (! this.addPathTypes.isEmpty()) { + final ChannelPipeline pipeline = this.channel.pipeline(); + final BGPByteToMessageDecoder decoder = pipeline.get(BGPByteToMessageDecoder.class); + decoder.addDecoderConstraint(MultiPathSupport.class, + MultiPathSupportImpl.createParserMultiPathSupport(this.addPathTypes)); + } + + if (this.holdTimerValue != 0) { + channel.eventLoop().schedule(new Runnable() { + @Override + public void run() { + handleHoldTimer(); + } + }, this.holdTimerValue, TimeUnit.SECONDS); + + channel.eventLoop().schedule(new Runnable() { + @Override + public void run() { + handleKeepaliveTimer(); + } + }, this.keepAlive, TimeUnit.SECONDS); + } + this.bgpId = remoteOpen.getBgpIdentifier(); + this.sessionStats = new BGPSessionStats(remoteOpen, this.holdTimerValue, this.keepAlive, channel, Optional.absent(), + this.tableTypes, this.addPathTypes); + } + + @Override + public synchronized void close() { + if (this.state != State.IDLE) { + this.writeAndFlush(new NotifyBuilder().setErrorCode(BGPError.CEASE.getCode()).setErrorSubcode(BGPError.CEASE.getSubcode()).build()); + } + this.closeWithoutMessage(); + } + + /** + * Handles incoming message based on their type. + * + * @param msg incoming message + */ + synchronized void handleMessage(final Notification msg) throws BGPDocumentedException { + // Update last reception time + this.lastMessageReceivedAt = System.nanoTime(); + this.sessionStats.updateReceivedMsgTotal(); + + if (msg instanceof Open) { + // Open messages should not be present here + this.terminate(new BGPDocumentedException(null, BGPError.FSM_ERROR)); + } else if (msg instanceof Notify) { + // Notifications are handled internally + LOG.info("Session closed because Notification message received: {} / {}", ((Notify) msg).getErrorCode(), + ((Notify) msg).getErrorSubcode()); + this.closeWithoutMessage(); + this.listener.onSessionTerminated(this, new BGPTerminationReason( + BGPError.forValue(((Notify) msg).getErrorCode(), ((Notify) msg).getErrorSubcode()))); + this.sessionStats.updateReceivedMsgErr((Notify) msg); + } else if (msg instanceof Keepalive) { + // Keepalives are handled internally + LOG.trace("Received KeepAlive messsage."); + this.kaCounter++; + this.sessionStats.updateReceivedMsgKA(); + if (this.kaCounter >= 2) { + this.sync.kaReceived(); + } + } else if (msg instanceof RouteRefresh) { + this.listener.onMessage(this, msg); + this.sessionStats.updateReceivedMsgRR(); + } else if (msg instanceof Update) { + this.listener.onMessage(this, msg); + this.sync.updReceived((Update) msg); + this.sessionStats.updateReceivedMsgUpd(); + } else { + LOG.warn("Ignoring unhandled message: {}.", msg.getClass()); + } + } + + synchronized void endOfInput() { + if (this.state == State.UP) { + LOG.info(END_OF_INPUT); + this.listener.onSessionDown(this, new IOException(END_OF_INPUT)); + } + } + + @GuardedBy("this") + private ChannelFuture writeEpilogue(final ChannelFuture future, final Notification msg) { + future.addListener( + new ChannelFutureListener() { + @Override + public void operationComplete(final ChannelFuture f) { + if (!f.isSuccess()) { + LOG.warn("Failed to send message {} to socket {}", msg, BGPSessionImpl.this.channel, f.cause()); + } else { + LOG.trace("Message {} sent to socket {}", msg, BGPSessionImpl.this.channel); + } + } + }); + this.lastMessageSentAt = System.nanoTime(); + this.sessionStats.updateSentMsgTotal(); + if (msg instanceof Update) { + this.sessionStats.updateSentMsgUpd(); + } else if (msg instanceof Notify) { + this.sessionStats.updateSentMsgErr((Notify) msg); + } + return future; + } + + void flush() { + this.channel.flush(); + } + + synchronized void write(final Notification msg) { + try { + writeEpilogue(this.channel.write(msg), msg); + } catch (final Exception e) { + LOG.warn("Message {} was not sent.", msg, e); + } + } + + synchronized ChannelFuture writeAndFlush(final Notification msg) { + if (isWritable()) { + return writeEpilogue(this.channel.writeAndFlush(msg), msg); + } + return this.channel.newFailedFuture(new NonWritableChannelException()); + } + + private synchronized void closeWithoutMessage() { + LOG.info("Closing session: {}", this); + removePeerSession(); + this.channel.close().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(final ChannelFuture future) throws Exception { + Preconditions.checkArgument(future.isSuccess(), "Channel failed to close: %s", future.cause()); + } + }); + this.state = State.IDLE; + } + + /** + * Closes BGP session from the parent with given reason. A message needs to be sent, but parent doesn't have to be + * modified, because he initiated the closing. (To prevent concurrent modification exception). + * + * @param e BGPDocumentedException + */ + private synchronized void terminate(final BGPDocumentedException e) { + final BGPError error = e.getError(); + final byte[] data = e.getData(); + final NotifyBuilder builder = new NotifyBuilder().setErrorCode(error.getCode()).setErrorSubcode(error.getSubcode()); + if (data != null && data.length != 0) { + builder.setData(data); + } + this.writeAndFlush(builder.build()); + this.closeWithoutMessage(); + + this.listener.onSessionTerminated(this, new BGPTerminationReason(error)); + } + + private void removePeerSession() { + if (this.peerRegistry != null) { + this.peerRegistry.removePeerSession(StrictBGPPeerRegistry.getIpAddress(this.channel.remoteAddress())); + } + } + + /** + * If HoldTimer expires, the session ends. If a message (whichever) was received during this period, the HoldTimer + * will be rescheduled by HOLD_TIMER_VALUE + the time that has passed from the start of the HoldTimer to the time at + * which the message was received. If the session was closed by the time this method starts to execute (the session + * state will become IDLE), then rescheduling won't occur. + */ + private synchronized void handleHoldTimer() { + if (this.state == State.IDLE) { + return; + } + + final long ct = System.nanoTime(); + final long nextHold = this.lastMessageReceivedAt + TimeUnit.SECONDS.toNanos(this.holdTimerValue); + + if (ct >= nextHold) { + LOG.debug("HoldTimer expired. {}", new Date()); + this.terminate(new BGPDocumentedException(BGPError.HOLD_TIMER_EXPIRED)); + } else { + this.channel.eventLoop().schedule(new Runnable() { + @Override + public void run() { + handleHoldTimer(); + } + }, nextHold - ct, TimeUnit.NANOSECONDS); + } + } + + /** + * If KeepAlive Timer expires, sends KeepAlive message. If a message (whichever) was send during this period, the + * KeepAlive Timer will be rescheduled by KEEP_ALIVE_TIMER_VALUE + the time that has passed from the start of the + * KeepAlive timer to the time at which the message was sent. If the session was closed by the time this method + * starts to execute (the session state will become IDLE), that rescheduling won't occur. + */ + private synchronized void handleKeepaliveTimer() { + if (this.state == State.IDLE) { + return; + } + + final long ct = System.nanoTime(); + long nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(this.keepAlive); + + if (ct >= nextKeepalive) { + this.writeAndFlush(KEEP_ALIVE); + nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(this.keepAlive); + this.sessionStats.updateSentMsgKA(); + } + this.channel.eventLoop().schedule(new Runnable() { + @Override + public void run() { + handleKeepaliveTimer(); + } + }, nextKeepalive - ct, TimeUnit.NANOSECONDS); + } + + @Override + public final String toString() { + return addToStringAttributes(MoreObjects.toStringHelper(this)).toString(); + } + + protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) { + toStringHelper.add("channel", this.channel); + toStringHelper.add("state", this.getState()); + return toStringHelper; + } + + @Override + public Set getAdvertisedTableTypes() { + return this.tableTypes; + } + + @Override + public List getAdvertisedAddPathTableTypes() { + return this.addPathTypes; + } + + protected synchronized void sessionUp() { + this.sessionStats.startSessionStopwatch(); + this.state = State.UP; + this.listener.onSessionUp(this); + } + + public synchronized State getState() { + return this.state; + } + + @Override + public final Ipv4Address getBgpId() { + return this.bgpId; + } + + @Override + public final AsNumber getAsNumber() { + return this.asNumber; + } + + synchronized boolean isWritable() { + return this.channel != null && this.channel.isWritable(); + } + + void schedule(final Runnable task) { + Preconditions.checkState(this.channel != null); + this.channel.eventLoop().submit(task); + } + + @VisibleForTesting + protected synchronized void setLastMessageSentAt(final long lastMessageSentAt) { + this.lastMessageSentAt = lastMessageSentAt; + } + + @Override + public synchronized BgpSessionState getBgpSessionState() { + return this.sessionStats.getBgpSessionState(this.state); + } + + @Override + public synchronized void resetSessionStats() { + this.sessionStats.resetStats(); + } + + public ChannelOutputLimiter getLimiter() { + return this.limiter; + } + + @Override + public final void channelInactive(final ChannelHandlerContext ctx) { + LOG.debug("Channel {} inactive.", ctx.channel()); + this.endOfInput(); + + try { + super.channelInactive(ctx); + } catch (final Exception e) { + throw new IllegalStateException("Failed to delegate channel inactive event on channel " + ctx.channel(), e); + } + } + + @Override + protected final void channelRead0(final ChannelHandlerContext ctx, final Notification msg) { + LOG.debug("Message was received: {}", msg); + try { + this.handleMessage(msg); + } catch (final BGPDocumentedException e) { + this.terminate(e); + } + } + + @Override + public final void handlerAdded(final ChannelHandlerContext ctx) { + this.sessionUp(); + } + + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) { + LOG.warn("BGP session encountered error", cause); + if (cause.getCause() instanceof BGPDocumentedException) { + this.terminate((BGPDocumentedException) cause.getCause()); + } else { + this.close(); + } + } }