Merge "Remove SimpleMapCache#getXtrIdTable()"
[lispflowmapping.git] / mappingservice / southbound / src / main / java / org / opendaylight / lispflowmapping / southbound / LispSouthboundPlugin.java
1 /*
2  * Copyright (c) 2014 Contextream, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.lispflowmapping.southbound;
10
11 import static io.netty.buffer.Unpooled.wrappedBuffer;
12
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import io.netty.bootstrap.Bootstrap;
17 import io.netty.buffer.ByteBuf;
18 import io.netty.buffer.ByteBufUtil;
19 import io.netty.buffer.PooledByteBufAllocator;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelFutureListener;
23 import io.netty.channel.ChannelOption;
24 import io.netty.channel.EventLoopGroup;
25 import io.netty.channel.epoll.Epoll;
26 import io.netty.channel.epoll.EpollChannelOption;
27 import io.netty.channel.epoll.EpollDatagramChannel;
28 import io.netty.channel.epoll.EpollEventLoopGroup;
29 import io.netty.channel.nio.NioEventLoopGroup;
30 import io.netty.channel.socket.DatagramPacket;
31 import io.netty.channel.socket.nio.NioDatagramChannel;
32 import io.netty.util.concurrent.DefaultThreadFactory;
33 import java.net.InetAddress;
34 import java.net.InetSocketAddress;
35 import java.net.UnknownHostException;
36 import java.nio.ByteBuffer;
37 import java.util.concurrent.ThreadFactory;
38 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
39 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
40 import org.opendaylight.lispflowmapping.dsbackend.DataStoreBackEnd;
41 import org.opendaylight.lispflowmapping.inmemorydb.HashMapDb;
42 import org.opendaylight.lispflowmapping.lisp.type.LispMessage;
43 import org.opendaylight.lispflowmapping.mapcache.AuthKeyDb;
44 import org.opendaylight.lispflowmapping.southbound.lisp.AuthenticationKeyDataListener;
45 import org.opendaylight.lispflowmapping.southbound.lisp.LispSouthboundHandler;
46 import org.opendaylight.lispflowmapping.southbound.lisp.LispXtrSouthboundHandler;
47 import org.opendaylight.lispflowmapping.southbound.lisp.cache.MapRegisterCache;
48 import org.opendaylight.lispflowmapping.type.sbplugin.IConfigLispSouthboundPlugin;
49 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
50 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
51 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.inet.binary.types.rev160303.IpAddressBinary;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.MessageType;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.transport.address.TransportAddress;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58 public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCloseable, ClusterSingletonService {
59     protected static final Logger LOG = LoggerFactory.getLogger(LispSouthboundPlugin.class);
60     public static final String LISPFLOWMAPPING_ENTITY_NAME = "lispflowmapping";
61     public static final ServiceGroupIdentifier SERVICE_GROUP_IDENTIFIER = ServiceGroupIdentifier.create(
62             LISPFLOWMAPPING_ENTITY_NAME);
63
64     private volatile String bindingAddress;
65     private AuthKeyDb akdb;
66     private MapRegisterCache mapRegisterCache = new MapRegisterCache();
67     private boolean mapRegisterCacheEnabled;
68     private long mapRegisterCacheTimeout;
69
70     private static Object startLock = new Object();
71     private final ClusterSingletonServiceProvider clusterSingletonService;
72     private LispSouthboundHandler lispSouthboundHandler;
73     private LispXtrSouthboundHandler lispXtrSouthboundHandler;
74     private NotificationPublishService notificationPublishService;
75     private int numChannels = 1;
76     private Channel[] channel;
77     private Channel xtrChannel;
78     private Class channelType;
79     private volatile int xtrPort = LispMessage.XTR_PORT_NUM;
80     private volatile boolean listenOnXtrPort = false;
81     private ConcurrentLispSouthboundStats statistics = new ConcurrentLispSouthboundStats();
82     private Bootstrap bootstrap = new Bootstrap();
83     private Bootstrap xtrBootstrap = new Bootstrap();
84     private ThreadFactory threadFactory = new DefaultThreadFactory("lisp-sb");
85     private EventLoopGroup eventLoopGroup;
86     private DataBroker dataBroker;
87     private AuthenticationKeyDataListener authenticationKeyDataListener;
88     private DataStoreBackEnd dsbe;
89
90     public LispSouthboundPlugin(final DataBroker dataBroker,
91             final NotificationPublishService notificationPublishService,
92             final ClusterSingletonServiceProvider clusterSingletonService) {
93         this.dataBroker = dataBroker;
94         this.notificationPublishService = notificationPublishService;
95         this.clusterSingletonService = clusterSingletonService;
96         this.clusterSingletonService.registerClusterSingletonService(this);
97         if (Epoll.isAvailable()) {
98             // When lispflowmapping is under heavy load, there are usually two threads nearing 100% CPU core
99             // utilization. In order to have some headroom, we reserve 3 cores for "other" tasks, and allow the
100             // rest to be used for southbound packet processing, which is the most CPU intensive work done in lfm
101             numChannels = Math.max(1, Runtime.getRuntime().availableProcessors() - 3);
102         }
103         channel = new Channel[numChannels];
104     }
105
106     public void init() {
107         LOG.info("LISP (RFC6830) Southbound Plugin is initializing...");
108         synchronized (startLock) {
109             this.akdb = new AuthKeyDb(new HashMapDb());
110             this.authenticationKeyDataListener = new AuthenticationKeyDataListener(dataBroker, akdb);
111             this.dsbe = new DataStoreBackEnd(dataBroker);
112
113             lispSouthboundHandler = new LispSouthboundHandler(this);
114             lispSouthboundHandler.setDataBroker(dataBroker);
115             lispSouthboundHandler.setNotificationProvider(notificationPublishService);
116             lispSouthboundHandler.setAuthKeyDb(akdb);
117             lispSouthboundHandler.setMapRegisterCache(mapRegisterCache);
118             lispSouthboundHandler.setMapRegisterCacheTimeout(mapRegisterCacheTimeout);
119             lispSouthboundHandler.setAuthenticationKeyDataListener(authenticationKeyDataListener);
120             lispSouthboundHandler.setDataStoreBackEnd(dsbe);
121             lispSouthboundHandler.setStats(statistics);
122             lispSouthboundHandler.restoreDaoFromDatastore();
123
124             lispXtrSouthboundHandler = new LispXtrSouthboundHandler();
125             lispXtrSouthboundHandler.setNotificationProvider(notificationPublishService);
126
127             if (Epoll.isAvailable()) {
128                 eventLoopGroup = new EpollEventLoopGroup(numChannels, threadFactory);
129                 channelType = EpollDatagramChannel.class;
130                 bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
131                 bootstrap.option(EpollChannelOption.SO_REUSEPORT, true);
132                 LOG.debug("Using Netty Epoll for UDP sockets");
133             } else {
134                 eventLoopGroup = new NioEventLoopGroup(0, threadFactory);
135                 channelType = NioDatagramChannel.class;
136                 LOG.debug("Using Netty I/O (non-Epoll) for UDP sockets");
137             }
138
139             bootstrap.group(eventLoopGroup);
140             bootstrap.channel(channelType);
141             bootstrap.handler(lispSouthboundHandler);
142
143             xtrBootstrap.group(eventLoopGroup);
144             xtrBootstrap.channel(channelType);
145             xtrBootstrap.handler(lispXtrSouthboundHandler);
146
147             start();
148             startXtr();
149
150             LOG.info("LISP (RFC6830) Southbound Plugin is up!");
151         }
152     }
153
154     @SuppressWarnings("checkstyle:IllegalCatch")
155     private void start() {
156         try {
157             for (int i = 0; i < numChannels; ++i) {
158                 channel[i] = bootstrap.bind(bindingAddress, LispMessage.PORT_NUM).sync().channel();
159             }
160             LOG.debug("Binding LISP UDP listening socket to {}:{}", bindingAddress, LispMessage.PORT_NUM);
161         } catch (Exception e) {
162             LOG.error("Failed to open main socket ", e);
163         }
164     }
165
166     @SuppressWarnings("checkstyle:IllegalCatch")
167     private void startXtr() {
168         if (listenOnXtrPort) {
169             try {
170                 xtrChannel = xtrBootstrap.bind(bindingAddress, xtrPort).sync().channel();
171                 LOG.debug("Binding LISP xTR UDP listening socket to {}:{}", bindingAddress, xtrPort);
172             } catch (Exception e) {
173                 LOG.error("Failed to open xTR socket ", e);
174             }
175         }
176     }
177
178     @SuppressWarnings("checkstyle:IllegalCatch")
179     private void stop() {
180         try {
181             for (int i = 0; i < numChannels; ++i) {
182                 channel[i].close().sync();
183                 channel[i] = null;
184             }
185         } catch (Exception e) {
186             LOG.error("Failed to close main socket ", e);
187         }
188     }
189
190     @SuppressWarnings("checkstyle:IllegalCatch")
191     private void stopXtr() {
192         if (listenOnXtrPort) {
193             try {
194                 xtrChannel.close().sync();
195                 xtrChannel = null;
196             } catch (Exception e) {
197                 LOG.error("Failed to close xTR socket ", e);
198             }
199         }
200     }
201
202     private void restart() {
203         LOG.info("Reloading");
204         stop();
205         start();
206     }
207
208     private void restartXtr() {
209         LOG.info("Reloading xTR");
210         stopXtr();
211         startXtr();
212     }
213
214     private void unloadActions() {
215         lispSouthboundHandler = null;
216         lispXtrSouthboundHandler = null;
217
218         stop();
219         stopXtr();
220
221         LOG.info("LISP (RFC6830) Southbound Plugin is down!");
222     }
223
224     public void handleSerializedLispBuffer(TransportAddress address, ByteBuffer outBuffer,
225                                            final MessageType packetType) {
226         InetAddress ip = getInetAddress(address);
227         handleSerializedLispBuffer(ip, outBuffer, packetType, address.getPort().getValue(), null);
228     }
229
230     public void handleSerializedLispBuffer(InetAddress address, ByteBuffer outBuffer,
231             final MessageType packetType, final int portNumber, Channel senderChannel) {
232         if (senderChannel == null) {
233             senderChannel = this.channel[0];
234         }
235         InetSocketAddress recipient = new InetSocketAddress(address, portNumber);
236         outBuffer.position(0);
237         ByteBuf data = wrappedBuffer(outBuffer);
238         DatagramPacket packet = new DatagramPacket(data, recipient);
239         LOG.debug("Sending {} on port {} to address: {}", packetType, portNumber, address);
240         if (LOG.isTraceEnabled()) {
241             LOG.trace("Buffer:\n{}", ByteBufUtil.prettyHexDump(data));
242         }
243         senderChannel.write(packet).addListener(new ChannelFutureListener() {
244             @Override
245             public void operationComplete(ChannelFuture future) {
246                 if (future.isSuccess()) {
247                     LOG.trace("Success");
248                     statistics.incrementTx(packetType.getIntValue());
249                 } else {
250                     LOG.warn("Failed to send packet");
251                     statistics.incrementTxErrors();
252                 }
253             }
254         });
255         senderChannel.flush();
256     }
257
258     private InetAddress getInetAddress(TransportAddress address) {
259         Preconditions.checkNotNull(address, "TransportAddress must not be null");
260         IpAddressBinary ip = address.getIpAddress();
261         try {
262             if (ip.getIpv4AddressBinary() != null) {
263                 return InetAddress.getByAddress(ip.getIpv4AddressBinary().getValue());
264             } else if (ip.getIpv6AddressBinary() != null) {
265                 return InetAddress.getByAddress(ip.getIpv6AddressBinary().getValue());
266             }
267         } catch (UnknownHostException e) {
268             LOG.debug("Could not convert TransportAddress {} to InetAddress", address, e);
269         }
270         return null;
271     }
272
273     public ConcurrentLispSouthboundStats getStats() {
274         return statistics;
275     }
276
277     @Override
278     @SuppressWarnings("checkstyle:IllegalCatch")
279     public void setLispAddress(String address) {
280         synchronized (startLock) {
281             if (bindingAddress.equals(address)) {
282                 LOG.debug("Configured LISP binding address didn't change.");
283             } else {
284                 LOG.debug("Setting LISP binding address to {}", address);
285                 bindingAddress = address;
286                 if (channel != null) {
287                     try {
288                         restart();
289                         restartXtr();
290                     } catch (Exception e) {
291                         LOG.error("Failed to set LISP binding address: ", e);
292                     }
293                 }
294             }
295         }
296     }
297
298     @Override
299     public void shouldListenOnXtrPort(boolean shouldListenOnXtrPort) {
300         listenOnXtrPort = shouldListenOnXtrPort;
301         if (listenOnXtrPort) {
302             restartXtr();
303         } else {
304             LOG.info("Shutting down xTR");
305             stopXtr();
306         }
307     }
308
309     @Override
310     public void setXtrPort(int port) {
311         this.xtrPort = port;
312         if (listenOnXtrPort) {
313             restartXtr();
314         }
315     }
316
317     public void setMapRegisterCacheEnabled(final boolean mapRegisterCacheEnabled) {
318         this.mapRegisterCacheEnabled = mapRegisterCacheEnabled;
319         if (mapRegisterCacheEnabled) {
320             LOG.info("Enabling Map-Register cache");
321         } else {
322             LOG.info("Disabling Map-Register cache");
323         }
324     }
325
326     public void setMapRegisterCacheTimeout(long mapRegisterCacheTimeout) {
327         this.mapRegisterCacheTimeout = mapRegisterCacheTimeout;
328     }
329
330     public void setBindingAddress(String bindingAddress) {
331         this.bindingAddress = bindingAddress;
332     }
333
334     @Override
335     public void close() throws Exception {
336         eventLoopGroup.shutdownGracefully();
337         lispSouthboundHandler.close();
338         unloadActions();
339         clusterSingletonService.close();
340     }
341
342     @Override
343     public void instantiateServiceInstance() {
344         if (lispSouthboundHandler != null) {
345             lispSouthboundHandler.setNotificationProvider(notificationPublishService);
346             lispSouthboundHandler.restoreDaoFromDatastore();
347             lispSouthboundHandler.setIsMaster(true);
348         }
349         if (lispXtrSouthboundHandler != null) {
350             lispXtrSouthboundHandler.setNotificationProvider(notificationPublishService);
351         }
352     }
353
354     @Override
355     public ListenableFuture<Void> closeServiceInstance() {
356         if (lispSouthboundHandler != null) {
357             lispSouthboundHandler.setNotificationProvider(null);
358             lispSouthboundHandler.setIsMaster(false);
359         }
360         if (lispXtrSouthboundHandler != null) {
361             lispXtrSouthboundHandler.setNotificationProvider(null);
362         }
363         return Futures.<Void>immediateFuture(null);
364     }
365
366     @Override
367     public ServiceGroupIdentifier getIdentifier() {
368         return SERVICE_GROUP_IDENTIFIER;
369     }
370
371     public MapRegisterCache getMapRegisterCache() {
372         return mapRegisterCache;
373     }
374
375     public boolean isMapRegisterCacheEnabled() {
376         return mapRegisterCacheEnabled;
377     }
378
379     public long getMapRegisterCacheTimeout() {
380         return mapRegisterCacheTimeout;
381     }
382 }