X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fnetconf-netty-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Fnettyutil%2FAbstractNetconfSession.java;h=3eaf83156f3494677196a20b808ae36a0e2588b4;hb=6aabbf6024ab3c9d3fc4562a3dea0b79e73b8142;hp=fdcd7e6479a91f8ef2b69b7ec3ec771730f4e701;hpb=647e61052064693a422d48be8046272934916ac9;p=netconf.git diff --git a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSession.java b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSession.java index fdcd7e6479..3eaf83156f 100644 --- a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSession.java +++ b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSession.java @@ -7,46 +7,50 @@ */ package org.opendaylight.netconf.nettyutil; -import com.siemens.ct.exi.exceptions.EXIException; -import com.siemens.ct.exi.exceptions.UnsupportedOption; +import static java.util.Objects.requireNonNull; + import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; -import io.netty.channel.DefaultChannelPromise; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.MessageToByteEncoder; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; -import java.io.IOException; -import org.opendaylight.controller.config.util.xml.XmlElement; +import java.io.EOFException; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.netconf.api.NetconfExiSession; -import org.opendaylight.netconf.api.NetconfMessage; import org.opendaylight.netconf.api.NetconfSession; import org.opendaylight.netconf.api.NetconfSessionListener; import org.opendaylight.netconf.api.NetconfTerminationReason; +import org.opendaylight.netconf.api.messages.NetconfMessage; +import org.opendaylight.netconf.api.xml.XmlElement; import org.opendaylight.netconf.nettyutil.handler.NetconfEXICodec; import org.opendaylight.netconf.nettyutil.handler.NetconfEXIToMessageDecoder; import org.opendaylight.netconf.nettyutil.handler.NetconfMessageToEXIEncoder; import org.opendaylight.netconf.nettyutil.handler.exi.EXIParameters; -import org.opendaylight.protocol.framework.AbstractProtocolSession; +import org.opendaylight.netconf.shaded.exificient.core.exceptions.EXIException; +import org.opendaylight.netconf.shaded.exificient.core.exceptions.UnsupportedOption; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.SessionIdType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class AbstractNetconfSession> - extends AbstractProtocolSession implements NetconfSession, NetconfExiSession { +public abstract class AbstractNetconfSession> + extends SimpleChannelInboundHandler implements NetconfSession, NetconfExiSession { private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfSession.class); + + private final @NonNull SessionIdType sessionId; private final L sessionListener; - private final long sessionId; - private boolean up = false; + private final Channel channel; private ChannelHandler delayedEncoder; + private boolean up; - private final Channel channel; - - protected AbstractNetconfSession(final L sessionListener, final Channel channel, final long sessionId) { + protected AbstractNetconfSession(final L sessionListener, final Channel channel, final SessionIdType sessionId) { this.sessionListener = sessionListener; this.channel = channel; - this.sessionId = sessionId; + this.sessionId = requireNonNull(sessionId); LOG.debug("Session {} created", sessionId); } @@ -54,17 +58,21 @@ public abstract class AbstractNetconfSession() { - @Override - public void operationComplete(final Future future) throws Exception { - if (future.isSuccess()) { - proxyFuture.setSuccess(); - } else { - proxyFuture.setFailure(future.cause()); - } - } - }); - if (delayedEncoder != null) { - replaceMessageEncoder(delayedEncoder); - delayedEncoder = null; - } + + final ChannelPromise promise = channel.newPromise(); + channel.eventLoop().execute(() -> { + channel.writeAndFlush(netconfMessage, promise); + if (delayedEncoder != null) { + replaceMessageEncoder(delayedEncoder); + delayedEncoder = null; } }); - return proxyFuture; + // FIXME: NETCONF-1106: this is a workaround for netconf-server's NetconfSubsystem using EmbeddedChannel instead + // of correctly integrating with the underlying transport channel + if (channel instanceof EmbeddedChannel embeddedChannel) { + // Embedded event loop implementation has no executor, it requires explicit invocation to process + synchronized (channel) { + embeddedChannel.runPendingTasks(); + } + } + return promise; } - @Override protected void endOfInput() { - LOG.debug("Session {} end of input detected while session was in state {}", toString(), isUp() ? "up" - : "initialized"); - if (isUp()) { - this.sessionListener.onSessionDown(thisInstance(), - new IOException("End of input detected. Close the session.")); + LOG.debug("Session {} end of input detected while session was in state {}", this, up ? "up" : "initialized"); + if (up) { + sessionListener.onSessionDown(thisInstance(), new EOFException("End of input")); } } - @Override protected void sessionUp() { - LOG.debug("Session {} up", toString()); + LOG.debug("Session {} up", this); sessionListener.onSessionUp(thisInstance()); - this.up = true; + up = true; } @Override public String toString() { - final StringBuffer sb = new StringBuffer(getClass().getSimpleName() + "{"); - sb.append("sessionId=").append(sessionId); + final StringBuilder sb = new StringBuilder(getClass().getSimpleName() + "{"); + sb.append("sessionId=").append(sessionId.getValue()); sb.append(", channel=").append(channel); sb.append('}'); return sb.toString(); @@ -135,7 +135,7 @@ public abstract class AbstractNetconfSession