Bump upstreams to SNAPSHOTs
[netconf.git] / netconf / callhome-protocol / src / main / java / org / opendaylight / netconf / callhome / protocol / CallHomeSessionContext.java
1 /*
2  * Copyright (c) 2016 Brocade Communication Systems and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.callhome.protocol;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
12
13 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14 import io.netty.channel.EventLoopGroup;
15 import io.netty.util.concurrent.GlobalEventExecutor;
16 import io.netty.util.concurrent.Promise;
17 import java.io.IOException;
18 import java.net.InetSocketAddress;
19 import java.net.SocketAddress;
20 import java.security.PublicKey;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import org.eclipse.jdt.annotation.Nullable;
24 import org.opendaylight.netconf.client.NetconfClientSession;
25 import org.opendaylight.netconf.client.NetconfClientSessionListener;
26 import org.opendaylight.netconf.client.NetconfClientSessionNegotiatorFactory;
27 import org.opendaylight.netconf.shaded.sshd.client.channel.ClientChannel;
28 import org.opendaylight.netconf.shaded.sshd.client.future.AuthFuture;
29 import org.opendaylight.netconf.shaded.sshd.client.future.OpenFuture;
30 import org.opendaylight.netconf.shaded.sshd.client.session.ClientSession;
31 import org.opendaylight.netconf.shaded.sshd.common.future.SshFutureListener;
32 import org.opendaylight.netconf.shaded.sshd.common.session.Session;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 // Non-final for testing
37 class CallHomeSessionContext implements CallHomeProtocolSessionContext {
38
39     private static final Logger LOG = LoggerFactory.getLogger(CallHomeSessionContext.class);
40     static final Session.AttributeKey<CallHomeSessionContext> SESSION_KEY = new Session.AttributeKey<>();
41
42     private static final String NETCONF = "netconf";
43
44     private final ClientSession sshSession;
45     private final CallHomeAuthorization authorization;
46     private final Factory factory;
47
48     private volatile boolean activated;
49
50     private final InetSocketAddress remoteAddress;
51     private final PublicKey serverKey;
52
53     @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Passing 'this' around")
54     CallHomeSessionContext(final ClientSession sshSession, final CallHomeAuthorization authorization,
55                            final SocketAddress remoteAddress, final Factory factory) {
56         this.authorization = requireNonNull(authorization, "authorization");
57         checkArgument(this.authorization.isServerAllowed(), "Server was not allowed.");
58         this.factory = requireNonNull(factory);
59         this.sshSession = requireNonNull(sshSession);
60         this.sshSession.setAttribute(SESSION_KEY, this);
61         this.remoteAddress = (InetSocketAddress) this.sshSession.getIoSession().getRemoteAddress();
62         serverKey = this.sshSession.getServerKey();
63     }
64
65     static CallHomeSessionContext getFrom(final ClientSession sshSession) {
66         return sshSession.getAttribute(SESSION_KEY);
67     }
68
69     AuthFuture authorize() throws IOException {
70         authorization.applyTo(sshSession);
71         return sshSession.auth();
72     }
73
74     void openNetconfChannel() {
75         LOG.debug("Opening NETCONF Subsystem on {}", sshSession);
76         try {
77             final ClientChannel netconfChannel = sshSession.createSubsystemChannel(NETCONF);
78             netconfChannel.setStreaming(ClientChannel.Streaming.Async);
79             netconfChannel.open().addListener(newSshFutureListener(netconfChannel));
80         } catch (IOException e) {
81             throw new IllegalStateException(e);
82         }
83     }
84
85     SshFutureListener<OpenFuture> newSshFutureListener(final ClientChannel netconfChannel) {
86         return future -> {
87             if (future.isOpened()) {
88                 factory.getChannelOpenListener().onNetconfSubsystemOpened(this,
89                     listener -> doActivate(netconfChannel, listener));
90             } else {
91                 channelOpenFailed(future.getException());
92             }
93         };
94     }
95
96     @Override
97     public void terminate() {
98         sshSession.close(false);
99         removeSelf();
100     }
101
102     @Override
103     public TransportType getTransportType() {
104         return TransportType.SSH;
105     }
106
107     private void channelOpenFailed(final Throwable throwable) {
108         LOG.error("Unable to open netconf subsystem, disconnecting.", throwable);
109         sshSession.close(false);
110     }
111
112     private synchronized Promise<NetconfClientSession> doActivate(final ClientChannel netconfChannel,
113             final NetconfClientSessionListener listener) {
114         if (activated) {
115             return newSessionPromise().setFailure(new IllegalStateException("Session already activated."));
116         }
117
118         activated = true;
119         LOG.info("Activating Netconf channel for {} with {}", getRemoteAddress(), listener);
120         Promise<NetconfClientSession> activationPromise = newSessionPromise();
121         final MinaSshNettyChannel nettyChannel = newMinaSshNettyChannel(netconfChannel);
122         factory.getChannelInitializer(listener).initialize(nettyChannel, activationPromise);
123         factory.getNettyGroup().register(nettyChannel).awaitUninterruptibly(500);
124         return activationPromise;
125     }
126
127     protected MinaSshNettyChannel newMinaSshNettyChannel(final ClientChannel netconfChannel) {
128         return new MinaSshNettyChannel(this, sshSession, netconfChannel);
129     }
130
131     private static Promise<NetconfClientSession> newSessionPromise() {
132         return GlobalEventExecutor.INSTANCE.newPromise();
133     }
134
135     @Override
136     public PublicKey getRemoteServerKey() {
137         return serverKey;
138     }
139
140     @Override
141     public InetSocketAddress getRemoteAddress() {
142         return remoteAddress;
143     }
144
145     @Override
146     public String getSessionId() {
147         return authorization.getSessionName();
148     }
149
150     void removeSelf() {
151         factory.remove(this);
152     }
153
154     static class Factory {
155
156         private final EventLoopGroup nettyGroup;
157         private final NetconfClientSessionNegotiatorFactory negotiatorFactory;
158         private final CallHomeNetconfSubsystemListener subsystemListener;
159         private final ConcurrentMap<String, CallHomeSessionContext> sessions = new ConcurrentHashMap<>();
160
161         Factory(final EventLoopGroup nettyGroup, final NetconfClientSessionNegotiatorFactory negotiatorFactory,
162                 final CallHomeNetconfSubsystemListener subsystemListener) {
163             this.nettyGroup = requireNonNull(nettyGroup, "nettyGroup");
164             this.negotiatorFactory = requireNonNull(negotiatorFactory, "negotiatorFactory");
165             this.subsystemListener = requireNonNull(subsystemListener);
166         }
167
168         void remove(final CallHomeSessionContext session) {
169             sessions.remove(session.getSessionId(), session);
170         }
171
172         ReverseSshChannelInitializer getChannelInitializer(final NetconfClientSessionListener listener) {
173             return ReverseSshChannelInitializer.create(negotiatorFactory, listener);
174         }
175
176         CallHomeNetconfSubsystemListener getChannelOpenListener() {
177             return subsystemListener;
178         }
179
180         @Nullable CallHomeSessionContext createIfNotExists(final ClientSession sshSession,
181                 final CallHomeAuthorization authorization, final SocketAddress remoteAddress) {
182             CallHomeSessionContext session = new CallHomeSessionContext(sshSession, authorization,
183                     remoteAddress, this);
184             CallHomeSessionContext preexisting = sessions.putIfAbsent(session.getSessionId(), session);
185             // If preexisting is null - session does not exist, so we can safely create new one, otherwise we return
186             // null and incoming connection will be rejected.
187             return preexisting == null ? session : null;
188         }
189
190         EventLoopGroup getNettyGroup() {
191             return nettyGroup;
192         }
193     }
194 }