Pass maximum chunk size to AbstractNetconfSessionNegotiator
[netconf.git] / netconf / netconf-netty-util / src / main / java / org / opendaylight / netconf / nettyutil / AbstractNetconfSessionNegotiator.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.nettyutil;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12 import static java.util.Objects.requireNonNull;
13
14 import com.google.common.annotations.Beta;
15 import io.netty.channel.Channel;
16 import io.netty.channel.ChannelHandler;
17 import io.netty.channel.ChannelHandlerContext;
18 import io.netty.channel.ChannelInboundHandlerAdapter;
19 import io.netty.handler.ssl.SslHandler;
20 import io.netty.util.Timeout;
21 import io.netty.util.Timer;
22 import io.netty.util.concurrent.Promise;
23 import java.util.Optional;
24 import java.util.concurrent.TimeUnit;
25 import org.checkerframework.checker.index.qual.NonNegative;
26 import org.checkerframework.checker.lock.qual.GuardedBy;
27 import org.eclipse.jdt.annotation.NonNull;
28 import org.opendaylight.netconf.api.NetconfDocumentedException;
29 import org.opendaylight.netconf.api.NetconfMessage;
30 import org.opendaylight.netconf.api.NetconfSessionListener;
31 import org.opendaylight.netconf.api.messages.NetconfHelloMessage;
32 import org.opendaylight.netconf.api.xml.XmlNetconfConstants;
33 import org.opendaylight.netconf.nettyutil.handler.FramingMechanismHandlerFactory;
34 import org.opendaylight.netconf.nettyutil.handler.NetconfChunkAggregator;
35 import org.opendaylight.netconf.nettyutil.handler.NetconfMessageToXMLEncoder;
36 import org.opendaylight.netconf.nettyutil.handler.NetconfXMLToHelloMessageDecoder;
37 import org.opendaylight.netconf.nettyutil.handler.NetconfXMLToMessageDecoder;
38 import org.opendaylight.netconf.util.messages.FramingMechanism;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41 import org.w3c.dom.Document;
42 import org.w3c.dom.NodeList;
43
44 public abstract class AbstractNetconfSessionNegotiator<S extends AbstractNetconfSession<S, L>,
45             L extends NetconfSessionListener<S>>
46             extends ChannelInboundHandlerAdapter implements NetconfSessionNegotiator<S> {
47     /**
48      * Possible states for Finite State Machine.
49      */
50     protected enum State {
51         IDLE, OPEN_WAIT, FAILED, ESTABLISHED
52     }
53
54     private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfSessionNegotiator.class);
55     private static final String NAME_OF_EXCEPTION_HANDLER = "lastExceptionHandler";
56     private static final String DEFAULT_MAXIMUM_CHUNK_SIZE_PROP = "org.opendaylight.netconf.default.maximum.chunk.size";
57     private static final int DEFAULT_MAXIMUM_CHUNK_SIZE_DEFAULT = 16 * 1024 * 1024;
58
59     /**
60      * Default upper bound on the size of an individual chunk. This value can be controlled through
61      * {@value #DEFAULT_MAXIMUM_CHUNK_SIZE_PROP} system property and defaults to
62      * {@value #DEFAULT_MAXIMUM_CHUNK_SIZE_DEFAULT} bytes.
63      */
64     @Beta
65     public static final @NonNegative int DEFAULT_MAXIMUM_INCOMING_CHUNK_SIZE;
66
67     static {
68         final int propValue = Integer.getInteger(DEFAULT_MAXIMUM_CHUNK_SIZE_PROP, DEFAULT_MAXIMUM_CHUNK_SIZE_DEFAULT);
69         if (propValue <= 0) {
70             LOG.warn("Ignoring invalid {} value {}", DEFAULT_MAXIMUM_CHUNK_SIZE_PROP, propValue);
71             DEFAULT_MAXIMUM_INCOMING_CHUNK_SIZE = DEFAULT_MAXIMUM_CHUNK_SIZE_DEFAULT;
72         } else {
73             DEFAULT_MAXIMUM_INCOMING_CHUNK_SIZE = propValue;
74         }
75         LOG.debug("Default maximum incoming NETCONF chunk size is {} bytes", DEFAULT_MAXIMUM_INCOMING_CHUNK_SIZE);
76     }
77
78     private final @NonNull NetconfHelloMessage localHello;
79     protected final Channel channel;
80
81     private final @NonNegative int maximumIncomingChunkSize;
82     private final long connectionTimeoutMillis;
83     private final Promise<S> promise;
84     private final L sessionListener;
85     private final Timer timer;
86
87     private Timeout timeoutTask;
88
89     @GuardedBy("this")
90     private State state = State.IDLE;
91
92     protected AbstractNetconfSessionNegotiator(final NetconfHelloMessage hello, final Promise<S> promise,
93                                                final Channel channel, final Timer timer, final L sessionListener,
94                                                final long connectionTimeoutMillis,
95                                                final @NonNegative int maximumIncomingChunkSize) {
96         this.localHello = requireNonNull(hello);
97         this.promise = requireNonNull(promise);
98         this.channel = requireNonNull(channel);
99         this.timer = timer;
100         this.sessionListener = sessionListener;
101         this.connectionTimeoutMillis = connectionTimeoutMillis;
102         this.maximumIncomingChunkSize = maximumIncomingChunkSize;
103         checkArgument(maximumIncomingChunkSize > 0, "Invalid maximum incoming chunk size %s", maximumIncomingChunkSize);
104     }
105
106     @Deprecated(since = "4.0.1", forRemoval = true)
107     protected AbstractNetconfSessionNegotiator(final NetconfHelloMessage hello, final Promise<S> promise,
108                                                final Channel channel, final Timer timer,
109                                                final L sessionListener, final long connectionTimeoutMillis) {
110         this(hello, promise, channel, timer, sessionListener, connectionTimeoutMillis,
111             DEFAULT_MAXIMUM_INCOMING_CHUNK_SIZE);
112     }
113
114     protected final @NonNull NetconfHelloMessage localHello() {
115         return localHello;
116     }
117
118     protected final void startNegotiation() {
119         if (ifNegotiatedAlready()) {
120             LOG.debug("Negotiation on channel {} already started", channel);
121         } else {
122             final Optional<SslHandler> sslHandler = getSslHandler(channel);
123             if (sslHandler.isPresent()) {
124                 sslHandler.get().handshakeFuture().addListener(future -> {
125                     checkState(future.isSuccess(), "Ssl handshake was not successful");
126                     LOG.debug("Ssl handshake complete");
127                     start();
128                 });
129             } else {
130                 start();
131             }
132         }
133     }
134
135     protected final synchronized boolean ifNegotiatedAlready() {
136         // Indicates whether negotiation already started
137         return this.state != State.IDLE;
138     }
139
140     private static Optional<SslHandler> getSslHandler(final Channel channel) {
141         return Optional.ofNullable(channel.pipeline().get(SslHandler.class));
142     }
143
144     private void start() {
145         LOG.debug("Session negotiation started with hello message {} on channel {}", localHello, channel);
146
147         channel.pipeline().addLast(NAME_OF_EXCEPTION_HANDLER, new ExceptionHandlingInboundChannelHandler());
148
149         sendMessage(localHello);
150
151         replaceHelloMessageOutboundHandler();
152         changeState(State.OPEN_WAIT);
153
154         timeoutTask = this.timer.newTimeout(this::timeoutExpired, connectionTimeoutMillis, TimeUnit.MILLISECONDS);
155     }
156
157     private synchronized void timeoutExpired(final Timeout timeout) {
158         if (state != State.ESTABLISHED) {
159             LOG.debug("Connection timeout after {}, session backed by channel {} is in state {}", timeout, channel,
160                 state);
161
162             // Do not fail negotiation if promise is done or canceled
163             // It would result in setting result of the promise second time and that throws exception
164             if (!promise.isDone() && !promise.isCancelled()) {
165                 LOG.warn("Netconf session backed by channel {} was not established after {}", channel,
166                     connectionTimeoutMillis);
167                 changeState(State.FAILED);
168
169                 channel.close().addListener(future -> {
170                     if (future.isSuccess()) {
171                         LOG.debug("Channel {} closed: success", channel);
172                     } else {
173                         LOG.warn("Channel {} closed: fail", channel);
174                     }
175                 });
176             }
177         } else if (channel.isOpen()) {
178             channel.pipeline().remove(NAME_OF_EXCEPTION_HANDLER);
179         }
180     }
181
182     private void cancelTimeout() {
183         if (timeoutTask != null) {
184             timeoutTask.cancel();
185         }
186     }
187
188     protected final S getSessionForHelloMessage(final NetconfHelloMessage netconfMessage)
189             throws NetconfDocumentedException {
190         final Document doc = netconfMessage.getDocument();
191
192         if (shouldUseChunkFraming(doc)) {
193             insertChunkFramingToPipeline();
194         }
195
196         changeState(State.ESTABLISHED);
197         return getSession(sessionListener, channel, netconfMessage);
198     }
199
200     protected abstract S getSession(L sessionListener, Channel channel, NetconfHelloMessage message)
201         throws NetconfDocumentedException;
202
203     /**
204      * Insert chunk framing handlers into the pipeline.
205      */
206     private void insertChunkFramingToPipeline() {
207         replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_FRAME_ENCODER,
208                 FramingMechanismHandlerFactory.createHandler(FramingMechanism.CHUNK));
209         replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_AGGREGATOR,
210                 new NetconfChunkAggregator(maximumIncomingChunkSize));
211     }
212
213     private boolean shouldUseChunkFraming(final Document doc) {
214         return containsBase11Capability(doc) && containsBase11Capability(localHello.getDocument());
215     }
216
217     /**
218      * Remove special inbound handler for hello message. Insert regular netconf xml message (en|de)coders.
219      *
220      * <p>
221      * Inbound hello message handler should be kept until negotiation is successful
222      * It caches any non-hello messages while negotiation is still in progress
223      */
224     protected final void replaceHelloMessageInboundHandler(final S session) {
225         ChannelHandler helloMessageHandler = replaceChannelHandler(channel,
226                 AbstractChannelInitializer.NETCONF_MESSAGE_DECODER, new NetconfXMLToMessageDecoder());
227
228         checkState(helloMessageHandler instanceof NetconfXMLToHelloMessageDecoder,
229                 "Pipeline handlers misplaced on session: %s, pipeline: %s", session, channel.pipeline());
230         Iterable<NetconfMessage> netconfMessagesFromNegotiation =
231                 ((NetconfXMLToHelloMessageDecoder) helloMessageHandler).getPostHelloNetconfMessages();
232
233         // Process messages received during negotiation
234         // The hello message handler does not have to be synchronized,
235         // since it is always call from the same thread by netty.
236         // It means, we are now using the thread now
237         for (NetconfMessage message : netconfMessagesFromNegotiation) {
238             session.handleMessage(message);
239         }
240     }
241
242     /**
243      * Remove special outbound handler for hello message. Insert regular netconf xml message (en|de)coders.
244      */
245     private void replaceHelloMessageOutboundHandler() {
246         replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_ENCODER,
247                 new NetconfMessageToXMLEncoder());
248     }
249
250     private static ChannelHandler replaceChannelHandler(final Channel channel, final String handlerKey,
251                                                         final ChannelHandler decoder) {
252         return channel.pipeline().replace(handlerKey, handlerKey, decoder);
253     }
254
255     private synchronized void changeState(final State newState) {
256         LOG.debug("Changing state from : {} to : {} for channel: {}", state, newState, channel);
257         checkState(isStateChangePermitted(state, newState),
258                 "Cannot change state from %s to %s for channel %s", state, newState, channel);
259         this.state = newState;
260     }
261
262     private static boolean containsBase11Capability(final Document doc) {
263         final NodeList nList = doc.getElementsByTagNameNS(
264             XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0,
265             XmlNetconfConstants.CAPABILITY);
266         for (int i = 0; i < nList.getLength(); i++) {
267             if (nList.item(i).getTextContent().contains(XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1)) {
268                 return true;
269             }
270         }
271         return false;
272     }
273
274     private static boolean isStateChangePermitted(final State state, final State newState) {
275         if (state == State.IDLE && newState == State.OPEN_WAIT) {
276             return true;
277         }
278         if (state == State.OPEN_WAIT && newState == State.ESTABLISHED) {
279             return true;
280         }
281         if (state == State.OPEN_WAIT && newState == State.FAILED) {
282             return true;
283         }
284         LOG.debug("Transition from {} to {} is not allowed", state, newState);
285         return false;
286     }
287
288     /**
289      * Handler to catch exceptions in pipeline during negotiation.
290      */
291     private final class ExceptionHandlingInboundChannelHandler extends ChannelInboundHandlerAdapter {
292         @Override
293         public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
294             LOG.warn("An exception occurred during negotiation with {} on channel {}",
295                     channel.remoteAddress(), channel, cause);
296             // FIXME: this is quite suspect as it is competing with timeoutExpired() without synchronization
297             cancelTimeout();
298             negotiationFailed(cause);
299             changeState(State.FAILED);
300         }
301     }
302
303     protected final void negotiationSuccessful(final S session) {
304         LOG.debug("Negotiation on channel {} successful with session {}", channel, session);
305         channel.pipeline().replace(this, "session", session);
306         promise.setSuccess(session);
307     }
308
309     protected void negotiationFailed(final Throwable cause) {
310         LOG.debug("Negotiation on channel {} failed", channel, cause);
311         channel.close();
312         promise.setFailure(cause);
313     }
314
315     /**
316      * Send a message to peer and fail negotiation if it does not reach
317      * the peer.
318      *
319      * @param msg Message which should be sent.
320      */
321     protected final void sendMessage(final NetconfMessage msg) {
322         this.channel.writeAndFlush(msg).addListener(f -> {
323             if (!f.isSuccess()) {
324                 LOG.info("Failed to send message {} on channel {}", msg, channel, f.cause());
325                 negotiationFailed(f.cause());
326             } else {
327                 LOG.trace("Message {} sent to socket on channel {}", msg, channel);
328             }
329         });
330     }
331
332     @Override
333     @SuppressWarnings("checkstyle:illegalCatch")
334     public final void channelActive(final ChannelHandlerContext ctx) {
335         LOG.debug("Starting session negotiation on channel {}", channel);
336         try {
337             startNegotiation();
338         } catch (final Exception e) {
339             LOG.warn("Unexpected negotiation failure on channel {}", channel, e);
340             negotiationFailed(e);
341         }
342     }
343
344     @Override
345     @SuppressWarnings("checkstyle:illegalCatch")
346     public final void channelRead(final ChannelHandlerContext ctx, final Object msg) {
347         LOG.debug("Negotiation read invoked on channel {}", channel);
348         try {
349             handleMessage((NetconfHelloMessage) msg);
350         } catch (final Exception e) {
351             LOG.debug("Unexpected error while handling negotiation message {} on channel {}", msg, channel, e);
352             negotiationFailed(e);
353         }
354     }
355
356     @Override
357     public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
358         LOG.info("Unexpected error during negotiation on channel {}", channel, cause);
359         negotiationFailed(cause);
360     }
361
362     protected abstract void handleMessage(NetconfHelloMessage msg) throws Exception;
363 }