package org.opendaylight.controller.netconf.client;
+import com.google.common.base.Optional;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
-
-import java.io.Closeable;
-import java.net.InetSocketAddress;
-
import org.opendaylight.controller.netconf.util.AbstractChannelInitializer;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
import org.opendaylight.protocol.framework.AbstractDispatcher;
import org.opendaylight.protocol.framework.ReconnectStrategy;
import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Optional;
+import java.io.Closeable;
+import java.net.InetSocketAddress;
public class NetconfClientDispatcher extends AbstractDispatcher<NetconfClientSession, NetconfClientSessionListener> implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(NetconfClientDispatcher.class);
- private final NetconfClientSessionNegotiatorFactory negotatorFactory;
+ private final NetconfClientSessionNegotiatorFactory negotiatorFactory;
private final HashedWheelTimer timer;
- public NetconfClientDispatcher(EventLoopGroup bossGroup, EventLoopGroup workerGroup, long clientConnectionTimeoutMillis) {
+ public NetconfClientDispatcher(EventLoopGroup bossGroup, EventLoopGroup workerGroup,
+ long clientConnectionTimeoutMillis) {
super(bossGroup, workerGroup);
timer = new HashedWheelTimer();
- this.negotatorFactory = new NetconfClientSessionNegotiatorFactory(timer, Optional.<String>absent(), clientConnectionTimeoutMillis);
+ this.negotiatorFactory = new NetconfClientSessionNegotiatorFactory(timer,
+ Optional.<NetconfHelloMessageAdditionalHeader> absent(), clientConnectionTimeoutMillis);
}
- public NetconfClientDispatcher(EventLoopGroup bossGroup, EventLoopGroup workerGroup, String additionalHeader, long connectionTimeoutMillis) {
+ public NetconfClientDispatcher(EventLoopGroup bossGroup, EventLoopGroup workerGroup,
+ NetconfHelloMessageAdditionalHeader additionalHeader, long connectionTimeoutMillis) {
super(bossGroup, workerGroup);
timer = new HashedWheelTimer();
- this.negotatorFactory = new NetconfClientSessionNegotiatorFactory(timer, Optional.of(additionalHeader), connectionTimeoutMillis);
+ this.negotiatorFactory = new NetconfClientSessionNegotiatorFactory(timer, Optional.of(additionalHeader),
+ connectionTimeoutMillis);
}
public Future<NetconfClientSession> createClient(InetSocketAddress address,
}
private void initialize(SocketChannel ch, Promise<NetconfClientSession> promise) {
- new ClientChannelInitializer( negotatorFactory, sessionListener).initialize(ch, promise);
+ new ClientChannelInitializer(negotiatorFactory, sessionListener).initialize(ch, promise);
}
});
}
public Future<Void> createReconnectingClient(final InetSocketAddress address,
final NetconfClientSessionListener listener,
final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy) {
- final ClientChannelInitializer init = new ClientChannelInitializer(negotatorFactory, listener);
+ final ClientChannelInitializer init = new ClientChannelInitializer(negotiatorFactory, listener);
return super.createReconnectingClient(address, connectStrategyFactory, reestablishStrategy,
new PipelineInitializer<NetconfClientSession>() {
});
}
- private static class ClientChannelInitializer extends AbstractChannelInitializer<NetconfClientSession> {
+ private static final class ClientChannelInitializer extends AbstractChannelInitializer<NetconfClientSession> {
private final NetconfClientSessionNegotiatorFactory negotiatorFactory;
private final NetconfClientSessionListener sessionListener;
}
@Override
- protected void initializeAfterDecoder(SocketChannel ch, Promise<NetconfClientSession> promise) {
- ch.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator(
- new SessionListenerFactory<NetconfClientSessionListener>() {
- @Override
- public NetconfClientSessionListener getSessionListener() {
- return sessionListener;
- }
- }, ch, promise));
+ public void initialize(SocketChannel ch, Promise<NetconfClientSession> promise) {
+ super.initialize(ch,promise);
+ }
+
+ @Override
+ protected void initializeSessionNegotiator(SocketChannel ch, Promise<NetconfClientSession> promise) {
+ ch.pipeline().addAfter(NETCONF_MESSAGE_DECODER, AbstractChannelInitializer.NETCONF_SESSION_NEGOTIATOR,
+ negotiatorFactory.getSessionNegotiator(
+ new SessionListenerFactory<NetconfClientSessionListener>() {
+ @Override
+ public NetconfClientSessionListener getSessionListener() {
+ return sessionListener;
+ }
+ }, ch, promise));
}
}