Do not use toString() in looging messages
[netconf.git] / netconf / netconf-netty-util / src / main / java / org / opendaylight / netconf / nettyutil / AbstractNetconfSession.java
index e4c03ed50f43b518622957e179715e87f8f36814..aab5b6d0c0320ae2e686db53bb3edd7455ceb090 100644 (file)
@@ -10,29 +10,30 @@ package org.opendaylight.netconf.nettyutil;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
-import io.netty.channel.DefaultChannelPromise;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import io.netty.handler.codec.MessageToByteEncoder;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.FutureListener;
 import java.io.IOException;
-import org.opendaylight.controller.config.util.xml.XmlElement;
 import org.opendaylight.netconf.api.NetconfExiSession;
 import org.opendaylight.netconf.api.NetconfMessage;
 import org.opendaylight.netconf.api.NetconfSession;
 import org.opendaylight.netconf.api.NetconfSessionListener;
 import org.opendaylight.netconf.api.NetconfTerminationReason;
+import org.opendaylight.netconf.api.xml.XmlElement;
 import org.opendaylight.netconf.nettyutil.handler.NetconfEXICodec;
 import org.opendaylight.netconf.nettyutil.handler.NetconfEXIToMessageDecoder;
 import org.opendaylight.netconf.nettyutil.handler.NetconfMessageToEXIEncoder;
 import org.opendaylight.netconf.nettyutil.handler.exi.EXIParameters;
-import org.opendaylight.protocol.framework.AbstractProtocolSession;
-import org.openexi.proc.common.EXIOptionsException;
-import org.openexi.sax.TransmogrifierException;
+import org.opendaylight.netconf.shaded.exificient.core.exceptions.EXIException;
+import org.opendaylight.netconf.shaded.exificient.core.exceptions.UnsupportedOption;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class AbstractNetconfSession<S extends NetconfSession, L extends NetconfSessionListener<S>> extends AbstractProtocolSession<NetconfMessage> implements NetconfSession, NetconfExiSession {
+public abstract class AbstractNetconfSession<S extends NetconfSession,L extends NetconfSessionListener<S>>
+        extends SimpleChannelInboundHandler<Object> implements NetconfSession, NetconfExiSession {
+
     private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfSession.class);
     private final L sessionListener;
     private final long sessionId;
@@ -58,7 +59,6 @@ public abstract class AbstractNetconfSession<S extends NetconfSession, L extends
         sessionListener.onSessionTerminated(thisInstance(), new NetconfTerminationReason("Session closed"));
     }
 
-    @Override
     protected void handleMessage(final NetconfMessage netconfMessage) {
         LOG.debug("handling incoming message");
         sessionListener.onMessage(thisInstance(), netconfMessage);
@@ -74,50 +74,37 @@ public abstract class AbstractNetconfSession<S extends NetconfSession, L extends
         // Restconf writes to a netconf mountpoint execute multiple messages
         // and one of these was executed from a restconf thread thus breaking ordering so
         // we need to execute all messages from an EventLoop thread.
-        final DefaultChannelPromise proxyFuture = new DefaultChannelPromise(channel);
-        channel.eventLoop().execute(new Runnable() {
-            @Override
-            public void run() {
-                final ChannelFuture future = channel.writeAndFlush(netconfMessage);
-                future.addListener(new FutureListener<Void>() {
-                    @Override
-                    public void operationComplete(Future<Void> future) throws Exception {
-                        if (future.isSuccess()) {
-                            proxyFuture.setSuccess();
-                        } else {
-                            proxyFuture.setFailure(future.cause());
-                        }
-                    }
-                });
-                if (delayedEncoder != null) {
-                    replaceMessageEncoder(delayedEncoder);
-                    delayedEncoder = null;
-                }
+
+        final ChannelPromise promise = channel.newPromise();
+        channel.eventLoop().execute(() -> {
+            channel.writeAndFlush(netconfMessage, promise);
+            if (delayedEncoder != null) {
+                replaceMessageEncoder(delayedEncoder);
+                delayedEncoder = null;
             }
         });
 
-        return proxyFuture;
+        return promise;
     }
 
-    @Override
     protected void endOfInput() {
-        LOG.debug("Session {} end of input detected while session was in state {}", toString(), isUp() ? "up"
+        LOG.debug("Session {} end of input detected while session was in state {}", this, isUp() ? "up"
                 : "initialized");
         if (isUp()) {
-            this.sessionListener.onSessionDown(thisInstance(), new IOException("End of input detected. Close the session."));
+            this.sessionListener.onSessionDown(thisInstance(),
+                    new IOException("End of input detected. Close the session."));
         }
     }
 
-    @Override
     protected void sessionUp() {
-        LOG.debug("Session {} up", toString());
+        LOG.debug("Session {} up", this);
         sessionListener.onSessionUp(thisInstance());
         this.up = true;
     }
 
     @Override
     public String toString() {
-        final StringBuffer sb = new StringBuffer(getClass().getSimpleName() + "{");
+        final StringBuilder sb = new StringBuilder(getClass().getSimpleName() + "{");
         sb.append("sessionId=").append(sessionId);
         sb.append(", channel=").append(channel);
         sb.append('}');
@@ -145,24 +132,17 @@ public abstract class AbstractNetconfSession<S extends NetconfSession, L extends
         final EXIParameters exiParams;
         try {
             exiParams = EXIParameters.fromXmlElement(XmlElement.fromDomDocument(startExiMessage.getDocument()));
-        } catch (final EXIOptionsException e) {
+        } catch (final UnsupportedOption e) {
             LOG.warn("Unable to parse EXI parameters from {} on session {}", startExiMessage, this, e);
             throw new IllegalArgumentException("Cannot parse options", e);
         }
 
-        final NetconfEXICodec exiCodec = new NetconfEXICodec(exiParams.getOptions());
-        final NetconfMessageToEXIEncoder exiEncoder;
-        try {
-            exiEncoder = NetconfMessageToEXIEncoder.create(exiCodec);
-        } catch (EXIOptionsException | TransmogrifierException e) {
-            LOG.warn("Failed to instantiate EXI encoder for {} on session {}", exiCodec, this, e);
-            throw new IllegalStateException("Cannot instantiate encoder for options", e);
-        }
-
+        final NetconfEXICodec exiCodec = NetconfEXICodec.forParameters(exiParams);
+        final NetconfMessageToEXIEncoder exiEncoder = NetconfMessageToEXIEncoder.create(exiCodec);
         final NetconfEXIToMessageDecoder exiDecoder;
         try {
             exiDecoder = NetconfEXIToMessageDecoder.create(exiCodec);
-        } catch (EXIOptionsException e) {
+        } catch (EXIException e) {
             LOG.warn("Failed to instantiate EXI decodeer for {} on session {}", exiCodec, this, e);
             throw new IllegalStateException("Cannot instantiate encoder for options", e);
         }
@@ -186,4 +166,29 @@ public abstract class AbstractNetconfSession<S extends NetconfSession, L extends
     public final long getSessionId() {
         return sessionId;
     }
+
+    @Override
+    @SuppressWarnings("checkstyle:illegalCatch")
+    public final void channelInactive(final ChannelHandlerContext ctx) {
+        LOG.debug("Channel {} inactive.", ctx.channel());
+        endOfInput();
+        try {
+            // Forward channel inactive event, all handlers in pipeline might be interested in the event e.g. close
+            // channel handler of reconnect promise
+            super.channelInactive(ctx);
+        } catch (final Exception e) {
+            throw new RuntimeException("Failed to delegate channel inactive event on channel " + ctx.channel(), e);
+        }
+    }
+
+    @Override
+    protected final void channelRead0(final ChannelHandlerContext ctx, final Object msg) {
+        LOG.debug("Message was received: {}", msg);
+        handleMessage((NetconfMessage) msg);
+    }
+
+    @Override
+    public final void handlerAdded(final ChannelHandlerContext ctx) {
+        sessionUp();
+    }
 }