X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fnetconf-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Fclient%2FNetconfSshClientDispatcher.java;h=5b82ff22156341b8543b59d3c95bd72e6d9b77cd;hp=ce0f4274757ac37a1791569b29b95906ec26cd6b;hb=3176d0fa83a733f1aa0810d7e99217f5aa664931;hpb=c742dea42922f3910cb178a1d8f50706e73c8aaa diff --git a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfSshClientDispatcher.java b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfSshClientDispatcher.java index ce0f427475..5b82ff2215 100644 --- a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfSshClientDispatcher.java +++ b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfSshClientDispatcher.java @@ -9,10 +9,112 @@ package org.opendaylight.controller.netconf.client; import io.netty.channel.EventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.util.HashedWheelTimer; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.Promise; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.opendaylight.controller.netconf.util.AbstractChannelInitializer; +import org.opendaylight.controller.netconf.util.handler.ssh.SshHandler; +import org.opendaylight.controller.netconf.util.handler.ssh.authentication.AuthenticationHandler; +import org.opendaylight.controller.netconf.util.handler.ssh.client.Invoker; +import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader; +import org.opendaylight.protocol.framework.ReconnectStrategy; +import org.opendaylight.protocol.framework.ReconnectStrategyFactory; +import org.opendaylight.protocol.framework.SessionListenerFactory; + +import com.google.common.base.Optional; public class NetconfSshClientDispatcher extends NetconfClientDispatcher { - public NetconfSshClientDispatcher(EventLoopGroup bossGroup, EventLoopGroup workerGroup) { - super(null, bossGroup, workerGroup); + private final AuthenticationHandler authHandler; + private final HashedWheelTimer timer; + private final NetconfClientSessionNegotiatorFactory negotiatorFactory; + + public NetconfSshClientDispatcher(AuthenticationHandler authHandler, EventLoopGroup bossGroup, + EventLoopGroup workerGroup, long connectionTimeoutMillis) { + super(bossGroup, workerGroup, connectionTimeoutMillis); + this.authHandler = authHandler; + this.timer = new HashedWheelTimer(); + this.negotiatorFactory = new NetconfClientSessionNegotiatorFactory(timer, + Optional. absent(), connectionTimeoutMillis); + } + + public NetconfSshClientDispatcher(AuthenticationHandler authHandler, EventLoopGroup bossGroup, + EventLoopGroup workerGroup, NetconfHelloMessageAdditionalHeader additionalHeader, long socketTimeoutMillis) { + super(bossGroup, workerGroup, additionalHeader, socketTimeoutMillis); + this.authHandler = authHandler; + this.timer = new HashedWheelTimer(); + this.negotiatorFactory = new NetconfClientSessionNegotiatorFactory(timer, Optional.of(additionalHeader), + socketTimeoutMillis); + } + + @Override + public Future createClient(InetSocketAddress address, + final NetconfClientSessionListener sessionListener, ReconnectStrategy strat) { + return super.createClient(address, strat, new PipelineInitializer() { + + @Override + public void initializeChannel(SocketChannel arg0, Promise arg1) { + new NetconfSshClientInitializer(authHandler, negotiatorFactory, sessionListener).initialize(arg0, arg1); + } + + }); + } + + @Override + public Future createReconnectingClient(final InetSocketAddress address, + final NetconfClientSessionListener listener, + final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy) { + final NetconfSshClientInitializer init = new NetconfSshClientInitializer(authHandler, negotiatorFactory, listener); + + return super.createReconnectingClient(address, connectStrategyFactory, reestablishStrategy, + new PipelineInitializer() { + @Override + public void initializeChannel(final SocketChannel ch, final Promise promise) { + init.initialize(ch, promise); + } + }); + } + + private static final class NetconfSshClientInitializer extends AbstractChannelInitializer { + + private final AuthenticationHandler authenticationHandler; + private final NetconfClientSessionNegotiatorFactory negotiatorFactory; + private final NetconfClientSessionListener sessionListener; + + public NetconfSshClientInitializer(AuthenticationHandler authHandler, + NetconfClientSessionNegotiatorFactory negotiatorFactory, + final NetconfClientSessionListener sessionListener) { + this.authenticationHandler = authHandler; + this.negotiatorFactory = negotiatorFactory; + this.sessionListener = sessionListener; + } + + @Override + public void initialize(SocketChannel ch, Promise promise) { + try { + Invoker invoker = Invoker.subsystem("netconf"); + ch.pipeline().addFirst(new SshHandler(authenticationHandler, invoker)); + super.initialize(ch,promise); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + protected void initializeSessionNegotiator(SocketChannel ch, + Promise promise) { + ch.pipeline().addAfter(NETCONF_MESSAGE_DECODER, AbstractChannelInitializer.NETCONF_SESSION_NEGOTIATOR, + negotiatorFactory.getSessionNegotiator(new SessionListenerFactory() { + @Override + public NetconfClientSessionListener getSessionListener() { + return sessionListener; + } + }, ch, promise)); + } } }