X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=mappingservice%2Fsouthbound%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Flispflowmapping%2Fsouthbound%2FLispSouthboundPlugin.java;h=6bc70e716799155b0cab62e0cdde2e3880974fc7;hb=4a99f1d24037d5b7947658b7360a8c53baeedbf6;hp=4adde9f59465e08fd48ecf6de7ccdc148adc24ed;hpb=3eec3f69a81c5c09f840aba6ccd88b5a0ca59f94;p=lispflowmapping.git diff --git a/mappingservice/southbound/src/main/java/org/opendaylight/lispflowmapping/southbound/LispSouthboundPlugin.java b/mappingservice/southbound/src/main/java/org/opendaylight/lispflowmapping/southbound/LispSouthboundPlugin.java index 4adde9f59..6bc70e716 100644 --- a/mappingservice/southbound/src/main/java/org/opendaylight/lispflowmapping/southbound/LispSouthboundPlugin.java +++ b/mappingservice/southbound/src/main/java/org/opendaylight/lispflowmapping/southbound/LispSouthboundPlugin.java @@ -8,255 +8,297 @@ package org.opendaylight.lispflowmapping.southbound; -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; +import static io.netty.buffer.Unpooled.wrappedBuffer; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.Epoll; +import io.netty.channel.epoll.EpollChannelOption; +import io.netty.channel.epoll.EpollDatagramChannel; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.DatagramPacket; +import io.netty.channel.socket.nio.NioDatagramChannel; +import io.netty.util.concurrent.DefaultThreadFactory; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.SocketException; -import java.net.SocketTimeoutException; +import java.net.UnknownHostException; import java.nio.ByteBuffer; - -import org.opendaylight.controller.sal.binding.api.BindingAwareBroker; -import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext; -import org.opendaylight.controller.sal.binding.api.BindingAwareProvider; -import org.opendaylight.controller.sal.binding.api.NotificationProviderService; -import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; -import org.opendaylight.lispflowmapping.implementation.serializer.LispMessage; -import org.opendaylight.lispflowmapping.southbound.lisp.ILispSouthboundService; -import org.opendaylight.lispflowmapping.southbound.lisp.LispSouthboundService; -import org.opendaylight.lispflowmapping.southbound.lisp.LispXtrSouthboundService; +import java.util.List; +import java.util.concurrent.ThreadFactory; +import org.opendaylight.lispflowmapping.dsbackend.DataStoreBackEnd; +import org.opendaylight.lispflowmapping.inmemorydb.HashMapDb; +import org.opendaylight.lispflowmapping.lisp.type.LispMessage; +import org.opendaylight.lispflowmapping.lisp.util.LispAddressStringifier; +import org.opendaylight.lispflowmapping.mapcache.AuthKeyDb; +import org.opendaylight.lispflowmapping.southbound.lisp.AuthenticationKeyDataListener; +import org.opendaylight.lispflowmapping.southbound.lisp.LispSouthboundHandler; +import org.opendaylight.lispflowmapping.southbound.lisp.LispXtrSouthboundHandler; +import org.opendaylight.lispflowmapping.southbound.lisp.cache.MapRegisterCache; import org.opendaylight.lispflowmapping.type.sbplugin.IConfigLispSouthboundPlugin; -import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.control.plane.rev150314.LfmControlPlaneService; -import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.control.plane.rev150314.transportaddress.TransportAddress; +import org.opendaylight.mdsal.binding.api.DataBroker; +import org.opendaylight.mdsal.binding.api.NotificationPublishService; +import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService; +import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider; +import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier; +import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.inet.binary.types.rev160303.IpAddressBinary; +import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.MessageType; +import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.eid.container.Eid; +import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.mapping.authkey.container.MappingAuthkey; +import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.transport.address.TransportAddress; +import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.mappingservice.rev150906.db.instance.AuthenticationKey; +import org.opendaylight.yangtools.yang.binding.Notification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.net.InetAddresses; - -public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCloseable, BindingAwareProvider { +public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCloseable, ClusterSingletonService { protected static final Logger LOG = LoggerFactory.getLogger(LispSouthboundPlugin.class); + public static final String LISPFLOWMAPPING_ENTITY_NAME = "lispflowmapping"; + public static final ServiceGroupIdentifier SERVICE_GROUP_IDENTIFIER = ServiceGroupIdentifier.create( + LISPFLOWMAPPING_ENTITY_NAME); + + private volatile boolean isMaster = false; + private volatile String bindingAddress; + private AuthKeyDb akdb; + private final MapRegisterCache mapRegisterCache = new MapRegisterCache(); + private boolean mapRegisterCacheEnabled; + private long mapRegisterCacheTimeout; private static Object startLock = new Object(); - private LispIoThread lispThread; - private LispIoThread xtrThread; - private LispSouthboundService lispSouthboundService; - private LispXtrSouthboundService lispXtrSouthboundService; - private NotificationProviderService notificationService; - private RpcProviderRegistry rpcRegistry; - private BindingAwareBroker broker; - private volatile DatagramSocket socket = null; - private volatile String bindingAddress = null; + private final ClusterSingletonServiceProvider clusterSingletonService; + private LispSouthboundHandler lispSouthboundHandler; + private LispXtrSouthboundHandler lispXtrSouthboundHandler; + private final NotificationPublishService notificationPublishService; + private int numChannels = 1; + private final Channel[] channel; + private Channel xtrChannel; + private Class channelType; private volatile int xtrPort = LispMessage.XTR_PORT_NUM; private volatile boolean listenOnXtrPort = false; - private BindingAwareBroker.RpcRegistration controlPlaneRpc; - private DatagramSocket xtrSocket; + private final ConcurrentLispSouthboundStats statistics = new ConcurrentLispSouthboundStats(); + private final Bootstrap bootstrap = new Bootstrap(); + private final Bootstrap xtrBootstrap = new Bootstrap(); + private final ThreadFactory threadFactory = new DefaultThreadFactory("lisp-sb"); + private EventLoopGroup eventLoopGroup; + private final DataBroker dataBroker; + private AuthenticationKeyDataListener authenticationKeyDataListener; + private DataStoreBackEnd dsbe; + + public LispSouthboundPlugin(final DataBroker dataBroker, + final NotificationPublishService notificationPublishService, + final ClusterSingletonServiceProvider clusterSingletonService) { + this.dataBroker = dataBroker; + this.notificationPublishService = notificationPublishService; + this.clusterSingletonService = clusterSingletonService; + if (Epoll.isAvailable()) { + // When lispflowmapping is under heavy load, there are usually two threads nearing 100% CPU core + // utilization. In order to have some headroom, we reserve 3 cores for "other" tasks, and allow the + // rest to be used for southbound packet processing, which is the most CPU intensive work done in lfm + numChannels = Math.max(1, Runtime.getRuntime().availableProcessors() - 3); + } + channel = new Channel[numChannels]; + } public void init() { - LOG.info("LISP (RFC6830) Mapping Service is up!"); - final LfmControlPlaneRpc lfmCpRpc = new LfmControlPlaneRpc(this); - - controlPlaneRpc = rpcRegistry.addRpcImplementation(LfmControlPlaneService.class, lfmCpRpc); - broker.registerProvider(this); - + LOG.info("LISP (RFC6830) Southbound Plugin is initializing..."); synchronized (startLock) { - lispSouthboundService = new LispSouthboundService(); - lispXtrSouthboundService = new LispXtrSouthboundService(); - lispSouthboundService.setNotificationProvider(this.notificationService); - lispXtrSouthboundService.setNotificationProvider(this.notificationService); - LOG.trace("Provider Session initialized"); - if (bindingAddress == null) { - setLispAddress("0.0.0.0"); + this.akdb = new AuthKeyDb(new HashMapDb()); + this.authenticationKeyDataListener = new AuthenticationKeyDataListener(dataBroker, akdb); + this.dsbe = new DataStoreBackEnd(dataBroker); + restoreDaoFromDatastore(); + + LispSouthboundHandler lsbh = new LispSouthboundHandler(this); + this.lispSouthboundHandler = lsbh; + + LispXtrSouthboundHandler lxsbh = new LispXtrSouthboundHandler(this); + this.lispXtrSouthboundHandler = lxsbh; + + if (Epoll.isAvailable()) { + eventLoopGroup = new EpollEventLoopGroup(numChannels, threadFactory); + channelType = EpollDatagramChannel.class; + bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + bootstrap.option(EpollChannelOption.SO_REUSEPORT, true); + LOG.debug("Using Netty Epoll for UDP sockets"); + } else { + eventLoopGroup = new NioEventLoopGroup(0, threadFactory); + channelType = NioDatagramChannel.class; + LOG.debug("Using Netty I/O (non-Epoll) for UDP sockets"); } - LOG.info("LISP (RFC6830) Mapping Service is up!"); - } - } - public void setNotificationProviderService(NotificationProviderService notificationService) { - this.notificationService = notificationService; - } + bootstrap.group(eventLoopGroup); + bootstrap.channel(channelType); + bootstrap.handler(lsbh); - public void setRpcRegistryDependency(RpcProviderRegistry rpcRegistry) { - this.rpcRegistry = rpcRegistry; - } + xtrBootstrap.group(eventLoopGroup); + xtrBootstrap.channel(channelType); + xtrBootstrap.handler(lxsbh); - public void setBindingAwareBroker(BindingAwareBroker broker) { - this.broker = broker; - } + start(); + startXtr(); - private void unloadActions() { - if (lispThread != null) { - lispThread.stopRunning(); - } - lispSouthboundService = null; - lispXtrSouthboundService = null; - lispThread = null; - xtrThread = null; - bindingAddress = null; - LOG.info("LISP (RFC6830) Mapping Service is down!"); - try { - Thread.sleep(1100); - } catch (InterruptedException e) { + clusterSingletonService.registerClusterSingletonService(this); + LOG.info("LISP (RFC6830) Southbound Plugin is up!"); } } - private class LispIoThread extends Thread { - private volatile boolean shouldRun; - private volatile DatagramSocket threadSocket = null; - private volatile ILispSouthboundService service; - private volatile boolean running; - - public LispIoThread(DatagramSocket socket, ILispSouthboundService service) { - super("Lisp Thread"); - this.threadSocket = socket; - this.service = service; - shouldRun = true; + @SuppressWarnings("checkstyle:IllegalCatch") + private void start() { + try { + for (int i = 0; i < numChannels; ++i) { + channel[i] = bootstrap.bind(bindingAddress, LispMessage.PORT_NUM).sync().channel(); + } + LOG.debug("Binding LISP UDP listening socket to {}:{}", bindingAddress, LispMessage.PORT_NUM); + } catch (Exception e) { + LOG.error("Failed to open main socket ", e); } + } - @Override - public void run() { - running = true; - - int lispReceiveTimeout = 1000; - - LOG.info("LISP (RFC6830) Mapping Service is running and listening on address: " + bindingAddress - + " port: " + threadSocket.getLocalPort()); + @SuppressWarnings("checkstyle:IllegalCatch") + private void startXtr() { + if (listenOnXtrPort) { try { - - threadSocket.setSoTimeout(lispReceiveTimeout); - } catch (SocketException e) { - LOG.error("Cannot open socket on UDP port " + threadSocket.getLocalPort(), e); - return; + xtrChannel = xtrBootstrap.bind(bindingAddress, xtrPort).sync().channel(); + LOG.debug("Binding LISP xTR UDP listening socket to {}:{}", bindingAddress, xtrPort); + } catch (Exception e) { + LOG.error("Failed to open xTR socket ", e); } + } + } - while (shouldRun) { - byte[] buffer = new byte[4096]; - DatagramPacket packet = new DatagramPacket(buffer, buffer.length); - try { - threadSocket.receive(packet); - LOG.trace("Received a packet!"); - } catch (SocketTimeoutException ste) { - continue; - } catch (IOException e) { - LOG.warn("IO Exception while trying to recieve packet", e); - } - LOG.trace(String.format("Handling packet from {%s}:{%d} (len={%d})", packet.getAddress() - .getHostAddress(), packet.getPort(), packet.getLength())); - - try { - this.service.handlePacket(packet); - } catch (Exception e) { - LOG.warn("Error while handling packet", e); - } + @SuppressWarnings("checkstyle:IllegalCatch") + private void stop() { + try { + for (int i = 0; i < numChannels; ++i) { + channel[i].close().sync(); + channel[i] = null; } - - threadSocket.close(); - LOG.trace("Socket closed"); - running = false; + } catch (Exception e) { + LOG.error("Failed to close main socket ", e); } + } - public void stopRunning() { - shouldRun = false; + @SuppressWarnings("checkstyle:IllegalCatch") + private void stopXtr() { + if (listenOnXtrPort) { + try { + xtrChannel.close().sync(); + xtrChannel = null; + } catch (Exception e) { + LOG.error("Failed to close xTR socket ", e); + } } + } - public boolean isRunning() { - return running; - } + private void restart() { + LOG.info("Reloading"); + stop(); + start(); } - public static String intToIpv4(int address) { - return ((address >> 24) & 0xff) + "." + // - ((address >> 16) & 0xff) + "." + // - ((address >> 8) & 0xff) + "." + // - ((address >> 0) & 0xff); + private void restartXtr() { + LOG.info("Reloading xTR"); + stopXtr(); + startXtr(); } - public String getHelp() { - StringBuffer help = new StringBuffer(); - help.append("---LISP Southbound Plugin---\n"); - return help.toString(); + private void unloadActions() { + lispSouthboundHandler = null; + lispXtrSouthboundHandler = null; + + stop(); + stopXtr(); + + LOG.info("LISP (RFC6830) Southbound Plugin is down!"); } - private void startIOThread() { - if (socket != null) { - while (!socket.isClosed()) { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - } - } - } - try { - socket = new DatagramSocket(new InetSocketAddress(bindingAddress, LispMessage.PORT_NUM)); - lispThread = new LispIoThread(socket, lispSouthboundService); - lispThread.start(); - LOG.info("LISP (RFC6830) Mapping Service Southbound Plugin is up!"); - if (listenOnXtrPort) { - restartXtrThread(); - } - } catch (SocketException e) { - LOG.error("couldn't start socket {}", e.getMessage()); + /** + * Restore all keys from MDSAL datastore. + */ + public void restoreDaoFromDatastore() { + final List authKeys = dsbe.getAllAuthenticationKeys(); + LOG.info("Restoring {} keys from datastore into southbound DAO", authKeys.size()); + + for (AuthenticationKey authKey : authKeys) { + final Eid key = authKey.getEid(); + final MappingAuthkey mappingAuthkey = authKey.getMappingAuthkey(); + LOG.debug("Adding authentication key '{}' with key-ID {} for {}", mappingAuthkey.getKeyString(), + mappingAuthkey.getKeyType(), + LispAddressStringifier.getString(key)); + akdb.addAuthenticationKey(key, mappingAuthkey); } } - private void restartXtrThread() { - try { - stopXtrThread(); - xtrSocket = new DatagramSocket(new InetSocketAddress(bindingAddress, xtrPort)); - xtrThread = new LispIoThread(xtrSocket, lispXtrSouthboundService); - xtrThread.start(); - LOG.info("xTR Southbound Plugin is up!"); - } catch (SocketException e) { - LOG.warn("failed to start xtr thread: {}", e.getMessage()); + public void handleSerializedLispBuffer(TransportAddress address, ByteBuffer outBuffer, + final MessageType packetType) { + InetAddress ip = getInetAddress(address); + handleSerializedLispBuffer(ip, outBuffer, packetType, address.getPort().getValue().toJava(), null); + } + + public void handleSerializedLispBuffer(InetAddress address, ByteBuffer outBuffer, + final MessageType packetType, final int portNumber, Channel senderChannel) { + if (senderChannel == null) { + senderChannel = this.channel[0]; } + InetSocketAddress recipient = new InetSocketAddress(address, portNumber); + outBuffer.position(0); + ByteBuf data = wrappedBuffer(outBuffer); + DatagramPacket packet = new DatagramPacket(data, recipient); + LOG.debug("Sending {} on port {} to address: {}", packetType, portNumber, address); + if (LOG.isTraceEnabled()) { + LOG.trace("Buffer:\n{}", ByteBufUtil.prettyHexDump(data)); + } + senderChannel.write(packet).addListener(future -> { + if (future.isSuccess()) { + LOG.trace("Success"); + statistics.incrementTx(packetType.getIntValue()); + } else { + LOG.warn("Failed to send packet"); + statistics.incrementTxErrors(); + } + }); + senderChannel.flush(); } - public void handleSerializedLispBuffer(TransportAddress address, ByteBuffer outBuffer, String packetType) { - DatagramPacket packet = new DatagramPacket(outBuffer.array(), outBuffer.limit()); - packet.setPort(address.getPort().getValue()); - InetAddress ip = InetAddresses.forString(address.getIpAddress().getIpv4Address().getValue()); - packet.setAddress(ip); + private InetAddress getInetAddress(TransportAddress address) { + Preconditions.checkNotNull(address, "TransportAddress must not be null"); + IpAddressBinary ip = address.getIpAddress(); try { - if (LOG.isDebugEnabled()) { - LOG.trace("Sending " + packetType + " on port " + address.getPort().getValue() + " to address: " + ip); + if (ip.getIpv4AddressBinary() != null) { + return InetAddress.getByAddress(ip.getIpv4AddressBinary().getValue()); + } else if (ip.getIpv6AddressBinary() != null) { + return InetAddress.getByAddress(ip.getIpv6AddressBinary().getValue()); } - socket.send(packet); - } catch (IOException e) { - LOG.warn("Failed to send " + packetType, e); + } catch (UnknownHostException e) { + LOG.debug("Could not convert TransportAddress {} to InetAddress", address, e); } + return null; } + @Override + @SuppressWarnings("checkstyle:IllegalCatch") public void setLispAddress(String address) { synchronized (startLock) { - if (bindingAddress != null && bindingAddress.equals(address)) { - LOG.trace("configured lisp binding address didn't change."); + if (bindingAddress.equals(address)) { + LOG.debug("Configured LISP binding address didn't change."); } else { - String action = (bindingAddress == null ? "Setting" : "Resetting"); - LOG.trace(action + " lisp binding address to: " + address); + LOG.debug("Setting LISP binding address to {}", address); bindingAddress = address; - if (lispThread != null) { - lispThread.stopRunning(); - while (lispThread.isRunning()) { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - } + if (channel != null) { + try { + restart(); + restartXtr(); + } catch (Exception e) { + LOG.error("Failed to set LISP binding address: ", e); } } - stopXtrThread(); - startIOThread(); - } - } - } - - private void stopXtrThread() { - if (xtrThread != null) { - xtrThread.stopRunning(); - while (xtrThread.isRunning()) { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - } } } } @@ -265,11 +307,10 @@ public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCl public void shouldListenOnXtrPort(boolean shouldListenOnXtrPort) { listenOnXtrPort = shouldListenOnXtrPort; if (listenOnXtrPort) { - LOG.debug("restarting xtr thread"); - restartXtrThread(); + restartXtr(); } else { - LOG.debug("terminating thread"); - stopXtrThread(); + LOG.info("Shutting down xTR"); + stopXtr(); } } @@ -277,18 +318,86 @@ public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCl public void setXtrPort(int port) { this.xtrPort = port; if (listenOnXtrPort) { - restartXtrThread(); + restartXtr(); + } + } + + public void setMapRegisterCacheEnabled(final boolean mapRegisterCacheEnabled) { + this.mapRegisterCacheEnabled = mapRegisterCacheEnabled; + if (mapRegisterCacheEnabled) { + LOG.info("Enabling Map-Register cache"); + } else { + LOG.info("Disabling Map-Register cache"); } } + public void setMapRegisterCacheTimeout(long mapRegisterCacheTimeout) { + this.mapRegisterCacheTimeout = mapRegisterCacheTimeout; + } + + public void setBindingAddress(String bindingAddress) { + this.bindingAddress = bindingAddress; + } + @Override public void close() throws Exception { + eventLoopGroup.shutdownGracefully(); + lispSouthboundHandler.close(); unloadActions(); - controlPlaneRpc.close(); + clusterSingletonService.close(); + dsbe.closeTransactionChain(); } @Override - public void onSessionInitiated(ProviderContext session) { - LOG.debug("LispSouthboundPlugin Provider Session Initiated"); + public void instantiateServiceInstance() { + this.isMaster = true; + } + + @Override + public ListenableFuture closeServiceInstance() { + this.isMaster = false; + return Futures.immediateFuture(null); + } + + @Override + public ServiceGroupIdentifier getIdentifier() { + return SERVICE_GROUP_IDENTIFIER; + } + + public synchronized void sendNotificationIfPossible(final Notification notification) throws InterruptedException { + if (isMaster && notificationPublishService != null) { + notificationPublishService.putNotification(notification); + LOG.trace("Publishing notification: {}", notification); + } else if (notificationPublishService == null) { + LOG.warn("Can't publish notification because no reference to publication service exists!"); + } + } + + public AuthKeyDb getAkdb() { + return akdb; + } + + public ConcurrentLispSouthboundStats getStats() { + return statistics; + } + + public DataBroker getDataBroker() { + return dataBroker; + } + + public AuthenticationKeyDataListener getAuthenticationKeyDataListener() { + return authenticationKeyDataListener; + } + + public MapRegisterCache getMapRegisterCache() { + return mapRegisterCache; + } + + public boolean isMapRegisterCacheEnabled() { + return mapRegisterCacheEnabled; + } + + public long getMapRegisterCacheTimeout() { + return mapRegisterCacheTimeout; } }