Promote MessageRegistry to pcep-api
[bgpcep.git] / pcep / impl / src / main / java / org / opendaylight / protocol / pcep / impl / PCEPDispatcherImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.pcep.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import io.netty.bootstrap.ServerBootstrap;
13 import io.netty.buffer.PooledByteBufAllocator;
14 import io.netty.channel.ChannelFuture;
15 import io.netty.channel.ChannelInitializer;
16 import io.netty.channel.ChannelOption;
17 import io.netty.channel.EventLoopGroup;
18 import io.netty.channel.FixedRecvByteBufAllocator;
19 import io.netty.channel.epoll.Epoll;
20 import io.netty.channel.epoll.EpollChannelOption;
21 import io.netty.channel.epoll.EpollEventLoopGroup;
22 import io.netty.channel.epoll.EpollMode;
23 import io.netty.channel.epoll.EpollServerSocketChannel;
24 import io.netty.channel.socket.SocketChannel;
25 import io.netty.channel.socket.nio.NioServerSocketChannel;
26 import io.netty.util.concurrent.DefaultPromise;
27 import io.netty.util.concurrent.EventExecutor;
28 import io.netty.util.concurrent.GlobalEventExecutor;
29 import io.netty.util.concurrent.Promise;
30 import java.io.Closeable;
31 import java.net.InetSocketAddress;
32 import java.util.concurrent.TimeUnit;
33 import org.checkerframework.checker.lock.qual.GuardedBy;
34 import org.eclipse.jdt.annotation.NonNull;
35 import org.opendaylight.protocol.concepts.KeyMapping;
36 import org.opendaylight.protocol.pcep.MessageRegistry;
37 import org.opendaylight.protocol.pcep.PCEPDispatcher;
38 import org.opendaylight.protocol.pcep.PCEPDispatcherDependencies;
39 import org.opendaylight.protocol.pcep.PCEPSession;
40 import org.opendaylight.protocol.pcep.PCEPSessionNegotiatorFactory;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 /**
45  * Implementation of PCEPDispatcher.
46  */
47 public class PCEPDispatcherImpl implements PCEPDispatcher, Closeable {
48     private static final Logger LOG = LoggerFactory.getLogger(PCEPDispatcherImpl.class);
49     private static final Integer SOCKET_BACKLOG_SIZE = 128;
50     private static final long TIMEOUT = 10;
51     private final PCEPSessionNegotiatorFactory snf;
52     private final PCEPHandlerFactory hf;
53     private final EventLoopGroup bossGroup;
54     private final EventLoopGroup workerGroup;
55     private final EventExecutor executor;
56     @GuardedBy("this")
57     private KeyMapping keys;
58
59     /**
60      * Creates an instance of PCEPDispatcherImpl, gets the default selector and opens it.
61      *
62      * @param registry          a message registry
63      * @param negotiatorFactory a negotiation factory
64      * @param bossGroup         accepts an incoming connection
65      * @param workerGroup       handles the traffic of accepted connection
66      */
67     public PCEPDispatcherImpl(final @NonNull MessageRegistry registry,
68             final @NonNull PCEPSessionNegotiatorFactory negotiatorFactory,
69             final @NonNull EventLoopGroup bossGroup, final @NonNull EventLoopGroup workerGroup) {
70         snf = requireNonNull(negotiatorFactory);
71         hf = new PCEPHandlerFactory(registry);
72         if (Epoll.isAvailable()) {
73             this.bossGroup = new EpollEventLoopGroup();
74             this.workerGroup = new EpollEventLoopGroup();
75         } else {
76             this.bossGroup = requireNonNull(bossGroup);
77             this.workerGroup = requireNonNull(workerGroup);
78         }
79         executor = requireNonNull(GlobalEventExecutor.INSTANCE);
80     }
81
82     @Override
83     public final synchronized ChannelFuture createServer(final PCEPDispatcherDependencies dispatcherDependencies) {
84         keys = dispatcherDependencies.getKeys();
85
86         final ChannelPipelineInitializer initializer = (ch, promise) -> {
87             ch.pipeline().addLast(hf.getDecoders());
88             ch.pipeline().addLast("negotiator", snf
89                     .getSessionNegotiator(dispatcherDependencies, ch, promise));
90             ch.pipeline().addLast(hf.getEncoders());
91         };
92
93         final ServerBootstrap b = createServerBootstrap(initializer);
94         final InetSocketAddress address = dispatcherDependencies.getAddress();
95         final ChannelFuture f = b.bind(address);
96         LOG.debug("Initiated server {} at {}.", f, address);
97
98         // FIXME: err, why are we resetting this?
99         keys = KeyMapping.of();
100         return f;
101     }
102
103     synchronized ServerBootstrap createServerBootstrap(final ChannelPipelineInitializer initializer) {
104         final ServerBootstrap b = new ServerBootstrap();
105         b.childHandler(new ChannelInitializer<SocketChannel>() {
106             @Override
107             protected void initChannel(final SocketChannel ch) {
108                 initializer.initializeChannel(ch, new DefaultPromise<>(executor));
109             }
110         });
111         b.option(ChannelOption.SO_BACKLOG, SOCKET_BACKLOG_SIZE);
112
113         b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
114
115         if (Epoll.isAvailable()) {
116             b.channel(EpollServerSocketChannel.class);
117             b.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
118         } else {
119             b.channel(NioServerSocketChannel.class);
120         }
121         if (!keys.isEmpty()) {
122             if (Epoll.isAvailable()) {
123                 b.option(EpollChannelOption.TCP_MD5SIG, keys.asMap());
124             } else {
125                 throw new UnsupportedOperationException("Setting TCP-MD5 signatures is not supported",
126                         Epoll.unavailabilityCause().getCause());
127             }
128         }
129
130         // Make sure we are doing round-robin processing
131         b.childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(1));
132
133         if (b.config().group() == null) {
134             b.group(bossGroup, workerGroup);
135         }
136
137         return b;
138     }
139
140     @Override
141     public final void close() {
142         if (Epoll.isAvailable()) {
143             workerGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
144             bossGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
145         }
146     }
147
148     @Override
149     public final PCEPSessionNegotiatorFactory getPCEPSessionNegotiatorFactory() {
150         return snf;
151     }
152
153     protected interface ChannelPipelineInitializer {
154         void initializeChannel(SocketChannel socketChannel, Promise<PCEPSession> promise);
155     }
156 }