*/
package org.opendaylight.netconf.nettyutil;
+import static java.util.Objects.requireNonNull;
+
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
-import java.io.IOException;
+import java.io.EOFException;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.netconf.api.NetconfExiSession;
-import org.opendaylight.netconf.api.NetconfMessage;
import org.opendaylight.netconf.api.NetconfSession;
import org.opendaylight.netconf.api.NetconfSessionListener;
import org.opendaylight.netconf.api.NetconfTerminationReason;
+import org.opendaylight.netconf.api.messages.NetconfMessage;
import org.opendaylight.netconf.api.xml.XmlElement;
import org.opendaylight.netconf.nettyutil.handler.NetconfEXICodec;
import org.opendaylight.netconf.nettyutil.handler.NetconfEXIToMessageDecoder;
import org.opendaylight.netconf.nettyutil.handler.exi.EXIParameters;
import org.opendaylight.netconf.shaded.exificient.core.exceptions.EXIException;
import org.opendaylight.netconf.shaded.exificient.core.exceptions.UnsupportedOption;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.SessionIdType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class AbstractNetconfSession<S extends NetconfSession,L extends NetconfSessionListener<S>>
+public abstract class AbstractNetconfSession<S extends NetconfSession, L extends NetconfSessionListener<S>>
extends SimpleChannelInboundHandler<Object> implements NetconfSession, NetconfExiSession {
-
private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfSession.class);
+
+ private final @NonNull SessionIdType sessionId;
private final L sessionListener;
- private final long sessionId;
- private boolean up = false;
+ private final Channel channel;
private ChannelHandler delayedEncoder;
+ private boolean up;
- private final Channel channel;
-
- protected AbstractNetconfSession(final L sessionListener, final Channel channel, final long sessionId) {
+ protected AbstractNetconfSession(final L sessionListener, final Channel channel, final SessionIdType sessionId) {
this.sessionListener = sessionListener;
this.channel = channel;
- this.sessionId = sessionId;
+ this.sessionId = requireNonNull(sessionId);
LOG.debug("Session {} created", sessionId);
}
@Override
public void close() {
- channel.close();
up = false;
+ channel.close();
sessionListener.onSessionTerminated(thisInstance(), new NetconfTerminationReason("Session closed"));
}
sessionListener.onMessage(thisInstance(), netconfMessage);
}
+ protected void handleError(final Exception failure) {
+ LOG.debug("handling incoming error");
+ sessionListener.onError(thisInstance(), failure);
+ }
+
@Override
public ChannelFuture sendMessage(final NetconfMessage netconfMessage) {
// From: https://github.com/netty/netty/issues/3887
}
});
+ // FIXME: NETCONF-1106: this is a workaround for netconf-server's NetconfSubsystem using EmbeddedChannel instead
+ // of correctly integrating with the underlying transport channel
+ if (channel instanceof EmbeddedChannel embeddedChannel) {
+ // Embedded event loop implementation has no executor, it requires explicit invocation to process
+ synchronized (channel) {
+ embeddedChannel.runPendingTasks();
+ }
+ }
return promise;
}
protected void endOfInput() {
- LOG.debug("Session {} end of input detected while session was in state {}", toString(), isUp() ? "up"
- : "initialized");
- if (isUp()) {
- this.sessionListener.onSessionDown(thisInstance(),
- new IOException("End of input detected. Close the session."));
+ LOG.debug("Session {} end of input detected while session was in state {}", this, up ? "up" : "initialized");
+ if (up) {
+ sessionListener.onSessionDown(thisInstance(), new EOFException("End of input"));
}
}
protected void sessionUp() {
- LOG.debug("Session {} up", toString());
+ LOG.debug("Session {} up", this);
sessionListener.onSessionUp(thisInstance());
- this.up = true;
+ up = true;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(getClass().getSimpleName() + "{");
- sb.append("sessionId=").append(sessionId);
+ sb.append("sessionId=").append(sessionId.getValue());
sb.append(", channel=").append(channel);
sb.append('}');
return sb.toString();
}
protected final void replaceMessageEncoderAfterNextMessage(final ChannelHandler handler) {
- this.delayedEncoder = handler;
+ delayedEncoder = handler;
}
protected final void replaceChannelHandler(final String handlerName, final ChannelHandler handler) {
return up;
}
- public final long getSessionId() {
+ public final @NonNull SessionIdType sessionId() {
return sessionId;
}
// channel handler of reconnect promise
super.channelInactive(ctx);
} catch (final Exception e) {
- throw new RuntimeException("Failed to delegate channel inactive event on channel " + ctx.channel(), e);
+ throw new IllegalStateException("Failed to delegate channel inactive event on channel " + ctx.channel(), e);
}
}
@Override
protected final void channelRead0(final ChannelHandlerContext ctx, final Object msg) {
LOG.debug("Message was received: {}", msg);
- handleMessage((NetconfMessage) msg);
+ if (msg instanceof NetconfMessage message) {
+ handleMessage(message);
+ } else if (msg instanceof Exception failure) {
+ handleError(failure);
+ } else {
+ LOG.warn("Ignoring unexpected message {}", msg);
+ }
}
@Override