Migrate netconf-topology to new transport
[netconf.git] / netconf / callhome-protocol / src / main / java / org / opendaylight / netconf / callhome / protocol / CallHomeSessionContext.java
1 /*
2  * Copyright (c) 2016 Brocade Communication Systems and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.callhome.protocol;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
12
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.SettableFuture;
17 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
18 import io.netty.channel.ChannelHandlerContext;
19 import io.netty.channel.ChannelOutboundHandlerAdapter;
20 import io.netty.channel.ChannelPromise;
21 import io.netty.channel.EventLoopGroup;
22 import io.netty.util.concurrent.GlobalEventExecutor;
23 import io.netty.util.concurrent.Promise;
24 import java.io.IOException;
25 import java.net.InetSocketAddress;
26 import java.security.PublicKey;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ConcurrentMap;
29 import org.eclipse.jdt.annotation.Nullable;
30 import org.opendaylight.netconf.client.NetconfClientSession;
31 import org.opendaylight.netconf.client.NetconfClientSessionListener;
32 import org.opendaylight.netconf.client.NetconfClientSessionNegotiatorFactory;
33 import org.opendaylight.netconf.nettyutil.handler.ssh.client.AsyncSshHandlerWriter;
34 import org.opendaylight.netconf.nettyutil.handler.ssh.client.NetconfClientSessionImpl;
35 import org.opendaylight.netconf.shaded.sshd.client.channel.ChannelSubsystem;
36 import org.opendaylight.netconf.shaded.sshd.client.channel.ClientChannel;
37 import org.opendaylight.netconf.shaded.sshd.client.future.AuthFuture;
38 import org.opendaylight.netconf.shaded.sshd.client.future.OpenFuture;
39 import org.opendaylight.netconf.shaded.sshd.client.session.ClientSession;
40 import org.opendaylight.netconf.shaded.sshd.common.future.SshFutureListener;
41 import org.opendaylight.netconf.shaded.sshd.common.session.Session;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev231025.connection.parameters.Protocol.Name;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 // Non-final for testing
47 class CallHomeSessionContext implements CallHomeProtocolSessionContext {
48
49     private static final Logger LOG = LoggerFactory.getLogger(CallHomeSessionContext.class);
50     private static final String NETCONF = "netconf";
51
52     @VisibleForTesting
53     static final Session.AttributeKey<CallHomeSessionContext> SESSION_KEY = new Session.AttributeKey<>();
54
55     private final ClientSession sshSession;
56     private final CallHomeAuthorization authorization;
57     private final Factory factory;
58
59     private volatile boolean activated;
60
61     private final InetSocketAddress remoteAddress;
62     private final PublicKey serverKey;
63
64     @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Passing 'this' around")
65     CallHomeSessionContext(final ClientSession sshSession, final CallHomeAuthorization authorization,
66                            final Factory factory) {
67         this.authorization = requireNonNull(authorization, "authorization");
68         checkArgument(this.authorization.isServerAllowed(), "Server was not allowed.");
69         this.factory = requireNonNull(factory);
70         this.sshSession = requireNonNull(sshSession);
71         remoteAddress = (InetSocketAddress) this.sshSession.getIoSession().getRemoteAddress();
72         serverKey = this.sshSession.getServerKey();
73     }
74
75     final void associate() {
76         sshSession.setAttribute(SESSION_KEY, this);
77     }
78
79     static CallHomeSessionContext getFrom(final ClientSession sshSession) {
80         return sshSession.getAttribute(SESSION_KEY);
81     }
82
83     AuthFuture authorize() throws IOException {
84         authorization.applyTo(sshSession);
85         return sshSession.auth();
86     }
87
88     void openNetconfChannel() {
89         LOG.debug("Opening NETCONF Subsystem on {}", sshSession);
90         try {
91             final MinaSshNettyChannel nettyChannel = newMinaSshNettyChannel();
92             final ClientChannel netconfChannel =
93                     ((NetconfClientSessionImpl) sshSession).createSubsystemChannel(NETCONF, nettyChannel.pipeline());
94             netconfChannel.setStreaming(ClientChannel.Streaming.Async);
95             netconfChannel.open().addListener(newSshFutureListener(netconfChannel, nettyChannel));
96         } catch (IOException e) {
97             throw new IllegalStateException(e);
98         }
99     }
100
101     SshFutureListener<OpenFuture> newSshFutureListener(final ClientChannel netconfChannel,
102             final MinaSshNettyChannel nettyChannel) {
103         return future -> {
104             if (future.isOpened()) {
105                 factory.getChannelOpenListener().onNetconfSubsystemOpened(this,
106                     listener -> doActivate(netconfChannel, listener, nettyChannel));
107             } else {
108                 channelOpenFailed(future.getException());
109             }
110         };
111     }
112
113     @Override
114     public void terminate() {
115         sshSession.close(false);
116         removeSelf();
117     }
118
119     @Override
120     public Name getTransportType() {
121         return Name.SSH;
122     }
123
124     private void channelOpenFailed(final Throwable throwable) {
125         LOG.error("Unable to open netconf subsystem, disconnecting.", throwable);
126         sshSession.close(false);
127     }
128
129     private synchronized ListenableFuture<NetconfClientSession> doActivate(final ClientChannel netconfChannel,
130             final NetconfClientSessionListener listener, final MinaSshNettyChannel nettyChannel) {
131         if (activated) {
132             return Futures.immediateFailedFuture(new IllegalStateException("Session already activated."));
133         }
134         activated = true;
135         nettyChannel.pipeline().addFirst(new SshWriteAsyncHandlerAdapter(netconfChannel));
136         LOG.info("Activating Netconf channel for {} with {}", getRemoteAddress(), listener);
137         final Promise<NetconfClientSession> activationPromise = newSessionPromise();
138         factory.getChannelInitializer(listener).initialize(nettyChannel, activationPromise);
139         ((ChannelSubsystem) netconfChannel).onClose(nettyChannel::doNettyDisconnect);
140         factory.getNettyGroup().register(nettyChannel).awaitUninterruptibly(500);
141         final SettableFuture<NetconfClientSession> future = SettableFuture.create();
142         activationPromise.addListener(ignored -> {
143             final var cause = activationPromise.cause();
144             if (cause != null) {
145                 future.setException(cause);
146             } else {
147                 future.set(activationPromise.getNow());
148             }
149         });
150         return future;
151     }
152
153     @Deprecated(since = "7.0.0", forRemoval = true)
154     @VisibleForTesting
155     MinaSshNettyChannel newMinaSshNettyChannel() {
156         return new MinaSshNettyChannel(this, sshSession);
157     }
158
159     private static Promise<NetconfClientSession> newSessionPromise() {
160         return GlobalEventExecutor.INSTANCE.newPromise();
161     }
162
163     @Override
164     public PublicKey getRemoteServerKey() {
165         return serverKey;
166     }
167
168     @Override
169     public InetSocketAddress getRemoteAddress() {
170         return remoteAddress;
171     }
172
173     @Override
174     public String getSessionId() {
175         return authorization.getSessionName();
176     }
177
178     void removeSelf() {
179         factory.remove(this);
180     }
181
182     static class Factory {
183         private final ConcurrentMap<String, CallHomeSessionContext> sessions = new ConcurrentHashMap<>();
184         private final EventLoopGroup nettyGroup;
185         private final NetconfClientSessionNegotiatorFactory negotiatorFactory;
186         private final CallHomeNetconfSubsystemListener subsystemListener;
187
188         Factory(final EventLoopGroup nettyGroup, final NetconfClientSessionNegotiatorFactory negotiatorFactory,
189                 final CallHomeNetconfSubsystemListener subsystemListener) {
190             this.nettyGroup = requireNonNull(nettyGroup);
191             this.negotiatorFactory = requireNonNull(negotiatorFactory);
192             this.subsystemListener = requireNonNull(subsystemListener);
193         }
194
195         ReverseSshChannelInitializer getChannelInitializer(final NetconfClientSessionListener listener) {
196             return ReverseSshChannelInitializer.create(negotiatorFactory, listener);
197         }
198
199         CallHomeNetconfSubsystemListener getChannelOpenListener() {
200             return subsystemListener;
201         }
202
203         EventLoopGroup getNettyGroup() {
204             return nettyGroup;
205         }
206
207         @Nullable CallHomeSessionContext createIfNotExists(final ClientSession sshSession,
208                                                            final CallHomeAuthorization authorization) {
209             final var newSession = new CallHomeSessionContext(sshSession, authorization, this);
210             final var existing = sessions.putIfAbsent(newSession.getSessionId(), newSession);
211             if (existing == null) {
212                 // There was no mapping, but now there is. Associate the context with the session.
213                 newSession.associate();
214                 return newSession;
215             }
216
217             // We already have a mapping, do not create a new one. But also check if the current session matches
218             // the one stored in the session. This can happen during re-keying.
219             return existing == CallHomeSessionContext.getFrom(sshSession) ? existing : null;
220         }
221
222         void remove(final CallHomeSessionContext session) {
223             sessions.remove(session.getSessionId(), session);
224         }
225     }
226
227     static class SshWriteAsyncHandlerAdapter extends ChannelOutboundHandlerAdapter {
228         private final AsyncSshHandlerWriter sshWriteAsyncHandler;
229         private final ClientChannel sshChannel;
230
231         SshWriteAsyncHandlerAdapter(final ClientChannel sshChannel) {
232             this.sshChannel = sshChannel;
233             sshWriteAsyncHandler = new AsyncSshHandlerWriter(sshChannel.getAsyncIn());
234         }
235
236         @Override
237         public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
238             sshWriteAsyncHandler.write(ctx, msg, promise);
239         }
240
241         public ClientChannel getSshChannel() {
242             return sshChannel;
243         }
244     }
245 }