BUG-4692: Migrate TCP-MD5 support in pcc-mock package to netty's native-epoll
[bgpcep.git] / pcep / pcc-mock / src / main / java / org / opendaylight / protocol / pcep / pcc / mock / protocol / PCCDispatcherImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.protocol.pcep.pcc.mock.protocol;
10
11 import com.google.common.base.Optional;
12 import io.netty.bootstrap.Bootstrap;
13 import io.netty.channel.ChannelHandlerContext;
14 import io.netty.channel.ChannelInboundHandlerAdapter;
15 import io.netty.channel.ChannelInitializer;
16 import io.netty.channel.ChannelOption;
17 import io.netty.channel.EventLoopGroup;
18 import io.netty.channel.epoll.Epoll;
19 import io.netty.channel.epoll.EpollChannelOption;
20 import io.netty.channel.epoll.EpollEventLoopGroup;
21 import io.netty.channel.epoll.EpollSocketChannel;
22 import io.netty.channel.nio.NioEventLoopGroup;
23 import io.netty.channel.socket.SocketChannel;
24 import io.netty.channel.socket.nio.NioSocketChannel;
25 import io.netty.util.concurrent.Future;
26 import java.math.BigInteger;
27 import java.net.InetSocketAddress;
28 import java.util.concurrent.ExecutionException;
29 import javax.annotation.Nonnull;
30 import javax.annotation.Nullable;
31 import org.opendaylight.protocol.concepts.KeyMapping;
32 import org.opendaylight.protocol.pcep.PCEPSession;
33 import org.opendaylight.protocol.pcep.PCEPSessionListenerFactory;
34 import org.opendaylight.protocol.pcep.PCEPSessionNegotiatorFactory;
35 import org.opendaylight.protocol.pcep.impl.PCEPHandlerFactory;
36 import org.opendaylight.protocol.pcep.pcc.mock.api.PCCDispatcher;
37 import org.opendaylight.protocol.pcep.spi.MessageRegistry;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 public final class PCCDispatcherImpl implements PCCDispatcher, AutoCloseable {
42
43     private static final Logger LOG = LoggerFactory.getLogger(PCCDispatcherImpl.class);
44
45     private static final int CONNECT_TIMEOUT = 2000;
46
47     private final PCEPHandlerFactory factory;
48     private final EventLoopGroup workerGroup;
49
50     public PCCDispatcherImpl(@Nonnull final MessageRegistry registry) {
51         if(Epoll.isAvailable()){
52             this.workerGroup = new EpollEventLoopGroup();
53         } else {
54             this.workerGroup = new NioEventLoopGroup();
55         }
56         this.factory = new PCEPHandlerFactory(registry);
57     }
58
59     @Override
60     public Future<PCEPSession> createClient(@Nonnull final InetSocketAddress remoteAddress, @Nonnull final long reconnectTime,
61                                             @Nonnull final PCEPSessionListenerFactory listenerFactory, @Nonnull final PCEPSessionNegotiatorFactory negotiatorFactory,
62                                             @Nonnull final KeyMapping keys, @Nullable final InetSocketAddress localAddress) {
63         return createClient(remoteAddress, reconnectTime, listenerFactory, negotiatorFactory, keys, localAddress, BigInteger.ONE);
64     }
65
66     @Override
67     public Future<PCEPSession> createClient(@Nonnull final InetSocketAddress remoteAddress, @Nonnull final long reconnectTime,
68                                             @Nonnull final PCEPSessionListenerFactory listenerFactory, @Nonnull final PCEPSessionNegotiatorFactory negotiatorFactory,
69                                             @Nonnull final KeyMapping keys, @Nullable final InetSocketAddress localAddress, @Nonnull final BigInteger dbVersion) {
70         final Bootstrap b = new Bootstrap();
71         b.group(this.workerGroup);
72         b.localAddress(localAddress);
73         final Optional<KeyMapping> optionalKey = Optional.fromNullable(keys);
74         setChannelFactory(b, optionalKey);
75         b.option(ChannelOption.SO_KEEPALIVE, true);
76         b.option(ChannelOption.MAX_MESSAGES_PER_READ, 1);
77         final long retryTimer = reconnectTime == -1 ? 0 : reconnectTime;
78         final PCCReconnectPromise promise = new PCCReconnectPromise(remoteAddress, (int) retryTimer, CONNECT_TIMEOUT, b);
79         final ChannelInitializer<SocketChannel> channelInitializer = new ChannelInitializer<SocketChannel>() {
80             @Override
81             protected void initChannel(final SocketChannel ch) throws Exception {
82                 ch.pipeline().addLast(PCCDispatcherImpl.this.factory.getDecoders());
83                 ch.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator(listenerFactory, ch, promise, new PCCPeerProposal(dbVersion)));
84                 ch.pipeline().addLast(PCCDispatcherImpl.this.factory.getEncoders());
85                 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
86                     @Override
87                     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
88                         if (promise.isCancelled()) {
89                             return;
90                         }
91
92                         if (!promise.isInitialConnectFinished()) {
93                             LOG.debug("Connection to {} was dropped during negotiation, reattempting", remoteAddress);
94                             return;
95                         }
96                         LOG.debug("Reconnecting after connection to {} was dropped", remoteAddress);
97                         PCCDispatcherImpl.this.createClient(remoteAddress, reconnectTime, listenerFactory, negotiatorFactory,
98                             keys, localAddress, dbVersion);
99                     }
100                 });
101             }
102         };
103         b.handler(channelInitializer);
104         promise.connect();
105         return promise;
106     }
107
108     private void setChannelFactory(final Bootstrap bootstrap, final Optional<KeyMapping> keys) {
109         if(Epoll.isAvailable()) {
110             bootstrap.channel(EpollSocketChannel.class);
111         } else {
112             bootstrap.channel(NioSocketChannel.class);
113         }
114         if (keys.isPresent()) {
115             if (Epoll.isAvailable()) {
116                 bootstrap.option(EpollChannelOption.TCP_MD5SIG, keys.get());
117             } else {
118                 throw new UnsupportedOperationException(Epoll.unavailabilityCause().getCause());
119             }
120         }
121     }
122
123     @Override
124     public void close() {
125         try {
126             this.workerGroup.shutdownGracefully().get();
127         } catch (final InterruptedException | ExecutionException e) {
128             LOG.warn("Failed to properly close dispatcher.", e);
129         }
130     }
131 }