+
+ private void negotiationSuccessful(final BGPSessionImpl session) {
+ LOG.debug("Negotiation on channel {} successful with session {}", this.channel, session);
+ this.channel.pipeline().replace(this, "session", session);
+ this.promise.setSuccess(session);
+ }
+
+ private void negotiationFailedCloseChannel(final Throwable cause) {
+ LOG.debug("Negotiation on channel {} failed", this.channel, cause);
+ this.channel.close();
+ synchronized (AbstractBGPSessionNegotiator.this) {
+ if (this.pending != null && this.pending.isCancellable()) {
+ this.pending.cancel(true);
+ this.pending = null;
+ }
+ }
+ }
+
+ private void sendMessage(final Notification msg) {
+ this.channel.writeAndFlush(msg).addListener((ChannelFutureListener) f -> {
+ if (!f.isSuccess()) {
+ LOG.warn("Failed to send message {} to channel {}", msg, AbstractBGPSessionNegotiator.this.channel, f.cause());
+ negotiationFailedCloseChannel(f.cause());
+ } else {
+ LOG.trace("Message {} sent to channel {}", msg, AbstractBGPSessionNegotiator.this.channel);
+ }
+ });
+ }
+
+ @Override
+ public final void channelActive(final ChannelHandlerContext ctx) {
+ LOG.debug("Starting session negotiation on channel {}", this.channel);
+ startNegotiation();
+ }
+
+ @Override
+ public final void channelRead(final ChannelHandlerContext ctx, final Object msg) {
+ LOG.debug("Negotiation read invoked on channel {}", this.channel);
+ try {
+ handleMessage((Notification) msg);
+ } catch (final Exception e) {
+ LOG.debug("Unexpected error while handling negotiation message {}", msg, e);
+ negotiationFailedCloseChannel(e);
+ }
+
+ }
+
+ @Override
+ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
+ LOG.info("Unexpected error during negotiation", cause);
+ negotiationFailedCloseChannel(cause);
+ }