Fix NetconfMessage not sent on EmbeddedChannel
[netconf.git] / netconf / netconf-netty-util / src / main / java / org / opendaylight / netconf / nettyutil / AbstractNetconfSession.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.nettyutil;
9
10 import static java.util.Objects.requireNonNull;
11
12 import io.netty.channel.Channel;
13 import io.netty.channel.ChannelFuture;
14 import io.netty.channel.ChannelHandler;
15 import io.netty.channel.ChannelHandlerContext;
16 import io.netty.channel.ChannelPromise;
17 import io.netty.channel.SimpleChannelInboundHandler;
18 import io.netty.channel.embedded.EmbeddedChannel;
19 import io.netty.handler.codec.ByteToMessageDecoder;
20 import io.netty.handler.codec.MessageToByteEncoder;
21 import java.io.EOFException;
22 import org.eclipse.jdt.annotation.NonNull;
23 import org.opendaylight.netconf.api.NetconfExiSession;
24 import org.opendaylight.netconf.api.NetconfSession;
25 import org.opendaylight.netconf.api.NetconfSessionListener;
26 import org.opendaylight.netconf.api.NetconfTerminationReason;
27 import org.opendaylight.netconf.api.messages.NetconfMessage;
28 import org.opendaylight.netconf.api.xml.XmlElement;
29 import org.opendaylight.netconf.nettyutil.handler.NetconfEXICodec;
30 import org.opendaylight.netconf.nettyutil.handler.NetconfEXIToMessageDecoder;
31 import org.opendaylight.netconf.nettyutil.handler.NetconfMessageToEXIEncoder;
32 import org.opendaylight.netconf.nettyutil.handler.exi.EXIParameters;
33 import org.opendaylight.netconf.shaded.exificient.core.exceptions.EXIException;
34 import org.opendaylight.netconf.shaded.exificient.core.exceptions.UnsupportedOption;
35 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.SessionIdType;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 public abstract class AbstractNetconfSession<S extends NetconfSession, L extends NetconfSessionListener<S>>
40         extends SimpleChannelInboundHandler<Object> implements NetconfSession, NetconfExiSession {
41     private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfSession.class);
42
43     private final @NonNull SessionIdType sessionId;
44     private final L sessionListener;
45     private final Channel channel;
46
47     private ChannelHandler delayedEncoder;
48     private boolean up;
49
50     protected AbstractNetconfSession(final L sessionListener, final Channel channel, final SessionIdType sessionId) {
51         this.sessionListener = sessionListener;
52         this.channel = channel;
53         this.sessionId = requireNonNull(sessionId);
54         LOG.debug("Session {} created", sessionId);
55     }
56
57     protected abstract S thisInstance();
58
59     @Override
60     public void close() {
61         up = false;
62         channel.close();
63         sessionListener.onSessionTerminated(thisInstance(), new NetconfTerminationReason("Session closed"));
64     }
65
66     protected void handleMessage(final NetconfMessage netconfMessage) {
67         LOG.debug("handling incoming message");
68         sessionListener.onMessage(thisInstance(), netconfMessage);
69     }
70
71     protected void handleError(final Exception failure) {
72         LOG.debug("handling incoming error");
73         sessionListener.onError(thisInstance(), failure);
74     }
75
76     @Override
77     public ChannelFuture sendMessage(final NetconfMessage netconfMessage) {
78         // From: https://github.com/netty/netty/issues/3887
79         // Netty can provide "ordering" in the following situations:
80         // 1. You are doing all writes from the EventLoop thread; OR
81         // 2. You are doing no writes from the EventLoop thread (i.e. all writes are being done in other thread(s)).
82         //
83         // Restconf writes to a netconf mountpoint execute multiple messages
84         // and one of these was executed from a restconf thread thus breaking ordering so
85         // we need to execute all messages from an EventLoop thread.
86
87         final ChannelPromise promise = channel.newPromise();
88         channel.eventLoop().execute(() -> {
89             channel.writeAndFlush(netconfMessage, promise);
90             if (delayedEncoder != null) {
91                 replaceMessageEncoder(delayedEncoder);
92                 delayedEncoder = null;
93             }
94         });
95
96         // FIXME: NETCONF-1106: this is a workaround for netconf-server's NetconfSubsystem using EmbeddedChannel instead
97         //                      of correctly integrating with the underlying transport channel
98         if (channel instanceof EmbeddedChannel embeddedChannel) {
99             // Embedded event loop implementation has no executor, it requires explicit invocation to process
100             synchronized (channel) {
101                 embeddedChannel.runPendingTasks();
102             }
103         }
104         return promise;
105     }
106
107     protected void endOfInput() {
108         LOG.debug("Session {} end of input detected while session was in state {}", this, up ? "up" : "initialized");
109         if (up) {
110             sessionListener.onSessionDown(thisInstance(), new EOFException("End of input"));
111         }
112     }
113
114     protected void sessionUp() {
115         LOG.debug("Session {} up", this);
116         sessionListener.onSessionUp(thisInstance());
117         up = true;
118     }
119
120     @Override
121     public String toString() {
122         final StringBuilder sb = new StringBuilder(getClass().getSimpleName() + "{");
123         sb.append("sessionId=").append(sessionId.getValue());
124         sb.append(", channel=").append(channel);
125         sb.append('}');
126         return sb.toString();
127     }
128
129     protected final void replaceMessageDecoder(final ChannelHandler handler) {
130         replaceChannelHandler(AbstractChannelInitializer.NETCONF_MESSAGE_DECODER, handler);
131     }
132
133     protected final void replaceMessageEncoder(final ChannelHandler handler) {
134         replaceChannelHandler(AbstractChannelInitializer.NETCONF_MESSAGE_ENCODER, handler);
135     }
136
137     protected final void replaceMessageEncoderAfterNextMessage(final ChannelHandler handler) {
138         delayedEncoder = handler;
139     }
140
141     protected final void replaceChannelHandler(final String handlerName, final ChannelHandler handler) {
142         channel.pipeline().replace(handlerName, handlerName, handler);
143     }
144
145     @Override
146     public final void startExiCommunication(final NetconfMessage startExiMessage) {
147         final EXIParameters exiParams;
148         try {
149             exiParams = EXIParameters.fromXmlElement(XmlElement.fromDomDocument(startExiMessage.getDocument()));
150         } catch (final UnsupportedOption e) {
151             LOG.warn("Unable to parse EXI parameters from {} on session {}", startExiMessage, this, e);
152             throw new IllegalArgumentException("Cannot parse options", e);
153         }
154
155         final NetconfEXICodec exiCodec = NetconfEXICodec.forParameters(exiParams);
156         final NetconfMessageToEXIEncoder exiEncoder = NetconfMessageToEXIEncoder.create(exiCodec);
157         final NetconfEXIToMessageDecoder exiDecoder;
158         try {
159             exiDecoder = NetconfEXIToMessageDecoder.create(exiCodec);
160         } catch (EXIException e) {
161             LOG.warn("Failed to instantiate EXI decodeer for {} on session {}", exiCodec, this, e);
162             throw new IllegalStateException("Cannot instantiate encoder for options", e);
163         }
164
165         addExiHandlers(exiDecoder, exiEncoder);
166         LOG.debug("Session {} EXI handlers added to pipeline", this);
167     }
168
169     /**
170      * Add a set encoder/decoder tuple into the channel pipeline as appropriate.
171      *
172      * @param decoder EXI decoder
173      * @param encoder EXI encoder
174      */
175     protected abstract void addExiHandlers(ByteToMessageDecoder decoder, MessageToByteEncoder<NetconfMessage> encoder);
176
177     public final boolean isUp() {
178         return up;
179     }
180
181     public final @NonNull SessionIdType sessionId() {
182         return sessionId;
183     }
184
185     @Override
186     @SuppressWarnings("checkstyle:illegalCatch")
187     public final void channelInactive(final ChannelHandlerContext ctx) {
188         LOG.debug("Channel {} inactive.", ctx.channel());
189         endOfInput();
190         try {
191             // Forward channel inactive event, all handlers in pipeline might be interested in the event e.g. close
192             // channel handler of reconnect promise
193             super.channelInactive(ctx);
194         } catch (final Exception e) {
195             throw new IllegalStateException("Failed to delegate channel inactive event on channel " + ctx.channel(), e);
196         }
197     }
198
199     @Override
200     protected final void channelRead0(final ChannelHandlerContext ctx, final Object msg) {
201         LOG.debug("Message was received: {}", msg);
202         if (msg instanceof NetconfMessage message) {
203             handleMessage(message);
204         } else if (msg instanceof Exception failure) {
205             handleError(failure);
206         } else {
207             LOG.warn("Ignoring unexpected message {}", msg);
208         }
209     }
210
211     @Override
212     public final void handlerAdded(final ChannelHandlerContext ctx) {
213         sessionUp();
214     }
215 }