+
+ private void negotiationSuccessful(final BGPSessionImpl session) {
+ LOG.debug("Negotiation on channel {} successful with session {}", this.channel, session);
+ this.channel.pipeline().replace(this, "session", session);
+ this.promise.setSuccess(session);
+ }
+
+ private void negotiationFailedCloseChannel(final Throwable cause) {
+ LOG.debug("Negotiation on channel {} failed", this.channel, cause);
+ this.channel.close();
+ this.promise.setFailure(cause);
+ }
+
+ private void sendMessage(final Notification msg) {
+ this.channel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(final ChannelFuture f) {
+ if (!f.isSuccess()) {
+ LOG.warn("Failed to send message {}", msg, f.cause());
+ negotiationFailedCloseChannel(f.cause());
+ } else {
+ LOG.trace("Message {} sent to socket", msg);
+ }
+ }
+ });
+ }
+
+ @Override
+ public final void channelActive(final ChannelHandlerContext ctx) {
+ LOG.debug("Starting session negotiation on channel {}", this.channel);
+ startNegotiation();
+ }
+
+ @Override
+ public final void channelRead(final ChannelHandlerContext ctx, final Object msg) {
+ LOG.debug("Negotiation read invoked on channel {}", this.channel);
+ try {
+ handleMessage((Notification) msg);
+ } catch (final Exception e) {
+ LOG.debug("Unexpected error while handling negotiation message {}", msg, e);
+ negotiationFailedCloseChannel(e);
+ }
+
+ }
+
+ @Override
+ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
+ LOG.info("Unexpected error during negotiation", cause);
+ negotiationFailedCloseChannel(cause);
+ }