X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fnetconf-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Fclient%2FNetconfClientDispatcher.java;h=c7c723cb27b552fa51b7a7a287f66306649f35ca;hp=bff2a54c58926e9e445ac13878950384f1b9160d;hb=84248dac9ed8aa37e996e39429c8aa8ece473eaf;hpb=e7291930e152c05262b85f0acdf8c3907f89ec7e diff --git a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientDispatcher.java b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientDispatcher.java index bff2a54c58..c7c723cb27 100644 --- a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientDispatcher.java +++ b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientDispatcher.java @@ -8,16 +8,14 @@ package org.opendaylight.controller.netconf.client; +import com.google.common.base.Optional; import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.util.HashedWheelTimer; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; - -import java.io.Closeable; -import java.net.InetSocketAddress; - import org.opendaylight.controller.netconf.util.AbstractChannelInitializer; +import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader; import org.opendaylight.protocol.framework.AbstractDispatcher; import org.opendaylight.protocol.framework.ReconnectStrategy; import org.opendaylight.protocol.framework.ReconnectStrategyFactory; @@ -25,25 +23,30 @@ import org.opendaylight.protocol.framework.SessionListenerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Optional; +import java.io.Closeable; +import java.net.InetSocketAddress; public class NetconfClientDispatcher extends AbstractDispatcher implements Closeable { private static final Logger logger = LoggerFactory.getLogger(NetconfClientDispatcher.class); - private final NetconfClientSessionNegotiatorFactory negotatorFactory; + private final NetconfClientSessionNegotiatorFactory negotiatorFactory; private final HashedWheelTimer timer; - public NetconfClientDispatcher(EventLoopGroup bossGroup, EventLoopGroup workerGroup, long clientConnectionTimeoutMillis) { + public NetconfClientDispatcher(EventLoopGroup bossGroup, EventLoopGroup workerGroup, + long clientConnectionTimeoutMillis) { super(bossGroup, workerGroup); timer = new HashedWheelTimer(); - this.negotatorFactory = new NetconfClientSessionNegotiatorFactory(timer, Optional.absent(), clientConnectionTimeoutMillis); + this.negotiatorFactory = new NetconfClientSessionNegotiatorFactory(timer, + Optional. absent(), clientConnectionTimeoutMillis); } - public NetconfClientDispatcher(EventLoopGroup bossGroup, EventLoopGroup workerGroup, String additionalHeader, long connectionTimeoutMillis) { + public NetconfClientDispatcher(EventLoopGroup bossGroup, EventLoopGroup workerGroup, + NetconfHelloMessageAdditionalHeader additionalHeader, long connectionTimeoutMillis) { super(bossGroup, workerGroup); timer = new HashedWheelTimer(); - this.negotatorFactory = new NetconfClientSessionNegotiatorFactory(timer, Optional.of(additionalHeader), connectionTimeoutMillis); + this.negotiatorFactory = new NetconfClientSessionNegotiatorFactory(timer, Optional.of(additionalHeader), + connectionTimeoutMillis); } public Future createClient(InetSocketAddress address, @@ -57,7 +60,7 @@ public class NetconfClientDispatcher extends AbstractDispatcher promise) { - new ClientChannelInitializer( negotatorFactory, sessionListener).initialize(ch, promise); + new ClientChannelInitializer(negotiatorFactory, sessionListener).initialize(ch, promise); } }); } @@ -65,7 +68,7 @@ public class NetconfClientDispatcher extends AbstractDispatcher createReconnectingClient(final InetSocketAddress address, final NetconfClientSessionListener listener, final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy) { - final ClientChannelInitializer init = new ClientChannelInitializer(negotatorFactory, listener); + final ClientChannelInitializer init = new ClientChannelInitializer(negotiatorFactory, listener); return super.createReconnectingClient(address, connectStrategyFactory, reestablishStrategy, new PipelineInitializer() { @@ -76,7 +79,7 @@ public class NetconfClientDispatcher extends AbstractDispatcher { + private static final class ClientChannelInitializer extends AbstractChannelInitializer { private final NetconfClientSessionNegotiatorFactory negotiatorFactory; private final NetconfClientSessionListener sessionListener; @@ -88,14 +91,20 @@ public class NetconfClientDispatcher extends AbstractDispatcher promise) { - ch.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator( - new SessionListenerFactory() { - @Override - public NetconfClientSessionListener getSessionListener() { - return sessionListener; - } - }, ch, promise)); + public void initialize(SocketChannel ch, Promise promise) { + super.initialize(ch,promise); + } + + @Override + protected void initializeSessionNegotiator(SocketChannel ch, Promise promise) { + ch.pipeline().addAfter(NETCONF_MESSAGE_DECODER, AbstractChannelInitializer.NETCONF_SESSION_NEGOTIATOR, + negotiatorFactory.getSessionNegotiator( + new SessionListenerFactory() { + @Override + public NetconfClientSessionListener getSessionListener() { + return sessionListener; + } + }, ch, promise)); } }