+
+ protected final void negotiationSuccessful(final S session) {
+ LOG.debug("Negotiation on channel {} successful with session {}", channel, session);
+ channel.pipeline().replace(this, "session", session);
+ promise.setSuccess(session);
+ }
+
+ protected void negotiationFailed(final Throwable cause) {
+ LOG.debug("Negotiation on channel {} failed", channel, cause);
+ channel.close();
+ promise.setFailure(cause);
+ }
+
+ /**
+ * Send a message to peer and fail negotiation if it does not reach
+ * the peer.
+ *
+ * @param msg Message which should be sent.
+ */
+ protected final void sendMessage(final NetconfMessage msg) {
+ this.channel.writeAndFlush(msg).addListener(f -> {
+ if (!f.isSuccess()) {
+ LOG.info("Failed to send message {} on channel {}", msg, channel, f.cause());
+ negotiationFailed(f.cause());
+ } else {
+ LOG.trace("Message {} sent to socket on channel {}", msg, channel);
+ }
+ });
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:illegalCatch")
+ public final void channelActive(final ChannelHandlerContext ctx) {
+ LOG.debug("Starting session negotiation on channel {}", channel);
+ try {
+ startNegotiation();
+ } catch (final Exception e) {
+ LOG.warn("Unexpected negotiation failure on channel {}", channel, e);
+ negotiationFailed(e);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:illegalCatch")
+ public final void channelRead(final ChannelHandlerContext ctx, final Object msg) {
+ LOG.debug("Negotiation read invoked on channel {}", channel);
+ try {
+ handleMessage((NetconfHelloMessage) msg);
+ } catch (final Exception e) {
+ LOG.debug("Unexpected error while handling negotiation message {} on channel {}", msg, channel, e);
+ negotiationFailed(e);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
+ LOG.info("Unexpected error during negotiation on channel {}", channel, cause);
+ negotiationFailed(cause);
+ }
+
+ protected abstract void handleMessage(NetconfHelloMessage msg) throws Exception;