public final class BGPDispatcherImpl extends AbstractDispatcher<BGPSessionImpl, BGPSessionListener> implements BGPDispatcher {
private final Timer timer = new HashedWheelTimer();
- private BGPSessionNegotiatorFactory snf;
-
private final BGPHandlerFactory hf;
public BGPDispatcherImpl(final BGPMessageFactory parser) {
@Override
public Future<? extends BGPSession> createClient(final InetSocketAddress address, final BGPSessionPreferences preferences,
final BGPSessionListener listener, final ReconnectStrategy strategy) {
- this.snf = new BGPSessionNegotiatorFactory(this.timer, preferences);
+ final BGPSessionNegotiatorFactory snf = new BGPSessionNegotiatorFactory(this.timer, preferences);
final SessionListenerFactory<BGPSessionListener> slf = new SessionListenerFactory<BGPSessionListener>() {
-
@Override
public BGPSessionListener getSessionListener() {
return listener;
}
};
- return super.createClient(address, strategy, slf);
- }
-
- @Override
- public void initializeChannel(final SocketChannel ch, final Promise<BGPSessionImpl> promise,
- final SessionListenerFactory<BGPSessionListener> slf) {
- ch.pipeline().addLast(this.hf.getDecoders());
- ch.pipeline().addLast("negotiator", this.snf.getSessionNegotiator(slf, ch, promise));
- ch.pipeline().addLast(this.hf.getEncoders());
+ return super.createClient(address, strategy, new PipelineInitializer<BGPSessionImpl>() {
+ @Override
+ public void initializeChannel(final SocketChannel ch, final Promise<BGPSessionImpl> promise) {
+ ch.pipeline().addLast(hf.getDecoders());
+ ch.pipeline().addLast("negotiator", snf.getSessionNegotiator(slf, ch, promise));
+ ch.pipeline().addLast(hf.getEncoders());
+ }
+ });
}
}
this.factory = Preconditions.checkNotNull(factory);
}
- @Override
- public void initializeChannel(final SocketChannel ch, final Promise<S> promise, final SessionListenerFactory<L> listenerFactory) {
- ch.pipeline().addLast(this.factory.getDecoders());
- ch.pipeline().addLast("negotiator", this.negotiatorFactory.getSessionNegotiator(listenerFactory, ch, promise));
- ch.pipeline().addLast(this.factory.getEncoders());
+ public void createServer(final InetSocketAddress address, final SessionListenerFactory<L> listenerFactory) {
+ super.createServer(address, new PipelineInitializer<S>() {
+
+ @Override
+ public void initializeChannel(final SocketChannel ch, final Promise<S> promise) {
+ ch.pipeline().addLast(factory.getDecoders());
+ ch.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator(listenerFactory, ch, promise));
+ ch.pipeline().addLast(factory.getEncoders());
+ }
+ });
}
public static void main(final String[] args) throws IOException {
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
-
/**
* Dispatcher class for creating servers and clients. The idea is to first create servers and clients and the run the
* start method that will handle sockets in different thread.
*/
public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends SessionListener<?, ?, ?>> implements Closeable {
+ protected interface PipelineInitializer<S extends ProtocolSession<?>> {
+ /**
+ * Initializes channel by specifying the handlers in its pipeline. Handlers are protocol specific, therefore this
+ * method needs to be implemented in protocol specific Dispatchers.
+ *
+ * @param channel whose pipeline should be defined, also to be passed to {@link SessionNegotiatorFactory}
+ * @param promise to be passed to {@link SessionNegotiatorFactory}
+ */
+ public void initializeChannel(SocketChannel channel, Promise<S> promise);
+ }
+
+
private static final Logger logger = LoggerFactory.getLogger(AbstractDispatcher.class);
private final EventLoopGroup bossGroup;
this.workerGroup = new NioEventLoopGroup();
}
- /**
- * Initializes channel by specifying the handlers in its pipeline. Handlers are protocol specific, therefore this
- * method needs to be implemented in protocol specific Dispatchers.
- *
- * @param channel whose pipeline should be defined, also to be passed to {@link SessionNegotiatorFactory}
- * @param promise to be passed to {@link SessionNegotiatorFactory}
- */
- public abstract void initializeChannel(SocketChannel channel, Promise<S> promise, final SessionListenerFactory<L> lfactory);
-
/**
* Creates server. Each server needs factories to pass their instances to client sessions.
*
* @param address address to which the server should be bound
+ * @param initializer instance of PipelineInitializer used to initialize the channel pipeline
*
* @return ChannelFuture representing the binding process
*/
- @VisibleForTesting
- public ChannelFuture createServer(final InetSocketAddress address, final SessionListenerFactory<L> lfactory) {
+ protected ChannelFuture createServer(final InetSocketAddress address, final PipelineInitializer<S> initializer) {
final ServerBootstrap b = new ServerBootstrap();
b.group(this.bossGroup, this.workerGroup);
b.channel(NioServerSocketChannel.class);
@Override
protected void initChannel(final SocketChannel ch) throws Exception {
- initializeChannel(ch, new DefaultPromise<S>(GlobalEventExecutor.INSTANCE), lfactory);
+ initializer.initializeChannel(ch, new DefaultPromise<S>(GlobalEventExecutor.INSTANCE));
}
});
b.childOption(ChannelOption.SO_KEEPALIVE, true);
* @return Future representing the connection process. Its result represents the combined success of TCP connection
* as well as session negotiation.
*/
- @VisibleForTesting
- public Future<S> createClient(final InetSocketAddress address, final ReconnectStrategy strategy,
- final SessionListenerFactory<L> lfactory) {
+ protected Future<S> createClient(final InetSocketAddress address, final ReconnectStrategy strategy, final PipelineInitializer<S> initializer) {
final Bootstrap b = new Bootstrap();
final ProtocolSessionPromise<S> p = new ProtocolSessionPromise<S>(address, strategy, b);
b.group(this.workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(
@Override
protected void initChannel(final SocketChannel ch) throws Exception {
- initializeChannel(ch, p, lfactory);
+ initializer.initializeChannel(ch, p);
}
});
p.connect();
* success if it indicates no further attempts should be made and failure if it reports an error
*/
protected Future<Void> createReconnectingClient(final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory,
- final ReconnectStrategy reestablishStrategy, final SessionListenerFactory<L> lfactory) {
+ final ReconnectStrategy reestablishStrategy, final PipelineInitializer<S> initializer) {
- final ReconnectPromise<S, L> p = new ReconnectPromise<S, L>(this, address, connectStrategyFactory, reestablishStrategy, lfactory);
+ final ReconnectPromise<S, L> p = new ReconnectPromise<S, L>(this, address, connectStrategyFactory, reestablishStrategy, initializer);
p.connect();
return p;
import java.net.InetSocketAddress;
+import org.opendaylight.protocol.framework.AbstractDispatcher.PipelineInitializer;
+
import com.google.common.base.Preconditions;
final class ReconnectPromise<S extends ProtocolSession<?>, L extends SessionListener<?, ?, ?>> extends DefaultPromise<Void> {
private final InetSocketAddress address;
private final ReconnectStrategyFactory strategyFactory;
private final ReconnectStrategy strategy;
+ private final PipelineInitializer<S> initializer;
private Future<?> pending;
- private final SessionListenerFactory<L> lfactory;
public ReconnectPromise(final AbstractDispatcher<S, L> dispatcher, final InetSocketAddress address,
final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy,
- final SessionListenerFactory<L> lfactory) {
+ final PipelineInitializer<S> initializer) {
this.dispatcher = Preconditions.checkNotNull(dispatcher);
this.address = Preconditions.checkNotNull(address);
this.strategyFactory = Preconditions.checkNotNull(connectStrategyFactory);
this.strategy = Preconditions.checkNotNull(reestablishStrategy);
- this.lfactory = Preconditions.checkNotNull(lfactory);
+ this.initializer = Preconditions.checkNotNull(initializer);
}
synchronized void connect() {
}
};
- final Future<S> cf = this.dispatcher.createClient(this.address, rs, this.lfactory);
+ final Future<S> cf = this.dispatcher.createClient(this.address, rs, this.initializer);
final Object lock = this;
this.pending = cf;
public class ServerTest {
public static final int PORT = 18080;
- AbstractDispatcher<?, SimpleSessionListener> clientDispatcher, dispatcher;
+ SimpleDispatcher clientDispatcher, dispatcher;
final SimpleSessionListener pce = new SimpleSessionListener();
public void testConnectionEstablished() throws Exception {
final Promise<Boolean> p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
- this.dispatcher = new SimpleDispatcher<>(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
+ this.dispatcher = new SimpleDispatcher(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
@Override
public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
this.server.get();
- this.clientDispatcher = new SimpleDispatcher<>(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
+ this.clientDispatcher = new SimpleDispatcher(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
@Override
public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
final Channel channel, final Promise<SimpleSession> promise) {
this.session = (SimpleSession) this.clientDispatcher.createClient(this.serverAddress,
new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), new SessionListenerFactory<SimpleSessionListener>() {
- @Override
- public SimpleSessionListener getSessionListener() {
- return new SimpleSessionListener();
- }
- }).get(6, TimeUnit.SECONDS);
+ @Override
+ public SimpleSessionListener getSessionListener() {
+ return new SimpleSessionListener();
+ }
+ }).get(6, TimeUnit.SECONDS);
assertEquals(true, p.get(3, TimeUnit.SECONDS));
}
public void testConnectionFailed() throws IOException, InterruptedException, ExecutionException, TimeoutException {
final Promise<Boolean> p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
- this.dispatcher = new SimpleDispatcher<>(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
+ this.dispatcher = new SimpleDispatcher(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
@Override
public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
this.server.get();
- this.clientDispatcher = new SimpleDispatcher<>(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
+ this.clientDispatcher = new SimpleDispatcher(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
@Override
public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
final Channel channel, final Promise<SimpleSession> promise) {
this.session = (SimpleSession) this.clientDispatcher.createClient(this.serverAddress,
new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), new SessionListenerFactory<SimpleSessionListener>() {
- @Override
- public SimpleSessionListener getSessionListener() {
- return new SimpleSessionListener();
- }
- }).get(6, TimeUnit.SECONDS);
+ @Override
+ public SimpleSessionListener getSessionListener() {
+ return new SimpleSessionListener();
+ }
+ }).get(6, TimeUnit.SECONDS);
final Future<?> session = this.clientDispatcher.createClient(this.serverAddress,
new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), new SessionListenerFactory<SimpleSessionListener>() {
- @Override
- public SimpleSessionListener getSessionListener() {
- return new SimpleSessionListener();
- }
- });
+ @Override
+ public SimpleSessionListener getSessionListener() {
+ return new SimpleSessionListener();
+ }
+ });
assertFalse(session.isSuccess());
}
package org.opendaylight.protocol.framework;
+import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.SocketChannel;
+import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
+import java.net.InetSocketAddress;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
-public class SimpleDispatcher<M, S extends ProtocolSession<?>, L extends SessionListener<?, ?, ?>> extends
-AbstractDispatcher<S, L> {
+public class SimpleDispatcher extends AbstractDispatcher<SimpleSession, SimpleSessionListener> {
private static final Logger logger = LoggerFactory.getLogger(SimpleDispatcher.class);
- private final SessionNegotiatorFactory<M, S, L> negotiatorFactory;
+ private final SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener> negotiatorFactory;
private final ProtocolHandlerFactory<?> factory;
- public SimpleDispatcher(final SessionNegotiatorFactory<M, S, L> negotiatorFactory, final ProtocolHandlerFactory<?> factory,
- final Promise<S> promise) {
+ private final class SimplePipelineInitializer implements PipelineInitializer<SimpleSession> {
+ final SessionListenerFactory<SimpleSessionListener> listenerFactory;
+
+ SimplePipelineInitializer(final SessionListenerFactory<SimpleSessionListener> listenerFactory) {
+ this.listenerFactory = Preconditions.checkNotNull(listenerFactory);
+ }
+
+ @Override
+ public void initializeChannel(final SocketChannel channel, final Promise<SimpleSession> promise) {
+ channel.pipeline().addLast(factory.getDecoders());
+ channel.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator(listenerFactory, channel, promise));
+ channel.pipeline().addLast(factory.getEncoders());
+ logger.debug("initialization completed for channel {}", channel);
+ }
+
+ }
+
+ public SimpleDispatcher(final SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener> negotiatorFactory, final ProtocolHandlerFactory<?> factory,
+ final Promise<SimpleSession> promise) {
this.negotiatorFactory = Preconditions.checkNotNull(negotiatorFactory);
this.factory = Preconditions.checkNotNull(factory);
}
- @Override
- public void initializeChannel(final SocketChannel ch, final Promise<S> promise, final SessionListenerFactory<L> lfactory) {
- ch.pipeline().addLast(this.factory.getDecoders());
- ch.pipeline().addLast("negotiator", this.negotiatorFactory.getSessionNegotiator(lfactory, ch, promise));
- ch.pipeline().addLast(this.factory.getEncoders());
- logger.debug("initialization completed for channel {}", ch);
+ public Future<SimpleSession> createClient(final InetSocketAddress address, final ReconnectStrategy strategy, final SessionListenerFactory<SimpleSessionListener> listenerFactory) {
+ return super.createClient(address, strategy, new SimplePipelineInitializer(listenerFactory));
+ }
+
+ public ChannelFuture createServer(final InetSocketAddress address, final SessionListenerFactory<SimpleSessionListener> listenerFactory) {
+ return super.createServer(address, new SimplePipelineInitializer(listenerFactory));
}
}
import io.netty.channel.ChannelFuture;
-import java.io.IOException;
import java.net.InetSocketAddress;
import org.opendaylight.protocol.framework.SessionListenerFactory;
* @param address to be bound with the server
* @param listenerFactory to create listeners for clients
* @return instance of PCEPServer
- * @throws IOException if some IO error occurred
*/
public ChannelFuture createServer(final InetSocketAddress address, final SessionListenerFactory<PCEPSessionListener> listenerFactory);
}
@Override
public ChannelFuture createServer(final InetSocketAddress address, final SessionListenerFactory<PCEPSessionListener> listenerFactory) {
- return super.createServer(address, listenerFactory);
- }
-
- @Override
- public void initializeChannel(final SocketChannel ch, final Promise<PCEPSessionImpl> promise,
- final SessionListenerFactory<PCEPSessionListener> listenerFactory) {
- ch.pipeline().addLast(this.hf.getDecoders());
- ch.pipeline().addLast("negotiator", this.snf.getSessionNegotiator(listenerFactory, ch, promise));
- ch.pipeline().addLast(this.hf.getEncoders());
+ return super.createServer(address, new PipelineInitializer<PCEPSessionImpl>() {
+ @Override
+ public void initializeChannel(final SocketChannel ch, final Promise<PCEPSessionImpl> promise) {
+ ch.pipeline().addLast(hf.getDecoders());
+ ch.pipeline().addLast("negotiator", snf.getSessionNegotiator(listenerFactory, ch, promise));
+ ch.pipeline().addLast(hf.getEncoders());
+ }
+ });
}
}
import io.netty.channel.socket.SocketChannel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.Promise;
import org.opendaylight.protocol.framework.NeverReconnectStrategy;
import org.opendaylight.protocol.framework.ProtocolHandlerFactory;
import org.opendaylight.protocol.framework.ProtocolSession;
+import org.opendaylight.protocol.framework.ReconnectStrategy;
import org.opendaylight.protocol.framework.SessionListener;
import org.opendaylight.protocol.framework.SessionListenerFactory;
import org.opendaylight.protocol.framework.SessionNegotiatorFactory;
this.factory = Preconditions.checkNotNull(factory);
}
- @Override
- public void initializeChannel(final SocketChannel ch, final Promise<S> promise, final SessionListenerFactory<L> listenerFactory) {
- ch.pipeline().addLast(this.factory.getDecoders());
- ch.pipeline().addLast("negotiator", this.negotiatorFactory.getSessionNegotiator(listenerFactory, ch, promise));
- ch.pipeline().addLast(this.factory.getEncoders());
+ public Future<S> createClient(final InetSocketAddress address, final ReconnectStrategy strategy, final SessionListenerFactory<L> listenerFactory) {
+ return super.createClient(address, strategy, new PipelineInitializer<S>() {
+ @Override
+ public void initializeChannel(final SocketChannel ch, final Promise<S> promise) {
+ ch.pipeline().addLast(factory.getDecoders());
+ ch.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator(listenerFactory, ch, promise));
+ ch.pipeline().addLast(factory.getEncoders());
+ }
+ });
}
public static void main(final String[] args) throws Exception {