*/
package org.opendaylight.protocol.pcep.impl;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
+import static java.util.Objects.requireNonNull;
+
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
+import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollMode;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.Promise;
import java.io.Closeable;
import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.protocol.concepts.KeyMapping;
import org.opendaylight.protocol.pcep.PCEPDispatcher;
-import org.opendaylight.protocol.pcep.PCEPPeerProposal;
-import org.opendaylight.protocol.pcep.PCEPSessionListenerFactory;
+import org.opendaylight.protocol.pcep.PCEPDispatcherDependencies;
import org.opendaylight.protocol.pcep.PCEPSessionNegotiatorFactory;
import org.opendaylight.protocol.pcep.spi.MessageRegistry;
import org.slf4j.Logger;
public class PCEPDispatcherImpl implements PCEPDispatcher, Closeable {
private static final Logger LOG = LoggerFactory.getLogger(PCEPDispatcherImpl.class);
private static final Integer SOCKET_BACKLOG_SIZE = 128;
- private final PCEPSessionNegotiatorFactory snf;
+ private static final long TIMEOUT = 10;
+ private final PCEPSessionNegotiatorFactory<PCEPSessionImpl> snf;
private final PCEPHandlerFactory hf;
-
-
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
private final EventExecutor executor;
- private Optional<KeyMapping> keys;
+ @GuardedBy("this")
+ private KeyMapping keys;
/**
* Creates an instance of PCEPDispatcherImpl, gets the default selector and opens it.
*
- * @param registry a message registry
+ * @param registry a message registry
* @param negotiatorFactory a negotiation factory
- * @param bossGroup accepts an incoming connection
- * @param workerGroup handles the traffic of accepted connection
+ * @param bossGroup accepts an incoming connection
+ * @param workerGroup handles the traffic of accepted connection
*/
- public PCEPDispatcherImpl(final MessageRegistry registry,
- final PCEPSessionNegotiatorFactory negotiatorFactory,
- final EventLoopGroup bossGroup, final EventLoopGroup workerGroup) {
- this.snf = Preconditions.checkNotNull(negotiatorFactory);
+ public PCEPDispatcherImpl(final @NonNull MessageRegistry registry,
+ final @NonNull PCEPSessionNegotiatorFactory<PCEPSessionImpl> negotiatorFactory,
+ final @NonNull EventLoopGroup bossGroup, final @NonNull EventLoopGroup workerGroup) {
+ this.snf = requireNonNull(negotiatorFactory);
this.hf = new PCEPHandlerFactory(registry);
if (Epoll.isAvailable()) {
this.bossGroup = new EpollEventLoopGroup();
this.workerGroup = new EpollEventLoopGroup();
} else {
- this.bossGroup = Preconditions.checkNotNull(bossGroup);
- this.workerGroup = Preconditions.checkNotNull(workerGroup);
+ this.bossGroup = requireNonNull(bossGroup);
+ this.workerGroup = requireNonNull(workerGroup);
}
- this.executor = Preconditions.checkNotNull(GlobalEventExecutor.INSTANCE);
+ this.executor = requireNonNull(GlobalEventExecutor.INSTANCE);
}
@Override
- public synchronized ChannelFuture createServer(final InetSocketAddress address,
- final PCEPSessionListenerFactory listenerFactory, final PCEPPeerProposal peerProposal) {
- return createServer(address, Optional.<KeyMapping>absent(), listenerFactory, peerProposal);
- }
-
- @Override
- public synchronized ChannelFuture createServer(final InetSocketAddress address, final Optional<KeyMapping> keys,
- final PCEPSessionListenerFactory listenerFactory, final PCEPPeerProposal peerProposal) {
- this.keys = keys;
+ public final synchronized ChannelFuture createServer(final PCEPDispatcherDependencies dispatcherDependencies) {
+ this.keys = dispatcherDependencies.getKeys();
final ChannelPipelineInitializer initializer = (ch, promise) -> {
- ch.pipeline().addLast(PCEPDispatcherImpl.this.hf.getDecoders());
- ch.pipeline().addLast("negotiator", PCEPDispatcherImpl.this.snf.getSessionNegotiator(listenerFactory, ch, promise, peerProposal));
- ch.pipeline().addLast(PCEPDispatcherImpl.this.hf.getEncoders());
+ ch.pipeline().addLast(this.hf.getDecoders());
+ ch.pipeline().addLast("negotiator", this.snf
+ .getSessionNegotiator(dispatcherDependencies, ch, promise));
+ ch.pipeline().addLast(this.hf.getEncoders());
};
final ServerBootstrap b = createServerBootstrap(initializer);
+ final InetSocketAddress address = dispatcherDependencies.getAddress();
final ChannelFuture f = b.bind(address);
LOG.debug("Initiated server {} at {}.", f, address);
- this.keys = Optional.absent();
+ this.keys = KeyMapping.getKeyMapping();
return f;
}
- protected ServerBootstrap createServerBootstrap(final ChannelPipelineInitializer initializer) {
+ synchronized ServerBootstrap createServerBootstrap(final ChannelPipelineInitializer initializer) {
final ServerBootstrap b = new ServerBootstrap();
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(final SocketChannel ch) {
- initializer.initializeChannel(ch, new DefaultPromise(PCEPDispatcherImpl.this.executor));
+ initializer.initializeChannel(ch, new DefaultPromise<>(PCEPDispatcherImpl.this.executor));
}
});
b.option(ChannelOption.SO_BACKLOG, SOCKET_BACKLOG_SIZE);
if (Epoll.isAvailable()) {
b.channel(EpollServerSocketChannel.class);
+ b.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
} else {
b.channel(NioServerSocketChannel.class);
}
- if (this.keys.isPresent()) {
+ if (!this.keys.isEmpty()) {
if (Epoll.isAvailable()) {
- b.option(EpollChannelOption.TCP_MD5SIG, this.keys.get());
+ b.option(EpollChannelOption.TCP_MD5SIG, this.keys);
} else {
throw new UnsupportedOperationException(Epoll.unavailabilityCause().getCause());
}
}
// Make sure we are doing round-robin processing
- b.childOption(ChannelOption.MAX_MESSAGES_PER_READ, 1);
+ b.childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(1));
- if (b.group() == null) {
+ if (b.config().group() == null) {
b.group(this.bossGroup, this.workerGroup);
}
}
@Override
- public void close() {
+ public final void close() {
if (Epoll.isAvailable()) {
- this.workerGroup.shutdownGracefully().awaitUninterruptibly();
- this.bossGroup.shutdownGracefully().awaitUninterruptibly();
+ this.workerGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
+ this.bossGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
}
}
- protected interface ChannelPipelineInitializer {
- void initializeChannel(SocketChannel socketChannel, Promise<PCEPSessionImpl> promise);
- }
-
@Override
- public PCEPSessionNegotiatorFactory getPCEPSessionNegotiatorFactory() {
+ public final PCEPSessionNegotiatorFactory<PCEPSessionImpl> getPCEPSessionNegotiatorFactory() {
return this.snf;
}
+
+ protected interface ChannelPipelineInitializer {
+ void initializeChannel(SocketChannel socketChannel, Promise<PCEPSessionImpl> promise);
+ }
}