2 * Copyright (c) 2016 Brocade Communication Systems and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.callhome.protocol;
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
13 import com.google.common.annotations.VisibleForTesting;
14 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
15 import io.netty.channel.ChannelHandlerContext;
16 import io.netty.channel.ChannelOutboundHandlerAdapter;
17 import io.netty.channel.ChannelPromise;
18 import io.netty.channel.EventLoopGroup;
19 import io.netty.util.concurrent.GlobalEventExecutor;
20 import io.netty.util.concurrent.Promise;
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.security.PublicKey;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ConcurrentMap;
26 import org.eclipse.jdt.annotation.Nullable;
27 import org.opendaylight.netconf.api.TransportConstants;
28 import org.opendaylight.netconf.client.NetconfClientSession;
29 import org.opendaylight.netconf.client.NetconfClientSessionListener;
30 import org.opendaylight.netconf.client.NetconfClientSessionNegotiatorFactory;
31 import org.opendaylight.netconf.nettyutil.handler.ssh.client.AsyncSshHandlerWriter;
32 import org.opendaylight.netconf.nettyutil.handler.ssh.client.NetconfClientSessionImpl;
33 import org.opendaylight.netconf.shaded.sshd.client.channel.ChannelSubsystem;
34 import org.opendaylight.netconf.shaded.sshd.client.channel.ClientChannel;
35 import org.opendaylight.netconf.shaded.sshd.client.future.AuthFuture;
36 import org.opendaylight.netconf.shaded.sshd.client.future.OpenFuture;
37 import org.opendaylight.netconf.shaded.sshd.client.session.ClientSession;
38 import org.opendaylight.netconf.shaded.sshd.common.future.SshFutureListener;
39 import org.opendaylight.netconf.shaded.sshd.common.session.Session;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev230430.connection.parameters.Protocol.Name;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
44 // Non-final for testing
45 class CallHomeSessionContext implements CallHomeProtocolSessionContext {
46 private static final Logger LOG = LoggerFactory.getLogger(CallHomeSessionContext.class);
49 static final Session.AttributeKey<CallHomeSessionContext> SESSION_KEY = new Session.AttributeKey<>();
51 private final ClientSession sshSession;
52 private final CallHomeAuthorization authorization;
53 private final Factory factory;
55 private volatile boolean activated;
57 private final InetSocketAddress remoteAddress;
58 private final PublicKey serverKey;
60 @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Passing 'this' around")
61 CallHomeSessionContext(final ClientSession sshSession, final CallHomeAuthorization authorization,
62 final Factory factory) {
63 this.authorization = requireNonNull(authorization, "authorization");
64 checkArgument(this.authorization.isServerAllowed(), "Server was not allowed.");
65 this.factory = requireNonNull(factory);
66 this.sshSession = requireNonNull(sshSession);
67 remoteAddress = (InetSocketAddress) this.sshSession.getIoSession().getRemoteAddress();
68 serverKey = this.sshSession.getServerKey();
71 final void associate() {
72 sshSession.setAttribute(SESSION_KEY, this);
75 static CallHomeSessionContext getFrom(final ClientSession sshSession) {
76 return sshSession.getAttribute(SESSION_KEY);
79 AuthFuture authorize() throws IOException {
80 authorization.applyTo(sshSession);
81 return sshSession.auth();
84 void openNetconfChannel() {
85 LOG.debug("Opening NETCONF Subsystem on {}", sshSession);
87 final MinaSshNettyChannel nettyChannel = newMinaSshNettyChannel();
88 final ClientChannel netconfChannel = ((NetconfClientSessionImpl) sshSession).createSubsystemChannel(
89 TransportConstants.SSH_SUBSYSTEM, nettyChannel.pipeline());
90 netconfChannel.setStreaming(ClientChannel.Streaming.Async);
91 netconfChannel.open().addListener(newSshFutureListener(netconfChannel, nettyChannel));
92 } catch (IOException e) {
93 throw new IllegalStateException(e);
97 SshFutureListener<OpenFuture> newSshFutureListener(final ClientChannel netconfChannel,
98 final MinaSshNettyChannel nettyChannel) {
100 if (future.isOpened()) {
101 factory.getChannelOpenListener().onNetconfSubsystemOpened(this,
102 listener -> doActivate(netconfChannel, listener, nettyChannel));
104 channelOpenFailed(future.getException());
110 public void terminate() {
111 sshSession.close(false);
116 public Name getTransportType() {
120 private void channelOpenFailed(final Throwable throwable) {
121 LOG.error("Unable to open netconf subsystem, disconnecting.", throwable);
122 sshSession.close(false);
125 private synchronized Promise<NetconfClientSession> doActivate(final ClientChannel netconfChannel,
126 final NetconfClientSessionListener listener, final MinaSshNettyChannel nettyChannel) {
128 return newSessionPromise().setFailure(new IllegalStateException("Session already activated."));
131 nettyChannel.pipeline().addFirst(new SshWriteAsyncHandlerAdapter(netconfChannel));
132 LOG.info("Activating Netconf channel for {} with {}", getRemoteAddress(), listener);
133 final Promise<NetconfClientSession> activationPromise = newSessionPromise();
134 factory.getChannelInitializer(listener).initialize(nettyChannel, activationPromise);
135 ((ChannelSubsystem) netconfChannel).onClose(nettyChannel::doNettyDisconnect);
136 factory.getNettyGroup().register(nettyChannel).awaitUninterruptibly(500);
137 return activationPromise;
140 @Deprecated(since = "7.0.0", forRemoval = true)
141 protected MinaSshNettyChannel newMinaSshNettyChannel() {
142 return new MinaSshNettyChannel(this, sshSession);
145 private static Promise<NetconfClientSession> newSessionPromise() {
146 return GlobalEventExecutor.INSTANCE.newPromise();
150 public PublicKey getRemoteServerKey() {
155 public InetSocketAddress getRemoteAddress() {
156 return remoteAddress;
160 public String getSessionId() {
161 return authorization.getSessionName();
165 factory.remove(this);
168 static class Factory {
169 private final ConcurrentMap<String, CallHomeSessionContext> sessions = new ConcurrentHashMap<>();
170 private final EventLoopGroup nettyGroup;
171 private final NetconfClientSessionNegotiatorFactory negotiatorFactory;
172 private final CallHomeNetconfSubsystemListener subsystemListener;
174 Factory(final EventLoopGroup nettyGroup, final NetconfClientSessionNegotiatorFactory negotiatorFactory,
175 final CallHomeNetconfSubsystemListener subsystemListener) {
176 this.nettyGroup = requireNonNull(nettyGroup);
177 this.negotiatorFactory = requireNonNull(negotiatorFactory);
178 this.subsystemListener = requireNonNull(subsystemListener);
181 ReverseSshChannelInitializer getChannelInitializer(final NetconfClientSessionListener listener) {
182 return ReverseSshChannelInitializer.create(negotiatorFactory, listener);
185 CallHomeNetconfSubsystemListener getChannelOpenListener() {
186 return subsystemListener;
189 EventLoopGroup getNettyGroup() {
193 @Nullable CallHomeSessionContext createIfNotExists(final ClientSession sshSession,
194 final CallHomeAuthorization authorization) {
195 final var newSession = new CallHomeSessionContext(sshSession, authorization, this);
196 final var existing = sessions.putIfAbsent(newSession.getSessionId(), newSession);
197 if (existing == null) {
198 // There was no mapping, but now there is. Associate the context with the session.
199 newSession.associate();
203 // We already have a mapping, do not create a new one. But also check if the current session matches
204 // the one stored in the session. This can happen during re-keying.
205 return existing == CallHomeSessionContext.getFrom(sshSession) ? existing : null;
208 void remove(final CallHomeSessionContext session) {
209 sessions.remove(session.getSessionId(), session);
213 static class SshWriteAsyncHandlerAdapter extends ChannelOutboundHandlerAdapter {
214 private final AsyncSshHandlerWriter sshWriteAsyncHandler;
215 private final ClientChannel sshChannel;
217 SshWriteAsyncHandlerAdapter(final ClientChannel sshChannel) {
218 this.sshChannel = sshChannel;
219 sshWriteAsyncHandler = new AsyncSshHandlerWriter(sshChannel.getAsyncIn());
223 public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
224 sshWriteAsyncHandler.write(ctx, msg, promise);
227 public ClientChannel getSshChannel() {