import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
-import io.netty.channel.DefaultChannelPromise;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.FutureListener;
import java.io.IOException;
-import org.opendaylight.controller.config.util.xml.XmlElement;
import org.opendaylight.netconf.api.NetconfExiSession;
import org.opendaylight.netconf.api.NetconfMessage;
import org.opendaylight.netconf.api.NetconfSession;
import org.opendaylight.netconf.api.NetconfSessionListener;
import org.opendaylight.netconf.api.NetconfTerminationReason;
+import org.opendaylight.netconf.api.xml.XmlElement;
import org.opendaylight.netconf.nettyutil.handler.NetconfEXICodec;
import org.opendaylight.netconf.nettyutil.handler.NetconfEXIToMessageDecoder;
import org.opendaylight.netconf.nettyutil.handler.NetconfMessageToEXIEncoder;
import org.opendaylight.netconf.nettyutil.handler.exi.EXIParameters;
-import org.opendaylight.protocol.framework.AbstractProtocolSession;
-import org.openexi.proc.common.EXIOptionsException;
-import org.openexi.sax.TransmogrifierException;
+import org.opendaylight.netconf.shaded.exificient.core.exceptions.EXIException;
+import org.opendaylight.netconf.shaded.exificient.core.exceptions.UnsupportedOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractNetconfSession<S extends NetconfSession,L extends NetconfSessionListener<S>>
- extends AbstractProtocolSession<NetconfMessage> implements NetconfSession, NetconfExiSession {
+ extends SimpleChannelInboundHandler<Object> implements NetconfSession, NetconfExiSession {
+
private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfSession.class);
private final L sessionListener;
private final long sessionId;
sessionListener.onSessionTerminated(thisInstance(), new NetconfTerminationReason("Session closed"));
}
- @Override
protected void handleMessage(final NetconfMessage netconfMessage) {
LOG.debug("handling incoming message");
sessionListener.onMessage(thisInstance(), netconfMessage);
// Restconf writes to a netconf mountpoint execute multiple messages
// and one of these was executed from a restconf thread thus breaking ordering so
// we need to execute all messages from an EventLoop thread.
- final DefaultChannelPromise proxyFuture = new DefaultChannelPromise(channel);
- channel.eventLoop().execute(new Runnable() {
- @Override
- public void run() {
- final ChannelFuture future = channel.writeAndFlush(netconfMessage);
- future.addListener(new FutureListener<Void>() {
- @Override
- public void operationComplete(Future<Void> future) throws Exception {
- if (future.isSuccess()) {
- proxyFuture.setSuccess();
- } else {
- proxyFuture.setFailure(future.cause());
- }
- }
- });
- if (delayedEncoder != null) {
- replaceMessageEncoder(delayedEncoder);
- delayedEncoder = null;
- }
+
+ final ChannelPromise promise = channel.newPromise();
+ channel.eventLoop().execute(() -> {
+ channel.writeAndFlush(netconfMessage, promise);
+ if (delayedEncoder != null) {
+ replaceMessageEncoder(delayedEncoder);
+ delayedEncoder = null;
}
});
- return proxyFuture;
+ return promise;
}
- @Override
protected void endOfInput() {
LOG.debug("Session {} end of input detected while session was in state {}", toString(), isUp() ? "up"
: "initialized");
}
}
- @Override
protected void sessionUp() {
LOG.debug("Session {} up", toString());
sessionListener.onSessionUp(thisInstance());
@Override
public String toString() {
- final StringBuffer sb = new StringBuffer(getClass().getSimpleName() + "{");
+ final StringBuilder sb = new StringBuilder(getClass().getSimpleName() + "{");
sb.append("sessionId=").append(sessionId);
sb.append(", channel=").append(channel);
sb.append('}');
final EXIParameters exiParams;
try {
exiParams = EXIParameters.fromXmlElement(XmlElement.fromDomDocument(startExiMessage.getDocument()));
- } catch (final EXIOptionsException e) {
+ } catch (final UnsupportedOption e) {
LOG.warn("Unable to parse EXI parameters from {} on session {}", startExiMessage, this, e);
throw new IllegalArgumentException("Cannot parse options", e);
}
- final NetconfEXICodec exiCodec = new NetconfEXICodec(exiParams.getOptions());
- final NetconfMessageToEXIEncoder exiEncoder;
- try {
- exiEncoder = NetconfMessageToEXIEncoder.create(exiCodec);
- } catch (EXIOptionsException | TransmogrifierException e) {
- LOG.warn("Failed to instantiate EXI encoder for {} on session {}", exiCodec, this, e);
- throw new IllegalStateException("Cannot instantiate encoder for options", e);
- }
-
+ final NetconfEXICodec exiCodec = NetconfEXICodec.forParameters(exiParams);
+ final NetconfMessageToEXIEncoder exiEncoder = NetconfMessageToEXIEncoder.create(exiCodec);
final NetconfEXIToMessageDecoder exiDecoder;
try {
exiDecoder = NetconfEXIToMessageDecoder.create(exiCodec);
- } catch (EXIOptionsException e) {
+ } catch (EXIException e) {
LOG.warn("Failed to instantiate EXI decodeer for {} on session {}", exiCodec, this, e);
throw new IllegalStateException("Cannot instantiate encoder for options", e);
}
public final long getSessionId() {
return sessionId;
}
+
+ @Override
+ @SuppressWarnings("checkstyle:illegalCatch")
+ public final void channelInactive(final ChannelHandlerContext ctx) {
+ LOG.debug("Channel {} inactive.", ctx.channel());
+ endOfInput();
+ try {
+ // Forward channel inactive event, all handlers in pipeline might be interested in the event e.g. close
+ // channel handler of reconnect promise
+ super.channelInactive(ctx);
+ } catch (final Exception e) {
+ throw new RuntimeException("Failed to delegate channel inactive event on channel " + ctx.channel(), e);
+ }
+ }
+
+ @Override
+ protected final void channelRead0(final ChannelHandlerContext ctx, final Object msg) {
+ LOG.debug("Message was received: {}", msg);
+ handleMessage((NetconfMessage) msg);
+ }
+
+ @Override
+ public final void handlerAdded(final ChannelHandlerContext ctx) {
+ sessionUp();
+ }
}