Sequence state transitions during negotiation start 50/102750/5
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 18 Oct 2022 19:27:51 +0000 (21:27 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Wed, 19 Oct 2022 00:31:42 +0000 (02:31 +0200)
If we fail to write out the initial proposal, we will end up running
listener actions before other negotiation start transitions -- hence
messing up our state.

Inline sendMessage() and dispatch the listener as a last operation of
start().

JIRA: NETCONF-905
Change-Id: I06b3b266ed2c37ec388aff4abb988f84e6fa3863
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java

index 13b4d32edcf7775616b06aa2eec739dd043f1577..88b280d1a87d366acc09a431ecc8115fbe0e8826 100644 (file)
@@ -19,6 +19,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.Timeout;
 import io.netty.util.Timer;
+import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.Promise;
 import java.util.concurrent.TimeUnit;
 import org.checkerframework.checker.index.qual.NonNegative;
@@ -145,9 +146,10 @@ public abstract class AbstractNetconfSessionNegotiator<S extends AbstractNetconf
     private void start() {
         LOG.debug("Session negotiation started with hello message {} on channel {}", localHello, channel);
 
-        channel.pipeline().addLast(NAME_OF_EXCEPTION_HANDLER, new ExceptionHandlingInboundChannelHandler());
+        // Send the message out, but to not run listeners just yet, as we have some more state transitions to go through
+        final var helloFuture = channel.writeAndFlush(localHello);
 
-        sendMessage(localHello);
+        channel.pipeline().addLast(NAME_OF_EXCEPTION_HANDLER, new ExceptionHandlingInboundChannelHandler());
 
         replaceHelloMessageOutboundHandler();
 
@@ -158,6 +160,19 @@ public abstract class AbstractNetconfSessionNegotiator<S extends AbstractNetconf
             timeoutTask = timer.newTimeout(unused -> channel.eventLoop().execute(this::timeoutExpired),
                 connectionTimeoutMillis, TimeUnit.MILLISECONDS);
         }
+
+        // State transition completed, now run any additional processing
+        helloFuture.addListener(this::onHelloWriteComplete);
+    }
+
+    private void onHelloWriteComplete(final Future<?> future) {
+        final var cause = future.cause();
+        if (cause != null) {
+            LOG.info("Failed to send message {} on channel {}", localHello, channel, cause);
+            negotiationFailed(cause);
+        } else {
+            LOG.trace("Message {} sent to socket on channel {}", localHello, channel);
+        }
     }
 
     private synchronized void timeoutExpired() {
@@ -328,24 +343,6 @@ public abstract class AbstractNetconfSessionNegotiator<S extends AbstractNetconf
         promise.setFailure(cause);
     }
 
-    /**
-     * Send a message to peer and fail negotiation if it does not reach
-     * the peer.
-     *
-     * @param msg Message which should be sent.
-     */
-    protected void sendMessage(final NetconfMessage msg) {
-        channel.writeAndFlush(msg).addListener(f -> {
-            final var cause = f.cause();
-            if (cause != null) {
-                LOG.info("Failed to send message {} on channel {}", msg, channel, cause);
-                negotiationFailed(cause);
-            } else {
-                LOG.trace("Message {} sent to socket on channel {}", msg, channel);
-            }
-        });
-    }
-
     @Override
     @SuppressWarnings("checkstyle:illegalCatch")
     public final void channelActive(final ChannelHandlerContext ctx) {