NetconfClientSessionNegotiator negotiator = createNetconfClientSessionNegotiator(promise, null);
negotiator.channelActive(null);
+ doReturn(null).when(future).cause();
negotiator.handleMessage(NetconfHelloMessage.createServerHello(Set.of("a", "b"), 10));
verify(promise).setSuccess(any());
}
doReturn(promise).when(promise).setSuccess(any());
NetconfClientSessionNegotiator negotiator = createNetconfClientSessionNegotiator(promise, null);
+ doReturn(null).when(future).cause();
negotiator.handleMessage(NetconfHelloMessage.createServerHello(Set.of("a", "b"), 10));
negotiator.channelActive(null);
verify(promise).setSuccess(any());
doReturn(promise).when(promise).setSuccess(any());
NetconfClientSessionNegotiator negotiator = createNetconfClientSessionNegotiator(promise, exiMessage);
+ doReturn(null).when(future).cause();
negotiator.channelActive(null);
doAnswer(invocationOnMock -> {
}
private void start() {
- LOG.debug("Session negotiation started with hello message {} on channel {}", localHello, channel);
+ LOG.debug("Sending negotiation proposal {} on channel {}", localHello, channel);
// Send the message out, but to not run listeners just yet, as we have some more state transitions to go through
final var helloFuture = channel.writeAndFlush(localHello);
+ // Quick check: if the future has already failed we call it quits before negotiation even started
+ final var helloCause = helloFuture.cause();
+ if (helloCause != null) {
+ LOG.warn("Failed to send negotiation proposal on channel {}", channel, helloCause);
+ failAndClose();
+ return;
+ }
+
channel.pipeline().addLast(NAME_OF_EXCEPTION_HANDLER, new ExceptionHandlingInboundChannelHandler());
replaceHelloMessageOutboundHandler();
connectionTimeoutMillis, TimeUnit.MILLISECONDS);
}
+ LOG.debug("Session negotiation started on channel {}", channel);
+
// State transition completed, now run any additional processing
helloFuture.addListener(this::onHelloWriteComplete);
}
if (!promise.isDone() && !promise.isCancelled()) {
LOG.warn("Netconf session backed by channel {} was not established after {}", channel,
connectionTimeoutMillis);
- changeState(State.FAILED);
-
- channel.close().addListener(future -> {
- final var cause = future.cause();
- if (cause != null) {
- LOG.warn("Channel {} closed: fail", channel, cause);
- } else {
- LOG.debug("Channel {} closed: success", channel);
- }
- });
+ failAndClose();
}
} else if (channel.isOpen()) {
channel.pipeline().remove(NAME_OF_EXCEPTION_HANDLER);
}
}
+ private void failAndClose() {
+ changeState(State.FAILED);
+ channel.close().addListener(this::onChannelClosed);
+ }
+
+ private void onChannelClosed(final Future<?> future) {
+ final var cause = future.cause();
+ if (cause != null) {
+ LOG.warn("Channel {} closed: fail", channel, cause);
+ } else {
+ LOG.debug("Channel {} closed: success", channel);
+ }
+ }
+
private synchronized void cancelTimeout() {
if (timeoutTask != null && !timeoutTask.cancel()) {
// Late-coming cancel: make sure the task does not actually run
}
private static boolean isStateChangePermitted(final State state, final State newState) {
- if (state == State.IDLE && newState == State.OPEN_WAIT) {
+ if (state == State.IDLE && (newState == State.OPEN_WAIT || newState == State.FAILED)) {
return true;
}
if (state == State.OPEN_WAIT && (newState == State.ESTABLISHED || newState == State.FAILED)) {