+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.netconf.nettyutil;
-
-import com.siemens.ct.exi.core.exceptions.EXIException;
-import com.siemens.ct.exi.core.exceptions.UnsupportedOption;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.DefaultChannelPromise;
-import io.netty.handler.codec.ByteToMessageDecoder;
-import io.netty.handler.codec.MessageToByteEncoder;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.FutureListener;
-import java.io.IOException;
-import org.opendaylight.controller.config.util.xml.XmlElement;
-import org.opendaylight.netconf.api.NetconfExiSession;
-import org.opendaylight.netconf.api.NetconfMessage;
-import org.opendaylight.netconf.api.NetconfSession;
-import org.opendaylight.netconf.api.NetconfSessionListener;
-import org.opendaylight.netconf.api.NetconfTerminationReason;
-import org.opendaylight.netconf.nettyutil.handler.NetconfEXICodec;
-import org.opendaylight.netconf.nettyutil.handler.NetconfEXIToMessageDecoder;
-import org.opendaylight.netconf.nettyutil.handler.NetconfMessageToEXIEncoder;
-import org.opendaylight.netconf.nettyutil.handler.exi.EXIParameters;
-import org.opendaylight.protocol.framework.AbstractProtocolSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class AbstractNetconfSession<S extends NetconfSession,L extends NetconfSessionListener<S>>
- extends AbstractProtocolSession<NetconfMessage> implements NetconfSession, NetconfExiSession {
- private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfSession.class);
- private final L sessionListener;
- private final long sessionId;
- private boolean up = false;
-
- private ChannelHandler delayedEncoder;
-
- private final Channel channel;
-
- protected AbstractNetconfSession(final L sessionListener, final Channel channel, final long sessionId) {
- this.sessionListener = sessionListener;
- this.channel = channel;
- this.sessionId = sessionId;
- LOG.debug("Session {} created", sessionId);
- }
-
- protected abstract S thisInstance();
-
- @Override
- public void close() {
- channel.close();
- up = false;
- sessionListener.onSessionTerminated(thisInstance(), new NetconfTerminationReason("Session closed"));
- }
-
- @Override
- protected void handleMessage(final NetconfMessage netconfMessage) {
- LOG.debug("handling incoming message");
- sessionListener.onMessage(thisInstance(), netconfMessage);
- }
-
- @Override
- public ChannelFuture sendMessage(final NetconfMessage netconfMessage) {
- // From: https://github.com/netty/netty/issues/3887
- // Netty can provide "ordering" in the following situations:
- // 1. You are doing all writes from the EventLoop thread; OR
- // 2. You are doing no writes from the EventLoop thread (i.e. all writes are being done in other thread(s)).
- //
- // Restconf writes to a netconf mountpoint execute multiple messages
- // and one of these was executed from a restconf thread thus breaking ordering so
- // we need to execute all messages from an EventLoop thread.
- final DefaultChannelPromise proxyFuture = new DefaultChannelPromise(channel);
- channel.eventLoop().execute(new Runnable() {
- @Override
- public void run() {
- final ChannelFuture future = channel.writeAndFlush(netconfMessage);
- future.addListener(new FutureListener<Void>() {
- @Override
- public void operationComplete(Future<Void> future) throws Exception {
- if (future.isSuccess()) {
- proxyFuture.setSuccess();
- } else {
- proxyFuture.setFailure(future.cause());
- }
- }
- });
- if (delayedEncoder != null) {
- replaceMessageEncoder(delayedEncoder);
- delayedEncoder = null;
- }
- }
- });
-
- return proxyFuture;
- }
-
- @Override
- protected void endOfInput() {
- LOG.debug("Session {} end of input detected while session was in state {}", toString(), isUp() ? "up"
- : "initialized");
- if (isUp()) {
- this.sessionListener.onSessionDown(thisInstance(),
- new IOException("End of input detected. Close the session."));
- }
- }
-
- @Override
- protected void sessionUp() {
- LOG.debug("Session {} up", toString());
- sessionListener.onSessionUp(thisInstance());
- this.up = true;
- }
-
- @Override
- public String toString() {
- final StringBuffer sb = new StringBuffer(getClass().getSimpleName() + "{");
- sb.append("sessionId=").append(sessionId);
- sb.append(", channel=").append(channel);
- sb.append('}');
- return sb.toString();
- }
-
- protected final void replaceMessageDecoder(final ChannelHandler handler) {
- replaceChannelHandler(AbstractChannelInitializer.NETCONF_MESSAGE_DECODER, handler);
- }
-
- protected final void replaceMessageEncoder(final ChannelHandler handler) {
- replaceChannelHandler(AbstractChannelInitializer.NETCONF_MESSAGE_ENCODER, handler);
- }
-
- protected final void replaceMessageEncoderAfterNextMessage(final ChannelHandler handler) {
- this.delayedEncoder = handler;
- }
-
- protected final void replaceChannelHandler(final String handlerName, final ChannelHandler handler) {
- channel.pipeline().replace(handlerName, handlerName, handler);
- }
-
- @Override
- public final void startExiCommunication(final NetconfMessage startExiMessage) {
- final EXIParameters exiParams;
- try {
- exiParams = EXIParameters.fromXmlElement(XmlElement.fromDomDocument(startExiMessage.getDocument()));
- } catch (final UnsupportedOption e) {
- LOG.warn("Unable to parse EXI parameters from {} on session {}", startExiMessage, this, e);
- throw new IllegalArgumentException("Cannot parse options", e);
- }
-
- final NetconfEXICodec exiCodec = NetconfEXICodec.forParameters(exiParams);
- final NetconfMessageToEXIEncoder exiEncoder = NetconfMessageToEXIEncoder.create(exiCodec);
- final NetconfEXIToMessageDecoder exiDecoder;
- try {
- exiDecoder = NetconfEXIToMessageDecoder.create(exiCodec);
- } catch (EXIException e) {
- LOG.warn("Failed to instantiate EXI decodeer for {} on session {}", exiCodec, this, e);
- throw new IllegalStateException("Cannot instantiate encoder for options", e);
- }
-
- addExiHandlers(exiDecoder, exiEncoder);
- LOG.debug("Session {} EXI handlers added to pipeline", this);
- }
-
- /**
- * Add a set encoder/decoder tuple into the channel pipeline as appropriate.
- *
- * @param decoder EXI decoder
- * @param encoder EXI encoder
- */
- protected abstract void addExiHandlers(ByteToMessageDecoder decoder, MessageToByteEncoder<NetconfMessage> encoder);
-
- public final boolean isUp() {
- return up;
- }
-
- public final long getSessionId() {
- return sessionId;
- }
-}