/*
- * Copyright (c) 2013 Contextream, Inc. and others. All rights reserved.
+ * Copyright (c) 2014 Contextream, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
package org.opendaylight.lispflowmapping.southbound;
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
+import static io.netty.buffer.Unpooled.wrappedBuffer;
+
+import com.google.common.base.Preconditions;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.DatagramPacket;
+import io.netty.channel.socket.nio.NioDatagramChannel;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
-
-import org.eclipse.osgi.framework.console.CommandProvider;
-import org.opendaylight.lispflowmapping.implementation.serializer.LispMessage;
-import org.opendaylight.lispflowmapping.interfaces.lisp.IFlowMapping;
-import org.opendaylight.lispflowmapping.southbound.lisp.LispSouthboundService;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.FrameworkUtil;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ThreadFactory;
+
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
+import org.opendaylight.lispflowmapping.clustering.ClusterNodeModulSwitcherImpl;
+import org.opendaylight.lispflowmapping.clustering.api.ClusterNodeModuleSwitcher;
+import org.opendaylight.lispflowmapping.lisp.type.LispMessage;
+import org.opendaylight.lispflowmapping.southbound.lisp.LispSouthboundHandler;
+import org.opendaylight.lispflowmapping.southbound.lisp.LispXtrSouthboundHandler;
+import org.opendaylight.lispflowmapping.type.sbplugin.IConfigLispSouthboundPlugin;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.inet.binary.types.rev160303.IpAddressBinary;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.MessageType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.transport.address.TransportAddress;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.sb.rev150904.OdlLispSbService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.lisp.sb.config.rev150517.LispSbConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class LispSouthboundPlugin implements ILispSouthboundPlugin {
- protected static final Logger logger = LoggerFactory.getLogger(LispSouthboundPlugin.class);
+public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCloseable, ClusterNodeModuleSwitcher {
+ protected static final Logger LOG = LoggerFactory.getLogger(LispSouthboundPlugin.class);
+
+ private static Object startLock = new Object();
+ private final ClusterNodeModulSwitcherImpl clusterNodeModulSwitcher;
+ private LispSouthboundHandler lispSouthboundHandler;
+ private LispXtrSouthboundHandler lispXtrSouthboundHandler;
+ private NotificationPublishService notificationPublishService;
+ private NioDatagramChannel channel;
+ private volatile String bindingAddress = "0.0.0.0";
+ private volatile int xtrPort = LispMessage.XTR_PORT_NUM;
+ private volatile boolean listenOnXtrPort = false;
+ private boolean mapRegisterCacheEnabled = true;
+ private RpcRegistration<OdlLispSbService> sbRpcRegistration;
+ private NioDatagramChannel xtrChannel;
+ private LispSouthboundStats statistics = new LispSouthboundStats();
+ private Bootstrap bootstrap = new Bootstrap();
+ private Bootstrap xtrBootstrap = new Bootstrap();
+ private ThreadFactory threadFactory = new DefaultThreadFactory("lisp-sb");
+ private EventLoopGroup eventLoopGroup = new NioEventLoopGroup(0, threadFactory);
+ private DataBroker dataBroker;
+
+ public LispSouthboundPlugin(final DataBroker dataBroker,
+ final NotificationPublishService notificationPublishService,
+ final LispSbConfig lispSbConfig, final EntityOwnershipService entityOwnershipService) {
+ this.dataBroker = dataBroker;
+ this.notificationPublishService = notificationPublishService;
+ this.bindingAddress = lispSbConfig.getBindAddress();
+ this.mapRegisterCacheEnabled = lispSbConfig.isMapRegisterCache();
+ clusterNodeModulSwitcher = new ClusterNodeModulSwitcherImpl(entityOwnershipService);
+ clusterNodeModulSwitcher.setModule(this);
+ }
+
+ public void init() {
+ LOG.info("LISP (RFC6830) Southbound Plugin is initializing...");
+ synchronized (startLock) {
+ lispSouthboundHandler = new LispSouthboundHandler(this);
+ lispSouthboundHandler.setDataBroker(dataBroker);
+ lispSouthboundHandler.setNotificationProvider(this.notificationPublishService);
+ lispSouthboundHandler.setMapRegisterCacheEnabled(mapRegisterCacheEnabled);
+ lispSouthboundHandler.init();
+ lispSouthboundHandler.restoreDaoFromDatastore();
- private LispIoThread thread;
- private LispSouthboundService service;
+ lispXtrSouthboundHandler = new LispXtrSouthboundHandler();
+ lispXtrSouthboundHandler.setNotificationProvider(this.notificationPublishService);
+ bootstrap.group(eventLoopGroup);
+ bootstrap.channel(NioDatagramChannel.class);
+ bootstrap.handler(lispSouthboundHandler);
- void setFlowMappingService(IFlowMapping mappingService) {
- logger.debug("FlowMapping set in LispSouthbound");
- service = new LispSouthboundService(mappingService, mappingService);
- logger.debug("Registering LispIpv4Address");
- logger.debug("Registering LispIpv6Address");
+ xtrBootstrap.group(eventLoopGroup);
+ xtrBootstrap.channel(NioDatagramChannel.class);
+ xtrBootstrap.handler(lispXtrSouthboundHandler);
+
+ start();
+ startXtr();
+
+ LOG.info("LISP (RFC6830) Southbound Plugin is up!");
+ }
+ clusterNodeModulSwitcher.switchModuleByEntityOwnership();
}
- void unsetFlowMappingService(IFlowMapping mappingService) {
- logger.debug("LispDAO was unset in LispMappingService");
- service = null;
+ private void start() {
+ try {
+ channel = (NioDatagramChannel) bootstrap.bind(bindingAddress, LispMessage.PORT_NUM).sync().channel();
+ LOG.debug("Binding LISP UDP listening socket to {}:{}", bindingAddress, LispMessage.PORT_NUM);
+ } catch (Exception e) {
+ LOG.error("Failed to open main socket ", e);
+ }
}
- public void init() {
- logger.debug("LISP (RFC6830) Mapping Service is initialized!");
- thread = new LispIoThread();
+ private void startXtr() {
+ if (listenOnXtrPort) {
+ try {
+ xtrChannel = (NioDatagramChannel) xtrBootstrap.bind(bindingAddress, xtrPort).sync().channel();
+ LOG.debug("Binding LISP xTR UDP listening socket to {}:{}", bindingAddress, xtrPort);
+ } catch (Exception e) {
+ LOG.error("Failed to open xTR socket ", e);
+ }
+ }
}
- public void start() {
- logger.info("LISP (RFC6830) Mapping Service is up!");
- thread.start();
+ private void stop() {
+ try {
+ channel.close().sync();
+ channel = null;
+ } catch (Exception e) {
+ LOG.error("Failed to close main socket ", e);
+ }
+ }
- // OSGI console
- registerWithOSGIConsole();
+ private void stopXtr() {
+ if (listenOnXtrPort) {
+ try {
+ xtrChannel.close().sync();
+ xtrChannel = null;
+ } catch (Exception e) {
+ LOG.error("Failed to close xTR socket ", e);
+ }
+ }
}
- private void registerWithOSGIConsole() {
- BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass()).getBundleContext();
- bundleContext.registerService(CommandProvider.class.getName(), this, null);
+ private void restart() {
+ LOG.info("Reloading");
+ stop();
+ start();
}
- public void stop() {
- thread.stopRunning();
- logger.info("LISP (RFC6830) Mapping Service is down!");
+ private void restartXtr() {
+ LOG.info("Reloading xTR");
+ stopXtr();
+ startXtr();
}
- public void destroy() {
- logger.debug("LISP (RFC6830) Mapping Service is destroyed!");
- thread = null;
- service = null;
+ private void unloadActions() {
+ lispSouthboundHandler = null;
+ lispXtrSouthboundHandler = null;
+
+ stop();
+ stopXtr();
+
+ LOG.info("LISP (RFC6830) Southbound Plugin is down!");
}
- private class LispIoThread extends Thread {
- private boolean running;
+ public void handleSerializedLispBuffer(TransportAddress address, ByteBuffer outBuffer,
+ final MessageType packetType) {
+ InetAddress ip = getInetAddress(address);
+ handleSerializedLispBuffer(ip, outBuffer, packetType, address.getPort().getValue());
+ }
- public LispIoThread() {
- super("Lisp Thread");
- running = true;
+ public void handleSerializedLispBuffer(InetAddress address, ByteBuffer outBuffer,
+ final MessageType packetType, final int portNumber) {
+ InetSocketAddress recipient = new InetSocketAddress(address, portNumber);
+ outBuffer.position(0);
+ ByteBuf data = wrappedBuffer(outBuffer);
+ DatagramPacket packet = new DatagramPacket(data, recipient);
+ LOG.debug("Sending {} on port {} to address: {}", packetType, portNumber, address);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Buffer:\n{}", ByteBufUtil.prettyHexDump(data));
}
-
- @Override
- public void run() {
- DatagramSocket socket;
- int lispPortNumber = LispMessage.PORT_NUM;
- int lispReceiveTimeout = 1000;
- String lispBindAddress = "0.0.0.0";
- logger.info("LISP (RFC6830) Mapping Service is running and listening on " + lispBindAddress);
- try {
- socket = new DatagramSocket(new InetSocketAddress(lispBindAddress, lispPortNumber));
- socket.setSoTimeout(lispReceiveTimeout);
- } catch (SocketException e) {
- logger.warn("Cannot open socket on UDP port " + lispPortNumber, e);
- return;
+ channel.write(packet).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) {
+ if (future.isSuccess()) {
+ LOG.trace("Success");
+ statistics.incrementTx(packetType.getIntValue());
+ } else {
+ LOG.warn("Failed to send packet");
+ statistics.incrementTxErrors();
+ }
}
+ });
+ channel.flush();
+ }
- while (running) {
- byte[] buffer = new byte[4096];
- DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
- try {
- socket.receive(packet);
- } catch (SocketTimeoutException ste) {
- continue;
- } catch (IOException e) {
- // TODO: log
- }
- logger.debug("Handling packet from {}:{} (len={})", packet.getAddress().getHostAddress(), packet.getPort(), packet.getLength());
- try {
- DatagramPacket reply = service.handlePacket(packet);
+ private InetAddress getInetAddress(TransportAddress address) {
+ Preconditions.checkNotNull(address, "TransportAddress must not be null");
+ IpAddressBinary ip = address.getIpAddress();
+ try {
+ if (ip.getIpv4AddressBinary() != null) {
+ return InetAddress.getByAddress(ip.getIpv4AddressBinary().getValue());
+ } else if (ip.getIpv6AddressBinary() != null) {
+ return InetAddress.getByAddress(ip.getIpv6AddressBinary().getValue());
+ }
+ } catch (UnknownHostException e) {
+ LOG.debug("Could not convert TransportAddress {} to InetAddress", address, e);
+ }
+ return null;
+ }
- if (reply == null) {
- continue;
- }
+ public LispSouthboundStats getStats() {
+ return statistics;
+ }
- reply.setAddress(packet.getAddress());
+ @Override
+ public void setLispAddress(String address) {
+ synchronized (startLock) {
+ if (bindingAddress.equals(address)) {
+ LOG.debug("Configured LISP binding address didn't change.");
+ } else {
+ LOG.debug("Setting LISP binding address to {}", address);
+ bindingAddress = address;
+ if (channel != null) {
try {
- logger.debug("sending reply to {}:{} (len={})", reply.getAddress().getHostAddress(), reply.getPort(), reply.getLength());
- socket.send(reply);
- } catch (IOException e) {
- // TODO: log
+ restart();
+ restartXtr();
+ } catch (Exception e) {
+ LOG.error("Failed to set LISP binding address: ", e);
}
- } catch (RuntimeException e) {
- logger.warn("", e);
}
}
+ }
+ }
- socket.close();
+ @Override
+ public void shouldListenOnXtrPort(boolean shouldListenOnXtrPort) {
+ listenOnXtrPort = shouldListenOnXtrPort;
+ if (listenOnXtrPort) {
+ restartXtr();
+ } else {
+ LOG.info("Shutting down xTR");
+ stopXtr();
}
+ }
- public void stopRunning() {
- running = false;
+ @Override
+ public void setXtrPort(int port) {
+ this.xtrPort = port;
+ if (listenOnXtrPort) {
+ restartXtr();
}
}
+ public void setMapRegisterCacheEnabled(final boolean mapRegisterCacheEnabled) {
+ this.mapRegisterCacheEnabled = mapRegisterCacheEnabled;
+ if (mapRegisterCacheEnabled) {
+ LOG.info("Enabling Map-Register cache");
+ } else {
+ LOG.info("Disabling Map-Register cache");
+ }
+ }
- public static String intToIpv4(int address) {
- return ((address >> 24) & 0xff) + "." + //
- ((address >> 16) & 0xff) + "." + //
- ((address >> 8) & 0xff) + "." + //
- ((address >> 0) & 0xff);
+ @Override
+ public void close() throws Exception {
+ eventLoopGroup.shutdownGracefully();
+ sbRpcRegistration.close();
+ lispSouthboundHandler.close();
+ unloadActions();
}
- public String getHelp() {
- StringBuffer help = new StringBuffer();
- help.append("---LISP Southbound Plugin---\n");
- return help.toString();
+ @Override
+ public void stopModule() {
+ if (lispSouthboundHandler != null) {
+ lispSouthboundHandler.setNotificationProvider(null);
+ lispSouthboundHandler.setIsReadFromChannelEnabled(false);
+ }
+ if (lispXtrSouthboundHandler != null) {
+ lispXtrSouthboundHandler.setNotificationProvider(null);
+ }
}
+ @Override
+ public void startModule() {
+ if (lispSouthboundHandler != null) {
+ lispSouthboundHandler.setNotificationProvider(notificationPublishService);
+ lispSouthboundHandler.restoreDaoFromDatastore();
+ lispSouthboundHandler.setIsReadFromChannelEnabled(true);
+ }
+ if (lispXtrSouthboundHandler != null) {
+ lispXtrSouthboundHandler.setNotificationProvider(notificationPublishService);
+ }
+ }
}