2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.nettyutil;
10 import io.netty.channel.Channel;
11 import io.netty.channel.ChannelFuture;
12 import io.netty.channel.ChannelHandler;
13 import io.netty.channel.DefaultChannelPromise;
14 import io.netty.handler.codec.ByteToMessageDecoder;
15 import io.netty.handler.codec.MessageToByteEncoder;
16 import io.netty.util.concurrent.Future;
17 import io.netty.util.concurrent.FutureListener;
18 import java.io.IOException;
19 import org.opendaylight.controller.config.util.xml.XmlElement;
20 import org.opendaylight.netconf.api.NetconfExiSession;
21 import org.opendaylight.netconf.api.NetconfMessage;
22 import org.opendaylight.netconf.api.NetconfSession;
23 import org.opendaylight.netconf.api.NetconfSessionListener;
24 import org.opendaylight.netconf.api.NetconfTerminationReason;
25 import org.opendaylight.netconf.nettyutil.handler.NetconfEXICodec;
26 import org.opendaylight.netconf.nettyutil.handler.NetconfEXIToMessageDecoder;
27 import org.opendaylight.netconf.nettyutil.handler.NetconfMessageToEXIEncoder;
28 import org.opendaylight.netconf.nettyutil.handler.exi.EXIParameters;
29 import org.opendaylight.protocol.framework.AbstractProtocolSession;
30 import org.openexi.proc.common.EXIOptionsException;
31 import org.openexi.sax.TransmogrifierException;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
35 public abstract class AbstractNetconfSession<S extends NetconfSession,L extends NetconfSessionListener<S>>
36 extends AbstractProtocolSession<NetconfMessage> implements NetconfSession, NetconfExiSession {
37 private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfSession.class);
38 private final L sessionListener;
39 private final long sessionId;
40 private boolean up = false;
42 private ChannelHandler delayedEncoder;
44 private final Channel channel;
46 protected AbstractNetconfSession(final L sessionListener, final Channel channel, final long sessionId) {
47 this.sessionListener = sessionListener;
48 this.channel = channel;
49 this.sessionId = sessionId;
50 LOG.debug("Session {} created", sessionId);
53 protected abstract S thisInstance();
59 sessionListener.onSessionTerminated(thisInstance(), new NetconfTerminationReason("Session closed"));
63 protected void handleMessage(final NetconfMessage netconfMessage) {
64 LOG.debug("handling incoming message");
65 sessionListener.onMessage(thisInstance(), netconfMessage);
69 public ChannelFuture sendMessage(final NetconfMessage netconfMessage) {
70 // From: https://github.com/netty/netty/issues/3887
71 // Netty can provide "ordering" in the following situations:
72 // 1. You are doing all writes from the EventLoop thread; OR
73 // 2. You are doing no writes from the EventLoop thread (i.e. all writes are being done in other thread(s)).
75 // Restconf writes to a netconf mountpoint execute multiple messages
76 // and one of these was executed from a restconf thread thus breaking ordering so
77 // we need to execute all messages from an EventLoop thread.
78 final DefaultChannelPromise proxyFuture = new DefaultChannelPromise(channel);
79 channel.eventLoop().execute(new Runnable() {
82 final ChannelFuture future = channel.writeAndFlush(netconfMessage);
83 future.addListener(new FutureListener<Void>() {
85 public void operationComplete(Future<Void> future) throws Exception {
86 if (future.isSuccess()) {
87 proxyFuture.setSuccess();
89 proxyFuture.setFailure(future.cause());
93 if (delayedEncoder != null) {
94 replaceMessageEncoder(delayedEncoder);
95 delayedEncoder = null;
104 protected void endOfInput() {
105 LOG.debug("Session {} end of input detected while session was in state {}", toString(), isUp() ? "up"
108 this.sessionListener.onSessionDown(thisInstance(),
109 new IOException("End of input detected. Close the session."));
114 protected void sessionUp() {
115 LOG.debug("Session {} up", toString());
116 sessionListener.onSessionUp(thisInstance());
121 public String toString() {
122 final StringBuffer sb = new StringBuffer(getClass().getSimpleName() + "{");
123 sb.append("sessionId=").append(sessionId);
124 sb.append(", channel=").append(channel);
126 return sb.toString();
129 protected final void replaceMessageDecoder(final ChannelHandler handler) {
130 replaceChannelHandler(AbstractChannelInitializer.NETCONF_MESSAGE_DECODER, handler);
133 protected final void replaceMessageEncoder(final ChannelHandler handler) {
134 replaceChannelHandler(AbstractChannelInitializer.NETCONF_MESSAGE_ENCODER, handler);
137 protected final void replaceMessageEncoderAfterNextMessage(final ChannelHandler handler) {
138 this.delayedEncoder = handler;
141 protected final void replaceChannelHandler(final String handlerName, final ChannelHandler handler) {
142 channel.pipeline().replace(handlerName, handlerName, handler);
146 public final void startExiCommunication(final NetconfMessage startExiMessage) {
147 final EXIParameters exiParams;
149 exiParams = EXIParameters.fromXmlElement(XmlElement.fromDomDocument(startExiMessage.getDocument()));
150 } catch (final EXIOptionsException e) {
151 LOG.warn("Unable to parse EXI parameters from {} on session {}", startExiMessage, this, e);
152 throw new IllegalArgumentException("Cannot parse options", e);
155 final NetconfEXICodec exiCodec = new NetconfEXICodec(exiParams.getOptions());
156 final NetconfMessageToEXIEncoder exiEncoder;
158 exiEncoder = NetconfMessageToEXIEncoder.create(exiCodec);
159 } catch (EXIOptionsException | TransmogrifierException e) {
160 LOG.warn("Failed to instantiate EXI encoder for {} on session {}", exiCodec, this, e);
161 throw new IllegalStateException("Cannot instantiate encoder for options", e);
164 final NetconfEXIToMessageDecoder exiDecoder;
166 exiDecoder = NetconfEXIToMessageDecoder.create(exiCodec);
167 } catch (EXIOptionsException e) {
168 LOG.warn("Failed to instantiate EXI decodeer for {} on session {}", exiCodec, this, e);
169 throw new IllegalStateException("Cannot instantiate encoder for options", e);
172 addExiHandlers(exiDecoder, exiEncoder);
173 LOG.debug("Session {} EXI handlers added to pipeline", this);
177 * Add a set encoder/decoder tuple into the channel pipeline as appropriate.
179 * @param decoder EXI decoder
180 * @param encoder EXI encoder
182 protected abstract void addExiHandlers(ByteToMessageDecoder decoder, MessageToByteEncoder<NetconfMessage> encoder);
184 public final boolean isUp() {
188 public final long getSessionId() {