import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
+
import java.io.Closeable;
import java.net.InetSocketAddress;
-import org.opendaylight.controller.netconf.api.NetconfMessage;
-import org.opendaylight.controller.netconf.api.NetconfSession;
-import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
+
import org.opendaylight.controller.netconf.util.AbstractChannelInitializer;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
import org.opendaylight.protocol.framework.AbstractDispatcher;
import org.opendaylight.protocol.framework.ReconnectStrategy;
-import org.opendaylight.protocol.framework.SessionListener;
+import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
import org.opendaylight.protocol.framework.SessionListenerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Optional;
+
public class NetconfClientDispatcher extends AbstractDispatcher<NetconfClientSession, NetconfClientSessionListener> implements Closeable {
- private static final Logger logger = LoggerFactory.getLogger(NetconfClient.class);
+ private static final Logger logger = LoggerFactory.getLogger(NetconfClientDispatcher.class);
- private final NetconfClientSessionNegotiatorFactory negotatorFactory;
+ private final NetconfClientSessionNegotiatorFactory negotiatorFactory;
private final HashedWheelTimer timer;
- public NetconfClientDispatcher(EventLoopGroup bossGroup, EventLoopGroup workerGroup) {
+ public NetconfClientDispatcher(EventLoopGroup bossGroup, EventLoopGroup workerGroup,
+ long clientConnectionTimeoutMillis) {
+ super(bossGroup, workerGroup);
+ timer = new HashedWheelTimer();
+ this.negotiatorFactory = new NetconfClientSessionNegotiatorFactory(timer,
+ Optional.<NetconfHelloMessageAdditionalHeader> absent(), clientConnectionTimeoutMillis);
+ }
+
+ public NetconfClientDispatcher(EventLoopGroup bossGroup, EventLoopGroup workerGroup,
+ NetconfHelloMessageAdditionalHeader additionalHeader, long connectionTimeoutMillis) {
super(bossGroup, workerGroup);
timer = new HashedWheelTimer();
- this.negotatorFactory = new NetconfClientSessionNegotiatorFactory(timer);
+ this.negotiatorFactory = new NetconfClientSessionNegotiatorFactory(timer, Optional.of(additionalHeader),
+ connectionTimeoutMillis);
}
public Future<NetconfClientSession> createClient(InetSocketAddress address,
}
private void initialize(SocketChannel ch, Promise<NetconfClientSession> promise) {
- new ClientChannelInitializer( negotatorFactory, sessionListener).initialize(ch, promise);
+ new ClientChannelInitializer(negotiatorFactory, sessionListener).initialize(ch, promise);
+ }
+ });
+ }
+
+ public Future<Void> createReconnectingClient(final InetSocketAddress address,
+ final NetconfClientSessionListener listener,
+ final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy) {
+ final ClientChannelInitializer init = new ClientChannelInitializer(negotiatorFactory, listener);
+
+ return super.createReconnectingClient(address, connectStrategyFactory, reestablishStrategy,
+ new PipelineInitializer<NetconfClientSession>() {
+ @Override
+ public void initializeChannel(final SocketChannel ch, final Promise<NetconfClientSession> promise) {
+ init.initialize(ch, promise);
}
});
}
- private static class ClientChannelInitializer extends AbstractChannelInitializer {
+ private static class ClientChannelInitializer extends AbstractChannelInitializer<NetconfClientSession> {
private final NetconfClientSessionNegotiatorFactory negotiatorFactory;
private final NetconfClientSessionListener sessionListener;
private ClientChannelInitializer(NetconfClientSessionNegotiatorFactory negotiatorFactory,
- NetconfClientSessionListener sessionListener) {
+ NetconfClientSessionListener sessionListener) {
this.negotiatorFactory = negotiatorFactory;
this.sessionListener = sessionListener;
}
@Override
- public void initialize(SocketChannel ch, Promise<? extends NetconfSession> promise) {
+ public void initialize(SocketChannel ch, Promise<NetconfClientSession> promise) {
super.initialize(ch,promise);
}
@Override
- protected void initializeAfterDecoder(SocketChannel ch, Promise<? extends NetconfSession> promise) {
- ch.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator(new SessionListenerFactory() {
- @Override
- public SessionListener<NetconfMessage, NetconfClientSession, NetconfTerminationReason> getSessionListener() {
- return sessionListener;
- }
- }, ch, promise));
+ protected void initializeSessionNegotiator(SocketChannel ch, Promise<NetconfClientSession> promise) {
+ ch.pipeline().addAfter(NETCONF_MESSAGE_DECODER, AbstractChannelInitializer.NETCONF_SESSION_NEGOTIATOR,
+ negotiatorFactory.getSessionNegotiator(
+ new SessionListenerFactory<NetconfClientSessionListener>() {
+ @Override
+ public NetconfClientSessionListener getSessionListener() {
+ return sessionListener;
+ }
+ }, ch, promise));
}
-
}
+
@Override
public void close() {
try {