X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=tests%2Fhoneynode%2Fnetconf-netty-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Fnettyutil%2FAbstractNetconfSession.java;fp=tests%2Fhoneynode%2Fnetconf-netty-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Fnettyutil%2FAbstractNetconfSession.java;h=49f7b7e7014f7a5ab078afc7290734d3b1a937b7;hb=73d276ca887159c41a0877c2250d54b42e7fd64c;hp=0000000000000000000000000000000000000000;hpb=57e20793c55e43ed6bbec42e3304b9a37a1ff2f5;p=transportpce.git diff --git a/tests/honeynode/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSession.java b/tests/honeynode/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSession.java new file mode 100644 index 000000000..49f7b7e70 --- /dev/null +++ b/tests/honeynode/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSession.java @@ -0,0 +1,184 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.nettyutil; + +import com.siemens.ct.exi.core.exceptions.EXIException; +import com.siemens.ct.exi.core.exceptions.UnsupportedOption; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler; +import io.netty.channel.DefaultChannelPromise; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.handler.codec.MessageToByteEncoder; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import java.io.IOException; +import org.opendaylight.controller.config.util.xml.XmlElement; +import org.opendaylight.netconf.api.NetconfExiSession; +import org.opendaylight.netconf.api.NetconfMessage; +import org.opendaylight.netconf.api.NetconfSession; +import org.opendaylight.netconf.api.NetconfSessionListener; +import org.opendaylight.netconf.api.NetconfTerminationReason; +import org.opendaylight.netconf.nettyutil.handler.NetconfEXICodec; +import org.opendaylight.netconf.nettyutil.handler.NetconfEXIToMessageDecoder; +import org.opendaylight.netconf.nettyutil.handler.NetconfMessageToEXIEncoder; +import org.opendaylight.netconf.nettyutil.handler.exi.EXIParameters; +import org.opendaylight.protocol.framework.AbstractProtocolSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractNetconfSession> + extends AbstractProtocolSession implements NetconfSession, NetconfExiSession { + private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfSession.class); + private final L sessionListener; + private final long sessionId; + private boolean up = false; + + private ChannelHandler delayedEncoder; + + private final Channel channel; + + protected AbstractNetconfSession(final L sessionListener, final Channel channel, final long sessionId) { + this.sessionListener = sessionListener; + this.channel = channel; + this.sessionId = sessionId; + LOG.debug("Session {} created", sessionId); + } + + protected abstract S thisInstance(); + + @Override + public void close() { + channel.close(); + up = false; + sessionListener.onSessionTerminated(thisInstance(), new NetconfTerminationReason("Session closed")); + } + + @Override + protected void handleMessage(final NetconfMessage netconfMessage) { + LOG.debug("handling incoming message"); + sessionListener.onMessage(thisInstance(), netconfMessage); + } + + @Override + public ChannelFuture sendMessage(final NetconfMessage netconfMessage) { + // From: https://github.com/netty/netty/issues/3887 + // Netty can provide "ordering" in the following situations: + // 1. You are doing all writes from the EventLoop thread; OR + // 2. You are doing no writes from the EventLoop thread (i.e. all writes are being done in other thread(s)). + // + // Restconf writes to a netconf mountpoint execute multiple messages + // and one of these was executed from a restconf thread thus breaking ordering so + // we need to execute all messages from an EventLoop thread. + final DefaultChannelPromise proxyFuture = new DefaultChannelPromise(channel); + channel.eventLoop().execute(new Runnable() { + @Override + public void run() { + final ChannelFuture future = channel.writeAndFlush(netconfMessage); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) { + proxyFuture.setSuccess(); + } else { + proxyFuture.setFailure(future.cause()); + } + } + }); + if (delayedEncoder != null) { + replaceMessageEncoder(delayedEncoder); + delayedEncoder = null; + } + } + }); + + return proxyFuture; + } + + @Override + protected void endOfInput() { + LOG.debug("Session {} end of input detected while session was in state {}", toString(), isUp() ? "up" + : "initialized"); + if (isUp()) { + this.sessionListener.onSessionDown(thisInstance(), + new IOException("End of input detected. Close the session.")); + } + } + + @Override + protected void sessionUp() { + LOG.debug("Session {} up", toString()); + sessionListener.onSessionUp(thisInstance()); + this.up = true; + } + + @Override + public String toString() { + final StringBuffer sb = new StringBuffer(getClass().getSimpleName() + "{"); + sb.append("sessionId=").append(sessionId); + sb.append(", channel=").append(channel); + sb.append('}'); + return sb.toString(); + } + + protected final void replaceMessageDecoder(final ChannelHandler handler) { + replaceChannelHandler(AbstractChannelInitializer.NETCONF_MESSAGE_DECODER, handler); + } + + protected final void replaceMessageEncoder(final ChannelHandler handler) { + replaceChannelHandler(AbstractChannelInitializer.NETCONF_MESSAGE_ENCODER, handler); + } + + protected final void replaceMessageEncoderAfterNextMessage(final ChannelHandler handler) { + this.delayedEncoder = handler; + } + + protected final void replaceChannelHandler(final String handlerName, final ChannelHandler handler) { + channel.pipeline().replace(handlerName, handlerName, handler); + } + + @Override + public final void startExiCommunication(final NetconfMessage startExiMessage) { + final EXIParameters exiParams; + try { + exiParams = EXIParameters.fromXmlElement(XmlElement.fromDomDocument(startExiMessage.getDocument())); + } catch (final UnsupportedOption e) { + LOG.warn("Unable to parse EXI parameters from {} on session {}", startExiMessage, this, e); + throw new IllegalArgumentException("Cannot parse options", e); + } + + final NetconfEXICodec exiCodec = NetconfEXICodec.forParameters(exiParams); + final NetconfMessageToEXIEncoder exiEncoder = NetconfMessageToEXIEncoder.create(exiCodec); + final NetconfEXIToMessageDecoder exiDecoder; + try { + exiDecoder = NetconfEXIToMessageDecoder.create(exiCodec); + } catch (EXIException e) { + LOG.warn("Failed to instantiate EXI decodeer for {} on session {}", exiCodec, this, e); + throw new IllegalStateException("Cannot instantiate encoder for options", e); + } + + addExiHandlers(exiDecoder, exiEncoder); + LOG.debug("Session {} EXI handlers added to pipeline", this); + } + + /** + * Add a set encoder/decoder tuple into the channel pipeline as appropriate. + * + * @param decoder EXI decoder + * @param encoder EXI encoder + */ + protected abstract void addExiHandlers(ByteToMessageDecoder decoder, MessageToByteEncoder encoder); + + public final boolean isUp() { + return up; + } + + public final long getSessionId() { + return sessionId; + } +}