Bug 3230 - Attempt to use Epoll native transport if available
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / UdpHandler.java
1 /*
2  * Copyright (c) 2014 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowjava.protocol.impl.core;
10
11 import io.netty.bootstrap.Bootstrap;
12 import io.netty.channel.ChannelFuture;
13 import io.netty.channel.ChannelOption;
14 import io.netty.channel.EventLoopGroup;
15 import io.netty.channel.epoll.EpollDatagramChannel;
16 import io.netty.channel.epoll.EpollEventLoopGroup;
17 import io.netty.channel.nio.NioEventLoopGroup;
18 import io.netty.channel.socket.DatagramChannel;
19 import io.netty.channel.socket.nio.NioDatagramChannel;
20 import io.netty.util.concurrent.GenericFutureListener;
21
22 import java.net.InetAddress;
23 import java.net.InetSocketAddress;
24
25 import org.opendaylight.openflowjava.protocol.api.connection.ThreadConfiguration;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 import com.google.common.util.concurrent.ListenableFuture;
30 import com.google.common.util.concurrent.SettableFuture;
31
32 /**
33  * Class implementing server over UDP for handling incoming connections.
34  *
35  * @author michal.polkorab
36  */
37 public final class UdpHandler implements ServerFacade {
38
39     private static final Logger LOGGER = LoggerFactory
40             .getLogger(UdpHandler.class);
41     private int port;
42     private EventLoopGroup group;
43     private final InetAddress startupAddress;
44     private final SettableFuture<Boolean> isOnlineFuture;
45     private UdpChannelInitializer channelInitializer;
46     private ThreadConfiguration threadConfig;
47     private Class<? extends DatagramChannel> datagramChannelClass;
48
49     /**
50      * Constructor of UdpHandler that listens on selected port.
51      *
52      * @param port listening port of UdpHandler server
53      */
54     public UdpHandler(final int port) {
55         this(null, port);
56     }
57
58     /**
59      * Constructor of UdpHandler that listens on selected address and port.
60      * @param address listening address of UdpHandler server
61      * @param port listening port of UdpHandler server
62      */
63     public UdpHandler(final InetAddress address, final int port) {
64         this.port = port;
65         this.startupAddress = address;
66         isOnlineFuture = SettableFuture.create();
67     }
68
69     @Override
70     public void run() {
71         final ChannelFuture f;
72         try {
73             Bootstrap b = new Bootstrap();
74             b.group(group)
75              .channel(datagramChannelClass)
76              .option(ChannelOption.SO_BROADCAST, false)
77              .handler(channelInitializer);
78
79             if (startupAddress != null) {
80                 f = b.bind(startupAddress.getHostAddress(), port).sync();
81             } else {
82                 f = b.bind(port).sync();
83             }
84         } catch (InterruptedException e) {
85             LOGGER.error("Interrupted while binding port {}", port, e);
86             return;
87         }
88
89         try {
90             InetSocketAddress isa = (InetSocketAddress) f.channel().localAddress();
91             String address = isa.getHostString();
92
93             // Update port, as it may have been specified as 0
94             this.port = isa.getPort();
95
96             LOGGER.debug("Address from udpHandler: {}", address);
97             isOnlineFuture.set(true);
98             LOGGER.info("Switch listener started and ready to accept incoming udp connections on port: {}", port);
99             f.channel().closeFuture().sync();
100         } catch (InterruptedException e) {
101             LOGGER.error("Interrupted while waiting for port {} shutdown", port, e);
102         } finally {
103             shutdown();
104         }
105     }
106
107     @Override
108     public ListenableFuture<Boolean> shutdown() {
109         final SettableFuture<Boolean> result = SettableFuture.create();
110         group.shutdownGracefully().addListener(new GenericFutureListener<io.netty.util.concurrent.Future<Object>>() {
111
112             @Override
113             public void operationComplete(
114                     final io.netty.util.concurrent.Future<Object> downResult) throws Exception {
115                 result.set(downResult.isSuccess());
116                 if (downResult.cause() != null) {
117                     result.setException(downResult.cause());
118                 }
119             }
120
121         });
122         return result;
123     }
124
125     @Override
126     public ListenableFuture<Boolean> getIsOnlineFuture() {
127         return isOnlineFuture;
128     }
129
130     /**
131      * @return the port
132      */
133     public int getPort() {
134         return port;
135     }
136
137     /**
138      * @param channelInitializer
139      */
140     public void setChannelInitializer(UdpChannelInitializer channelInitializer) {
141         this.channelInitializer = channelInitializer;
142     }
143
144     @Override
145     public void setThreadConfig(ThreadConfiguration threadConfig) {
146         this.threadConfig = threadConfig;
147     }
148
149     /**
150      * Initiate event loop groups
151      * @param threadConfiguration number of threads to be created, if not specified in threadConfig
152      */
153     public void initiateEventLoopGroups(ThreadConfiguration threadConfiguration, boolean isEpollEnabled) {
154
155         if(isEpollEnabled) {
156             initiateEpollEventLoopGroups(threadConfiguration);
157         } else {
158             initiateNioEventLoopGroups(threadConfiguration);
159         }
160     }
161
162     /**
163      * Initiate Nio event loop groups
164      * @param threadConfiguration number of threads to be created, if not specified in threadConfig
165      */
166     public void initiateNioEventLoopGroups(ThreadConfiguration threadConfiguration) {
167         datagramChannelClass = NioDatagramChannel.class;
168         if (threadConfiguration != null) {
169             group = new NioEventLoopGroup(threadConfiguration.getWorkerThreadCount());
170         } else {
171             group = new NioEventLoopGroup();
172         }
173     }
174
175     /**
176      * Initiate Epoll event loop groups with Nio as fall back
177      * @param threadConfiguration
178      */
179     protected void initiateEpollEventLoopGroups(ThreadConfiguration threadConfiguration) {
180         try {
181             datagramChannelClass = EpollDatagramChannel.class;
182             if (threadConfiguration != null) {
183                 group = new EpollEventLoopGroup(threadConfiguration.getWorkerThreadCount());
184             } else {
185                 group = new EpollEventLoopGroup();
186             }
187             return;
188         } catch (Throwable ex) {
189             LOGGER.debug("Epoll initiation failed");
190         }
191
192         //Fallback mechanism
193         initiateNioEventLoopGroups(threadConfiguration);
194     }
195 }