Migrate netconf-server/mapping tests to JUnit5
[netconf.git] / netconf / netconf-netty-util / src / main / java / org / opendaylight / netconf / nettyutil / AbstractNetconfSession.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.nettyutil;
9
10 import static java.util.Objects.requireNonNull;
11
12 import io.netty.channel.Channel;
13 import io.netty.channel.ChannelFuture;
14 import io.netty.channel.ChannelHandler;
15 import io.netty.channel.ChannelHandlerContext;
16 import io.netty.channel.ChannelPromise;
17 import io.netty.channel.SimpleChannelInboundHandler;
18 import io.netty.handler.codec.ByteToMessageDecoder;
19 import io.netty.handler.codec.MessageToByteEncoder;
20 import java.io.EOFException;
21 import org.eclipse.jdt.annotation.NonNull;
22 import org.opendaylight.netconf.api.NetconfExiSession;
23 import org.opendaylight.netconf.api.NetconfSession;
24 import org.opendaylight.netconf.api.NetconfSessionListener;
25 import org.opendaylight.netconf.api.NetconfTerminationReason;
26 import org.opendaylight.netconf.api.messages.NetconfMessage;
27 import org.opendaylight.netconf.api.xml.XmlElement;
28 import org.opendaylight.netconf.nettyutil.handler.NetconfEXICodec;
29 import org.opendaylight.netconf.nettyutil.handler.NetconfEXIToMessageDecoder;
30 import org.opendaylight.netconf.nettyutil.handler.NetconfMessageToEXIEncoder;
31 import org.opendaylight.netconf.nettyutil.handler.exi.EXIParameters;
32 import org.opendaylight.netconf.shaded.exificient.core.exceptions.EXIException;
33 import org.opendaylight.netconf.shaded.exificient.core.exceptions.UnsupportedOption;
34 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.SessionIdType;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 public abstract class AbstractNetconfSession<S extends NetconfSession, L extends NetconfSessionListener<S>>
39         extends SimpleChannelInboundHandler<Object> implements NetconfSession, NetconfExiSession {
40     private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfSession.class);
41
42     private final @NonNull SessionIdType sessionId;
43     private final L sessionListener;
44     private final Channel channel;
45
46     private ChannelHandler delayedEncoder;
47     private boolean up;
48
49     protected AbstractNetconfSession(final L sessionListener, final Channel channel, final SessionIdType sessionId) {
50         this.sessionListener = sessionListener;
51         this.channel = channel;
52         this.sessionId = requireNonNull(sessionId);
53         LOG.debug("Session {} created", sessionId);
54     }
55
56     protected abstract S thisInstance();
57
58     @Override
59     public void close() {
60         up = false;
61         channel.close();
62         sessionListener.onSessionTerminated(thisInstance(), new NetconfTerminationReason("Session closed"));
63     }
64
65     protected void handleMessage(final NetconfMessage netconfMessage) {
66         LOG.debug("handling incoming message");
67         sessionListener.onMessage(thisInstance(), netconfMessage);
68     }
69
70     protected void handleError(final Exception failure) {
71         LOG.debug("handling incoming error");
72         sessionListener.onError(thisInstance(), failure);
73     }
74
75     @Override
76     public ChannelFuture sendMessage(final NetconfMessage netconfMessage) {
77         // From: https://github.com/netty/netty/issues/3887
78         // Netty can provide "ordering" in the following situations:
79         // 1. You are doing all writes from the EventLoop thread; OR
80         // 2. You are doing no writes from the EventLoop thread (i.e. all writes are being done in other thread(s)).
81         //
82         // Restconf writes to a netconf mountpoint execute multiple messages
83         // and one of these was executed from a restconf thread thus breaking ordering so
84         // we need to execute all messages from an EventLoop thread.
85
86         final ChannelPromise promise = channel.newPromise();
87         channel.eventLoop().execute(() -> {
88             channel.writeAndFlush(netconfMessage, promise);
89             if (delayedEncoder != null) {
90                 replaceMessageEncoder(delayedEncoder);
91                 delayedEncoder = null;
92             }
93         });
94
95         return promise;
96     }
97
98     protected void endOfInput() {
99         LOG.debug("Session {} end of input detected while session was in state {}", this, up ? "up" : "initialized");
100         if (up) {
101             sessionListener.onSessionDown(thisInstance(), new EOFException("End of input"));
102         }
103     }
104
105     protected void sessionUp() {
106         LOG.debug("Session {} up", this);
107         sessionListener.onSessionUp(thisInstance());
108         up = true;
109     }
110
111     @Override
112     public String toString() {
113         final StringBuilder sb = new StringBuilder(getClass().getSimpleName() + "{");
114         sb.append("sessionId=").append(sessionId.getValue());
115         sb.append(", channel=").append(channel);
116         sb.append('}');
117         return sb.toString();
118     }
119
120     protected final void replaceMessageDecoder(final ChannelHandler handler) {
121         replaceChannelHandler(AbstractChannelInitializer.NETCONF_MESSAGE_DECODER, handler);
122     }
123
124     protected final void replaceMessageEncoder(final ChannelHandler handler) {
125         replaceChannelHandler(AbstractChannelInitializer.NETCONF_MESSAGE_ENCODER, handler);
126     }
127
128     protected final void replaceMessageEncoderAfterNextMessage(final ChannelHandler handler) {
129         delayedEncoder = handler;
130     }
131
132     protected final void replaceChannelHandler(final String handlerName, final ChannelHandler handler) {
133         channel.pipeline().replace(handlerName, handlerName, handler);
134     }
135
136     @Override
137     public final void startExiCommunication(final NetconfMessage startExiMessage) {
138         final EXIParameters exiParams;
139         try {
140             exiParams = EXIParameters.fromXmlElement(XmlElement.fromDomDocument(startExiMessage.getDocument()));
141         } catch (final UnsupportedOption e) {
142             LOG.warn("Unable to parse EXI parameters from {} on session {}", startExiMessage, this, e);
143             throw new IllegalArgumentException("Cannot parse options", e);
144         }
145
146         final NetconfEXICodec exiCodec = NetconfEXICodec.forParameters(exiParams);
147         final NetconfMessageToEXIEncoder exiEncoder = NetconfMessageToEXIEncoder.create(exiCodec);
148         final NetconfEXIToMessageDecoder exiDecoder;
149         try {
150             exiDecoder = NetconfEXIToMessageDecoder.create(exiCodec);
151         } catch (EXIException e) {
152             LOG.warn("Failed to instantiate EXI decodeer for {} on session {}", exiCodec, this, e);
153             throw new IllegalStateException("Cannot instantiate encoder for options", e);
154         }
155
156         addExiHandlers(exiDecoder, exiEncoder);
157         LOG.debug("Session {} EXI handlers added to pipeline", this);
158     }
159
160     /**
161      * Add a set encoder/decoder tuple into the channel pipeline as appropriate.
162      *
163      * @param decoder EXI decoder
164      * @param encoder EXI encoder
165      */
166     protected abstract void addExiHandlers(ByteToMessageDecoder decoder, MessageToByteEncoder<NetconfMessage> encoder);
167
168     public final boolean isUp() {
169         return up;
170     }
171
172     public final @NonNull SessionIdType sessionId() {
173         return sessionId;
174     }
175
176     @Override
177     @SuppressWarnings("checkstyle:illegalCatch")
178     public final void channelInactive(final ChannelHandlerContext ctx) {
179         LOG.debug("Channel {} inactive.", ctx.channel());
180         endOfInput();
181         try {
182             // Forward channel inactive event, all handlers in pipeline might be interested in the event e.g. close
183             // channel handler of reconnect promise
184             super.channelInactive(ctx);
185         } catch (final Exception e) {
186             throw new IllegalStateException("Failed to delegate channel inactive event on channel " + ctx.channel(), e);
187         }
188     }
189
190     @Override
191     protected final void channelRead0(final ChannelHandlerContext ctx, final Object msg) {
192         LOG.debug("Message was received: {}", msg);
193         if (msg instanceof NetconfMessage message) {
194             handleMessage(message);
195         } else if (msg instanceof Exception failure) {
196             handleError(failure);
197         } else {
198             LOG.warn("Ignoring unexpected message {}", msg);
199         }
200     }
201
202     @Override
203     public final void handlerAdded(final ChannelHandlerContext ctx) {
204         sessionUp();
205     }
206 }