private final ChannelOutputLimiter limiter;
private BGPSessionStatsImpl sessionStats;
+ private boolean terminationReasonNotified;
public BGPSessionImpl(final BGPSessionListener listener, final Channel channel, final Open remoteOpen, final BGPSessionPreferences localPreferences,
final BGPPeerRegistry peerRegistry) {
}
if (this.holdTimerValue != 0) {
- channel.eventLoop().schedule(new Runnable() {
- @Override
- public void run() {
- handleHoldTimer();
- }
- }, this.holdTimerValue, TimeUnit.SECONDS);
-
- channel.eventLoop().schedule(new Runnable() {
- @Override
- public void run() {
- handleKeepaliveTimer();
- }
- }, this.keepAlive, TimeUnit.SECONDS);
+ channel.eventLoop().schedule(this::handleHoldTimer, this.holdTimerValue, TimeUnit.SECONDS);
+ channel.eventLoop().schedule(this::handleKeepaliveTimer, this.keepAlive, TimeUnit.SECONDS);
}
this.bgpId = remoteOpen.getBgpIdentifier();
this.sessionStats = new BGPSessionStatsImpl(this, remoteOpen, this.holdTimerValue, this.keepAlive, channel, Optional.<BGPSessionPreferences>absent(),
@Override
public synchronized void close() {
- if (this.state != State.IDLE) {
+ if (this.state != State.IDLE && !this.terminationReasonNotified) {
this.writeAndFlush(new NotifyBuilder().setErrorCode(BGPError.CEASE.getCode()).setErrorSubcode(BGPError.CEASE.getSubcode()).build());
this.closeWithoutMessage();
}
// Notifications are handled internally
LOG.info("Session closed because Notification message received: {} / {}, data={}", notify.getErrorCode(),
notify.getErrorSubcode(), notify.getData() != null ? ByteBufUtil.hexDump(notify.getData()) : null);
- this.closeWithoutMessage();
- this.listener.onSessionTerminated(this, new BGPTerminationReason(
- BGPError.forValue(notify.getErrorCode(), notify.getErrorSubcode())));
+ notifyTerminationReasonAndCloseWithoutMessage(notify.getErrorCode(), notify.getErrorSubcode());
} else if (msg instanceof Keepalive) {
// Keepalives are handled internally
LOG.trace("Received KeepAlive message.");
}
}
+ private synchronized void notifyTerminationReasonAndCloseWithoutMessage(final Short errorCode, final Short errorSubcode) {
+ this.terminationReasonNotified = true;
+ this.listener.onSessionTerminated(this, new BGPTerminationReason(
+ BGPError.forValue(errorCode, errorSubcode)));
+ this.closeWithoutMessage();
+ }
+
synchronized void endOfInput() {
if (this.state == State.UP) {
LOG.info(END_OF_INPUT);
@GuardedBy("this")
private ChannelFuture writeEpilogue(final ChannelFuture future, final Notification msg) {
future.addListener(
- new ChannelFutureListener() {
- @Override
- public void operationComplete(final ChannelFuture f) {
- if (!f.isSuccess()) {
- LOG.warn("Failed to send message {} to socket {}", msg, BGPSessionImpl.this.channel, f.cause());
- } else {
- LOG.trace("Message {} sent to socket {}", msg, BGPSessionImpl.this.channel);
- }
+ (ChannelFutureListener) f -> {
+ if (!f.isSuccess()) {
+ LOG.warn("Failed to send message {} to socket {}", msg, BGPSessionImpl.this.channel, f.cause());
+ } else {
+ LOG.trace("Message {} sent to socket {}", msg, BGPSessionImpl.this.channel);
}
});
this.lastMessageSentAt = System.nanoTime();
return;
}
LOG.info("Closing session: {}", this);
- this.channel.close().addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(final ChannelFuture future) throws Exception {
- Preconditions.checkArgument(future.isSuccess(), "Channel failed to close: %s", future.cause());
- }
- });
+ this.channel.close().addListener((ChannelFutureListener) future -> Preconditions.checkArgument(future.isSuccess(), "Channel failed to close: %s", future.cause()));
this.state = State.IDLE;
removePeerSession();
}
builder.setData(data);
}
this.writeAndFlush(builder.build());
- this.listener.onSessionTerminated(this, new BGPTerminationReason(error));
- this.closeWithoutMessage();
+ notifyTerminationReasonAndCloseWithoutMessage(error.getCode(), error.getSubcode());
}
private void removePeerSession() {
LOG.debug("HoldTimer expired. {}", new Date());
this.terminate(new BGPDocumentedException(BGPError.HOLD_TIMER_EXPIRED));
} else {
- this.channel.eventLoop().schedule(new Runnable() {
- @Override
- public void run() {
- handleHoldTimer();
- }
- }, nextHold - ct, TimeUnit.NANOSECONDS);
+ this.channel.eventLoop().schedule(this::handleHoldTimer, nextHold - ct, TimeUnit.NANOSECONDS);
}
}
nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(this.keepAlive);
this.sessionStats.updateSentMsgKA();
}
- this.channel.eventLoop().schedule(new Runnable() {
- @Override
- public void run() {
- handleKeepaliveTimer();
- }
- }, nextKeepalive - ct, TimeUnit.NANOSECONDS);
+ this.channel.eventLoop().schedule(this::handleKeepaliveTimer, nextKeepalive - ct, TimeUnit.NANOSECONDS);
}
@Override
}
@Override
- public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
+ public synchronized void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
LOG.warn("BGP session encountered error", cause);
if (cause.getCause() instanceof BGPDocumentedException) {
this.terminate((BGPDocumentedException) cause.getCause());