*/
package org.opendaylight.netconf.nettyutil;
-import com.siemens.ct.exi.core.exceptions.EXIException;
-import com.siemens.ct.exi.core.exceptions.UnsupportedOption;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
+import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
-import java.io.IOException;
+import java.io.EOFException;
import org.opendaylight.netconf.api.NetconfExiSession;
import org.opendaylight.netconf.api.NetconfMessage;
import org.opendaylight.netconf.api.NetconfSession;
import org.opendaylight.netconf.nettyutil.handler.NetconfEXIToMessageDecoder;
import org.opendaylight.netconf.nettyutil.handler.NetconfMessageToEXIEncoder;
import org.opendaylight.netconf.nettyutil.handler.exi.EXIParameters;
-import org.opendaylight.protocol.framework.AbstractProtocolSession;
+import org.opendaylight.netconf.shaded.exificient.core.exceptions.EXIException;
+import org.opendaylight.netconf.shaded.exificient.core.exceptions.UnsupportedOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class AbstractNetconfSession<S extends NetconfSession,L extends NetconfSessionListener<S>>
- extends AbstractProtocolSession<NetconfMessage> implements NetconfSession, NetconfExiSession {
+public abstract class AbstractNetconfSession<S extends NetconfSession, L extends NetconfSessionListener<S>>
+ extends SimpleChannelInboundHandler<Object> implements NetconfSession, NetconfExiSession {
private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfSession.class);
+
+
private final L sessionListener;
private final long sessionId;
private boolean up = false;
sessionListener.onSessionTerminated(thisInstance(), new NetconfTerminationReason("Session closed"));
}
- @Override
protected void handleMessage(final NetconfMessage netconfMessage) {
LOG.debug("handling incoming message");
sessionListener.onMessage(thisInstance(), netconfMessage);
return promise;
}
- @Override
protected void endOfInput() {
- LOG.debug("Session {} end of input detected while session was in state {}", toString(), isUp() ? "up"
- : "initialized");
- if (isUp()) {
- this.sessionListener.onSessionDown(thisInstance(),
- new IOException("End of input detected. Close the session."));
+ LOG.debug("Session {} end of input detected while session was in state {}", this, up ? "up" : "initialized");
+ if (up) {
+ this.sessionListener.onSessionDown(thisInstance(), new EOFException("End of input"));
}
}
- @Override
protected void sessionUp() {
- LOG.debug("Session {} up", toString());
+ LOG.debug("Session {} up", this);
sessionListener.onSessionUp(thisInstance());
this.up = true;
}
public final long getSessionId() {
return sessionId;
}
+
+ @Override
+ @SuppressWarnings("checkstyle:illegalCatch")
+ public final void channelInactive(final ChannelHandlerContext ctx) {
+ LOG.debug("Channel {} inactive.", ctx.channel());
+ endOfInput();
+ try {
+ // Forward channel inactive event, all handlers in pipeline might be interested in the event e.g. close
+ // channel handler of reconnect promise
+ super.channelInactive(ctx);
+ } catch (final Exception e) {
+ throw new IllegalStateException("Failed to delegate channel inactive event on channel " + ctx.channel(), e);
+ }
+ }
+
+ @Override
+ protected final void channelRead0(final ChannelHandlerContext ctx, final Object msg) {
+ LOG.debug("Message was received: {}", msg);
+ handleMessage((NetconfMessage) msg);
+ }
+
+ @Override
+ public final void handlerAdded(final ChannelHandlerContext ctx) {
+ sessionUp();
+ }
}