Bug 8248: Don't access DSBE from different threads
[lispflowmapping.git] / mappingservice / southbound / src / main / java / org / opendaylight / lispflowmapping / southbound / LispSouthboundPlugin.java
1 /*
2  * Copyright (c) 2014 Contextream, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.lispflowmapping.southbound;
10
11 import static io.netty.buffer.Unpooled.wrappedBuffer;
12
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import io.netty.bootstrap.Bootstrap;
17 import io.netty.buffer.ByteBuf;
18 import io.netty.buffer.ByteBufUtil;
19 import io.netty.buffer.PooledByteBufAllocator;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelFutureListener;
23 import io.netty.channel.ChannelOption;
24 import io.netty.channel.EventLoopGroup;
25 import io.netty.channel.epoll.Epoll;
26 import io.netty.channel.epoll.EpollChannelOption;
27 import io.netty.channel.epoll.EpollDatagramChannel;
28 import io.netty.channel.epoll.EpollEventLoopGroup;
29 import io.netty.channel.nio.NioEventLoopGroup;
30 import io.netty.channel.socket.DatagramPacket;
31 import io.netty.channel.socket.nio.NioDatagramChannel;
32 import io.netty.util.concurrent.DefaultThreadFactory;
33 import java.net.InetAddress;
34 import java.net.InetSocketAddress;
35 import java.net.UnknownHostException;
36 import java.nio.ByteBuffer;
37 import java.util.List;
38 import java.util.concurrent.ThreadFactory;
39 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
40 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
41 import org.opendaylight.lispflowmapping.dsbackend.DataStoreBackEnd;
42 import org.opendaylight.lispflowmapping.inmemorydb.HashMapDb;
43 import org.opendaylight.lispflowmapping.lisp.type.LispMessage;
44 import org.opendaylight.lispflowmapping.lisp.util.LispAddressStringifier;
45 import org.opendaylight.lispflowmapping.mapcache.AuthKeyDb;
46 import org.opendaylight.lispflowmapping.southbound.lisp.AuthenticationKeyDataListener;
47 import org.opendaylight.lispflowmapping.southbound.lisp.LispSouthboundHandler;
48 import org.opendaylight.lispflowmapping.southbound.lisp.LispXtrSouthboundHandler;
49 import org.opendaylight.lispflowmapping.southbound.lisp.cache.MapRegisterCache;
50 import org.opendaylight.lispflowmapping.type.sbplugin.IConfigLispSouthboundPlugin;
51 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
52 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
53 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.inet.binary.types.rev160303.IpAddressBinary;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.MessageType;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.eid.container.Eid;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.mapping.authkey.container.MappingAuthkey;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.transport.address.TransportAddress;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.mappingservice.rev150906.db.instance.AuthenticationKey;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63 public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCloseable, ClusterSingletonService {
64     protected static final Logger LOG = LoggerFactory.getLogger(LispSouthboundPlugin.class);
65     public static final String LISPFLOWMAPPING_ENTITY_NAME = "lispflowmapping";
66     public static final ServiceGroupIdentifier SERVICE_GROUP_IDENTIFIER = ServiceGroupIdentifier.create(
67             LISPFLOWMAPPING_ENTITY_NAME);
68
69     private volatile String bindingAddress;
70     private AuthKeyDb akdb;
71     private MapRegisterCache mapRegisterCache = new MapRegisterCache();
72     private boolean mapRegisterCacheEnabled;
73     private long mapRegisterCacheTimeout;
74
75     private static Object startLock = new Object();
76     private final ClusterSingletonServiceProvider clusterSingletonService;
77     private LispSouthboundHandler lispSouthboundHandler;
78     private LispXtrSouthboundHandler lispXtrSouthboundHandler;
79     private NotificationPublishService notificationPublishService;
80     private int numChannels = 1;
81     private Channel[] channel;
82     private Channel xtrChannel;
83     private Class channelType;
84     private volatile int xtrPort = LispMessage.XTR_PORT_NUM;
85     private volatile boolean listenOnXtrPort = false;
86     private ConcurrentLispSouthboundStats statistics = new ConcurrentLispSouthboundStats();
87     private Bootstrap bootstrap = new Bootstrap();
88     private Bootstrap xtrBootstrap = new Bootstrap();
89     private ThreadFactory threadFactory = new DefaultThreadFactory("lisp-sb");
90     private EventLoopGroup eventLoopGroup;
91     private DataBroker dataBroker;
92     private AuthenticationKeyDataListener authenticationKeyDataListener;
93     private DataStoreBackEnd dsbe;
94
95     public LispSouthboundPlugin(final DataBroker dataBroker,
96             final NotificationPublishService notificationPublishService,
97             final ClusterSingletonServiceProvider clusterSingletonService) {
98         this.dataBroker = dataBroker;
99         this.notificationPublishService = notificationPublishService;
100         this.clusterSingletonService = clusterSingletonService;
101         if (Epoll.isAvailable()) {
102             // When lispflowmapping is under heavy load, there are usually two threads nearing 100% CPU core
103             // utilization. In order to have some headroom, we reserve 3 cores for "other" tasks, and allow the
104             // rest to be used for southbound packet processing, which is the most CPU intensive work done in lfm
105             numChannels = Math.max(1, Runtime.getRuntime().availableProcessors() - 3);
106         }
107         channel = new Channel[numChannels];
108     }
109
110     public void init() {
111         LOG.info("LISP (RFC6830) Southbound Plugin is initializing...");
112         synchronized (startLock) {
113             this.akdb = new AuthKeyDb(new HashMapDb());
114             this.authenticationKeyDataListener = new AuthenticationKeyDataListener(dataBroker, akdb);
115             this.dsbe = new DataStoreBackEnd(dataBroker);
116             restoreDaoFromDatastore();
117
118             LispSouthboundHandler lispSouthboundHandler = new LispSouthboundHandler(this);
119             lispSouthboundHandler.setDataBroker(dataBroker);
120             lispSouthboundHandler.setNotificationProvider(notificationPublishService);
121             lispSouthboundHandler.setAuthKeyDb(akdb);
122             lispSouthboundHandler.setMapRegisterCache(mapRegisterCache);
123             lispSouthboundHandler.setMapRegisterCacheTimeout(mapRegisterCacheTimeout);
124             lispSouthboundHandler.setAuthenticationKeyDataListener(authenticationKeyDataListener);
125             lispSouthboundHandler.setStats(statistics);
126             this.lispSouthboundHandler = lispSouthboundHandler;
127
128             LispXtrSouthboundHandler lispXtrSouthboundHandler = new LispXtrSouthboundHandler();
129             lispXtrSouthboundHandler.setNotificationProvider(notificationPublishService);
130             this.lispXtrSouthboundHandler = lispXtrSouthboundHandler;
131
132             if (Epoll.isAvailable()) {
133                 eventLoopGroup = new EpollEventLoopGroup(numChannels, threadFactory);
134                 channelType = EpollDatagramChannel.class;
135                 bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
136                 bootstrap.option(EpollChannelOption.SO_REUSEPORT, true);
137                 LOG.debug("Using Netty Epoll for UDP sockets");
138             } else {
139                 eventLoopGroup = new NioEventLoopGroup(0, threadFactory);
140                 channelType = NioDatagramChannel.class;
141                 LOG.debug("Using Netty I/O (non-Epoll) for UDP sockets");
142             }
143
144             bootstrap.group(eventLoopGroup);
145             bootstrap.channel(channelType);
146             bootstrap.handler(lispSouthboundHandler);
147
148             xtrBootstrap.group(eventLoopGroup);
149             xtrBootstrap.channel(channelType);
150             xtrBootstrap.handler(lispXtrSouthboundHandler);
151
152             start();
153             startXtr();
154
155             clusterSingletonService.registerClusterSingletonService(this);
156             LOG.info("LISP (RFC6830) Southbound Plugin is up!");
157         }
158     }
159
160     @SuppressWarnings("checkstyle:IllegalCatch")
161     private void start() {
162         try {
163             for (int i = 0; i < numChannels; ++i) {
164                 channel[i] = bootstrap.bind(bindingAddress, LispMessage.PORT_NUM).sync().channel();
165             }
166             LOG.debug("Binding LISP UDP listening socket to {}:{}", bindingAddress, LispMessage.PORT_NUM);
167         } catch (Exception e) {
168             LOG.error("Failed to open main socket ", e);
169         }
170     }
171
172     @SuppressWarnings("checkstyle:IllegalCatch")
173     private void startXtr() {
174         if (listenOnXtrPort) {
175             try {
176                 xtrChannel = xtrBootstrap.bind(bindingAddress, xtrPort).sync().channel();
177                 LOG.debug("Binding LISP xTR UDP listening socket to {}:{}", bindingAddress, xtrPort);
178             } catch (Exception e) {
179                 LOG.error("Failed to open xTR socket ", e);
180             }
181         }
182     }
183
184     @SuppressWarnings("checkstyle:IllegalCatch")
185     private void stop() {
186         try {
187             for (int i = 0; i < numChannels; ++i) {
188                 channel[i].close().sync();
189                 channel[i] = null;
190             }
191         } catch (Exception e) {
192             LOG.error("Failed to close main socket ", e);
193         }
194     }
195
196     @SuppressWarnings("checkstyle:IllegalCatch")
197     private void stopXtr() {
198         if (listenOnXtrPort) {
199             try {
200                 xtrChannel.close().sync();
201                 xtrChannel = null;
202             } catch (Exception e) {
203                 LOG.error("Failed to close xTR socket ", e);
204             }
205         }
206     }
207
208     private void restart() {
209         LOG.info("Reloading");
210         stop();
211         start();
212     }
213
214     private void restartXtr() {
215         LOG.info("Reloading xTR");
216         stopXtr();
217         startXtr();
218     }
219
220     private void unloadActions() {
221         lispSouthboundHandler = null;
222         lispXtrSouthboundHandler = null;
223
224         stop();
225         stopXtr();
226
227         LOG.info("LISP (RFC6830) Southbound Plugin is down!");
228     }
229
230     /**
231      * Restore all keys from MDSAL datastore.
232      */
233     public void restoreDaoFromDatastore() {
234         final List<AuthenticationKey> authKeys = dsbe.getAllAuthenticationKeys();
235         LOG.info("Restoring {} keys from datastore into southbound DAO", authKeys.size());
236
237         for (AuthenticationKey authKey : authKeys) {
238             final Eid key = authKey.getEid();
239             final MappingAuthkey mappingAuthkey = authKey.getMappingAuthkey();
240             LOG.debug("Adding authentication key '{}' with key-ID {} for {}", mappingAuthkey.getKeyString(),
241                     mappingAuthkey.getKeyType(),
242                     LispAddressStringifier.getString(key));
243             akdb.addAuthenticationKey(key, mappingAuthkey);
244         }
245     }
246
247     public void handleSerializedLispBuffer(TransportAddress address, ByteBuffer outBuffer,
248                                            final MessageType packetType) {
249         InetAddress ip = getInetAddress(address);
250         handleSerializedLispBuffer(ip, outBuffer, packetType, address.getPort().getValue(), null);
251     }
252
253     public void handleSerializedLispBuffer(InetAddress address, ByteBuffer outBuffer,
254             final MessageType packetType, final int portNumber, Channel senderChannel) {
255         if (senderChannel == null) {
256             senderChannel = this.channel[0];
257         }
258         InetSocketAddress recipient = new InetSocketAddress(address, portNumber);
259         outBuffer.position(0);
260         ByteBuf data = wrappedBuffer(outBuffer);
261         DatagramPacket packet = new DatagramPacket(data, recipient);
262         LOG.debug("Sending {} on port {} to address: {}", packetType, portNumber, address);
263         if (LOG.isTraceEnabled()) {
264             LOG.trace("Buffer:\n{}", ByteBufUtil.prettyHexDump(data));
265         }
266         senderChannel.write(packet).addListener(new ChannelFutureListener() {
267             @Override
268             public void operationComplete(ChannelFuture future) {
269                 if (future.isSuccess()) {
270                     LOG.trace("Success");
271                     statistics.incrementTx(packetType.getIntValue());
272                 } else {
273                     LOG.warn("Failed to send packet");
274                     statistics.incrementTxErrors();
275                 }
276             }
277         });
278         senderChannel.flush();
279     }
280
281     private InetAddress getInetAddress(TransportAddress address) {
282         Preconditions.checkNotNull(address, "TransportAddress must not be null");
283         IpAddressBinary ip = address.getIpAddress();
284         try {
285             if (ip.getIpv4AddressBinary() != null) {
286                 return InetAddress.getByAddress(ip.getIpv4AddressBinary().getValue());
287             } else if (ip.getIpv6AddressBinary() != null) {
288                 return InetAddress.getByAddress(ip.getIpv6AddressBinary().getValue());
289             }
290         } catch (UnknownHostException e) {
291             LOG.debug("Could not convert TransportAddress {} to InetAddress", address, e);
292         }
293         return null;
294     }
295
296     public ConcurrentLispSouthboundStats getStats() {
297         return statistics;
298     }
299
300     @Override
301     @SuppressWarnings("checkstyle:IllegalCatch")
302     public void setLispAddress(String address) {
303         synchronized (startLock) {
304             if (bindingAddress.equals(address)) {
305                 LOG.debug("Configured LISP binding address didn't change.");
306             } else {
307                 LOG.debug("Setting LISP binding address to {}", address);
308                 bindingAddress = address;
309                 if (channel != null) {
310                     try {
311                         restart();
312                         restartXtr();
313                     } catch (Exception e) {
314                         LOG.error("Failed to set LISP binding address: ", e);
315                     }
316                 }
317             }
318         }
319     }
320
321     @Override
322     public void shouldListenOnXtrPort(boolean shouldListenOnXtrPort) {
323         listenOnXtrPort = shouldListenOnXtrPort;
324         if (listenOnXtrPort) {
325             restartXtr();
326         } else {
327             LOG.info("Shutting down xTR");
328             stopXtr();
329         }
330     }
331
332     @Override
333     public void setXtrPort(int port) {
334         this.xtrPort = port;
335         if (listenOnXtrPort) {
336             restartXtr();
337         }
338     }
339
340     public void setMapRegisterCacheEnabled(final boolean mapRegisterCacheEnabled) {
341         this.mapRegisterCacheEnabled = mapRegisterCacheEnabled;
342         if (mapRegisterCacheEnabled) {
343             LOG.info("Enabling Map-Register cache");
344         } else {
345             LOG.info("Disabling Map-Register cache");
346         }
347     }
348
349     public void setMapRegisterCacheTimeout(long mapRegisterCacheTimeout) {
350         this.mapRegisterCacheTimeout = mapRegisterCacheTimeout;
351     }
352
353     public void setBindingAddress(String bindingAddress) {
354         this.bindingAddress = bindingAddress;
355     }
356
357     @Override
358     public void close() throws Exception {
359         eventLoopGroup.shutdownGracefully();
360         lispSouthboundHandler.close();
361         unloadActions();
362         clusterSingletonService.close();
363         dsbe.closeTransactionChain();
364     }
365
366     @Override
367     public void instantiateServiceInstance() {
368         if (lispSouthboundHandler != null) {
369             lispSouthboundHandler.setNotificationProvider(notificationPublishService);
370             lispSouthboundHandler.setIsMaster(true);
371         }
372         if (lispXtrSouthboundHandler != null) {
373             lispXtrSouthboundHandler.setNotificationProvider(notificationPublishService);
374         }
375     }
376
377     @Override
378     public ListenableFuture<Void> closeServiceInstance() {
379         if (lispSouthboundHandler != null) {
380             lispSouthboundHandler.setNotificationProvider(null);
381             lispSouthboundHandler.setIsMaster(false);
382         }
383         if (lispXtrSouthboundHandler != null) {
384             lispXtrSouthboundHandler.setNotificationProvider(null);
385         }
386         return Futures.<Void>immediateFuture(null);
387     }
388
389     @Override
390     public ServiceGroupIdentifier getIdentifier() {
391         return SERVICE_GROUP_IDENTIFIER;
392     }
393
394     public MapRegisterCache getMapRegisterCache() {
395         return mapRegisterCache;
396     }
397
398     public boolean isMapRegisterCacheEnabled() {
399         return mapRegisterCacheEnabled;
400     }
401
402     public long getMapRegisterCacheTimeout() {
403         return mapRegisterCacheTimeout;
404     }
405 }