Fix NetconfMessage not sent on EmbeddedChannel
[netconf.git] / netconf / netconf-netty-util / src / main / java / org / opendaylight / netconf / nettyutil / AbstractNetconfSession.java
index 045b6d6d3e3d5b881ffa430530668a1d187ceb25..3eaf83156f3494677196a20b808ae36a0e2588b4 100644 (file)
@@ -7,20 +7,24 @@
  */
 package org.opendaylight.netconf.nettyutil;
 
+import static java.util.Objects.requireNonNull;
+
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.embedded.EmbeddedChannel;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import io.netty.handler.codec.MessageToByteEncoder;
-import java.io.IOException;
+import java.io.EOFException;
+import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.netconf.api.NetconfExiSession;
-import org.opendaylight.netconf.api.NetconfMessage;
 import org.opendaylight.netconf.api.NetconfSession;
 import org.opendaylight.netconf.api.NetconfSessionListener;
 import org.opendaylight.netconf.api.NetconfTerminationReason;
+import org.opendaylight.netconf.api.messages.NetconfMessage;
 import org.opendaylight.netconf.api.xml.XmlElement;
 import org.opendaylight.netconf.nettyutil.handler.NetconfEXICodec;
 import org.opendaylight.netconf.nettyutil.handler.NetconfEXIToMessageDecoder;
@@ -28,25 +32,25 @@ import org.opendaylight.netconf.nettyutil.handler.NetconfMessageToEXIEncoder;
 import org.opendaylight.netconf.nettyutil.handler.exi.EXIParameters;
 import org.opendaylight.netconf.shaded.exificient.core.exceptions.EXIException;
 import org.opendaylight.netconf.shaded.exificient.core.exceptions.UnsupportedOption;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.SessionIdType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class AbstractNetconfSession<S extends NetconfSession,L extends NetconfSessionListener<S>>
+public abstract class AbstractNetconfSession<S extends NetconfSession, L extends NetconfSessionListener<S>>
         extends SimpleChannelInboundHandler<Object> implements NetconfSession, NetconfExiSession {
-
     private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfSession.class);
+
+    private final @NonNull SessionIdType sessionId;
     private final L sessionListener;
-    private final long sessionId;
-    private boolean up = false;
+    private final Channel channel;
 
     private ChannelHandler delayedEncoder;
+    private boolean up;
 
-    private final Channel channel;
-
-    protected AbstractNetconfSession(final L sessionListener, final Channel channel, final long sessionId) {
+    protected AbstractNetconfSession(final L sessionListener, final Channel channel, final SessionIdType sessionId) {
         this.sessionListener = sessionListener;
         this.channel = channel;
-        this.sessionId = sessionId;
+        this.sessionId = requireNonNull(sessionId);
         LOG.debug("Session {} created", sessionId);
     }
 
@@ -54,8 +58,8 @@ public abstract class AbstractNetconfSession<S extends NetconfSession,L extends
 
     @Override
     public void close() {
-        channel.close();
         up = false;
+        channel.close();
         sessionListener.onSessionTerminated(thisInstance(), new NetconfTerminationReason("Session closed"));
     }
 
@@ -64,6 +68,11 @@ public abstract class AbstractNetconfSession<S extends NetconfSession,L extends
         sessionListener.onMessage(thisInstance(), netconfMessage);
     }
 
+    protected void handleError(final Exception failure) {
+        LOG.debug("handling incoming error");
+        sessionListener.onError(thisInstance(), failure);
+    }
+
     @Override
     public ChannelFuture sendMessage(final NetconfMessage netconfMessage) {
         // From: https://github.com/netty/netty/issues/3887
@@ -84,28 +93,34 @@ public abstract class AbstractNetconfSession<S extends NetconfSession,L extends
             }
         });
 
+        // FIXME: NETCONF-1106: this is a workaround for netconf-server's NetconfSubsystem using EmbeddedChannel instead
+        //                      of correctly integrating with the underlying transport channel
+        if (channel instanceof EmbeddedChannel embeddedChannel) {
+            // Embedded event loop implementation has no executor, it requires explicit invocation to process
+            synchronized (channel) {
+                embeddedChannel.runPendingTasks();
+            }
+        }
         return promise;
     }
 
     protected void endOfInput() {
-        LOG.debug("Session {} end of input detected while session was in state {}", toString(), isUp() ? "up"
-                : "initialized");
-        if (isUp()) {
-            this.sessionListener.onSessionDown(thisInstance(),
-                    new IOException("End of input detected. Close the session."));
+        LOG.debug("Session {} end of input detected while session was in state {}", this, up ? "up" : "initialized");
+        if (up) {
+            sessionListener.onSessionDown(thisInstance(), new EOFException("End of input"));
         }
     }
 
     protected void sessionUp() {
-        LOG.debug("Session {} up", toString());
+        LOG.debug("Session {} up", this);
         sessionListener.onSessionUp(thisInstance());
-        this.up = true;
+        up = true;
     }
 
     @Override
     public String toString() {
         final StringBuilder sb = new StringBuilder(getClass().getSimpleName() + "{");
-        sb.append("sessionId=").append(sessionId);
+        sb.append("sessionId=").append(sessionId.getValue());
         sb.append(", channel=").append(channel);
         sb.append('}');
         return sb.toString();
@@ -120,7 +135,7 @@ public abstract class AbstractNetconfSession<S extends NetconfSession,L extends
     }
 
     protected final void replaceMessageEncoderAfterNextMessage(final ChannelHandler handler) {
-        this.delayedEncoder = handler;
+        delayedEncoder = handler;
     }
 
     protected final void replaceChannelHandler(final String handlerName, final ChannelHandler handler) {
@@ -163,7 +178,7 @@ public abstract class AbstractNetconfSession<S extends NetconfSession,L extends
         return up;
     }
 
-    public final long getSessionId() {
+    public final @NonNull SessionIdType sessionId() {
         return sessionId;
     }
 
@@ -177,14 +192,20 @@ public abstract class AbstractNetconfSession<S extends NetconfSession,L extends
             // channel handler of reconnect promise
             super.channelInactive(ctx);
         } catch (final Exception e) {
-            throw new RuntimeException("Failed to delegate channel inactive event on channel " + ctx.channel(), e);
+            throw new IllegalStateException("Failed to delegate channel inactive event on channel " + ctx.channel(), e);
         }
     }
 
     @Override
     protected final void channelRead0(final ChannelHandlerContext ctx, final Object msg) {
         LOG.debug("Message was received: {}", msg);
-        handleMessage((NetconfMessage) msg);
+        if (msg instanceof NetconfMessage message) {
+            handleMessage(message);
+        } else if (msg instanceof Exception failure) {
+            handleError(failure);
+        } else {
+            LOG.warn("Ignoring unexpected message {}", msg);
+        }
     }
 
     @Override