Merge "Refactor bad local variable name in KeepaliveSalFacade"
[netconf.git] / netconf / netconf-netty-util / src / main / java / org / opendaylight / netconf / nettyutil / AbstractNetconfSession.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.nettyutil;
9
10 import com.siemens.ct.exi.core.exceptions.EXIException;
11 import com.siemens.ct.exi.core.exceptions.UnsupportedOption;
12 import io.netty.channel.Channel;
13 import io.netty.channel.ChannelFuture;
14 import io.netty.channel.ChannelHandler;
15 import io.netty.channel.ChannelPromise;
16 import io.netty.handler.codec.ByteToMessageDecoder;
17 import io.netty.handler.codec.MessageToByteEncoder;
18 import java.io.IOException;
19 import org.opendaylight.netconf.api.NetconfExiSession;
20 import org.opendaylight.netconf.api.NetconfMessage;
21 import org.opendaylight.netconf.api.NetconfSession;
22 import org.opendaylight.netconf.api.NetconfSessionListener;
23 import org.opendaylight.netconf.api.NetconfTerminationReason;
24 import org.opendaylight.netconf.api.xml.XmlElement;
25 import org.opendaylight.netconf.nettyutil.handler.NetconfEXICodec;
26 import org.opendaylight.netconf.nettyutil.handler.NetconfEXIToMessageDecoder;
27 import org.opendaylight.netconf.nettyutil.handler.NetconfMessageToEXIEncoder;
28 import org.opendaylight.netconf.nettyutil.handler.exi.EXIParameters;
29 import org.opendaylight.protocol.framework.AbstractProtocolSession;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 public abstract class AbstractNetconfSession<S extends NetconfSession,L extends NetconfSessionListener<S>>
34         extends AbstractProtocolSession<NetconfMessage> implements NetconfSession, NetconfExiSession {
35     private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfSession.class);
36     private final L sessionListener;
37     private final long sessionId;
38     private boolean up = false;
39
40     private ChannelHandler delayedEncoder;
41
42     private final Channel channel;
43
44     protected AbstractNetconfSession(final L sessionListener, final Channel channel, final long sessionId) {
45         this.sessionListener = sessionListener;
46         this.channel = channel;
47         this.sessionId = sessionId;
48         LOG.debug("Session {} created", sessionId);
49     }
50
51     protected abstract S thisInstance();
52
53     @Override
54     public void close() {
55         channel.close();
56         up = false;
57         sessionListener.onSessionTerminated(thisInstance(), new NetconfTerminationReason("Session closed"));
58     }
59
60     @Override
61     protected void handleMessage(final NetconfMessage netconfMessage) {
62         LOG.debug("handling incoming message");
63         sessionListener.onMessage(thisInstance(), netconfMessage);
64     }
65
66     @Override
67     public ChannelFuture sendMessage(final NetconfMessage netconfMessage) {
68         // From: https://github.com/netty/netty/issues/3887
69         // Netty can provide "ordering" in the following situations:
70         // 1. You are doing all writes from the EventLoop thread; OR
71         // 2. You are doing no writes from the EventLoop thread (i.e. all writes are being done in other thread(s)).
72         //
73         // Restconf writes to a netconf mountpoint execute multiple messages
74         // and one of these was executed from a restconf thread thus breaking ordering so
75         // we need to execute all messages from an EventLoop thread.
76
77         final ChannelPromise promise = channel.newPromise();
78         channel.eventLoop().execute(() -> {
79             channel.writeAndFlush(netconfMessage, promise);
80             if (delayedEncoder != null) {
81                 replaceMessageEncoder(delayedEncoder);
82                 delayedEncoder = null;
83             }
84         });
85
86         return promise;
87     }
88
89     @Override
90     protected void endOfInput() {
91         LOG.debug("Session {} end of input detected while session was in state {}", toString(), isUp() ? "up"
92                 : "initialized");
93         if (isUp()) {
94             this.sessionListener.onSessionDown(thisInstance(),
95                     new IOException("End of input detected. Close the session."));
96         }
97     }
98
99     @Override
100     protected void sessionUp() {
101         LOG.debug("Session {} up", toString());
102         sessionListener.onSessionUp(thisInstance());
103         this.up = true;
104     }
105
106     @Override
107     public String toString() {
108         final StringBuilder sb = new StringBuilder(getClass().getSimpleName() + "{");
109         sb.append("sessionId=").append(sessionId);
110         sb.append(", channel=").append(channel);
111         sb.append('}');
112         return sb.toString();
113     }
114
115     protected final void replaceMessageDecoder(final ChannelHandler handler) {
116         replaceChannelHandler(AbstractChannelInitializer.NETCONF_MESSAGE_DECODER, handler);
117     }
118
119     protected final void replaceMessageEncoder(final ChannelHandler handler) {
120         replaceChannelHandler(AbstractChannelInitializer.NETCONF_MESSAGE_ENCODER, handler);
121     }
122
123     protected final void replaceMessageEncoderAfterNextMessage(final ChannelHandler handler) {
124         this.delayedEncoder = handler;
125     }
126
127     protected final void replaceChannelHandler(final String handlerName, final ChannelHandler handler) {
128         channel.pipeline().replace(handlerName, handlerName, handler);
129     }
130
131     @Override
132     public final void startExiCommunication(final NetconfMessage startExiMessage) {
133         final EXIParameters exiParams;
134         try {
135             exiParams = EXIParameters.fromXmlElement(XmlElement.fromDomDocument(startExiMessage.getDocument()));
136         } catch (final UnsupportedOption e) {
137             LOG.warn("Unable to parse EXI parameters from {} on session {}", startExiMessage, this, e);
138             throw new IllegalArgumentException("Cannot parse options", e);
139         }
140
141         final NetconfEXICodec exiCodec = NetconfEXICodec.forParameters(exiParams);
142         final NetconfMessageToEXIEncoder exiEncoder = NetconfMessageToEXIEncoder.create(exiCodec);
143         final NetconfEXIToMessageDecoder exiDecoder;
144         try {
145             exiDecoder = NetconfEXIToMessageDecoder.create(exiCodec);
146         } catch (EXIException e) {
147             LOG.warn("Failed to instantiate EXI decodeer for {} on session {}", exiCodec, this, e);
148             throw new IllegalStateException("Cannot instantiate encoder for options", e);
149         }
150
151         addExiHandlers(exiDecoder, exiEncoder);
152         LOG.debug("Session {} EXI handlers added to pipeline", this);
153     }
154
155     /**
156      * Add a set encoder/decoder tuple into the channel pipeline as appropriate.
157      *
158      * @param decoder EXI decoder
159      * @param encoder EXI encoder
160      */
161     protected abstract void addExiHandlers(ByteToMessageDecoder decoder, MessageToByteEncoder<NetconfMessage> encoder);
162
163     public final boolean isUp() {
164         return up;
165     }
166
167     public final long getSessionId() {
168         return sessionId;
169     }
170 }