Remove opendaylight directory
[netconf.git] / netconf / netconf-netty-util / src / main / java / org / opendaylight / netconf / nettyutil / AbstractNetconfSession.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.nettyutil;
9
10 import io.netty.channel.Channel;
11 import io.netty.channel.ChannelFuture;
12 import io.netty.channel.ChannelHandler;
13 import io.netty.channel.DefaultChannelPromise;
14 import io.netty.handler.codec.ByteToMessageDecoder;
15 import io.netty.handler.codec.MessageToByteEncoder;
16 import io.netty.util.concurrent.Future;
17 import io.netty.util.concurrent.FutureListener;
18 import java.io.IOException;
19 import org.opendaylight.controller.config.util.xml.XmlElement;
20 import org.opendaylight.netconf.api.NetconfExiSession;
21 import org.opendaylight.netconf.api.NetconfMessage;
22 import org.opendaylight.netconf.api.NetconfSession;
23 import org.opendaylight.netconf.api.NetconfSessionListener;
24 import org.opendaylight.netconf.api.NetconfTerminationReason;
25 import org.opendaylight.netconf.nettyutil.handler.NetconfEXICodec;
26 import org.opendaylight.netconf.nettyutil.handler.NetconfEXIToMessageDecoder;
27 import org.opendaylight.netconf.nettyutil.handler.NetconfMessageToEXIEncoder;
28 import org.opendaylight.netconf.nettyutil.handler.exi.EXIParameters;
29 import org.opendaylight.protocol.framework.AbstractProtocolSession;
30 import org.openexi.proc.common.EXIOptionsException;
31 import org.openexi.sax.TransmogrifierException;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 public abstract class AbstractNetconfSession<S extends NetconfSession, L extends NetconfSessionListener<S>> extends AbstractProtocolSession<NetconfMessage> implements NetconfSession, NetconfExiSession {
36     private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfSession.class);
37     private final L sessionListener;
38     private final long sessionId;
39     private boolean up = false;
40
41     private ChannelHandler delayedEncoder;
42
43     private final Channel channel;
44
45     protected AbstractNetconfSession(final L sessionListener, final Channel channel, final long sessionId) {
46         this.sessionListener = sessionListener;
47         this.channel = channel;
48         this.sessionId = sessionId;
49         LOG.debug("Session {} created", sessionId);
50     }
51
52     protected abstract S thisInstance();
53
54     @Override
55     public void close() {
56         channel.close();
57         up = false;
58         sessionListener.onSessionTerminated(thisInstance(), new NetconfTerminationReason("Session closed"));
59     }
60
61     @Override
62     protected void handleMessage(final NetconfMessage netconfMessage) {
63         LOG.debug("handling incoming message");
64         sessionListener.onMessage(thisInstance(), netconfMessage);
65     }
66
67     @Override
68     public ChannelFuture sendMessage(final NetconfMessage netconfMessage) {
69         // From: https://github.com/netty/netty/issues/3887
70         // Netty can provide "ordering" in the following situations:
71         // 1. You are doing all writes from the EventLoop thread; OR
72         // 2. You are doing no writes from the EventLoop thread (i.e. all writes are being done in other thread(s)).
73         //
74         // Restconf writes to a netconf mountpoint execute multiple messages
75         // and one of these was executed from a restconf thread thus breaking ordering so
76         // we need to execute all messages from an EventLoop thread.
77         final DefaultChannelPromise proxyFuture = new DefaultChannelPromise(channel);
78         channel.eventLoop().execute(new Runnable() {
79             @Override
80             public void run() {
81                 final ChannelFuture future = channel.writeAndFlush(netconfMessage);
82                 future.addListener(new FutureListener<Void>() {
83                     @Override
84                     public void operationComplete(Future<Void> future) throws Exception {
85                         if (future.isSuccess()) {
86                             proxyFuture.setSuccess();
87                         } else {
88                             proxyFuture.setFailure(future.cause());
89                         }
90                     }
91                 });
92                 if (delayedEncoder != null) {
93                     replaceMessageEncoder(delayedEncoder);
94                     delayedEncoder = null;
95                 }
96             }
97         });
98
99         return proxyFuture;
100     }
101
102     @Override
103     protected void endOfInput() {
104         LOG.debug("Session {} end of input detected while session was in state {}", toString(), isUp() ? "up"
105                 : "initialized");
106         if (isUp()) {
107             this.sessionListener.onSessionDown(thisInstance(), new IOException("End of input detected. Close the session."));
108         }
109     }
110
111     @Override
112     protected void sessionUp() {
113         LOG.debug("Session {} up", toString());
114         sessionListener.onSessionUp(thisInstance());
115         this.up = true;
116     }
117
118     @Override
119     public String toString() {
120         final StringBuffer sb = new StringBuffer(getClass().getSimpleName() + "{");
121         sb.append("sessionId=").append(sessionId);
122         sb.append(", channel=").append(channel);
123         sb.append('}');
124         return sb.toString();
125     }
126
127     protected final void replaceMessageDecoder(final ChannelHandler handler) {
128         replaceChannelHandler(AbstractChannelInitializer.NETCONF_MESSAGE_DECODER, handler);
129     }
130
131     protected final void replaceMessageEncoder(final ChannelHandler handler) {
132         replaceChannelHandler(AbstractChannelInitializer.NETCONF_MESSAGE_ENCODER, handler);
133     }
134
135     protected final void replaceMessageEncoderAfterNextMessage(final ChannelHandler handler) {
136         this.delayedEncoder = handler;
137     }
138
139     protected final void replaceChannelHandler(final String handlerName, final ChannelHandler handler) {
140         channel.pipeline().replace(handlerName, handlerName, handler);
141     }
142
143     @Override
144     public final void startExiCommunication(final NetconfMessage startExiMessage) {
145         final EXIParameters exiParams;
146         try {
147             exiParams = EXIParameters.fromXmlElement(XmlElement.fromDomDocument(startExiMessage.getDocument()));
148         } catch (final EXIOptionsException e) {
149             LOG.warn("Unable to parse EXI parameters from {} on session {}", startExiMessage, this, e);
150             throw new IllegalArgumentException("Cannot parse options", e);
151         }
152
153         final NetconfEXICodec exiCodec = new NetconfEXICodec(exiParams.getOptions());
154         final NetconfMessageToEXIEncoder exiEncoder;
155         try {
156             exiEncoder = NetconfMessageToEXIEncoder.create(exiCodec);
157         } catch (EXIOptionsException | TransmogrifierException e) {
158             LOG.warn("Failed to instantiate EXI encoder for {} on session {}", exiCodec, this, e);
159             throw new IllegalStateException("Cannot instantiate encoder for options", e);
160         }
161
162         final NetconfEXIToMessageDecoder exiDecoder;
163         try {
164             exiDecoder = NetconfEXIToMessageDecoder.create(exiCodec);
165         } catch (EXIOptionsException e) {
166             LOG.warn("Failed to instantiate EXI decodeer for {} on session {}", exiCodec, this, e);
167             throw new IllegalStateException("Cannot instantiate encoder for options", e);
168         }
169
170         addExiHandlers(exiDecoder, exiEncoder);
171         LOG.debug("Session {} EXI handlers added to pipeline", this);
172     }
173
174     /**
175      * Add a set encoder/decoder tuple into the channel pipeline as appropriate.
176      *
177      * @param decoder EXI decoder
178      * @param encoder EXI encoder
179      */
180     protected abstract void addExiHandlers(ByteToMessageDecoder decoder, MessageToByteEncoder<NetconfMessage> encoder);
181
182     public final boolean isUp() {
183         return up;
184     }
185
186     public final long getSessionId() {
187         return sessionId;
188     }
189 }