cf064152d1fc78bc3d13172e20743f7e5097a675
[netconf.git] / netconf / callhome-protocol / src / main / java / org / opendaylight / netconf / callhome / protocol / CallHomeSessionContext.java
1 /*
2  * Copyright (c) 2016 Brocade Communication Systems and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.callhome.protocol;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
12
13 import io.netty.channel.EventLoopGroup;
14 import io.netty.util.concurrent.GlobalEventExecutor;
15 import io.netty.util.concurrent.Promise;
16 import java.io.IOException;
17 import java.net.InetSocketAddress;
18 import java.net.SocketAddress;
19 import java.security.PublicKey;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ConcurrentMap;
22 import org.apache.sshd.client.channel.ClientChannel;
23 import org.apache.sshd.client.future.AuthFuture;
24 import org.apache.sshd.client.future.OpenFuture;
25 import org.apache.sshd.client.session.ClientSession;
26 import org.apache.sshd.client.session.ClientSessionImpl;
27 import org.apache.sshd.common.future.SshFutureListener;
28 import org.apache.sshd.common.session.Session;
29 import org.checkerframework.checker.lock.qual.Holding;
30 import org.eclipse.jdt.annotation.Nullable;
31 import org.opendaylight.netconf.client.NetconfClientSession;
32 import org.opendaylight.netconf.client.NetconfClientSessionListener;
33 import org.opendaylight.netconf.client.NetconfClientSessionNegotiatorFactory;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 class CallHomeSessionContext implements CallHomeProtocolSessionContext {
38
39     private static final Logger LOG = LoggerFactory.getLogger(CallHomeSessionContext.class);
40     static final Session.AttributeKey<CallHomeSessionContext> SESSION_KEY = new Session.AttributeKey<>();
41
42     private static final String NETCONF = "netconf";
43
44     private final ClientSessionImpl sshSession;
45     private final CallHomeAuthorization authorization;
46     private final Factory factory;
47
48     private volatile MinaSshNettyChannel nettyChannel = null;
49     private volatile boolean activated;
50
51     private final InetSocketAddress remoteAddress;
52     private final PublicKey serverKey;
53
54     CallHomeSessionContext(final ClientSession sshSession, final CallHomeAuthorization authorization,
55                            final SocketAddress remoteAddress, final Factory factory) {
56         this.authorization = requireNonNull(authorization, "authorization");
57         checkArgument(this.authorization.isServerAllowed(), "Server was not allowed.");
58         checkArgument(sshSession instanceof ClientSessionImpl,
59                 "sshSession must implement ClientSessionImpl");
60         this.factory = requireNonNull(factory, "factory");
61         this.sshSession = (ClientSessionImpl) sshSession;
62         this.sshSession.setAttribute(SESSION_KEY, this);
63         this.remoteAddress = (InetSocketAddress) this.sshSession.getIoSession().getRemoteAddress();
64         this.serverKey = this.sshSession.getKex().getServerKey();
65     }
66
67     static CallHomeSessionContext getFrom(final ClientSession sshSession) {
68         return sshSession.getAttribute(SESSION_KEY);
69     }
70
71     AuthFuture authorize() throws IOException {
72         authorization.applyTo(sshSession);
73         return sshSession.auth();
74     }
75
76     void openNetconfChannel() {
77         LOG.debug("Opening NETCONF Subsystem on {}", sshSession);
78         try {
79             final ClientChannel netconfChannel = sshSession.createSubsystemChannel(NETCONF);
80             netconfChannel.setStreaming(ClientChannel.Streaming.Async);
81             netconfChannel.open().addListener(newSshFutureListener(netconfChannel));
82         } catch (IOException e) {
83             throw new IllegalStateException(e);
84         }
85     }
86
87     SshFutureListener<OpenFuture> newSshFutureListener(final ClientChannel netconfChannel) {
88         return future -> {
89             if (future.isOpened()) {
90                 netconfChannelOpened(netconfChannel);
91             } else {
92                 channelOpenFailed(future.getException());
93             }
94         };
95     }
96
97     @Override
98     public void terminate() {
99         sshSession.close(false);
100         removeSelf();
101     }
102
103     private void channelOpenFailed(final Throwable throwable) {
104         LOG.error("Unable to open netconf subsystem, disconnecting.", throwable);
105         sshSession.close(false);
106     }
107
108     private void netconfChannelOpened(final ClientChannel netconfChannel) {
109         nettyChannel = newMinaSshNettyChannel(netconfChannel);
110         factory.getChannelOpenListener().onNetconfSubsystemOpened(
111             CallHomeSessionContext.this, this::doActivate);
112     }
113
114     // FIXME: this does not look right
115     @Holding("this")
116     private synchronized Promise<NetconfClientSession> doActivate(final NetconfClientSessionListener listener) {
117         if (activated) {
118             return newSessionPromise().setFailure(new IllegalStateException("Session already activated."));
119         }
120         activated = true;
121         LOG.info("Activating Netconf channel for {} with {}", getRemoteAddress(), listener);
122         Promise<NetconfClientSession> activationPromise = newSessionPromise();
123         factory.getChannelInitializer(listener).initialize(nettyChannel, activationPromise);
124         factory.getNettyGroup().register(nettyChannel).awaitUninterruptibly(500);
125         return activationPromise;
126     }
127
128     protected MinaSshNettyChannel newMinaSshNettyChannel(final ClientChannel netconfChannel) {
129         return new MinaSshNettyChannel(this, sshSession, netconfChannel);
130     }
131
132     private static Promise<NetconfClientSession> newSessionPromise() {
133         return GlobalEventExecutor.INSTANCE.newPromise();
134     }
135
136     @Override
137     public PublicKey getRemoteServerKey() {
138         return serverKey;
139     }
140
141     @Override
142     public String getRemoteServerVersion() {
143         return sshSession.getServerVersion();
144     }
145
146     @Override
147     public InetSocketAddress getRemoteAddress() {
148         return remoteAddress;
149     }
150
151     @Override
152     public String getSessionName() {
153         return authorization.getSessionName();
154     }
155
156     void removeSelf() {
157         factory.remove(this);
158     }
159
160     static class Factory {
161
162         private final EventLoopGroup nettyGroup;
163         private final NetconfClientSessionNegotiatorFactory negotiatorFactory;
164         private final CallHomeNetconfSubsystemListener subsystemListener;
165         private final ConcurrentMap<String, CallHomeSessionContext> sessions = new ConcurrentHashMap<>();
166
167         Factory(final EventLoopGroup nettyGroup, final NetconfClientSessionNegotiatorFactory negotiatorFactory,
168                 final CallHomeNetconfSubsystemListener subsystemListener) {
169             this.nettyGroup = requireNonNull(nettyGroup, "nettyGroup");
170             this.negotiatorFactory = requireNonNull(negotiatorFactory, "negotiatorFactory");
171             this.subsystemListener = requireNonNull(subsystemListener);
172         }
173
174         void remove(final CallHomeSessionContext session) {
175             sessions.remove(session.getSessionName(), session);
176         }
177
178         ReverseSshChannelInitializer getChannelInitializer(final NetconfClientSessionListener listener) {
179             return ReverseSshChannelInitializer.create(negotiatorFactory, listener);
180         }
181
182         CallHomeNetconfSubsystemListener getChannelOpenListener() {
183             return this.subsystemListener;
184         }
185
186         @Nullable CallHomeSessionContext createIfNotExists(final ClientSession sshSession,
187                 final CallHomeAuthorization authorization, final SocketAddress remoteAddress) {
188             CallHomeSessionContext session = new CallHomeSessionContext(sshSession, authorization,
189                     remoteAddress, this);
190             CallHomeSessionContext preexisting = sessions.putIfAbsent(session.getSessionName(), session);
191             // If preexisting is null - session does not exist, so we can safely create new one, otherwise we return
192             // null and incoming connection will be rejected.
193             return preexisting == null ? session : null;
194         }
195
196         EventLoopGroup getNettyGroup() {
197             return nettyGroup;
198         }
199     }
200 }