HoneyNode Java 11 support for 221 devices
[transportpce.git] / tests / honeynode / 2.1 / netconf-netty-util / src / main / java / org / opendaylight / netconf / nettyutil / AbstractNetconfSession.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.nettyutil;
9
10 import com.siemens.ct.exi.core.exceptions.EXIException;
11 import com.siemens.ct.exi.core.exceptions.UnsupportedOption;
12 import io.netty.channel.Channel;
13 import io.netty.channel.ChannelFuture;
14 import io.netty.channel.ChannelHandler;
15 import io.netty.channel.DefaultChannelPromise;
16 import io.netty.handler.codec.ByteToMessageDecoder;
17 import io.netty.handler.codec.MessageToByteEncoder;
18 import io.netty.util.concurrent.Future;
19 import io.netty.util.concurrent.FutureListener;
20 import java.io.IOException;
21 import org.opendaylight.controller.config.util.xml.XmlElement;
22 import org.opendaylight.netconf.api.NetconfExiSession;
23 import org.opendaylight.netconf.api.NetconfMessage;
24 import org.opendaylight.netconf.api.NetconfSession;
25 import org.opendaylight.netconf.api.NetconfSessionListener;
26 import org.opendaylight.netconf.api.NetconfTerminationReason;
27 import org.opendaylight.netconf.nettyutil.handler.NetconfEXICodec;
28 import org.opendaylight.netconf.nettyutil.handler.NetconfEXIToMessageDecoder;
29 import org.opendaylight.netconf.nettyutil.handler.NetconfMessageToEXIEncoder;
30 import org.opendaylight.netconf.nettyutil.handler.exi.EXIParameters;
31 import org.opendaylight.protocol.framework.AbstractProtocolSession;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 public abstract class AbstractNetconfSession<S extends NetconfSession,L extends NetconfSessionListener<S>>
36         extends AbstractProtocolSession<NetconfMessage> implements NetconfSession, NetconfExiSession {
37     private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfSession.class);
38     private final L sessionListener;
39     private final long sessionId;
40     private boolean up = false;
41
42     private ChannelHandler delayedEncoder;
43
44     private final Channel channel;
45
46     protected AbstractNetconfSession(final L sessionListener, final Channel channel, final long sessionId) {
47         this.sessionListener = sessionListener;
48         this.channel = channel;
49         this.sessionId = sessionId;
50         LOG.debug("Session {} created", sessionId);
51     }
52
53     protected abstract S thisInstance();
54
55     @Override
56     public void close() {
57         channel.close();
58         up = false;
59         sessionListener.onSessionTerminated(thisInstance(), new NetconfTerminationReason("Session closed"));
60     }
61
62     @Override
63     protected void handleMessage(final NetconfMessage netconfMessage) {
64         LOG.debug("handling incoming message");
65         sessionListener.onMessage(thisInstance(), netconfMessage);
66     }
67
68     @Override
69     public ChannelFuture sendMessage(final NetconfMessage netconfMessage) {
70         // From: https://github.com/netty/netty/issues/3887
71         // Netty can provide "ordering" in the following situations:
72         // 1. You are doing all writes from the EventLoop thread; OR
73         // 2. You are doing no writes from the EventLoop thread (i.e. all writes are being done in other thread(s)).
74         //
75         // Restconf writes to a netconf mountpoint execute multiple messages
76         // and one of these was executed from a restconf thread thus breaking ordering so
77         // we need to execute all messages from an EventLoop thread.
78         final DefaultChannelPromise proxyFuture = new DefaultChannelPromise(channel);
79         channel.eventLoop().execute(new Runnable() {
80             @Override
81             public void run() {
82                 final ChannelFuture future = channel.writeAndFlush(netconfMessage);
83                 future.addListener(new FutureListener<Void>() {
84                     @Override
85                     public void operationComplete(Future<Void> future) throws Exception {
86                         if (future.isSuccess()) {
87                             proxyFuture.setSuccess();
88                         } else {
89                             proxyFuture.setFailure(future.cause());
90                         }
91                     }
92                 });
93                 if (delayedEncoder != null) {
94                     replaceMessageEncoder(delayedEncoder);
95                     delayedEncoder = null;
96                 }
97             }
98         });
99
100         return proxyFuture;
101     }
102
103     @Override
104     protected void endOfInput() {
105         LOG.debug("Session {} end of input detected while session was in state {}", toString(), isUp() ? "up"
106                 : "initialized");
107         if (isUp()) {
108             this.sessionListener.onSessionDown(thisInstance(),
109                     new IOException("End of input detected. Close the session."));
110         }
111     }
112
113     @Override
114     protected void sessionUp() {
115         LOG.debug("Session {} up", toString());
116         sessionListener.onSessionUp(thisInstance());
117         this.up = true;
118     }
119
120     @Override
121     public String toString() {
122         final StringBuffer sb = new StringBuffer(getClass().getSimpleName() + "{");
123         sb.append("sessionId=").append(sessionId);
124         sb.append(", channel=").append(channel);
125         sb.append('}');
126         return sb.toString();
127     }
128
129     protected final void replaceMessageDecoder(final ChannelHandler handler) {
130         replaceChannelHandler(AbstractChannelInitializer.NETCONF_MESSAGE_DECODER, handler);
131     }
132
133     protected final void replaceMessageEncoder(final ChannelHandler handler) {
134         replaceChannelHandler(AbstractChannelInitializer.NETCONF_MESSAGE_ENCODER, handler);
135     }
136
137     protected final void replaceMessageEncoderAfterNextMessage(final ChannelHandler handler) {
138         this.delayedEncoder = handler;
139     }
140
141     protected final void replaceChannelHandler(final String handlerName, final ChannelHandler handler) {
142         channel.pipeline().replace(handlerName, handlerName, handler);
143     }
144
145     @Override
146     public final void startExiCommunication(final NetconfMessage startExiMessage) {
147         final EXIParameters exiParams;
148         try {
149             exiParams = EXIParameters.fromXmlElement(XmlElement.fromDomDocument(startExiMessage.getDocument()));
150         } catch (final UnsupportedOption e) {
151             LOG.warn("Unable to parse EXI parameters from {} on session {}", startExiMessage, this, e);
152             throw new IllegalArgumentException("Cannot parse options", e);
153         }
154
155         final NetconfEXICodec exiCodec = NetconfEXICodec.forParameters(exiParams);
156         final NetconfMessageToEXIEncoder exiEncoder = NetconfMessageToEXIEncoder.create(exiCodec);
157         final NetconfEXIToMessageDecoder exiDecoder;
158         try {
159             exiDecoder = NetconfEXIToMessageDecoder.create(exiCodec);
160         } catch (EXIException e) {
161             LOG.warn("Failed to instantiate EXI decodeer for {} on session {}", exiCodec, this, e);
162             throw new IllegalStateException("Cannot instantiate encoder for options", e);
163         }
164
165         addExiHandlers(exiDecoder, exiEncoder);
166         LOG.debug("Session {} EXI handlers added to pipeline", this);
167     }
168
169     /**
170      * Add a set encoder/decoder tuple into the channel pipeline as appropriate.
171      *
172      * @param decoder EXI decoder
173      * @param encoder EXI encoder
174      */
175     protected abstract void addExiHandlers(ByteToMessageDecoder decoder, MessageToByteEncoder<NetconfMessage> encoder);
176
177     public final boolean isUp() {
178         return up;
179     }
180
181     public final long getSessionId() {
182         return sessionId;
183     }
184 }