Add ByteBufUtils
[bgpcep.git] / pcep / impl / src / main / java / org / opendaylight / protocol / pcep / impl / PCEPDispatcherImpl.java
index fc6c6e2765147c507667b76db44b5b7c885ede31..618608b54176dfcd940504a9f7c749eb1614c3f3 100644 (file)
@@ -7,17 +7,19 @@
  */
 package org.opendaylight.protocol.pcep.impl;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
+import static java.util.Objects.requireNonNull;
+
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
+import io.netty.channel.FixedRecvByteBufAllocator;
 import io.netty.channel.epoll.Epoll;
 import io.netty.channel.epoll.EpollChannelOption;
 import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollMode;
 import io.netty.channel.epoll.EpollServerSocketChannel;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
@@ -27,10 +29,12 @@ import io.netty.util.concurrent.GlobalEventExecutor;
 import io.netty.util.concurrent.Promise;
 import java.io.Closeable;
 import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.protocol.concepts.KeyMapping;
 import org.opendaylight.protocol.pcep.PCEPDispatcher;
-import org.opendaylight.protocol.pcep.PCEPPeerProposal;
-import org.opendaylight.protocol.pcep.PCEPSessionListenerFactory;
+import org.opendaylight.protocol.pcep.PCEPDispatcherDependencies;
 import org.opendaylight.protocol.pcep.PCEPSessionNegotiatorFactory;
 import org.opendaylight.protocol.pcep.spi.MessageRegistry;
 import org.slf4j.Logger;
@@ -42,69 +46,64 @@ import org.slf4j.LoggerFactory;
 public class PCEPDispatcherImpl implements PCEPDispatcher, Closeable {
     private static final Logger LOG = LoggerFactory.getLogger(PCEPDispatcherImpl.class);
     private static final Integer SOCKET_BACKLOG_SIZE = 128;
-    private final PCEPSessionNegotiatorFactory snf;
+    private static final long TIMEOUT = 10;
+    private final PCEPSessionNegotiatorFactory<PCEPSessionImpl> snf;
     private final PCEPHandlerFactory hf;
-
-
     private final EventLoopGroup bossGroup;
     private final EventLoopGroup workerGroup;
     private final EventExecutor executor;
-    private Optional<KeyMapping> keys;
+    @GuardedBy("this")
+    private KeyMapping keys;
 
     /**
      * Creates an instance of PCEPDispatcherImpl, gets the default selector and opens it.
      *
-     * @param registry a message registry
+     * @param registry          a message registry
      * @param negotiatorFactory a negotiation factory
-     * @param bossGroup accepts an incoming connection
-     * @param workerGroup handles the traffic of accepted connection
+     * @param bossGroup         accepts an incoming connection
+     * @param workerGroup       handles the traffic of accepted connection
      */
-    public PCEPDispatcherImpl(final MessageRegistry registry,
-            final PCEPSessionNegotiatorFactory negotiatorFactory,
-            final EventLoopGroup bossGroup, final EventLoopGroup workerGroup) {
-        this.snf = Preconditions.checkNotNull(negotiatorFactory);
+    public PCEPDispatcherImpl(final @NonNull MessageRegistry registry,
+            final @NonNull PCEPSessionNegotiatorFactory<PCEPSessionImpl> negotiatorFactory,
+            final @NonNull EventLoopGroup bossGroup, final @NonNull EventLoopGroup workerGroup) {
+        this.snf = requireNonNull(negotiatorFactory);
         this.hf = new PCEPHandlerFactory(registry);
         if (Epoll.isAvailable()) {
             this.bossGroup = new EpollEventLoopGroup();
             this.workerGroup = new EpollEventLoopGroup();
         } else {
-            this.bossGroup = Preconditions.checkNotNull(bossGroup);
-            this.workerGroup = Preconditions.checkNotNull(workerGroup);
+            this.bossGroup = requireNonNull(bossGroup);
+            this.workerGroup = requireNonNull(workerGroup);
         }
-        this.executor = Preconditions.checkNotNull(GlobalEventExecutor.INSTANCE);
+        this.executor = requireNonNull(GlobalEventExecutor.INSTANCE);
     }
 
     @Override
-    public synchronized ChannelFuture createServer(final InetSocketAddress address,
-            final PCEPSessionListenerFactory listenerFactory, final PCEPPeerProposal peerProposal) {
-        return createServer(address, Optional.<KeyMapping>absent(), listenerFactory, peerProposal);
-    }
-
-    @Override
-    public synchronized ChannelFuture createServer(final InetSocketAddress address, final Optional<KeyMapping> keys,
-            final PCEPSessionListenerFactory listenerFactory, final PCEPPeerProposal peerProposal) {
-        this.keys = keys;
+    public final synchronized ChannelFuture createServer(final PCEPDispatcherDependencies dispatcherDependencies) {
+        this.keys = dispatcherDependencies.getKeys();
 
         final ChannelPipelineInitializer initializer = (ch, promise) -> {
-            ch.pipeline().addLast(PCEPDispatcherImpl.this.hf.getDecoders());
-            ch.pipeline().addLast("negotiator", PCEPDispatcherImpl.this.snf.getSessionNegotiator(listenerFactory, ch, promise, peerProposal));
-            ch.pipeline().addLast(PCEPDispatcherImpl.this.hf.getEncoders());
+            ch.pipeline().addLast(this.hf.getDecoders());
+            ch.pipeline().addLast("negotiator", this.snf
+                    .getSessionNegotiator(dispatcherDependencies, ch, promise));
+            ch.pipeline().addLast(this.hf.getEncoders());
         };
 
         final ServerBootstrap b = createServerBootstrap(initializer);
+        final InetSocketAddress address = dispatcherDependencies.getAddress();
         final ChannelFuture f = b.bind(address);
         LOG.debug("Initiated server {} at {}.", f, address);
 
-        this.keys = Optional.absent();
+        this.keys = KeyMapping.getKeyMapping();
         return f;
     }
 
-    protected ServerBootstrap createServerBootstrap(final ChannelPipelineInitializer initializer) {
+    synchronized ServerBootstrap createServerBootstrap(final ChannelPipelineInitializer initializer) {
         final ServerBootstrap b = new ServerBootstrap();
         b.childHandler(new ChannelInitializer<SocketChannel>() {
             @Override
             protected void initChannel(final SocketChannel ch) {
-                initializer.initializeChannel(ch, new DefaultPromise(PCEPDispatcherImpl.this.executor));
+                initializer.initializeChannel(ch, new DefaultPromise<>(PCEPDispatcherImpl.this.executor));
             }
         });
         b.option(ChannelOption.SO_BACKLOG, SOCKET_BACKLOG_SIZE);
@@ -113,21 +112,22 @@ public class PCEPDispatcherImpl implements PCEPDispatcher, Closeable {
 
         if (Epoll.isAvailable()) {
             b.channel(EpollServerSocketChannel.class);
+            b.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
         } else {
             b.channel(NioServerSocketChannel.class);
         }
-        if (this.keys.isPresent()) {
+        if (!this.keys.isEmpty()) {
             if (Epoll.isAvailable()) {
-                b.option(EpollChannelOption.TCP_MD5SIG, this.keys.get());
+                b.option(EpollChannelOption.TCP_MD5SIG, this.keys);
             } else {
                 throw new UnsupportedOperationException(Epoll.unavailabilityCause().getCause());
             }
         }
 
         // Make sure we are doing round-robin processing
-        b.childOption(ChannelOption.MAX_MESSAGES_PER_READ, 1);
+        b.childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(1));
 
-        if (b.group() == null) {
+        if (b.config().group() == null) {
             b.group(this.bossGroup, this.workerGroup);
         }
 
@@ -135,19 +135,19 @@ public class PCEPDispatcherImpl implements PCEPDispatcher, Closeable {
     }
 
     @Override
-    public void close() {
+    public final void close() {
         if (Epoll.isAvailable()) {
-            this.workerGroup.shutdownGracefully().awaitUninterruptibly();
-            this.bossGroup.shutdownGracefully().awaitUninterruptibly();
+            this.workerGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
+            this.bossGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
         }
     }
 
-    protected interface ChannelPipelineInitializer {
-        void initializeChannel(SocketChannel socketChannel, Promise<PCEPSessionImpl> promise);
-    }
-
     @Override
-    public PCEPSessionNegotiatorFactory getPCEPSessionNegotiatorFactory() {
+    public final PCEPSessionNegotiatorFactory<PCEPSessionImpl> getPCEPSessionNegotiatorFactory() {
         return this.snf;
     }
+
+    protected interface ChannelPipelineInitializer {
+        void initializeChannel(SocketChannel socketChannel, Promise<PCEPSessionImpl> promise);
+    }
 }