Do not require NetconfSessionImpl
[netconf.git] / netconf / callhome-protocol / src / main / java / org / opendaylight / netconf / callhome / protocol / CallHomeSessionContext.java
1 /*
2  * Copyright (c) 2016 Brocade Communication Systems and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.callhome.protocol;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
12
13 import io.netty.channel.EventLoopGroup;
14 import io.netty.util.concurrent.GlobalEventExecutor;
15 import io.netty.util.concurrent.Promise;
16 import java.io.IOException;
17 import java.net.InetSocketAddress;
18 import java.net.SocketAddress;
19 import java.security.PublicKey;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ConcurrentMap;
22 import org.apache.sshd.client.channel.ClientChannel;
23 import org.apache.sshd.client.future.AuthFuture;
24 import org.apache.sshd.client.future.OpenFuture;
25 import org.apache.sshd.client.session.ClientSession;
26 import org.apache.sshd.common.future.SshFutureListener;
27 import org.apache.sshd.common.session.Session;
28 import org.checkerframework.checker.lock.qual.Holding;
29 import org.eclipse.jdt.annotation.Nullable;
30 import org.opendaylight.netconf.client.NetconfClientSession;
31 import org.opendaylight.netconf.client.NetconfClientSessionListener;
32 import org.opendaylight.netconf.client.NetconfClientSessionNegotiatorFactory;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 class CallHomeSessionContext implements CallHomeProtocolSessionContext {
37
38     private static final Logger LOG = LoggerFactory.getLogger(CallHomeSessionContext.class);
39     static final Session.AttributeKey<CallHomeSessionContext> SESSION_KEY = new Session.AttributeKey<>();
40
41     private static final String NETCONF = "netconf";
42
43     private final ClientSession sshSession;
44     private final CallHomeAuthorization authorization;
45     private final Factory factory;
46
47     private volatile MinaSshNettyChannel nettyChannel = null;
48     private volatile boolean activated;
49
50     private final InetSocketAddress remoteAddress;
51     private final PublicKey serverKey;
52
53     CallHomeSessionContext(final ClientSession sshSession, final CallHomeAuthorization authorization,
54                            final SocketAddress remoteAddress, final Factory factory) {
55         this.authorization = requireNonNull(authorization, "authorization");
56         checkArgument(this.authorization.isServerAllowed(), "Server was not allowed.");
57         this.factory = requireNonNull(factory, "factory");
58         this.sshSession = requireNonNull(sshSession, "sshSession");
59         this.sshSession.setAttribute(SESSION_KEY, this);
60         this.remoteAddress = (InetSocketAddress) this.sshSession.getIoSession().getRemoteAddress();
61         this.serverKey = this.sshSession.getKex().getServerKey();
62     }
63
64     static CallHomeSessionContext getFrom(final ClientSession sshSession) {
65         return sshSession.getAttribute(SESSION_KEY);
66     }
67
68     AuthFuture authorize() throws IOException {
69         authorization.applyTo(sshSession);
70         return sshSession.auth();
71     }
72
73     void openNetconfChannel() {
74         LOG.debug("Opening NETCONF Subsystem on {}", sshSession);
75         try {
76             final ClientChannel netconfChannel = sshSession.createSubsystemChannel(NETCONF);
77             netconfChannel.setStreaming(ClientChannel.Streaming.Async);
78             netconfChannel.open().addListener(newSshFutureListener(netconfChannel));
79         } catch (IOException e) {
80             throw new IllegalStateException(e);
81         }
82     }
83
84     SshFutureListener<OpenFuture> newSshFutureListener(final ClientChannel netconfChannel) {
85         return future -> {
86             if (future.isOpened()) {
87                 netconfChannelOpened(netconfChannel);
88             } else {
89                 channelOpenFailed(future.getException());
90             }
91         };
92     }
93
94     @Override
95     public void terminate() {
96         sshSession.close(false);
97         removeSelf();
98     }
99
100     private void channelOpenFailed(final Throwable throwable) {
101         LOG.error("Unable to open netconf subsystem, disconnecting.", throwable);
102         sshSession.close(false);
103     }
104
105     private void netconfChannelOpened(final ClientChannel netconfChannel) {
106         nettyChannel = newMinaSshNettyChannel(netconfChannel);
107         factory.getChannelOpenListener().onNetconfSubsystemOpened(
108             CallHomeSessionContext.this, this::doActivate);
109     }
110
111     // FIXME: this does not look right
112     @Holding("this")
113     private synchronized Promise<NetconfClientSession> doActivate(final NetconfClientSessionListener listener) {
114         if (activated) {
115             return newSessionPromise().setFailure(new IllegalStateException("Session already activated."));
116         }
117         activated = true;
118         LOG.info("Activating Netconf channel for {} with {}", getRemoteAddress(), listener);
119         Promise<NetconfClientSession> activationPromise = newSessionPromise();
120         factory.getChannelInitializer(listener).initialize(nettyChannel, activationPromise);
121         factory.getNettyGroup().register(nettyChannel).awaitUninterruptibly(500);
122         return activationPromise;
123     }
124
125     protected MinaSshNettyChannel newMinaSshNettyChannel(final ClientChannel netconfChannel) {
126         return new MinaSshNettyChannel(this, sshSession, netconfChannel);
127     }
128
129     private static Promise<NetconfClientSession> newSessionPromise() {
130         return GlobalEventExecutor.INSTANCE.newPromise();
131     }
132
133     @Override
134     public PublicKey getRemoteServerKey() {
135         return serverKey;
136     }
137
138     @Override
139     public String getRemoteServerVersion() {
140         return sshSession.getServerVersion();
141     }
142
143     @Override
144     public InetSocketAddress getRemoteAddress() {
145         return remoteAddress;
146     }
147
148     @Override
149     public String getSessionName() {
150         return authorization.getSessionName();
151     }
152
153     void removeSelf() {
154         factory.remove(this);
155     }
156
157     static class Factory {
158
159         private final EventLoopGroup nettyGroup;
160         private final NetconfClientSessionNegotiatorFactory negotiatorFactory;
161         private final CallHomeNetconfSubsystemListener subsystemListener;
162         private final ConcurrentMap<String, CallHomeSessionContext> sessions = new ConcurrentHashMap<>();
163
164         Factory(final EventLoopGroup nettyGroup, final NetconfClientSessionNegotiatorFactory negotiatorFactory,
165                 final CallHomeNetconfSubsystemListener subsystemListener) {
166             this.nettyGroup = requireNonNull(nettyGroup, "nettyGroup");
167             this.negotiatorFactory = requireNonNull(negotiatorFactory, "negotiatorFactory");
168             this.subsystemListener = requireNonNull(subsystemListener);
169         }
170
171         void remove(final CallHomeSessionContext session) {
172             sessions.remove(session.getSessionName(), session);
173         }
174
175         ReverseSshChannelInitializer getChannelInitializer(final NetconfClientSessionListener listener) {
176             return ReverseSshChannelInitializer.create(negotiatorFactory, listener);
177         }
178
179         CallHomeNetconfSubsystemListener getChannelOpenListener() {
180             return this.subsystemListener;
181         }
182
183         @Nullable CallHomeSessionContext createIfNotExists(final ClientSession sshSession,
184                 final CallHomeAuthorization authorization, final SocketAddress remoteAddress) {
185             CallHomeSessionContext session = new CallHomeSessionContext(sshSession, authorization,
186                     remoteAddress, this);
187             CallHomeSessionContext preexisting = sessions.putIfAbsent(session.getSessionName(), session);
188             // If preexisting is null - session does not exist, so we can safely create new one, otherwise we return
189             // null and incoming connection will be rejected.
190             return preexisting == null ? session : null;
191         }
192
193         EventLoopGroup getNettyGroup() {
194             return nettyGroup;
195         }
196     }
197 }