Hide Netty(Pipeline)AwareChannelSubsystem
[netconf.git] / netconf / callhome-protocol / src / main / java / org / opendaylight / netconf / callhome / protocol / CallHomeSessionContext.java
1 /*
2  * Copyright (c) 2016 Brocade Communication Systems and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.callhome.protocol;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
12
13 import com.google.common.annotations.VisibleForTesting;
14 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
15 import io.netty.channel.ChannelHandlerContext;
16 import io.netty.channel.ChannelOutboundHandlerAdapter;
17 import io.netty.channel.ChannelPromise;
18 import io.netty.channel.EventLoopGroup;
19 import io.netty.util.concurrent.GlobalEventExecutor;
20 import io.netty.util.concurrent.Promise;
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.security.PublicKey;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ConcurrentMap;
26 import org.eclipse.jdt.annotation.Nullable;
27 import org.opendaylight.netconf.client.NetconfClientSession;
28 import org.opendaylight.netconf.client.NetconfClientSessionListener;
29 import org.opendaylight.netconf.client.NetconfClientSessionNegotiatorFactory;
30 import org.opendaylight.netconf.nettyutil.handler.ssh.client.AsyncSshHandlerWriter;
31 import org.opendaylight.netconf.nettyutil.handler.ssh.client.NetconfClientSessionImpl;
32 import org.opendaylight.netconf.shaded.sshd.client.channel.ChannelSubsystem;
33 import org.opendaylight.netconf.shaded.sshd.client.channel.ClientChannel;
34 import org.opendaylight.netconf.shaded.sshd.client.future.AuthFuture;
35 import org.opendaylight.netconf.shaded.sshd.client.future.OpenFuture;
36 import org.opendaylight.netconf.shaded.sshd.client.session.ClientSession;
37 import org.opendaylight.netconf.shaded.sshd.common.future.SshFutureListener;
38 import org.opendaylight.netconf.shaded.sshd.common.session.Session;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev230430.connection.parameters.Protocol.Name;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 // Non-final for testing
44 class CallHomeSessionContext implements CallHomeProtocolSessionContext {
45
46     private static final Logger LOG = LoggerFactory.getLogger(CallHomeSessionContext.class);
47     private static final String NETCONF = "netconf";
48
49     @VisibleForTesting
50     static final Session.AttributeKey<CallHomeSessionContext> SESSION_KEY = new Session.AttributeKey<>();
51
52     private final ClientSession sshSession;
53     private final CallHomeAuthorization authorization;
54     private final Factory factory;
55
56     private volatile boolean activated;
57
58     private final InetSocketAddress remoteAddress;
59     private final PublicKey serverKey;
60
61     @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Passing 'this' around")
62     CallHomeSessionContext(final ClientSession sshSession, final CallHomeAuthorization authorization,
63                            final Factory factory) {
64         this.authorization = requireNonNull(authorization, "authorization");
65         checkArgument(this.authorization.isServerAllowed(), "Server was not allowed.");
66         this.factory = requireNonNull(factory);
67         this.sshSession = requireNonNull(sshSession);
68         remoteAddress = (InetSocketAddress) this.sshSession.getIoSession().getRemoteAddress();
69         serverKey = this.sshSession.getServerKey();
70     }
71
72     final void associate() {
73         sshSession.setAttribute(SESSION_KEY, this);
74     }
75
76     static CallHomeSessionContext getFrom(final ClientSession sshSession) {
77         return sshSession.getAttribute(SESSION_KEY);
78     }
79
80     AuthFuture authorize() throws IOException {
81         authorization.applyTo(sshSession);
82         return sshSession.auth();
83     }
84
85     void openNetconfChannel() {
86         LOG.debug("Opening NETCONF Subsystem on {}", sshSession);
87         try {
88             final MinaSshNettyChannel nettyChannel = newMinaSshNettyChannel();
89             final ClientChannel netconfChannel =
90                     ((NetconfClientSessionImpl) sshSession).createSubsystemChannel(NETCONF, nettyChannel.pipeline());
91             netconfChannel.setStreaming(ClientChannel.Streaming.Async);
92             netconfChannel.open().addListener(newSshFutureListener(netconfChannel, nettyChannel));
93         } catch (IOException e) {
94             throw new IllegalStateException(e);
95         }
96     }
97
98     SshFutureListener<OpenFuture> newSshFutureListener(final ClientChannel netconfChannel,
99             final MinaSshNettyChannel nettyChannel) {
100         return future -> {
101             if (future.isOpened()) {
102                 factory.getChannelOpenListener().onNetconfSubsystemOpened(this,
103                     listener -> doActivate(netconfChannel, listener, nettyChannel));
104             } else {
105                 channelOpenFailed(future.getException());
106             }
107         };
108     }
109
110     @Override
111     public void terminate() {
112         sshSession.close(false);
113         removeSelf();
114     }
115
116     @Override
117     public Name getTransportType() {
118         return Name.SSH;
119     }
120
121     private void channelOpenFailed(final Throwable throwable) {
122         LOG.error("Unable to open netconf subsystem, disconnecting.", throwable);
123         sshSession.close(false);
124     }
125
126     private synchronized Promise<NetconfClientSession> doActivate(final ClientChannel netconfChannel,
127             final NetconfClientSessionListener listener, final MinaSshNettyChannel nettyChannel) {
128         if (activated) {
129             return newSessionPromise().setFailure(new IllegalStateException("Session already activated."));
130         }
131         activated = true;
132         nettyChannel.pipeline().addFirst(new SshWriteAsyncHandlerAdapter(netconfChannel));
133         LOG.info("Activating Netconf channel for {} with {}", getRemoteAddress(), listener);
134         final Promise<NetconfClientSession> activationPromise = newSessionPromise();
135         factory.getChannelInitializer(listener).initialize(nettyChannel, activationPromise);
136         ((ChannelSubsystem) netconfChannel).onClose(nettyChannel::doNettyDisconnect);
137         factory.getNettyGroup().register(nettyChannel).awaitUninterruptibly(500);
138         return activationPromise;
139     }
140
141     protected MinaSshNettyChannel newMinaSshNettyChannel() {
142         return new MinaSshNettyChannel(this, sshSession);
143     }
144
145     private static Promise<NetconfClientSession> newSessionPromise() {
146         return GlobalEventExecutor.INSTANCE.newPromise();
147     }
148
149     @Override
150     public PublicKey getRemoteServerKey() {
151         return serverKey;
152     }
153
154     @Override
155     public InetSocketAddress getRemoteAddress() {
156         return remoteAddress;
157     }
158
159     @Override
160     public String getSessionId() {
161         return authorization.getSessionName();
162     }
163
164     void removeSelf() {
165         factory.remove(this);
166     }
167
168     static class Factory {
169         private final ConcurrentMap<String, CallHomeSessionContext> sessions = new ConcurrentHashMap<>();
170         private final EventLoopGroup nettyGroup;
171         private final NetconfClientSessionNegotiatorFactory negotiatorFactory;
172         private final CallHomeNetconfSubsystemListener subsystemListener;
173
174         Factory(final EventLoopGroup nettyGroup, final NetconfClientSessionNegotiatorFactory negotiatorFactory,
175                 final CallHomeNetconfSubsystemListener subsystemListener) {
176             this.nettyGroup = requireNonNull(nettyGroup);
177             this.negotiatorFactory = requireNonNull(negotiatorFactory);
178             this.subsystemListener = requireNonNull(subsystemListener);
179         }
180
181         ReverseSshChannelInitializer getChannelInitializer(final NetconfClientSessionListener listener) {
182             return ReverseSshChannelInitializer.create(negotiatorFactory, listener);
183         }
184
185         CallHomeNetconfSubsystemListener getChannelOpenListener() {
186             return subsystemListener;
187         }
188
189         EventLoopGroup getNettyGroup() {
190             return nettyGroup;
191         }
192
193         @Nullable CallHomeSessionContext createIfNotExists(final ClientSession sshSession,
194                                                            final CallHomeAuthorization authorization) {
195             final var newSession = new CallHomeSessionContext(sshSession, authorization, this);
196             final var existing = sessions.putIfAbsent(newSession.getSessionId(), newSession);
197             if (existing == null) {
198                 // There was no mapping, but now there is. Associate the context with the session.
199                 newSession.associate();
200                 return newSession;
201             }
202
203             // We already have a mapping, do not create a new one. But also check if the current session matches
204             // the one stored in the session. This can happen during re-keying.
205             return existing == CallHomeSessionContext.getFrom(sshSession) ? existing : null;
206         }
207
208         void remove(final CallHomeSessionContext session) {
209             sessions.remove(session.getSessionId(), session);
210         }
211     }
212
213     static class SshWriteAsyncHandlerAdapter extends ChannelOutboundHandlerAdapter {
214         private final AsyncSshHandlerWriter sshWriteAsyncHandler;
215         private final ClientChannel sshChannel;
216
217         SshWriteAsyncHandlerAdapter(final ClientChannel sshChannel) {
218             this.sshChannel = sshChannel;
219             sshWriteAsyncHandler = new AsyncSshHandlerWriter(sshChannel.getAsyncIn());
220         }
221
222         @Override
223         public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
224             sshWriteAsyncHandler.write(ctx, msg, promise);
225         }
226
227         public ClientChannel getSshChannel() {
228             return sshChannel;
229         }
230     }
231 }