2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.protocol.pcep.pcc.mock.protocol;
11 import io.netty.bootstrap.Bootstrap;
12 import io.netty.channel.ChannelHandlerContext;
13 import io.netty.channel.ChannelInboundHandlerAdapter;
14 import io.netty.channel.ChannelInitializer;
15 import io.netty.channel.ChannelOption;
16 import io.netty.channel.EventLoopGroup;
17 import io.netty.channel.epoll.Epoll;
18 import io.netty.channel.epoll.EpollChannelOption;
19 import io.netty.channel.epoll.EpollEventLoopGroup;
20 import io.netty.channel.epoll.EpollMode;
21 import io.netty.channel.epoll.EpollSocketChannel;
22 import io.netty.channel.nio.NioEventLoopGroup;
23 import io.netty.channel.socket.SocketChannel;
24 import io.netty.channel.socket.nio.NioSocketChannel;
25 import io.netty.util.concurrent.Future;
26 import java.math.BigInteger;
27 import java.net.InetSocketAddress;
28 import java.util.concurrent.ExecutionException;
29 import javax.annotation.Nonnull;
30 import org.opendaylight.protocol.concepts.KeyMapping;
31 import org.opendaylight.protocol.pcep.PCEPPeerProposal;
32 import org.opendaylight.protocol.pcep.PCEPSession;
33 import org.opendaylight.protocol.pcep.PCEPSessionListenerFactory;
34 import org.opendaylight.protocol.pcep.PCEPSessionNegotiatorFactory;
35 import org.opendaylight.protocol.pcep.PCEPSessionNegotiatorFactoryDependencies;
36 import org.opendaylight.protocol.pcep.impl.PCEPHandlerFactory;
37 import org.opendaylight.protocol.pcep.pcc.mock.api.PCCDispatcher;
38 import org.opendaylight.protocol.pcep.spi.MessageRegistry;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
42 public final class PCCDispatcherImpl implements PCCDispatcher, AutoCloseable {
44 private static final Logger LOG = LoggerFactory.getLogger(PCCDispatcherImpl.class);
46 private static final int CONNECT_TIMEOUT = 2000;
48 private final PCEPHandlerFactory factory;
49 private final EventLoopGroup workerGroup;
51 public PCCDispatcherImpl(@Nonnull final MessageRegistry registry) {
52 if (Epoll.isAvailable()) {
53 this.workerGroup = new EpollEventLoopGroup();
55 this.workerGroup = new NioEventLoopGroup();
57 this.factory = new PCEPHandlerFactory(registry);
61 public Future<PCEPSession> createClient(final InetSocketAddress remoteAddress, final long reconnectTime,
62 final PCEPSessionListenerFactory listenerFactory,
63 final PCEPSessionNegotiatorFactory<? extends PCEPSession> negotiatorFactory, final KeyMapping keys,
64 final InetSocketAddress localAddress) {
65 return createClient(remoteAddress, reconnectTime, listenerFactory, negotiatorFactory, keys, localAddress,
70 @SuppressWarnings("unchecked")
71 public Future<PCEPSession> createClient(final InetSocketAddress remoteAddress, final long reconnectTime,
72 final PCEPSessionListenerFactory listenerFactory, final PCEPSessionNegotiatorFactory negotiatorFactory,
73 final KeyMapping keys, final InetSocketAddress localAddress, final BigInteger dbVersion) {
74 final Bootstrap b = new Bootstrap();
75 b.group(this.workerGroup);
76 b.localAddress(localAddress);
77 setChannelFactory(b, keys);
78 b.option(ChannelOption.SO_KEEPALIVE, true);
79 b.option(ChannelOption.SO_REUSEADDR, true);
80 b.option(ChannelOption.RCVBUF_ALLOCATOR, new io.netty.channel.FixedRecvByteBufAllocator(1));
81 final long retryTimer = reconnectTime == -1 ? 0 : reconnectTime;
82 final PCCReconnectPromise promise =
83 new PCCReconnectPromise(remoteAddress, (int) retryTimer, CONNECT_TIMEOUT, b);
84 final ChannelInitializer<SocketChannel> channelInitializer = new ChannelInitializer<SocketChannel>() {
86 protected void initChannel(final SocketChannel ch) {
87 ch.pipeline().addLast(PCCDispatcherImpl.this.factory.getDecoders());
88 ch.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator(
89 new PCEPSessionNegotiatorFactoryDependencies() {
91 public PCEPSessionListenerFactory getListenerFactory() {
92 return listenerFactory;
96 public PCEPPeerProposal getPeerProposal() {
97 return new PCCPeerProposal(dbVersion);
100 ch.pipeline().addLast(PCCDispatcherImpl.this.factory.getEncoders());
101 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
103 public void channelInactive(final ChannelHandlerContext ctx) {
104 if (promise.isCancelled()) {
108 if (!promise.isInitialConnectFinished()) {
109 LOG.debug("Connection to {} was dropped during negotiation, reattempting", remoteAddress);
112 LOG.debug("Reconnecting after connection to {} was dropped", remoteAddress);
113 PCCDispatcherImpl.this.createClient(
125 b.handler(channelInitializer);
130 private static void setChannelFactory(final Bootstrap bootstrap, final KeyMapping keys) {
131 if (Epoll.isAvailable()) {
132 bootstrap.channel(EpollSocketChannel.class);
133 bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
135 bootstrap.channel(NioSocketChannel.class);
137 if (!keys.isEmpty()) {
138 if (Epoll.isAvailable()) {
139 bootstrap.option(EpollChannelOption.TCP_MD5SIG, keys);
141 throw new UnsupportedOperationException(Epoll.unavailabilityCause().getCause());
147 public void close() {
149 this.workerGroup.shutdownGracefully().get();
150 } catch (final InterruptedException | ExecutionException e) {
151 LOG.warn("Failed to properly close dispatcher.", e);