Hide Netty(Pipeline)AwareChannelSubsystem 92/108392/3
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 13 Oct 2023 11:41:51 +0000 (13:41 +0200)
committerRobert Varga <nite@hq.sk>
Fri, 13 Oct 2023 12:59:07 +0000 (12:59 +0000)
Just promise a ChannelSubsystem in API and let the implementation detail
with the details.

JIRA: NETCONF-1108
Change-Id: I21d663d4c2d92a5f087579ec9e29804da3bab569
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
netconf/callhome-protocol/src/main/java/org/opendaylight/netconf/callhome/protocol/CallHomeSessionContext.java
netconf/callhome-protocol/src/test/java/org/opendaylight/netconf/callhome/protocol/CallHomeSessionContextTest.java
netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java
netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/NetconfClientSessionImpl.java
netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/NettyAwareChannelSubsystem.java [deleted file]
netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/NettyAwareClientSession.java
netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/NettyChannelSubsystem.java [moved from netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/AbstractNettyChannelSubsystem.java with 79% similarity]
netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/NettyPipelineAwareChannelSubsystem.java [deleted file]
netconf/netconf-netty-util/src/test/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerTest.java

index 61b2ccb3c1e679adafe624d170104cfd849b1865..7cdb7bd2696a286c3878824d6e39bd58ceeb0c48 100644 (file)
@@ -29,7 +29,7 @@ import org.opendaylight.netconf.client.NetconfClientSessionListener;
 import org.opendaylight.netconf.client.NetconfClientSessionNegotiatorFactory;
 import org.opendaylight.netconf.nettyutil.handler.ssh.client.AsyncSshHandlerWriter;
 import org.opendaylight.netconf.nettyutil.handler.ssh.client.NetconfClientSessionImpl;
-import org.opendaylight.netconf.nettyutil.handler.ssh.client.NettyPipelineAwareChannelSubsystem;
+import org.opendaylight.netconf.shaded.sshd.client.channel.ChannelSubsystem;
 import org.opendaylight.netconf.shaded.sshd.client.channel.ClientChannel;
 import org.opendaylight.netconf.shaded.sshd.client.future.AuthFuture;
 import org.opendaylight.netconf.shaded.sshd.client.future.OpenFuture;
@@ -133,7 +133,7 @@ class CallHomeSessionContext implements CallHomeProtocolSessionContext {
         LOG.info("Activating Netconf channel for {} with {}", getRemoteAddress(), listener);
         final Promise<NetconfClientSession> activationPromise = newSessionPromise();
         factory.getChannelInitializer(listener).initialize(nettyChannel, activationPromise);
-        ((NettyPipelineAwareChannelSubsystem)netconfChannel).onClose(nettyChannel::doNettyDisconnect);
+        ((ChannelSubsystem) netconfChannel).onClose(nettyChannel::doNettyDisconnect);
         factory.getNettyGroup().register(nettyChannel).awaitUninterruptibly(500);
         return activationPromise;
     }
@@ -216,7 +216,7 @@ class CallHomeSessionContext implements CallHomeProtocolSessionContext {
 
         SshWriteAsyncHandlerAdapter(final ClientChannel sshChannel) {
             this.sshChannel = sshChannel;
-            this.sshWriteAsyncHandler = new AsyncSshHandlerWriter(sshChannel.getAsyncIn());
+            sshWriteAsyncHandler = new AsyncSshHandlerWriter(sshChannel.getAsyncIn());
         }
 
         @Override
index f5178b7ba847bc24462ae40f4ba413779b12298c..a70471176020d450f17b7d1cdd6c3555582c7154 100644 (file)
@@ -32,7 +32,7 @@ import org.opendaylight.netconf.callhome.protocol.CallHomeSessionContext.Factory
 import org.opendaylight.netconf.client.NetconfClientSessionListener;
 import org.opendaylight.netconf.client.NetconfClientSessionNegotiatorFactory;
 import org.opendaylight.netconf.nettyutil.handler.ssh.client.NetconfClientSessionImpl;
-import org.opendaylight.netconf.nettyutil.handler.ssh.client.NettyPipelineAwareChannelSubsystem;
+import org.opendaylight.netconf.shaded.sshd.client.channel.ChannelSubsystem;
 import org.opendaylight.netconf.shaded.sshd.client.channel.ClientChannel;
 import org.opendaylight.netconf.shaded.sshd.client.future.OpenFuture;
 import org.opendaylight.netconf.shaded.sshd.common.AttributeRepository.AttributeKey;
@@ -118,7 +118,7 @@ public class CallHomeSessionContextTest {
     public void creatingAChannelSuccessfullyShouldResultInAnAttachedListener() throws Exception {
         // given
         final var mockFuture = mock(OpenFuture.class);
-        final var mockChannelSubsystem = mock(NettyPipelineAwareChannelSubsystem.class);
+        final var mockChannelSubsystem = mock(ChannelSubsystem.class);
         doReturn(mockFuture).when(mockChannelSubsystem).open();
         doReturn(mockChannelSubsystem).when(mockSession).createSubsystemChannel(anyString(),
             any(DefaultChannelPipeline.class));
index d30ec087beadd7a71e32b7ed173156d95fc3c334..f6956d7c5d4e3dc39940bb96d035839c5360254b 100644 (file)
@@ -25,6 +25,7 @@ import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.checkerframework.checker.lock.qual.Holding;
 import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
+import org.opendaylight.netconf.shaded.sshd.client.channel.ChannelSubsystem;
 import org.opendaylight.netconf.shaded.sshd.client.channel.ClientChannel;
 import org.opendaylight.netconf.shaded.sshd.client.future.AuthFuture;
 import org.opendaylight.netconf.shaded.sshd.client.future.ConnectFuture;
@@ -80,7 +81,7 @@ public final class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
     private ChannelPromise connectPromise;
 
     private AsyncSshHandlerWriter sshWriteAsyncHandler;
-    private NettyAwareChannelSubsystem channel;
+    private ChannelSubsystem channel;
     private ClientSession session;
     private FutureListener<Object> negotiationFutureListener;
 
@@ -312,9 +313,7 @@ public final class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
         }
 
         if (channel != null) {
-            //TODO: see if calling just close() is sufficient
-            //channel.close(false);
-            channel.close();
+            channel.close(false);
             channel = null;
         }
         promise.setSuccess();
index 1a465d888765608fe7a4f8823b6ad8d430ad7ff0..0f8435235a0506172a56ea8e6bf4d4b1ade08bef 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.netconf.nettyutil.handler.ssh.client;
 
+import static java.util.Objects.requireNonNull;
+
 import com.google.common.annotations.Beta;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPipeline;
@@ -27,18 +29,30 @@ public final class NetconfClientSessionImpl extends ClientSessionImpl implements
     }
 
     @Override
-    public NettyAwareChannelSubsystem createSubsystemChannel(final String subsystem, final ChannelHandlerContext ctx)
+    public ChannelSubsystem createSubsystemChannel(final String subsystem, final ChannelHandlerContext ctx)
             throws IOException {
-        return registerSubsystem(new NettyAwareChannelSubsystem(subsystem, ctx));
+        requireNonNull(ctx);
+        return registerSubsystem(new NettyChannelSubsystem(subsystem) {
+            @Override
+            ChannelHandlerContext context() {
+                return ctx;
+            }
+        });
     }
 
     @Override
-    public NettyPipelineAwareChannelSubsystem createSubsystemChannel(final String subsystem,
+    public ChannelSubsystem createSubsystemChannel(final String subsystem,
             final ChannelPipeline pipeline) throws IOException {
-        return registerSubsystem(new NettyPipelineAwareChannelSubsystem(subsystem, pipeline));
+        requireNonNull(pipeline);
+        return registerSubsystem(new NettyChannelSubsystem(subsystem) {
+            @Override
+            ChannelHandlerContext context() {
+                return pipeline.firstContext();
+            }
+        });
     }
 
-    private <T extends ChannelSubsystem> T registerSubsystem(final T subsystem) throws IOException {
+    private ChannelSubsystem registerSubsystem(final ChannelSubsystem subsystem) throws IOException {
         final var service = getConnectionService();
         final var id = service.registerChannel(subsystem);
         if (log.isDebugEnabled()) {
diff --git a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/NettyAwareChannelSubsystem.java b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/NettyAwareChannelSubsystem.java
deleted file mode 100644 (file)
index 56b4442..0000000
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.netconf.nettyutil.handler.ssh.client;
-
-import static java.util.Objects.requireNonNull;
-
-import io.netty.channel.ChannelHandlerContext;
-
-/**
- * An {@link AbstractNettyChannelSubsystem} for subsystem which routes incoming data to a particular
- * {@link ChannelHandlerContext}.
- */
-// Non-final for testing
-public non-sealed class NettyAwareChannelSubsystem extends AbstractNettyChannelSubsystem {
-    private final ChannelHandlerContext ctx;
-
-    NettyAwareChannelSubsystem(final String subsystem, final ChannelHandlerContext ctx) {
-        super(subsystem);
-        this.ctx = requireNonNull(ctx);
-    }
-
-    @Override
-    final ChannelHandlerContext context() {
-        return ctx;
-    }
-}
index 754c132afcbc89a420eb6d82a522e9936fb8263f..ce89f0f39668806f5efb424d234871ecb19d08f5 100644 (file)
@@ -11,6 +11,7 @@ import com.google.common.annotations.Beta;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPipeline;
 import java.io.IOException;
+import org.opendaylight.netconf.shaded.sshd.client.channel.ChannelSubsystem;
 import org.opendaylight.netconf.shaded.sshd.client.session.ClientSession;
 
 /**
@@ -25,10 +26,10 @@ public interface NettyAwareClientSession extends ClientSession {
      *
      * @param subsystem The subsystem name
      * @param ctx Context to which to route data to
-     * @return The created {@link NettyAwareChannelSubsystem}
+     * @return The created {@link ChannelSubsystem}
      * @throws IOException If failed to create the requested channel
      */
-    NettyAwareChannelSubsystem createSubsystemChannel(String subsystem, ChannelHandlerContext ctx) throws IOException;
+    ChannelSubsystem createSubsystemChannel(String subsystem, ChannelHandlerContext ctx) throws IOException;
 
     /**
      * Allocate a channel to the specified subsystem. Incoming data on the channel will be routed to the
@@ -36,9 +37,8 @@ public interface NettyAwareClientSession extends ClientSession {
      *
      * @param subsystem The subsystem name
      * @param pipeline ChannelPipeline to which to route data to
-     * @return The created {@link NettyPipelineAwareChannelSubsystem}
+     * @return The created {@link ChannelSubsystem}
      * @throws IOException If failed to create the requested channel
      */
-    NettyPipelineAwareChannelSubsystem createSubsystemChannel(String subsystem, ChannelPipeline pipeline)
-            throws IOException;
+    ChannelSubsystem createSubsystemChannel(String subsystem, ChannelPipeline pipeline) throws IOException;
 }
@@ -15,13 +15,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Abstract base class for {@link NettyAwareChannelSubsystem} and {@link NettyPipelineAwareChannelSubsystem}.
+ * Abstract base class for {@link ChannelSubsystem}s backed by a Netty {@link ChannelHandlerContext}.
  */
-abstract sealed class AbstractNettyChannelSubsystem extends ChannelSubsystem
-        permits NettyAwareChannelSubsystem, NettyPipelineAwareChannelSubsystem {
-    private static final Logger LOG = LoggerFactory.getLogger(AbstractNettyChannelSubsystem.class);
+abstract class NettyChannelSubsystem extends ChannelSubsystem {
+    private static final Logger LOG = LoggerFactory.getLogger(NettyChannelSubsystem.class);
 
-    AbstractNettyChannelSubsystem(final String subsystem) {
+    NettyChannelSubsystem(final String subsystem) {
         super(subsystem);
     }
 
diff --git a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/NettyPipelineAwareChannelSubsystem.java b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/NettyPipelineAwareChannelSubsystem.java
deleted file mode 100644 (file)
index c4af659..0000000
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.netconf.nettyutil.handler.ssh.client;
-
-import static java.util.Objects.requireNonNull;
-
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPipeline;
-
-/**
- * A {@link AbstractNettyChannelSubsystem} for subsystem which routes incoming data to a particular
- * {@link ChannelPipeline}.
- */
-// Non-final for testing
-public non-sealed class NettyPipelineAwareChannelSubsystem extends AbstractNettyChannelSubsystem {
-    private final ChannelPipeline pipeline;
-
-    NettyPipelineAwareChannelSubsystem(final String subsystem, final ChannelPipeline pipeline) {
-        super(subsystem);
-        this.pipeline = requireNonNull(pipeline);
-    }
-
-    @Override
-    final ChannelHandlerContext context() {
-        return pipeline.firstContext();
-    }
-}
index ff34dfdfc180f2ccfde72fcf801f8bb19cad5f2b..0342f2d1957aaf80982901ee5f5b8ecc47d7c2df 100644 (file)
@@ -44,6 +44,7 @@ import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
+import org.opendaylight.netconf.shaded.sshd.client.channel.ChannelSubsystem;
 import org.opendaylight.netconf.shaded.sshd.client.channel.ClientChannel;
 import org.opendaylight.netconf.shaded.sshd.client.future.AuthFuture;
 import org.opendaylight.netconf.shaded.sshd.client.future.ConnectFuture;
@@ -170,7 +171,7 @@ public class AsyncSshHandlerTest {
 
         final IoInputStream asyncOut = getMockedIoInputStream();
         final IoOutputStream asyncIn = getMockedIoOutputStream();
-        final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
 
@@ -192,7 +193,7 @@ public class AsyncSshHandlerTest {
 
         final IoInputStream asyncOut = getMockedIoInputStream();
         final IoOutputStream asyncIn = getMockedIoOutputStream();
-        final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
 
@@ -223,7 +224,7 @@ public class AsyncSshHandlerTest {
             }
         }, MoreExecutors.directExecutor());
 
-        final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
 
@@ -245,7 +246,7 @@ public class AsyncSshHandlerTest {
         final IoOutputStream asyncIn = getMockedIoOutputStream();
         final IoWriteFuture ioWriteFuture = asyncIn.writeBuffer(new ByteArrayBuffer());
 
-        final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
 
@@ -292,7 +293,7 @@ public class AsyncSshHandlerTest {
         final IoOutputStream asyncIn = getMockedIoOutputStream();
         final IoWriteFuture ioWriteFuture = asyncIn.writeBuffer(new ByteArrayBuffer());
 
-        final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
 
@@ -324,7 +325,7 @@ public class AsyncSshHandlerTest {
 
         final IoInputStream asyncOut = getMockedIoInputStream();
         final IoOutputStream asyncIn = getMockedIoOutputStream();
-        final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
 
@@ -360,7 +361,7 @@ public class AsyncSshHandlerTest {
         return connectFuture;
     }
 
-    private static NettyAwareClientSession getMockedSshSession(final NettyAwareChannelSubsystem subsystemChannel)
+    private static NettyAwareClientSession getMockedSshSession(final ChannelSubsystem subsystemChannel)
             throws IOException {
         final NettyAwareClientSession sshSession = mock(NettyAwareClientSession.class);
 
@@ -368,7 +369,7 @@ public class AsyncSshHandlerTest {
         doReturn(false).when(sshSession).isClosed();
         doReturn(false).when(sshSession).isClosing();
         final CloseFuture closeFuture = mock(CloseFuture.class);
-        Futures.addCallback(stubAddListener(closeFuture), new SuccessFutureListener<CloseFuture>() {
+        Futures.addCallback(stubAddListener(closeFuture), new SuccessFutureListener<>() {
             @Override
             public void onSuccess(final SshFutureListener<CloseFuture> result) {
                 doReturn(true).when(closeFuture).isClosed();
@@ -383,9 +384,9 @@ public class AsyncSshHandlerTest {
         return sshSession;
     }
 
-    private NettyAwareChannelSubsystem getMockedSubsystemChannel(final IoInputStream asyncOut,
+    private ChannelSubsystem getMockedSubsystemChannel(final IoInputStream asyncOut,
                                                        final IoOutputStream asyncIn) throws IOException {
-        final NettyAwareChannelSubsystem subsystemChannel = mock(NettyAwareChannelSubsystem.class);
+        final ChannelSubsystem subsystemChannel = mock(ChannelSubsystem.class);
 
         doNothing().when(subsystemChannel).setStreaming(any(ClientChannel.Streaming.class));
         final OpenFuture openFuture = mock(OpenFuture.class);
@@ -400,7 +401,7 @@ public class AsyncSshHandlerTest {
         doReturn(openFuture).when(subsystemChannel).open();
         doReturn(asyncIn).when(subsystemChannel).getAsyncIn();
         doNothing().when(subsystemChannel).onClose(any());
-        doNothing().when(subsystemChannel).close();
+        doReturn(null).when(subsystemChannel).close(false);
         return subsystemChannel;
     }
 
@@ -441,7 +442,7 @@ public class AsyncSshHandlerTest {
 
         final IoInputStream asyncOut = getMockedIoInputStream();
         final IoOutputStream asyncIn = getMockedIoOutputStream();
-        final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);