Use ConcurrentHashMap in TesttoolNegotiationFactory
[netconf.git] / netconf / netconf-netty-util / src / main / java / org / opendaylight / netconf / nettyutil / AbstractNetconfSession.java
index e74a9a10386c2c72be55737cff66d6c7f41647e6..7f1009613af2bee37144fe03a79172f7a7118505 100644 (file)
@@ -7,44 +7,49 @@
  */
 package org.opendaylight.netconf.nettyutil;
 
-import com.siemens.ct.exi.core.exceptions.EXIException;
-import com.siemens.ct.exi.core.exceptions.UnsupportedOption;
+import static java.util.Objects.requireNonNull;
+
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
+import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import io.netty.handler.codec.MessageToByteEncoder;
-import java.io.IOException;
+import java.io.EOFException;
+import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.netconf.api.NetconfExiSession;
-import org.opendaylight.netconf.api.NetconfMessage;
 import org.opendaylight.netconf.api.NetconfSession;
 import org.opendaylight.netconf.api.NetconfSessionListener;
 import org.opendaylight.netconf.api.NetconfTerminationReason;
+import org.opendaylight.netconf.api.messages.NetconfMessage;
 import org.opendaylight.netconf.api.xml.XmlElement;
 import org.opendaylight.netconf.nettyutil.handler.NetconfEXICodec;
 import org.opendaylight.netconf.nettyutil.handler.NetconfEXIToMessageDecoder;
 import org.opendaylight.netconf.nettyutil.handler.NetconfMessageToEXIEncoder;
 import org.opendaylight.netconf.nettyutil.handler.exi.EXIParameters;
-import org.opendaylight.protocol.framework.AbstractProtocolSession;
+import org.opendaylight.netconf.shaded.exificient.core.exceptions.EXIException;
+import org.opendaylight.netconf.shaded.exificient.core.exceptions.UnsupportedOption;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.SessionIdType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class AbstractNetconfSession<S extends NetconfSession,L extends NetconfSessionListener<S>>
-        extends AbstractProtocolSession<NetconfMessage> implements NetconfSession, NetconfExiSession {
+public abstract class AbstractNetconfSession<S extends NetconfSession, L extends NetconfSessionListener<S>>
+        extends SimpleChannelInboundHandler<Object> implements NetconfSession, NetconfExiSession {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfSession.class);
+
+    private final @NonNull SessionIdType sessionId;
     private final L sessionListener;
-    private final long sessionId;
-    private boolean up = false;
+    private final Channel channel;
 
     private ChannelHandler delayedEncoder;
+    private boolean up;
 
-    private final Channel channel;
-
-    protected AbstractNetconfSession(final L sessionListener, final Channel channel, final long sessionId) {
+    protected AbstractNetconfSession(final L sessionListener, final Channel channel, final SessionIdType sessionId) {
         this.sessionListener = sessionListener;
         this.channel = channel;
-        this.sessionId = sessionId;
+        this.sessionId = requireNonNull(sessionId);
         LOG.debug("Session {} created", sessionId);
     }
 
@@ -52,17 +57,21 @@ public abstract class AbstractNetconfSession<S extends NetconfSession,L extends
 
     @Override
     public void close() {
-        channel.close();
         up = false;
+        channel.close();
         sessionListener.onSessionTerminated(thisInstance(), new NetconfTerminationReason("Session closed"));
     }
 
-    @Override
     protected void handleMessage(final NetconfMessage netconfMessage) {
         LOG.debug("handling incoming message");
         sessionListener.onMessage(thisInstance(), netconfMessage);
     }
 
+    protected void handleError(final Exception failure) {
+        LOG.debug("handling incoming error");
+        sessionListener.onError(thisInstance(), failure);
+    }
+
     @Override
     public ChannelFuture sendMessage(final NetconfMessage netconfMessage) {
         // From: https://github.com/netty/netty/issues/3887
@@ -86,27 +95,23 @@ public abstract class AbstractNetconfSession<S extends NetconfSession,L extends
         return promise;
     }
 
-    @Override
     protected void endOfInput() {
-        LOG.debug("Session {} end of input detected while session was in state {}", toString(), isUp() ? "up"
-                : "initialized");
-        if (isUp()) {
-            this.sessionListener.onSessionDown(thisInstance(),
-                    new IOException("End of input detected. Close the session."));
+        LOG.debug("Session {} end of input detected while session was in state {}", this, up ? "up" : "initialized");
+        if (up) {
+            sessionListener.onSessionDown(thisInstance(), new EOFException("End of input"));
         }
     }
 
-    @Override
     protected void sessionUp() {
-        LOG.debug("Session {} up", toString());
+        LOG.debug("Session {} up", this);
         sessionListener.onSessionUp(thisInstance());
-        this.up = true;
+        up = true;
     }
 
     @Override
     public String toString() {
         final StringBuilder sb = new StringBuilder(getClass().getSimpleName() + "{");
-        sb.append("sessionId=").append(sessionId);
+        sb.append("sessionId=").append(sessionId.getValue());
         sb.append(", channel=").append(channel);
         sb.append('}');
         return sb.toString();
@@ -121,7 +126,7 @@ public abstract class AbstractNetconfSession<S extends NetconfSession,L extends
     }
 
     protected final void replaceMessageEncoderAfterNextMessage(final ChannelHandler handler) {
-        this.delayedEncoder = handler;
+        delayedEncoder = handler;
     }
 
     protected final void replaceChannelHandler(final String handlerName, final ChannelHandler handler) {
@@ -164,7 +169,38 @@ public abstract class AbstractNetconfSession<S extends NetconfSession,L extends
         return up;
     }
 
-    public final long getSessionId() {
+    public final @NonNull SessionIdType sessionId() {
         return sessionId;
     }
+
+    @Override
+    @SuppressWarnings("checkstyle:illegalCatch")
+    public final void channelInactive(final ChannelHandlerContext ctx) {
+        LOG.debug("Channel {} inactive.", ctx.channel());
+        endOfInput();
+        try {
+            // Forward channel inactive event, all handlers in pipeline might be interested in the event e.g. close
+            // channel handler of reconnect promise
+            super.channelInactive(ctx);
+        } catch (final Exception e) {
+            throw new IllegalStateException("Failed to delegate channel inactive event on channel " + ctx.channel(), e);
+        }
+    }
+
+    @Override
+    protected final void channelRead0(final ChannelHandlerContext ctx, final Object msg) {
+        LOG.debug("Message was received: {}", msg);
+        if (msg instanceof NetconfMessage message) {
+            handleMessage(message);
+        } else if (msg instanceof Exception failure) {
+            handleError(failure);
+        } else {
+            LOG.warn("Ignoring unexpected message {}", msg);
+        }
+    }
+
+    @Override
+    public final void handlerAdded(final ChannelHandlerContext ctx) {
+        sessionUp();
+    }
 }