Sequence state transitions during negotiation start 67/102767/2
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 18 Oct 2022 19:27:51 +0000 (21:27 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Thu, 20 Oct 2022 09:00:28 +0000 (11:00 +0200)
If we fail to write out the initial proposal, we will end up running
listener actions before other negotiation start transitions -- hence
messing up our state.

Inline sendMessage() and dispatch the listener as a last operation of
start().

JIRA: NETCONF-905
Change-Id: I06b3b266ed2c37ec388aff4abb988f84e6fa3863
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit ea556da8f195af133ef9a4b9b85601af526e9f4c)

netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java

index 4705d32f6a8373805371ccf1fd0b43d8dabbfa8e..088a33ad0e3f70605adb42230bc23bb019de23fe 100644 (file)
@@ -19,6 +19,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.Timeout;
 import io.netty.util.Timer;
+import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.Promise;
 import java.util.concurrent.TimeUnit;
 import org.checkerframework.checker.index.qual.NonNegative;
@@ -143,12 +144,13 @@ public abstract class AbstractNetconfSessionNegotiator<P extends NetconfSessionP
     }
 
     private void start() {
-        final NetconfHelloMessage helloMessage = this.sessionPreferences.getHelloMessage();
-        LOG.debug("Session negotiation started with hello message {} on channel {}", helloMessage, channel);
+        final var localHello = sessionPreferences.getHelloMessage();
+        LOG.debug("Session negotiation started with hello message {} on channel {}", localHello, channel);
 
-        channel.pipeline().addLast(NAME_OF_EXCEPTION_HANDLER, new ExceptionHandlingInboundChannelHandler());
+        // Send the message out, but to not run listeners just yet, as we have some more state transitions to go through
+        final var helloFuture = channel.writeAndFlush(localHello);
 
-        sendMessage(helloMessage);
+        channel.pipeline().addLast(NAME_OF_EXCEPTION_HANDLER, new ExceptionHandlingInboundChannelHandler());
 
         replaceHelloMessageOutboundHandler();
 
@@ -159,6 +161,20 @@ public abstract class AbstractNetconfSessionNegotiator<P extends NetconfSessionP
             timeoutTask = timer.newTimeout(unused -> channel.eventLoop().execute(this::timeoutExpired),
                 connectionTimeoutMillis, TimeUnit.MILLISECONDS);
         }
+
+        // State transition completed, now run any additional processing
+        helloFuture.addListener(this::onHelloWriteComplete);
+    }
+
+    private void onHelloWriteComplete(final Future<?> future) {
+        final var localHello = sessionPreferences.getHelloMessage();
+        final var cause = future.cause();
+        if (cause != null) {
+            LOG.info("Failed to send message {} on channel {}", localHello, channel, cause);
+            negotiationFailed(cause);
+        } else {
+            LOG.trace("Message {} sent to socket on channel {}", localHello, channel);
+        }
     }
 
     private synchronized void timeoutExpired() {
@@ -331,24 +347,6 @@ public abstract class AbstractNetconfSessionNegotiator<P extends NetconfSessionP
         promise.setFailure(cause);
     }
 
-    /**
-     * Send a message to peer and fail negotiation if it does not reach
-     * the peer.
-     *
-     * @param msg Message which should be sent.
-     */
-    protected void sendMessage(final NetconfMessage msg) {
-        channel.writeAndFlush(msg).addListener(f -> {
-            final var cause = f.cause();
-            if (cause != null) {
-                LOG.info("Failed to send message {} on channel {}", msg, channel, cause);
-                negotiationFailed(cause);
-            } else {
-                LOG.trace("Message {} sent to socket on channel {}", msg, channel);
-            }
-        });
-    }
-
     @Override
     @SuppressWarnings("checkstyle:illegalCatch")
     public final void channelActive(final ChannelHandlerContext ctx) {