Bug 8746: Multi-threading improvements
[lispflowmapping.git] / mappingservice / southbound / src / main / java / org / opendaylight / lispflowmapping / southbound / LispSouthboundPlugin.java
1 /*
2  * Copyright (c) 2014 Contextream, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.lispflowmapping.southbound;
10
11 import static io.netty.buffer.Unpooled.wrappedBuffer;
12
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import io.netty.bootstrap.Bootstrap;
17 import io.netty.buffer.ByteBuf;
18 import io.netty.buffer.ByteBufUtil;
19 import io.netty.buffer.PooledByteBufAllocator;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelFutureListener;
23 import io.netty.channel.ChannelOption;
24 import io.netty.channel.EventLoopGroup;
25 import io.netty.channel.epoll.Epoll;
26 import io.netty.channel.epoll.EpollChannelOption;
27 import io.netty.channel.epoll.EpollDatagramChannel;
28 import io.netty.channel.epoll.EpollEventLoopGroup;
29 import io.netty.channel.nio.NioEventLoopGroup;
30 import io.netty.channel.socket.DatagramPacket;
31 import io.netty.channel.socket.nio.NioDatagramChannel;
32 import io.netty.util.concurrent.DefaultThreadFactory;
33 import java.net.InetAddress;
34 import java.net.InetSocketAddress;
35 import java.net.UnknownHostException;
36 import java.nio.ByteBuffer;
37 import java.util.List;
38 import java.util.concurrent.ThreadFactory;
39 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
40 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
41 import org.opendaylight.lispflowmapping.dsbackend.DataStoreBackEnd;
42 import org.opendaylight.lispflowmapping.inmemorydb.HashMapDb;
43 import org.opendaylight.lispflowmapping.lisp.type.LispMessage;
44 import org.opendaylight.lispflowmapping.lisp.util.LispAddressStringifier;
45 import org.opendaylight.lispflowmapping.mapcache.AuthKeyDb;
46 import org.opendaylight.lispflowmapping.southbound.lisp.AuthenticationKeyDataListener;
47 import org.opendaylight.lispflowmapping.southbound.lisp.LispSouthboundHandler;
48 import org.opendaylight.lispflowmapping.southbound.lisp.LispXtrSouthboundHandler;
49 import org.opendaylight.lispflowmapping.southbound.lisp.cache.MapRegisterCache;
50 import org.opendaylight.lispflowmapping.type.sbplugin.IConfigLispSouthboundPlugin;
51 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
52 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
53 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.inet.binary.types.rev160303.IpAddressBinary;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.MessageType;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.eid.container.Eid;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.mapping.authkey.container.MappingAuthkey;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.transport.address.TransportAddress;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.mappingservice.rev150906.db.instance.AuthenticationKey;
60 import org.opendaylight.yangtools.yang.binding.Notification;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63
64 public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCloseable, ClusterSingletonService {
65     protected static final Logger LOG = LoggerFactory.getLogger(LispSouthboundPlugin.class);
66     public static final String LISPFLOWMAPPING_ENTITY_NAME = "lispflowmapping";
67     public static final ServiceGroupIdentifier SERVICE_GROUP_IDENTIFIER = ServiceGroupIdentifier.create(
68             LISPFLOWMAPPING_ENTITY_NAME);
69
70     private volatile boolean isMaster = false;
71     private volatile String bindingAddress;
72     private AuthKeyDb akdb;
73     private MapRegisterCache mapRegisterCache = new MapRegisterCache();
74     private boolean mapRegisterCacheEnabled;
75     private long mapRegisterCacheTimeout;
76
77     private static Object startLock = new Object();
78     private final ClusterSingletonServiceProvider clusterSingletonService;
79     private LispSouthboundHandler lispSouthboundHandler;
80     private LispXtrSouthboundHandler lispXtrSouthboundHandler;
81     private NotificationPublishService notificationPublishService;
82     private int numChannels = 1;
83     private Channel[] channel;
84     private Channel xtrChannel;
85     private Class channelType;
86     private volatile int xtrPort = LispMessage.XTR_PORT_NUM;
87     private volatile boolean listenOnXtrPort = false;
88     private ConcurrentLispSouthboundStats statistics = new ConcurrentLispSouthboundStats();
89     private Bootstrap bootstrap = new Bootstrap();
90     private Bootstrap xtrBootstrap = new Bootstrap();
91     private ThreadFactory threadFactory = new DefaultThreadFactory("lisp-sb");
92     private EventLoopGroup eventLoopGroup;
93     private DataBroker dataBroker;
94     private AuthenticationKeyDataListener authenticationKeyDataListener;
95     private DataStoreBackEnd dsbe;
96
97     public LispSouthboundPlugin(final DataBroker dataBroker,
98             final NotificationPublishService notificationPublishService,
99             final ClusterSingletonServiceProvider clusterSingletonService) {
100         this.dataBroker = dataBroker;
101         this.notificationPublishService = notificationPublishService;
102         this.clusterSingletonService = clusterSingletonService;
103         if (Epoll.isAvailable()) {
104             // When lispflowmapping is under heavy load, there are usually two threads nearing 100% CPU core
105             // utilization. In order to have some headroom, we reserve 3 cores for "other" tasks, and allow the
106             // rest to be used for southbound packet processing, which is the most CPU intensive work done in lfm
107             numChannels = Math.max(1, Runtime.getRuntime().availableProcessors() - 3);
108         }
109         channel = new Channel[numChannels];
110     }
111
112     public void init() {
113         LOG.info("LISP (RFC6830) Southbound Plugin is initializing...");
114         synchronized (startLock) {
115             this.akdb = new AuthKeyDb(new HashMapDb());
116             this.authenticationKeyDataListener = new AuthenticationKeyDataListener(dataBroker, akdb);
117             this.dsbe = new DataStoreBackEnd(dataBroker);
118             restoreDaoFromDatastore();
119
120             LispSouthboundHandler lispSouthboundHandler = new LispSouthboundHandler(this);
121             this.lispSouthboundHandler = lispSouthboundHandler;
122
123             LispXtrSouthboundHandler lispXtrSouthboundHandler = new LispXtrSouthboundHandler(this);
124             this.lispXtrSouthboundHandler = lispXtrSouthboundHandler;
125
126             if (Epoll.isAvailable()) {
127                 eventLoopGroup = new EpollEventLoopGroup(numChannels, threadFactory);
128                 channelType = EpollDatagramChannel.class;
129                 bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
130                 bootstrap.option(EpollChannelOption.SO_REUSEPORT, true);
131                 LOG.debug("Using Netty Epoll for UDP sockets");
132             } else {
133                 eventLoopGroup = new NioEventLoopGroup(0, threadFactory);
134                 channelType = NioDatagramChannel.class;
135                 LOG.debug("Using Netty I/O (non-Epoll) for UDP sockets");
136             }
137
138             bootstrap.group(eventLoopGroup);
139             bootstrap.channel(channelType);
140             bootstrap.handler(lispSouthboundHandler);
141
142             xtrBootstrap.group(eventLoopGroup);
143             xtrBootstrap.channel(channelType);
144             xtrBootstrap.handler(lispXtrSouthboundHandler);
145
146             start();
147             startXtr();
148
149             clusterSingletonService.registerClusterSingletonService(this);
150             LOG.info("LISP (RFC6830) Southbound Plugin is up!");
151         }
152     }
153
154     @SuppressWarnings("checkstyle:IllegalCatch")
155     private void start() {
156         try {
157             for (int i = 0; i < numChannels; ++i) {
158                 channel[i] = bootstrap.bind(bindingAddress, LispMessage.PORT_NUM).sync().channel();
159             }
160             LOG.debug("Binding LISP UDP listening socket to {}:{}", bindingAddress, LispMessage.PORT_NUM);
161         } catch (Exception e) {
162             LOG.error("Failed to open main socket ", e);
163         }
164     }
165
166     @SuppressWarnings("checkstyle:IllegalCatch")
167     private void startXtr() {
168         if (listenOnXtrPort) {
169             try {
170                 xtrChannel = xtrBootstrap.bind(bindingAddress, xtrPort).sync().channel();
171                 LOG.debug("Binding LISP xTR UDP listening socket to {}:{}", bindingAddress, xtrPort);
172             } catch (Exception e) {
173                 LOG.error("Failed to open xTR socket ", e);
174             }
175         }
176     }
177
178     @SuppressWarnings("checkstyle:IllegalCatch")
179     private void stop() {
180         try {
181             for (int i = 0; i < numChannels; ++i) {
182                 channel[i].close().sync();
183                 channel[i] = null;
184             }
185         } catch (Exception e) {
186             LOG.error("Failed to close main socket ", e);
187         }
188     }
189
190     @SuppressWarnings("checkstyle:IllegalCatch")
191     private void stopXtr() {
192         if (listenOnXtrPort) {
193             try {
194                 xtrChannel.close().sync();
195                 xtrChannel = null;
196             } catch (Exception e) {
197                 LOG.error("Failed to close xTR socket ", e);
198             }
199         }
200     }
201
202     private void restart() {
203         LOG.info("Reloading");
204         stop();
205         start();
206     }
207
208     private void restartXtr() {
209         LOG.info("Reloading xTR");
210         stopXtr();
211         startXtr();
212     }
213
214     private void unloadActions() {
215         lispSouthboundHandler = null;
216         lispXtrSouthboundHandler = null;
217
218         stop();
219         stopXtr();
220
221         LOG.info("LISP (RFC6830) Southbound Plugin is down!");
222     }
223
224     /**
225      * Restore all keys from MDSAL datastore.
226      */
227     public void restoreDaoFromDatastore() {
228         final List<AuthenticationKey> authKeys = dsbe.getAllAuthenticationKeys();
229         LOG.info("Restoring {} keys from datastore into southbound DAO", authKeys.size());
230
231         for (AuthenticationKey authKey : authKeys) {
232             final Eid key = authKey.getEid();
233             final MappingAuthkey mappingAuthkey = authKey.getMappingAuthkey();
234             LOG.debug("Adding authentication key '{}' with key-ID {} for {}", mappingAuthkey.getKeyString(),
235                     mappingAuthkey.getKeyType(),
236                     LispAddressStringifier.getString(key));
237             akdb.addAuthenticationKey(key, mappingAuthkey);
238         }
239     }
240
241     public void handleSerializedLispBuffer(TransportAddress address, ByteBuffer outBuffer,
242                                            final MessageType packetType) {
243         InetAddress ip = getInetAddress(address);
244         handleSerializedLispBuffer(ip, outBuffer, packetType, address.getPort().getValue(), null);
245     }
246
247     public void handleSerializedLispBuffer(InetAddress address, ByteBuffer outBuffer,
248             final MessageType packetType, final int portNumber, Channel senderChannel) {
249         if (senderChannel == null) {
250             senderChannel = this.channel[0];
251         }
252         InetSocketAddress recipient = new InetSocketAddress(address, portNumber);
253         outBuffer.position(0);
254         ByteBuf data = wrappedBuffer(outBuffer);
255         DatagramPacket packet = new DatagramPacket(data, recipient);
256         LOG.debug("Sending {} on port {} to address: {}", packetType, portNumber, address);
257         if (LOG.isTraceEnabled()) {
258             LOG.trace("Buffer:\n{}", ByteBufUtil.prettyHexDump(data));
259         }
260         senderChannel.write(packet).addListener(new ChannelFutureListener() {
261             @Override
262             public void operationComplete(ChannelFuture future) {
263                 if (future.isSuccess()) {
264                     LOG.trace("Success");
265                     statistics.incrementTx(packetType.getIntValue());
266                 } else {
267                     LOG.warn("Failed to send packet");
268                     statistics.incrementTxErrors();
269                 }
270             }
271         });
272         senderChannel.flush();
273     }
274
275     private InetAddress getInetAddress(TransportAddress address) {
276         Preconditions.checkNotNull(address, "TransportAddress must not be null");
277         IpAddressBinary ip = address.getIpAddress();
278         try {
279             if (ip.getIpv4AddressBinary() != null) {
280                 return InetAddress.getByAddress(ip.getIpv4AddressBinary().getValue());
281             } else if (ip.getIpv6AddressBinary() != null) {
282                 return InetAddress.getByAddress(ip.getIpv6AddressBinary().getValue());
283             }
284         } catch (UnknownHostException e) {
285             LOG.debug("Could not convert TransportAddress {} to InetAddress", address, e);
286         }
287         return null;
288     }
289
290     @Override
291     @SuppressWarnings("checkstyle:IllegalCatch")
292     public void setLispAddress(String address) {
293         synchronized (startLock) {
294             if (bindingAddress.equals(address)) {
295                 LOG.debug("Configured LISP binding address didn't change.");
296             } else {
297                 LOG.debug("Setting LISP binding address to {}", address);
298                 bindingAddress = address;
299                 if (channel != null) {
300                     try {
301                         restart();
302                         restartXtr();
303                     } catch (Exception e) {
304                         LOG.error("Failed to set LISP binding address: ", e);
305                     }
306                 }
307             }
308         }
309     }
310
311     @Override
312     public void shouldListenOnXtrPort(boolean shouldListenOnXtrPort) {
313         listenOnXtrPort = shouldListenOnXtrPort;
314         if (listenOnXtrPort) {
315             restartXtr();
316         } else {
317             LOG.info("Shutting down xTR");
318             stopXtr();
319         }
320     }
321
322     @Override
323     public void setXtrPort(int port) {
324         this.xtrPort = port;
325         if (listenOnXtrPort) {
326             restartXtr();
327         }
328     }
329
330     public void setMapRegisterCacheEnabled(final boolean mapRegisterCacheEnabled) {
331         this.mapRegisterCacheEnabled = mapRegisterCacheEnabled;
332         if (mapRegisterCacheEnabled) {
333             LOG.info("Enabling Map-Register cache");
334         } else {
335             LOG.info("Disabling Map-Register cache");
336         }
337     }
338
339     public void setMapRegisterCacheTimeout(long mapRegisterCacheTimeout) {
340         this.mapRegisterCacheTimeout = mapRegisterCacheTimeout;
341     }
342
343     public void setBindingAddress(String bindingAddress) {
344         this.bindingAddress = bindingAddress;
345     }
346
347     @Override
348     public void close() throws Exception {
349         eventLoopGroup.shutdownGracefully();
350         lispSouthboundHandler.close();
351         unloadActions();
352         clusterSingletonService.close();
353         dsbe.closeTransactionChain();
354     }
355
356     @Override
357     public void instantiateServiceInstance() {
358         this.isMaster = true;
359     }
360
361     @Override
362     public ListenableFuture<Void> closeServiceInstance() {
363         this.isMaster = false;
364         return Futures.<Void>immediateFuture(null);
365     }
366
367     @Override
368     public ServiceGroupIdentifier getIdentifier() {
369         return SERVICE_GROUP_IDENTIFIER;
370     }
371
372     public synchronized void sendNotificationIfPossible(final Notification notification) throws InterruptedException {
373         if (isMaster && notificationPublishService != null) {
374             notificationPublishService.putNotification(notification);
375             LOG.trace("Publishing notification: {}", notification);
376         } else if (notificationPublishService == null) {
377             LOG.warn("Can't publish notification because no reference to publication service exists!");
378         }
379     }
380
381     public AuthKeyDb getAkdb() {
382         return akdb;
383     }
384
385     public ConcurrentLispSouthboundStats getStats() {
386         return statistics;
387     }
388
389     public DataBroker getDataBroker() {
390         return dataBroker;
391     }
392
393     public AuthenticationKeyDataListener getAuthenticationKeyDataListener() {
394         return authenticationKeyDataListener;
395     }
396
397     public MapRegisterCache getMapRegisterCache() {
398         return mapRegisterCache;
399     }
400
401     public boolean isMapRegisterCacheEnabled() {
402         return mapRegisterCacheEnabled;
403     }
404
405     public long getMapRegisterCacheTimeout() {
406         return mapRegisterCacheTimeout;
407     }
408 }