2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.nettyutil;
10 import com.siemens.ct.exi.core.exceptions.EXIException;
11 import com.siemens.ct.exi.core.exceptions.UnsupportedOption;
12 import io.netty.channel.Channel;
13 import io.netty.channel.ChannelFuture;
14 import io.netty.channel.ChannelHandler;
15 import io.netty.channel.ChannelHandlerContext;
16 import io.netty.channel.ChannelPromise;
17 import io.netty.channel.SimpleChannelInboundHandler;
18 import io.netty.handler.codec.ByteToMessageDecoder;
19 import io.netty.handler.codec.MessageToByteEncoder;
20 import java.io.IOException;
21 import org.opendaylight.netconf.api.NetconfExiSession;
22 import org.opendaylight.netconf.api.NetconfMessage;
23 import org.opendaylight.netconf.api.NetconfSession;
24 import org.opendaylight.netconf.api.NetconfSessionListener;
25 import org.opendaylight.netconf.api.NetconfTerminationReason;
26 import org.opendaylight.netconf.api.xml.XmlElement;
27 import org.opendaylight.netconf.nettyutil.handler.NetconfEXICodec;
28 import org.opendaylight.netconf.nettyutil.handler.NetconfEXIToMessageDecoder;
29 import org.opendaylight.netconf.nettyutil.handler.NetconfMessageToEXIEncoder;
30 import org.opendaylight.netconf.nettyutil.handler.exi.EXIParameters;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
34 public abstract class AbstractNetconfSession<S extends NetconfSession,L extends NetconfSessionListener<S>>
35 extends SimpleChannelInboundHandler<Object> implements NetconfSession, NetconfExiSession {
37 private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfSession.class);
38 private final L sessionListener;
39 private final long sessionId;
40 private boolean up = false;
42 private ChannelHandler delayedEncoder;
44 private final Channel channel;
46 protected AbstractNetconfSession(final L sessionListener, final Channel channel, final long sessionId) {
47 this.sessionListener = sessionListener;
48 this.channel = channel;
49 this.sessionId = sessionId;
50 LOG.debug("Session {} created", sessionId);
53 protected abstract S thisInstance();
59 sessionListener.onSessionTerminated(thisInstance(), new NetconfTerminationReason("Session closed"));
62 protected void handleMessage(final NetconfMessage netconfMessage) {
63 LOG.debug("handling incoming message");
64 sessionListener.onMessage(thisInstance(), netconfMessage);
68 public ChannelFuture sendMessage(final NetconfMessage netconfMessage) {
69 // From: https://github.com/netty/netty/issues/3887
70 // Netty can provide "ordering" in the following situations:
71 // 1. You are doing all writes from the EventLoop thread; OR
72 // 2. You are doing no writes from the EventLoop thread (i.e. all writes are being done in other thread(s)).
74 // Restconf writes to a netconf mountpoint execute multiple messages
75 // and one of these was executed from a restconf thread thus breaking ordering so
76 // we need to execute all messages from an EventLoop thread.
78 final ChannelPromise promise = channel.newPromise();
79 channel.eventLoop().execute(() -> {
80 channel.writeAndFlush(netconfMessage, promise);
81 if (delayedEncoder != null) {
82 replaceMessageEncoder(delayedEncoder);
83 delayedEncoder = null;
90 protected void endOfInput() {
91 LOG.debug("Session {} end of input detected while session was in state {}", toString(), isUp() ? "up"
94 this.sessionListener.onSessionDown(thisInstance(),
95 new IOException("End of input detected. Close the session."));
99 protected void sessionUp() {
100 LOG.debug("Session {} up", toString());
101 sessionListener.onSessionUp(thisInstance());
106 public String toString() {
107 final StringBuilder sb = new StringBuilder(getClass().getSimpleName() + "{");
108 sb.append("sessionId=").append(sessionId);
109 sb.append(", channel=").append(channel);
111 return sb.toString();
114 protected final void replaceMessageDecoder(final ChannelHandler handler) {
115 replaceChannelHandler(AbstractChannelInitializer.NETCONF_MESSAGE_DECODER, handler);
118 protected final void replaceMessageEncoder(final ChannelHandler handler) {
119 replaceChannelHandler(AbstractChannelInitializer.NETCONF_MESSAGE_ENCODER, handler);
122 protected final void replaceMessageEncoderAfterNextMessage(final ChannelHandler handler) {
123 this.delayedEncoder = handler;
126 protected final void replaceChannelHandler(final String handlerName, final ChannelHandler handler) {
127 channel.pipeline().replace(handlerName, handlerName, handler);
131 public final void startExiCommunication(final NetconfMessage startExiMessage) {
132 final EXIParameters exiParams;
134 exiParams = EXIParameters.fromXmlElement(XmlElement.fromDomDocument(startExiMessage.getDocument()));
135 } catch (final UnsupportedOption e) {
136 LOG.warn("Unable to parse EXI parameters from {} on session {}", startExiMessage, this, e);
137 throw new IllegalArgumentException("Cannot parse options", e);
140 final NetconfEXICodec exiCodec = NetconfEXICodec.forParameters(exiParams);
141 final NetconfMessageToEXIEncoder exiEncoder = NetconfMessageToEXIEncoder.create(exiCodec);
142 final NetconfEXIToMessageDecoder exiDecoder;
144 exiDecoder = NetconfEXIToMessageDecoder.create(exiCodec);
145 } catch (EXIException e) {
146 LOG.warn("Failed to instantiate EXI decodeer for {} on session {}", exiCodec, this, e);
147 throw new IllegalStateException("Cannot instantiate encoder for options", e);
150 addExiHandlers(exiDecoder, exiEncoder);
151 LOG.debug("Session {} EXI handlers added to pipeline", this);
155 * Add a set encoder/decoder tuple into the channel pipeline as appropriate.
157 * @param decoder EXI decoder
158 * @param encoder EXI encoder
160 protected abstract void addExiHandlers(ByteToMessageDecoder decoder, MessageToByteEncoder<NetconfMessage> encoder);
162 public final boolean isUp() {
166 public final long getSessionId() {
171 @SuppressWarnings("checkstyle:illegalCatch")
172 public final void channelInactive(final ChannelHandlerContext ctx) {
173 LOG.debug("Channel {} inactive.", ctx.channel());
176 // Forward channel inactive event, all handlers in pipeline might be interested in the event e.g. close
177 // channel handler of reconnect promise
178 super.channelInactive(ctx);
179 } catch (final Exception e) {
180 throw new RuntimeException("Failed to delegate channel inactive event on channel " + ctx.channel(), e);
185 protected final void channelRead0(final ChannelHandlerContext ctx, final Object msg) {
186 LOG.debug("Message was received: {}", msg);
187 handleMessage((NetconfMessage) msg);
191 public final void handlerAdded(final ChannelHandlerContext ctx) {