clean test environment
[transportpce.git] / tests / honeynode / 2.2.1 / netconf-netty-util / src / main / java / org / opendaylight / netconf / nettyutil / AbstractNetconfSession.java
diff --git a/tests/honeynode/2.2.1/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSession.java b/tests/honeynode/2.2.1/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSession.java
new file mode 100644 (file)
index 0000000..49f7b7e
--- /dev/null
@@ -0,0 +1,184 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.nettyutil;
+
+import com.siemens.ct.exi.core.exceptions.EXIException;
+import com.siemens.ct.exi.core.exceptions.UnsupportedOption;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.DefaultChannelPromise;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.codec.MessageToByteEncoder;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
+import java.io.IOException;
+import org.opendaylight.controller.config.util.xml.XmlElement;
+import org.opendaylight.netconf.api.NetconfExiSession;
+import org.opendaylight.netconf.api.NetconfMessage;
+import org.opendaylight.netconf.api.NetconfSession;
+import org.opendaylight.netconf.api.NetconfSessionListener;
+import org.opendaylight.netconf.api.NetconfTerminationReason;
+import org.opendaylight.netconf.nettyutil.handler.NetconfEXICodec;
+import org.opendaylight.netconf.nettyutil.handler.NetconfEXIToMessageDecoder;
+import org.opendaylight.netconf.nettyutil.handler.NetconfMessageToEXIEncoder;
+import org.opendaylight.netconf.nettyutil.handler.exi.EXIParameters;
+import org.opendaylight.protocol.framework.AbstractProtocolSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractNetconfSession<S extends NetconfSession,L extends NetconfSessionListener<S>>
+        extends AbstractProtocolSession<NetconfMessage> implements NetconfSession, NetconfExiSession {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfSession.class);
+    private final L sessionListener;
+    private final long sessionId;
+    private boolean up = false;
+
+    private ChannelHandler delayedEncoder;
+
+    private final Channel channel;
+
+    protected AbstractNetconfSession(final L sessionListener, final Channel channel, final long sessionId) {
+        this.sessionListener = sessionListener;
+        this.channel = channel;
+        this.sessionId = sessionId;
+        LOG.debug("Session {} created", sessionId);
+    }
+
+    protected abstract S thisInstance();
+
+    @Override
+    public void close() {
+        channel.close();
+        up = false;
+        sessionListener.onSessionTerminated(thisInstance(), new NetconfTerminationReason("Session closed"));
+    }
+
+    @Override
+    protected void handleMessage(final NetconfMessage netconfMessage) {
+        LOG.debug("handling incoming message");
+        sessionListener.onMessage(thisInstance(), netconfMessage);
+    }
+
+    @Override
+    public ChannelFuture sendMessage(final NetconfMessage netconfMessage) {
+        // From: https://github.com/netty/netty/issues/3887
+        // Netty can provide "ordering" in the following situations:
+        // 1. You are doing all writes from the EventLoop thread; OR
+        // 2. You are doing no writes from the EventLoop thread (i.e. all writes are being done in other thread(s)).
+        //
+        // Restconf writes to a netconf mountpoint execute multiple messages
+        // and one of these was executed from a restconf thread thus breaking ordering so
+        // we need to execute all messages from an EventLoop thread.
+        final DefaultChannelPromise proxyFuture = new DefaultChannelPromise(channel);
+        channel.eventLoop().execute(new Runnable() {
+            @Override
+            public void run() {
+                final ChannelFuture future = channel.writeAndFlush(netconfMessage);
+                future.addListener(new FutureListener<Void>() {
+                    @Override
+                    public void operationComplete(Future<Void> future) throws Exception {
+                        if (future.isSuccess()) {
+                            proxyFuture.setSuccess();
+                        } else {
+                            proxyFuture.setFailure(future.cause());
+                        }
+                    }
+                });
+                if (delayedEncoder != null) {
+                    replaceMessageEncoder(delayedEncoder);
+                    delayedEncoder = null;
+                }
+            }
+        });
+
+        return proxyFuture;
+    }
+
+    @Override
+    protected void endOfInput() {
+        LOG.debug("Session {} end of input detected while session was in state {}", toString(), isUp() ? "up"
+                : "initialized");
+        if (isUp()) {
+            this.sessionListener.onSessionDown(thisInstance(),
+                    new IOException("End of input detected. Close the session."));
+        }
+    }
+
+    @Override
+    protected void sessionUp() {
+        LOG.debug("Session {} up", toString());
+        sessionListener.onSessionUp(thisInstance());
+        this.up = true;
+    }
+
+    @Override
+    public String toString() {
+        final StringBuffer sb = new StringBuffer(getClass().getSimpleName() + "{");
+        sb.append("sessionId=").append(sessionId);
+        sb.append(", channel=").append(channel);
+        sb.append('}');
+        return sb.toString();
+    }
+
+    protected final void replaceMessageDecoder(final ChannelHandler handler) {
+        replaceChannelHandler(AbstractChannelInitializer.NETCONF_MESSAGE_DECODER, handler);
+    }
+
+    protected final void replaceMessageEncoder(final ChannelHandler handler) {
+        replaceChannelHandler(AbstractChannelInitializer.NETCONF_MESSAGE_ENCODER, handler);
+    }
+
+    protected final void replaceMessageEncoderAfterNextMessage(final ChannelHandler handler) {
+        this.delayedEncoder = handler;
+    }
+
+    protected final void replaceChannelHandler(final String handlerName, final ChannelHandler handler) {
+        channel.pipeline().replace(handlerName, handlerName, handler);
+    }
+
+    @Override
+    public final void startExiCommunication(final NetconfMessage startExiMessage) {
+        final EXIParameters exiParams;
+        try {
+            exiParams = EXIParameters.fromXmlElement(XmlElement.fromDomDocument(startExiMessage.getDocument()));
+        } catch (final UnsupportedOption e) {
+            LOG.warn("Unable to parse EXI parameters from {} on session {}", startExiMessage, this, e);
+            throw new IllegalArgumentException("Cannot parse options", e);
+        }
+
+        final NetconfEXICodec exiCodec = NetconfEXICodec.forParameters(exiParams);
+        final NetconfMessageToEXIEncoder exiEncoder = NetconfMessageToEXIEncoder.create(exiCodec);
+        final NetconfEXIToMessageDecoder exiDecoder;
+        try {
+            exiDecoder = NetconfEXIToMessageDecoder.create(exiCodec);
+        } catch (EXIException e) {
+            LOG.warn("Failed to instantiate EXI decodeer for {} on session {}", exiCodec, this, e);
+            throw new IllegalStateException("Cannot instantiate encoder for options", e);
+        }
+
+        addExiHandlers(exiDecoder, exiEncoder);
+        LOG.debug("Session {} EXI handlers added to pipeline", this);
+    }
+
+    /**
+     * Add a set encoder/decoder tuple into the channel pipeline as appropriate.
+     *
+     * @param decoder EXI decoder
+     * @param encoder EXI encoder
+     */
+    protected abstract void addExiHandlers(ByteToMessageDecoder decoder, MessageToByteEncoder<NetconfMessage> encoder);
+
+    public final boolean isUp() {
+        return up;
+    }
+
+    public final long getSessionId() {
+        return sessionId;
+    }
+}