2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.nettyutil;
10 import static java.util.Objects.requireNonNull;
12 import io.netty.channel.Channel;
13 import io.netty.channel.ChannelFuture;
14 import io.netty.channel.ChannelHandler;
15 import io.netty.channel.ChannelHandlerContext;
16 import io.netty.channel.ChannelPromise;
17 import io.netty.channel.SimpleChannelInboundHandler;
18 import io.netty.handler.codec.ByteToMessageDecoder;
19 import io.netty.handler.codec.MessageToByteEncoder;
20 import java.io.EOFException;
21 import org.eclipse.jdt.annotation.NonNull;
22 import org.opendaylight.netconf.api.NetconfExiSession;
23 import org.opendaylight.netconf.api.NetconfSession;
24 import org.opendaylight.netconf.api.NetconfSessionListener;
25 import org.opendaylight.netconf.api.NetconfTerminationReason;
26 import org.opendaylight.netconf.api.messages.NetconfMessage;
27 import org.opendaylight.netconf.api.xml.XmlElement;
28 import org.opendaylight.netconf.nettyutil.handler.NetconfEXICodec;
29 import org.opendaylight.netconf.nettyutil.handler.NetconfEXIToMessageDecoder;
30 import org.opendaylight.netconf.nettyutil.handler.NetconfMessageToEXIEncoder;
31 import org.opendaylight.netconf.nettyutil.handler.exi.EXIParameters;
32 import org.opendaylight.netconf.shaded.exificient.core.exceptions.EXIException;
33 import org.opendaylight.netconf.shaded.exificient.core.exceptions.UnsupportedOption;
34 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.SessionIdType;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
38 public abstract class AbstractNetconfSession<S extends NetconfSession, L extends NetconfSessionListener<S>>
39 extends SimpleChannelInboundHandler<Object> implements NetconfSession, NetconfExiSession {
40 private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfSession.class);
42 private final L sessionListener;
43 private final @NonNull SessionIdType sessionId;
44 private boolean up = false;
46 private ChannelHandler delayedEncoder;
48 private final Channel channel;
50 protected AbstractNetconfSession(final L sessionListener, final Channel channel, final SessionIdType sessionId) {
51 this.sessionListener = sessionListener;
52 this.channel = channel;
53 this.sessionId = requireNonNull(sessionId);
54 LOG.debug("Session {} created", sessionId);
57 protected abstract S thisInstance();
63 sessionListener.onSessionTerminated(thisInstance(), new NetconfTerminationReason("Session closed"));
66 protected void handleMessage(final NetconfMessage netconfMessage) {
67 LOG.debug("handling incoming message");
68 sessionListener.onMessage(thisInstance(), netconfMessage);
71 protected void handleError(final Exception failure) {
72 LOG.debug("handling incoming error");
73 sessionListener.onError(thisInstance(), failure);
77 public ChannelFuture sendMessage(final NetconfMessage netconfMessage) {
78 // From: https://github.com/netty/netty/issues/3887
79 // Netty can provide "ordering" in the following situations:
80 // 1. You are doing all writes from the EventLoop thread; OR
81 // 2. You are doing no writes from the EventLoop thread (i.e. all writes are being done in other thread(s)).
83 // Restconf writes to a netconf mountpoint execute multiple messages
84 // and one of these was executed from a restconf thread thus breaking ordering so
85 // we need to execute all messages from an EventLoop thread.
87 final ChannelPromise promise = channel.newPromise();
88 channel.eventLoop().execute(() -> {
89 channel.writeAndFlush(netconfMessage, promise);
90 if (delayedEncoder != null) {
91 replaceMessageEncoder(delayedEncoder);
92 delayedEncoder = null;
99 protected void endOfInput() {
100 LOG.debug("Session {} end of input detected while session was in state {}", this, up ? "up" : "initialized");
102 sessionListener.onSessionDown(thisInstance(), new EOFException("End of input"));
106 protected void sessionUp() {
107 LOG.debug("Session {} up", this);
108 sessionListener.onSessionUp(thisInstance());
113 public String toString() {
114 final StringBuilder sb = new StringBuilder(getClass().getSimpleName() + "{");
115 sb.append("sessionId=").append(sessionId.getValue());
116 sb.append(", channel=").append(channel);
118 return sb.toString();
121 protected final void replaceMessageDecoder(final ChannelHandler handler) {
122 replaceChannelHandler(AbstractChannelInitializer.NETCONF_MESSAGE_DECODER, handler);
125 protected final void replaceMessageEncoder(final ChannelHandler handler) {
126 replaceChannelHandler(AbstractChannelInitializer.NETCONF_MESSAGE_ENCODER, handler);
129 protected final void replaceMessageEncoderAfterNextMessage(final ChannelHandler handler) {
130 delayedEncoder = handler;
133 protected final void replaceChannelHandler(final String handlerName, final ChannelHandler handler) {
134 channel.pipeline().replace(handlerName, handlerName, handler);
138 public final void startExiCommunication(final NetconfMessage startExiMessage) {
139 final EXIParameters exiParams;
141 exiParams = EXIParameters.fromXmlElement(XmlElement.fromDomDocument(startExiMessage.getDocument()));
142 } catch (final UnsupportedOption e) {
143 LOG.warn("Unable to parse EXI parameters from {} on session {}", startExiMessage, this, e);
144 throw new IllegalArgumentException("Cannot parse options", e);
147 final NetconfEXICodec exiCodec = NetconfEXICodec.forParameters(exiParams);
148 final NetconfMessageToEXIEncoder exiEncoder = NetconfMessageToEXIEncoder.create(exiCodec);
149 final NetconfEXIToMessageDecoder exiDecoder;
151 exiDecoder = NetconfEXIToMessageDecoder.create(exiCodec);
152 } catch (EXIException e) {
153 LOG.warn("Failed to instantiate EXI decodeer for {} on session {}", exiCodec, this, e);
154 throw new IllegalStateException("Cannot instantiate encoder for options", e);
157 addExiHandlers(exiDecoder, exiEncoder);
158 LOG.debug("Session {} EXI handlers added to pipeline", this);
162 * Add a set encoder/decoder tuple into the channel pipeline as appropriate.
164 * @param decoder EXI decoder
165 * @param encoder EXI encoder
167 protected abstract void addExiHandlers(ByteToMessageDecoder decoder, MessageToByteEncoder<NetconfMessage> encoder);
169 public final boolean isUp() {
173 public final @NonNull SessionIdType sessionId() {
178 @SuppressWarnings("checkstyle:illegalCatch")
179 public final void channelInactive(final ChannelHandlerContext ctx) {
180 LOG.debug("Channel {} inactive.", ctx.channel());
183 // Forward channel inactive event, all handlers in pipeline might be interested in the event e.g. close
184 // channel handler of reconnect promise
185 super.channelInactive(ctx);
186 } catch (final Exception e) {
187 throw new IllegalStateException("Failed to delegate channel inactive event on channel " + ctx.channel(), e);
192 protected final void channelRead0(final ChannelHandlerContext ctx, final Object msg) {
193 LOG.debug("Message was received: {}", msg);
194 if (msg instanceof NetconfMessage message) {
195 handleMessage(message);
196 } else if (msg instanceof Exception failure) {
197 handleError(failure);
199 LOG.warn("Ignoring unexpected message {}", msg);
204 public final void handlerAdded(final ChannelHandlerContext ctx) {