Convert to using requireNonNull()
[netconf.git] / netconf / callhome-protocol / src / main / java / org / opendaylight / netconf / callhome / protocol / MinaSshNettyChannel.java
1 /*
2  * Copyright (c) 2016 Brocade Communication Systems and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.callhome.protocol;
9
10 import static java.util.Objects.requireNonNull;
11
12 import io.netty.buffer.ByteBuf;
13 import io.netty.channel.AbstractServerChannel;
14 import io.netty.channel.ChannelConfig;
15 import io.netty.channel.ChannelHandlerContext;
16 import io.netty.channel.ChannelMetadata;
17 import io.netty.channel.ChannelOutboundBuffer;
18 import io.netty.channel.ChannelOutboundHandlerAdapter;
19 import io.netty.channel.ChannelPromise;
20 import io.netty.channel.DefaultChannelConfig;
21 import io.netty.channel.EventLoop;
22 import java.net.SocketAddress;
23 import org.apache.sshd.client.channel.ClientChannel;
24 import org.apache.sshd.client.session.ClientSession;
25 import org.opendaylight.netconf.nettyutil.handler.ssh.client.AsyncSshHandlerReader;
26 import org.opendaylight.netconf.nettyutil.handler.ssh.client.AsyncSshHandlerReader.ReadMsgHandler;
27 import org.opendaylight.netconf.nettyutil.handler.ssh.client.AsyncSshHandlerWriter;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 class MinaSshNettyChannel extends AbstractServerChannel {
32     private static final Logger LOG = LoggerFactory.getLogger(MinaSshNettyChannel.class);
33     private static final ChannelMetadata METADATA = new ChannelMetadata(false);
34
35     private final ChannelConfig config = new DefaultChannelConfig(this);
36     private final CallHomeSessionContext context;
37     private final ClientSession session;
38     private final ClientChannel sshChannel;
39     private final AsyncSshHandlerReader sshReadHandler;
40     private final AsyncSshHandlerWriter sshWriteAsyncHandler;
41
42     private volatile boolean nettyClosed = false;
43
44     MinaSshNettyChannel(final CallHomeSessionContext context, final ClientSession session,
45         final ClientChannel sshChannel) {
46         this.context = requireNonNull(context);
47         this.session = requireNonNull(session);
48         this.sshChannel = requireNonNull(sshChannel);
49         this.sshReadHandler = new AsyncSshHandlerReader(
50             new ConnectionClosedDuringRead(), new FireReadMessage(), "netconf", sshChannel.getAsyncOut());
51         this.sshWriteAsyncHandler = new AsyncSshHandlerWriter(sshChannel.getAsyncIn());
52         pipeline().addFirst(createChannelAdapter());
53     }
54
55     private ChannelOutboundHandlerAdapter createChannelAdapter() {
56         return new ChannelOutboundHandlerAdapter() {
57             @Override
58             public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
59                 sshWriteAsyncHandler.write(ctx, msg, promise);
60             }
61         };
62     }
63
64     @Override
65     public ChannelConfig config() {
66         return config;
67     }
68
69     private static boolean notClosing(final org.apache.sshd.common.Closeable sshCloseable) {
70         return !sshCloseable.isClosing() && !sshCloseable.isClosed();
71     }
72
73     @Override
74     public boolean isOpen() {
75         return notClosing(session);
76     }
77
78     @Override
79     public boolean isActive() {
80         return notClosing(session);
81     }
82
83     @Override
84     public ChannelMetadata metadata() {
85         return METADATA;
86     }
87
88     @Override
89     protected AbstractUnsafe newUnsafe() {
90         return new SshUnsafe();
91     }
92
93     @Override
94     protected boolean isCompatible(final EventLoop loop) {
95         return true;
96     }
97
98     @Override
99     protected SocketAddress localAddress0() {
100         return session.getIoSession().getLocalAddress();
101     }
102
103     @Override
104     protected SocketAddress remoteAddress0() {
105         return context.getRemoteAddress();
106     }
107
108     @Override
109     protected void doBind(final SocketAddress localAddress) {
110         throw new UnsupportedOperationException("Bind not supported.");
111     }
112
113     void doMinaDisconnect(final boolean blocking) {
114         if (notClosing(session)) {
115             sshChannel.close(blocking);
116             session.close(blocking);
117         }
118     }
119
120     void doNettyDisconnect() {
121         if (!nettyClosed) {
122             nettyClosed = true;
123             pipeline().fireChannelInactive();
124             sshReadHandler.close();
125             sshWriteAsyncHandler.close();
126         }
127     }
128
129     @Override
130     protected void doDisconnect() {
131         LOG.info("Disconnect invoked");
132         doNettyDisconnect();
133         doMinaDisconnect(false);
134     }
135
136     @Override
137     protected void doClose() {
138         context.removeSelf();
139         if (notClosing(session)) {
140             session.close(true);
141             sshChannel.close(true);
142         }
143     }
144
145     @Override
146     protected void doBeginRead() {
147         // Intentional NOOP - read is started by AsyncSshHandlerReader
148     }
149
150     @Override
151     protected void doWrite(final ChannelOutboundBuffer in) {
152         throw new IllegalStateException("Outbound writes to SSH should be done by SSH Write handler");
153     }
154
155     private final class FireReadMessage implements ReadMsgHandler {
156         @Override
157         public void onMessageRead(final ByteBuf msg) {
158             pipeline().fireChannelRead(msg);
159         }
160     }
161
162     private final class ConnectionClosedDuringRead implements AutoCloseable {
163
164         /**
165          * Invoked when SSH session dropped during read using {@link AsyncSshHandlerReader}.
166          */
167         @Override
168         public void close() {
169             doNettyDisconnect();
170         }
171     }
172
173     private class SshUnsafe extends AbstractUnsafe {
174         @Override
175         public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress,
176                 final ChannelPromise promise) {
177             throw new UnsupportedOperationException("Unsafe is not supported.");
178         }
179     }
180 }