Promote MessageRegistry to pcep-api
[bgpcep.git] / pcep / pcc-mock / src / main / java / org / opendaylight / protocol / pcep / pcc / mock / protocol / PCCDispatcherImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.protocol.pcep.pcc.mock.protocol;
10
11 import io.netty.bootstrap.Bootstrap;
12 import io.netty.channel.ChannelHandlerContext;
13 import io.netty.channel.ChannelInboundHandlerAdapter;
14 import io.netty.channel.ChannelInitializer;
15 import io.netty.channel.ChannelOption;
16 import io.netty.channel.EventLoopGroup;
17 import io.netty.channel.epoll.Epoll;
18 import io.netty.channel.epoll.EpollChannelOption;
19 import io.netty.channel.epoll.EpollEventLoopGroup;
20 import io.netty.channel.epoll.EpollMode;
21 import io.netty.channel.epoll.EpollSocketChannel;
22 import io.netty.channel.nio.NioEventLoopGroup;
23 import io.netty.channel.socket.SocketChannel;
24 import io.netty.channel.socket.nio.NioSocketChannel;
25 import io.netty.util.concurrent.Future;
26 import java.net.InetSocketAddress;
27 import java.util.concurrent.ExecutionException;
28 import org.eclipse.jdt.annotation.NonNull;
29 import org.opendaylight.protocol.concepts.KeyMapping;
30 import org.opendaylight.protocol.pcep.MessageRegistry;
31 import org.opendaylight.protocol.pcep.PCEPPeerProposal;
32 import org.opendaylight.protocol.pcep.PCEPSession;
33 import org.opendaylight.protocol.pcep.PCEPSessionListenerFactory;
34 import org.opendaylight.protocol.pcep.PCEPSessionNegotiatorFactory;
35 import org.opendaylight.protocol.pcep.PCEPSessionNegotiatorFactoryDependencies;
36 import org.opendaylight.protocol.pcep.impl.PCEPHandlerFactory;
37 import org.opendaylight.protocol.pcep.pcc.mock.api.PCCDispatcher;
38 import org.opendaylight.yangtools.yang.common.Uint64;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 public final class PCCDispatcherImpl implements PCCDispatcher, AutoCloseable {
43
44     private static final Logger LOG = LoggerFactory.getLogger(PCCDispatcherImpl.class);
45
46     private static final int CONNECT_TIMEOUT = 2000;
47
48     private final PCEPHandlerFactory factory;
49     private final EventLoopGroup workerGroup;
50
51     public PCCDispatcherImpl(final @NonNull MessageRegistry registry) {
52         if (Epoll.isAvailable()) {
53             workerGroup = new EpollEventLoopGroup();
54         } else {
55             workerGroup = new NioEventLoopGroup();
56         }
57         factory = new PCEPHandlerFactory(registry);
58     }
59
60     @Override
61     public Future<PCEPSession> createClient(final InetSocketAddress remoteAddress, final long reconnectTime,
62             final PCEPSessionListenerFactory listenerFactory, final PCEPSessionNegotiatorFactory negotiatorFactory,
63             final KeyMapping keys, final InetSocketAddress localAddress) {
64         return createClient(remoteAddress, reconnectTime, listenerFactory, negotiatorFactory, keys, localAddress,
65             Uint64.ONE);
66     }
67
68     @Override
69     @SuppressWarnings("unchecked")
70     public Future<PCEPSession> createClient(final InetSocketAddress remoteAddress, final long reconnectTime,
71             final PCEPSessionListenerFactory listenerFactory, final PCEPSessionNegotiatorFactory negotiatorFactory,
72             final KeyMapping keys, final InetSocketAddress localAddress, final Uint64 dbVersion) {
73         final Bootstrap b = new Bootstrap();
74         b.group(workerGroup);
75         b.localAddress(localAddress);
76         setChannelFactory(b, keys);
77         b.option(ChannelOption.SO_KEEPALIVE, true);
78         b.option(ChannelOption.SO_REUSEADDR, true);
79         b.option(ChannelOption.RCVBUF_ALLOCATOR, new io.netty.channel.FixedRecvByteBufAllocator(1));
80         final long retryTimer = reconnectTime == -1 ? 0 : reconnectTime;
81         final PCCReconnectPromise promise =
82                 new PCCReconnectPromise(remoteAddress, (int) retryTimer, CONNECT_TIMEOUT, b);
83         final ChannelInitializer<SocketChannel> channelInitializer = new ChannelInitializer<>() {
84             @Override
85             protected void initChannel(final SocketChannel ch) {
86                 ch.pipeline().addLast(factory.getDecoders());
87                 ch.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator(
88                         new PCEPSessionNegotiatorFactoryDependencies() {
89                             @Override
90                             public PCEPSessionListenerFactory getListenerFactory() {
91                                 return listenerFactory;
92                             }
93
94                             @Override
95                             public PCEPPeerProposal getPeerProposal() {
96                                 return new PCCPeerProposal(dbVersion);
97                             }
98                         }, ch, promise));
99                 ch.pipeline().addLast(factory.getEncoders());
100                 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
101                     @Override
102                     public void channelInactive(final ChannelHandlerContext ctx) {
103                         if (promise.isCancelled()) {
104                             return;
105                         }
106
107                         if (!promise.isInitialConnectFinished()) {
108                             LOG.debug("Connection to {} was dropped during negotiation, reattempting", remoteAddress);
109                             return;
110                         }
111                         LOG.debug("Reconnecting after connection to {} was dropped", remoteAddress);
112                         PCCDispatcherImpl.this.createClient(
113                                 remoteAddress,
114                                 reconnectTime,
115                                 listenerFactory,
116                                 negotiatorFactory,
117                                 keys,
118                                 localAddress,
119                                 dbVersion);
120                     }
121                 });
122             }
123         };
124         b.handler(channelInitializer);
125         promise.connect();
126         return promise;
127     }
128
129     private static void setChannelFactory(final Bootstrap bootstrap, final KeyMapping keys) {
130         if (Epoll.isAvailable()) {
131             bootstrap.channel(EpollSocketChannel.class);
132             bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
133         } else {
134             bootstrap.channel(NioSocketChannel.class);
135         }
136         if (!keys.isEmpty()) {
137             if (Epoll.isAvailable()) {
138                 bootstrap.option(EpollChannelOption.TCP_MD5SIG, keys.asMap());
139             } else {
140                 throw new UnsupportedOperationException(Epoll.unavailabilityCause().getCause());
141             }
142         }
143     }
144
145     @Override
146     public void close() {
147         try {
148             workerGroup.shutdownGracefully().get();
149         } catch (final InterruptedException | ExecutionException e) {
150             LOG.warn("Failed to properly close dispatcher.", e);
151         }
152     }
153 }