X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fnetconf-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Fclient%2FNetconfSshClientDispatcher.java;h=0737279b042718309f5954205651d9da2dc151d4;hp=ce0f4274757ac37a1791569b29b95906ec26cd6b;hb=950be361f30f0dc9df109c1c649deb92d9cebb05;hpb=7021b10a1222828229378743de6d3286741adc3c diff --git a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfSshClientDispatcher.java b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfSshClientDispatcher.java index ce0f427475..0737279b04 100644 --- a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfSshClientDispatcher.java +++ b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfSshClientDispatcher.java @@ -9,10 +9,108 @@ package org.opendaylight.controller.netconf.client; import io.netty.channel.EventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.util.HashedWheelTimer; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.Promise; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.opendaylight.controller.netconf.util.AbstractChannelInitializer; +import org.opendaylight.controller.netconf.util.handler.ssh.SshHandler; +import org.opendaylight.controller.netconf.util.handler.ssh.authentication.AuthenticationHandler; +import org.opendaylight.controller.netconf.util.handler.ssh.client.Invoker; +import org.opendaylight.protocol.framework.ReconnectStrategy; +import org.opendaylight.protocol.framework.ReconnectStrategyFactory; +import org.opendaylight.protocol.framework.SessionListenerFactory; + +import com.google.common.base.Optional; public class NetconfSshClientDispatcher extends NetconfClientDispatcher { - public NetconfSshClientDispatcher(EventLoopGroup bossGroup, EventLoopGroup workerGroup) { - super(null, bossGroup, workerGroup); + private final AuthenticationHandler authHandler; + private final HashedWheelTimer timer; + private final NetconfClientSessionNegotiatorFactory negotatorFactory; + + public NetconfSshClientDispatcher(AuthenticationHandler authHandler, EventLoopGroup bossGroup, + EventLoopGroup workerGroup, long connectionTimeoutMillis) { + super(bossGroup, workerGroup, connectionTimeoutMillis); + this.authHandler = authHandler; + this.timer = new HashedWheelTimer(); + this.negotatorFactory = new NetconfClientSessionNegotiatorFactory(timer, Optional.absent(), connectionTimeoutMillis); + } + + public NetconfSshClientDispatcher(AuthenticationHandler authHandler, EventLoopGroup bossGroup, + EventLoopGroup workerGroup, String additionalHeader, long socketTimeoutMillis) { + super(bossGroup, workerGroup, additionalHeader, socketTimeoutMillis); + this.authHandler = authHandler; + this.timer = new HashedWheelTimer(); + this.negotatorFactory = new NetconfClientSessionNegotiatorFactory(timer, Optional.of(additionalHeader), socketTimeoutMillis); + } + + @Override + public Future createClient(InetSocketAddress address, + final NetconfClientSessionListener sessionListener, ReconnectStrategy strat) { + return super.createClient(address, strat, new PipelineInitializer() { + + @Override + public void initializeChannel(SocketChannel arg0, Promise arg1) { + new NetconfSshClientInitializer(authHandler, negotatorFactory, sessionListener).initialize(arg0, arg1); + } + + }); + } + + @Override + public Future createReconnectingClient(final InetSocketAddress address, + final NetconfClientSessionListener listener, + final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy) { + final NetconfSshClientInitializer init = new NetconfSshClientInitializer(authHandler, negotatorFactory, listener); + + return super.createReconnectingClient(address, connectStrategyFactory, reestablishStrategy, + new PipelineInitializer() { + @Override + public void initializeChannel(final SocketChannel ch, final Promise promise) { + init.initialize(ch, promise); + } + }); + } + + private static final class NetconfSshClientInitializer extends AbstractChannelInitializer { + + private final AuthenticationHandler authenticationHandler; + private final NetconfClientSessionNegotiatorFactory negotiatorFactory; + private final NetconfClientSessionListener sessionListener; + + public NetconfSshClientInitializer(AuthenticationHandler authHandler, + NetconfClientSessionNegotiatorFactory negotiatorFactory, + final NetconfClientSessionListener sessionListener) { + this.authenticationHandler = authHandler; + this.negotiatorFactory = negotiatorFactory; + this.sessionListener = sessionListener; + } + + @Override + public void initialize(SocketChannel ch, Promise promise) { + try { + Invoker invoker = Invoker.subsystem("netconf"); + ch.pipeline().addFirst(new SshHandler(authenticationHandler, invoker)); + super.initialize(ch,promise); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + protected void initializeAfterDecoder(SocketChannel ch, Promise promise) { + ch.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator(new SessionListenerFactory() { + @Override + public NetconfClientSessionListener getSessionListener() { + return sessionListener; + } + }, ch, promise)); + + } } }