e481fc0f9a0ae4f36044fb4dedba2ff7de2bf062
[lispflowmapping.git] / mappingservice / southbound / src / main / java / org / opendaylight / lispflowmapping / southbound / LispSouthboundPlugin.java
1 /*
2  * Copyright (c) 2014 Contextream, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.lispflowmapping.southbound;
9
10 import static io.netty.buffer.Unpooled.wrappedBuffer;
11 import static java.util.Objects.requireNonNull;
12
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import io.netty.bootstrap.Bootstrap;
16 import io.netty.buffer.ByteBuf;
17 import io.netty.buffer.ByteBufUtil;
18 import io.netty.buffer.PooledByteBufAllocator;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelOption;
21 import io.netty.channel.EventLoopGroup;
22 import io.netty.channel.epoll.Epoll;
23 import io.netty.channel.epoll.EpollChannelOption;
24 import io.netty.channel.epoll.EpollDatagramChannel;
25 import io.netty.channel.epoll.EpollEventLoopGroup;
26 import io.netty.channel.nio.NioEventLoopGroup;
27 import io.netty.channel.socket.DatagramChannel;
28 import io.netty.channel.socket.DatagramPacket;
29 import io.netty.channel.socket.nio.NioDatagramChannel;
30 import io.netty.util.concurrent.DefaultThreadFactory;
31 import java.net.InetAddress;
32 import java.net.InetSocketAddress;
33 import java.net.UnknownHostException;
34 import java.nio.ByteBuffer;
35 import java.util.List;
36 import java.util.concurrent.ThreadFactory;
37 import javax.annotation.PostConstruct;
38 import javax.annotation.PreDestroy;
39 import javax.inject.Inject;
40 import javax.inject.Singleton;
41 import org.opendaylight.lispflowmapping.dsbackend.DataStoreBackEnd;
42 import org.opendaylight.lispflowmapping.inmemorydb.HashMapDb;
43 import org.opendaylight.lispflowmapping.lisp.type.LispMessage;
44 import org.opendaylight.lispflowmapping.lisp.util.LispAddressStringifier;
45 import org.opendaylight.lispflowmapping.mapcache.AuthKeyDb;
46 import org.opendaylight.lispflowmapping.southbound.lisp.AuthenticationKeyDataListener;
47 import org.opendaylight.lispflowmapping.southbound.lisp.LispSouthboundHandler;
48 import org.opendaylight.lispflowmapping.southbound.lisp.LispXtrSouthboundHandler;
49 import org.opendaylight.lispflowmapping.southbound.lisp.cache.MapRegisterCache;
50 import org.opendaylight.lispflowmapping.type.sbplugin.IConfigLispSouthboundPlugin;
51 import org.opendaylight.mdsal.binding.api.DataBroker;
52 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
53 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
54 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
55 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.inet.binary.types.rev160303.IpAddressBinary;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.MessageType;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.eid.container.Eid;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.mapping.authkey.container.MappingAuthkey;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.transport.address.TransportAddress;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.mappingservice.rev150906.db.instance.AuthenticationKey;
62 import org.opendaylight.yangtools.concepts.Registration;
63 import org.opendaylight.yangtools.yang.binding.Notification;
64 import org.osgi.service.component.annotations.Activate;
65 import org.osgi.service.component.annotations.Component;
66 import org.osgi.service.component.annotations.Deactivate;
67 import org.osgi.service.component.annotations.Reference;
68 import org.osgi.service.metatype.annotations.AttributeDefinition;
69 import org.osgi.service.metatype.annotations.Designate;
70 import org.osgi.service.metatype.annotations.ObjectClassDefinition;
71 import org.slf4j.Logger;
72 import org.slf4j.LoggerFactory;
73
74 @Singleton
75 @Component(immediate = true, property = "type=default", configurationPid = "org.opendaylight.lispflowmapping",
76            service = { IConfigLispSouthboundPlugin.class, LispSouthboundPlugin.class })
77 @Designate(ocd = LispSouthboundPlugin.Configuration.class)
78 public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCloseable, ClusterSingletonService {
79     @ObjectClassDefinition
80     public @interface Configuration {
81         @AttributeDefinition()
82         String bindingAddress() default DEFAULT_BINDING_ADDRESS;
83
84         @AttributeDefinition()
85         boolean mapRegisterCacheEnabled() default true;
86
87         @AttributeDefinition()
88         long mapRegisterCacheTimeout() default DEFAULT_MAP_REGISTER_CACHE_TIMEOUT;
89     }
90
91     protected static final Logger LOG = LoggerFactory.getLogger(LispSouthboundPlugin.class);
92     public static final String LISPFLOWMAPPING_ENTITY_NAME = "lispflowmapping";
93     public static final ServiceGroupIdentifier SERVICE_GROUP_IDENTIFIER =
94         ServiceGroupIdentifier.create(LISPFLOWMAPPING_ENTITY_NAME);
95
96     private static final String DEFAULT_BINDING_ADDRESS = "0.0.0.0";
97     private static final long DEFAULT_MAP_REGISTER_CACHE_TIMEOUT = 90000;
98
99     private volatile boolean isMaster = false;
100     private volatile String bindingAddress;
101     private AuthKeyDb akdb;
102     private final MapRegisterCache mapRegisterCache = new MapRegisterCache();
103     private final boolean mapRegisterCacheEnabled;
104     private final long mapRegisterCacheTimeout;
105
106     private static Object startLock = new Object();
107
108     private final DataBroker dataBroker;
109     private final NotificationPublishService notificationPublishService;
110     private final ClusterSingletonServiceProvider clusterSingletonService;
111
112     private LispSouthboundHandler lispSouthboundHandler;
113     private int numChannels = 1;
114     private final Channel[] channel;
115     private Channel xtrChannel;
116     private volatile int xtrPort = LispMessage.XTR_PORT_NUM;
117     private volatile boolean listenOnXtrPort = false;
118     private final ConcurrentLispSouthboundStats statistics = new ConcurrentLispSouthboundStats();
119     private final Bootstrap bootstrap = new Bootstrap();
120     private final Bootstrap xtrBootstrap = new Bootstrap();
121     private final ThreadFactory threadFactory = new DefaultThreadFactory("lisp-sb");
122     private EventLoopGroup eventLoopGroup;
123     private AuthenticationKeyDataListener authenticationKeyDataListener;
124     private DataStoreBackEnd dsbe;
125     private Registration cssReg;
126
127     @Inject
128     public LispSouthboundPlugin(final DataBroker dataBroker,
129             final NotificationPublishService notificationPublishService,
130             final ClusterSingletonServiceProvider clusterSingletonService) {
131         this(dataBroker, notificationPublishService, clusterSingletonService, DEFAULT_BINDING_ADDRESS, true,
132             DEFAULT_MAP_REGISTER_CACHE_TIMEOUT);
133     }
134
135     @Activate
136     public LispSouthboundPlugin(@Reference final DataBroker dataBroker,
137             @Reference final NotificationPublishService notificationPublishService,
138             @Reference final ClusterSingletonServiceProvider clusterSingletonService,
139             final Configuration configuration) {
140         this(dataBroker, notificationPublishService, clusterSingletonService, configuration.bindingAddress(),
141             configuration.mapRegisterCacheEnabled(), configuration.mapRegisterCacheTimeout());
142         init();
143     }
144
145     public LispSouthboundPlugin(final DataBroker dataBroker,
146             final NotificationPublishService notificationPublishService,
147             final ClusterSingletonServiceProvider clusterSingletonService,
148             final String bindingAddress, final boolean mapRegisterCacheEnabled, final long mapRegisterCacheTimeout) {
149         LOG.info("LISP (RFC6830) Southbound Plugin is initializing...");
150         this.dataBroker = dataBroker;
151         this.notificationPublishService = notificationPublishService;
152         this.clusterSingletonService = clusterSingletonService;
153         this.bindingAddress = bindingAddress;
154         this.mapRegisterCacheEnabled = mapRegisterCacheEnabled;
155         this.mapRegisterCacheTimeout = mapRegisterCacheTimeout;
156
157         if (Epoll.isAvailable()) {
158             // When lispflowmapping is under heavy load, there are usually two threads nearing 100% CPU core
159             // utilization. In order to have some headroom, we reserve 3 cores for "other" tasks, and allow the
160             // rest to be used for southbound packet processing, which is the most CPU intensive work done in lfm
161             numChannels = Math.max(1, Runtime.getRuntime().availableProcessors() - 3);
162         }
163         channel = new Channel[numChannels];
164     }
165
166     @PostConstruct
167     public void init() {
168         synchronized (startLock) {
169             akdb = new AuthKeyDb(new HashMapDb());
170             authenticationKeyDataListener = new AuthenticationKeyDataListener(dataBroker, akdb);
171             dsbe = new DataStoreBackEnd(dataBroker);
172             restoreDaoFromDatastore();
173
174             final Class<? extends DatagramChannel> channelType;
175             if (Epoll.isAvailable()) {
176                 eventLoopGroup = new EpollEventLoopGroup(numChannels, threadFactory);
177                 channelType = EpollDatagramChannel.class;
178                 bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
179                 bootstrap.option(EpollChannelOption.SO_REUSEPORT, true);
180                 LOG.debug("Using Netty Epoll for UDP sockets");
181             } else {
182                 eventLoopGroup = new NioEventLoopGroup(0, threadFactory);
183                 channelType = NioDatagramChannel.class;
184                 LOG.debug("Using Netty I/O (non-Epoll) for UDP sockets");
185             }
186
187             bootstrap.group(eventLoopGroup);
188             bootstrap.channel(channelType);
189             lispSouthboundHandler = new LispSouthboundHandler(this);
190             bootstrap.handler(lispSouthboundHandler);
191
192             xtrBootstrap.group(eventLoopGroup);
193             xtrBootstrap.channel(channelType);
194             xtrBootstrap.handler(new LispXtrSouthboundHandler(this));
195
196             start();
197             startXtr();
198
199             cssReg = clusterSingletonService.registerClusterSingletonService(this);
200         }
201
202         LOG.info("LISP (RFC6830) Southbound Plugin is up!");
203     }
204
205     @SuppressWarnings("checkstyle:IllegalCatch")
206     private void start() {
207         try {
208             for (int i = 0; i < numChannels; ++i) {
209                 channel[i] = bootstrap.bind(bindingAddress, LispMessage.PORT_NUM).sync().channel();
210             }
211             LOG.debug("Binding LISP UDP listening socket to {}:{}", bindingAddress, LispMessage.PORT_NUM);
212         } catch (Exception e) {
213             LOG.error("Failed to open main socket ", e);
214         }
215     }
216
217     @SuppressWarnings("checkstyle:IllegalCatch")
218     private void startXtr() {
219         if (listenOnXtrPort) {
220             try {
221                 xtrChannel = xtrBootstrap.bind(bindingAddress, xtrPort).sync().channel();
222                 LOG.debug("Binding LISP xTR UDP listening socket to {}:{}", bindingAddress, xtrPort);
223             } catch (Exception e) {
224                 LOG.error("Failed to open xTR socket ", e);
225             }
226         }
227     }
228
229     @SuppressWarnings("checkstyle:IllegalCatch")
230     private void stop() {
231         try {
232             for (int i = 0; i < numChannels; ++i) {
233                 channel[i].close().sync();
234                 channel[i] = null;
235             }
236         } catch (Exception e) {
237             LOG.error("Failed to close main socket ", e);
238         }
239     }
240
241     @SuppressWarnings("checkstyle:IllegalCatch")
242     private void stopXtr() {
243         if (listenOnXtrPort) {
244             try {
245                 xtrChannel.close().sync();
246                 xtrChannel = null;
247             } catch (Exception e) {
248                 LOG.error("Failed to close xTR socket ", e);
249             }
250         }
251     }
252
253     private void restart() {
254         LOG.info("Reloading");
255         stop();
256         start();
257     }
258
259     private void restartXtr() {
260         LOG.info("Reloading xTR");
261         stopXtr();
262         startXtr();
263     }
264
265     private void unloadActions() {
266         lispSouthboundHandler = null;
267
268         stop();
269         stopXtr();
270
271         LOG.info("LISP (RFC6830) Southbound Plugin is down!");
272     }
273
274     /**
275      * Restore all keys from MDSAL datastore.
276      */
277     public void restoreDaoFromDatastore() {
278         final List<AuthenticationKey> authKeys = dsbe.getAllAuthenticationKeys();
279         LOG.info("Restoring {} keys from datastore into southbound DAO", authKeys.size());
280
281         for (AuthenticationKey authKey : authKeys) {
282             final Eid key = authKey.getEid();
283             final MappingAuthkey mappingAuthkey = authKey.getMappingAuthkey();
284             LOG.debug("Adding authentication key '{}' with key-ID {} for {}", mappingAuthkey.getKeyString(),
285                     mappingAuthkey.getKeyType(),
286                     LispAddressStringifier.getString(key));
287             akdb.addAuthenticationKey(key, mappingAuthkey);
288         }
289     }
290
291     public void handleSerializedLispBuffer(final TransportAddress address, final ByteBuffer outBuffer,
292                                            final MessageType packetType) {
293         InetAddress ip = getInetAddress(address);
294         handleSerializedLispBuffer(ip, outBuffer, packetType, address.getPort().getValue().toJava(), null);
295     }
296
297     public void handleSerializedLispBuffer(final InetAddress address, final ByteBuffer outBuffer,
298             final MessageType packetType, final int portNumber, Channel senderChannel) {
299         if (senderChannel == null) {
300             senderChannel = channel[0];
301         }
302         InetSocketAddress recipient = new InetSocketAddress(address, portNumber);
303         outBuffer.position(0);
304         ByteBuf data = wrappedBuffer(outBuffer);
305         DatagramPacket packet = new DatagramPacket(data, recipient);
306         LOG.debug("Sending {} on port {} to address: {}", packetType, portNumber, address);
307         if (LOG.isTraceEnabled()) {
308             LOG.trace("Buffer:\n{}", ByteBufUtil.prettyHexDump(data));
309         }
310         senderChannel.write(packet).addListener(future -> {
311             if (future.isSuccess()) {
312                 LOG.trace("Success");
313                 statistics.incrementTx(packetType.getIntValue());
314             } else {
315                 LOG.warn("Failed to send packet");
316                 statistics.incrementTxErrors();
317             }
318         });
319         senderChannel.flush();
320     }
321
322     private static InetAddress getInetAddress(final TransportAddress address) {
323         requireNonNull(address, "TransportAddress must not be null");
324         IpAddressBinary ip = address.getIpAddress();
325         try {
326             if (ip.getIpv4AddressBinary() != null) {
327                 return InetAddress.getByAddress(ip.getIpv4AddressBinary().getValue());
328             } else if (ip.getIpv6AddressBinary() != null) {
329                 return InetAddress.getByAddress(ip.getIpv6AddressBinary().getValue());
330             }
331         } catch (UnknownHostException e) {
332             LOG.debug("Could not convert TransportAddress {} to InetAddress", address, e);
333         }
334         return null;
335     }
336
337     @Override
338     @SuppressWarnings("checkstyle:IllegalCatch")
339     public void setLispAddress(final String address) {
340         synchronized (startLock) {
341             if (bindingAddress.equals(address)) {
342                 LOG.debug("Configured LISP binding address didn't change.");
343             } else {
344                 LOG.debug("Setting LISP binding address to {}", address);
345                 bindingAddress = address;
346                 if (channel != null) {
347                     try {
348                         restart();
349                         restartXtr();
350                     } catch (Exception e) {
351                         LOG.error("Failed to set LISP binding address: ", e);
352                     }
353                 }
354             }
355         }
356     }
357
358     @Override
359     public void shouldListenOnXtrPort(final boolean shouldListenOnXtrPort) {
360         listenOnXtrPort = shouldListenOnXtrPort;
361         if (listenOnXtrPort) {
362             restartXtr();
363         } else {
364             LOG.info("Shutting down xTR");
365             stopXtr();
366         }
367     }
368
369     @Override
370     public void setXtrPort(final int port) {
371         xtrPort = port;
372         if (listenOnXtrPort) {
373             restartXtr();
374         }
375     }
376
377     @Deactivate
378     @PreDestroy
379     @Override
380     public void close() throws Exception {
381         eventLoopGroup.shutdownGracefully();
382         lispSouthboundHandler.close();
383         unloadActions();
384         if (cssReg != null) {
385             cssReg.close();
386         }
387         dsbe.closeTransactionChain();
388     }
389
390     @Override
391     public void instantiateServiceInstance() {
392         isMaster = true;
393     }
394
395     @Override
396     public ListenableFuture<Void> closeServiceInstance() {
397         isMaster = false;
398         return Futures.<Void>immediateFuture(null);
399     }
400
401     @Override
402     public ServiceGroupIdentifier getIdentifier() {
403         return SERVICE_GROUP_IDENTIFIER;
404     }
405
406     public synchronized void sendNotificationIfPossible(final Notification notification) throws InterruptedException {
407         if (isMaster && notificationPublishService != null) {
408             notificationPublishService.putNotification(notification);
409             LOG.trace("Publishing notification: {}", notification);
410         } else if (notificationPublishService == null) {
411             LOG.warn("Can't publish notification because no reference to publication service exists!");
412         }
413     }
414
415     public AuthKeyDb getAkdb() {
416         return akdb;
417     }
418
419     public ConcurrentLispSouthboundStats getStats() {
420         return statistics;
421     }
422
423     public DataBroker getDataBroker() {
424         return dataBroker;
425     }
426
427     public AuthenticationKeyDataListener getAuthenticationKeyDataListener() {
428         return authenticationKeyDataListener;
429     }
430
431     public MapRegisterCache getMapRegisterCache() {
432         return mapRegisterCache;
433     }
434
435     public boolean isMapRegisterCacheEnabled() {
436         return mapRegisterCacheEnabled;
437     }
438
439     public long getMapRegisterCacheTimeout() {
440         return mapRegisterCacheTimeout;
441     }
442 }