package org.opendaylight.lispflowmapping.southbound;
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
+import static io.netty.buffer.Unpooled.wrappedBuffer;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.DatagramPacket;
+import io.netty.channel.socket.nio.NioDatagramChannel;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
-
-import org.apache.commons.lang3.exception.ExceptionUtils;
+import java.util.concurrent.ThreadFactory;
import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
import org.opendaylight.lispflowmapping.lisp.type.LispMessage;
-import org.opendaylight.lispflowmapping.southbound.lisp.ILispSouthboundService;
-import org.opendaylight.lispflowmapping.southbound.lisp.LispSouthboundService;
-import org.opendaylight.lispflowmapping.southbound.lisp.LispXtrSouthboundService;
+import org.opendaylight.lispflowmapping.southbound.lisp.LispSouthboundHandler;
+import org.opendaylight.lispflowmapping.southbound.lisp.LispXtrSouthboundHandler;
import org.opendaylight.lispflowmapping.type.sbplugin.IConfigLispSouthboundPlugin;
import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.MessageType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.transport.address.TransportAddress;
protected static final Logger LOG = LoggerFactory.getLogger(LispSouthboundPlugin.class);
private static Object startLock = new Object();
- private LispIoThread lispThread;
- private LispIoThread xtrThread;
- private LispSouthboundService lispSouthboundService;
- private LispXtrSouthboundService lispXtrSouthboundService;
+ private LispSouthboundHandler lispSouthboundHandler;
+ private LispXtrSouthboundHandler lispXtrSouthboundHandler;
private NotificationPublishService notificationPublishService;
private RpcProviderRegistry rpcRegistry;
private BindingAwareBroker broker;
- private volatile DatagramSocket socket = null;
- private volatile String bindingAddress = null;
+ private NioDatagramChannel channel;
+ private volatile String bindingAddress = "0.0.0.0";
private volatile int xtrPort = LispMessage.XTR_PORT_NUM;
private volatile boolean listenOnXtrPort = false;
private BindingAwareBroker.RpcRegistration<OdlLispSbService> sbRpcRegistration;
- private DatagramSocket xtrSocket;
+ private NioDatagramChannel xtrChannel;
private LispSouthboundStats statistics = new LispSouthboundStats();
+ private ThreadFactory threadFactory = new DefaultThreadFactory("lisp-sb");
+ private EventLoopGroup eventLoopGroup = new NioEventLoopGroup(0, threadFactory);
+
public void init() {
- LOG.info("LISP (RFC6830) southbound plugin is initializing...");
+ LOG.info("LISP (RFC6830) Southbound Plugin is initializing...");
final LispSouthboundRPC sbRpcHandler = new LispSouthboundRPC(this);
sbRpcRegistration = rpcRegistry.addRpcImplementation(OdlLispSbService.class, sbRpcHandler);
broker.registerProvider(this);
synchronized (startLock) {
- lispSouthboundService = new LispSouthboundService(this);
- lispXtrSouthboundService = new LispXtrSouthboundService();
- lispSouthboundService.setNotificationProvider(this.notificationPublishService);
- lispXtrSouthboundService.setNotificationProvider(this.notificationPublishService);
- if (bindingAddress == null) {
- setLispAddress("0.0.0.0");
- }
- LOG.info("LISP (RFC6830) southbound plugin is up!");
- }
- }
+ lispSouthboundHandler = new LispSouthboundHandler(this);
+ lispXtrSouthboundHandler = new LispXtrSouthboundHandler();
+ lispSouthboundHandler.setNotificationProvider(this.notificationPublishService);
+ lispXtrSouthboundHandler.setNotificationProvider(this.notificationPublishService);
- public void setNotificationPublishService(NotificationPublishService notificationService) {
- this.notificationPublishService = notificationService;
- }
+ start();
+ startXtr();
- public void setRpcRegistryDependency(RpcProviderRegistry rpcRegistry) {
- this.rpcRegistry = rpcRegistry;
- }
-
- public void setBindingAwareBroker(BindingAwareBroker broker) {
- this.broker = broker;
+ LOG.info("LISP (RFC6830) Southbound Plugin is up!");
+ }
}
- private void unloadActions() {
- if (lispThread != null) {
- lispThread.stopRunning();
- }
- lispSouthboundService = null;
- lispXtrSouthboundService = null;
- lispThread = null;
- xtrThread = null;
- bindingAddress = null;
- LOG.info("LISP (RFC6830) southbound plugin is down!");
+ private void start() {
try {
- Thread.sleep(1100);
- } catch (InterruptedException e) {
+ Bootstrap bootstrap = new Bootstrap();
+ bootstrap.group(eventLoopGroup);
+ bootstrap.channel(NioDatagramChannel.class);
+ bootstrap.handler(lispSouthboundHandler);
+ channel = (NioDatagramChannel) bootstrap.bind(bindingAddress, LispMessage.PORT_NUM).sync().channel();
+ } catch (Exception e) {
+ LOG.error("Failed to open main socket ", e);
}
}
- private class LispIoThread extends Thread {
- private volatile boolean shouldRun;
- private volatile DatagramSocket threadSocket = null;
- private volatile ILispSouthboundService service;
- private volatile boolean running;
-
- public LispIoThread(DatagramSocket socket, ILispSouthboundService service) {
- super("Lisp Thread");
- this.threadSocket = socket;
- this.service = service;
- shouldRun = true;
- }
-
- @Override
- public void run() {
- running = true;
-
- int lispReceiveTimeout = 1000;
-
- LOG.info("LISP (RFC6830) southbound plugin is running and listening on address: " + bindingAddress
- + " port: " + threadSocket.getLocalPort());
+ private void startXtr() {
+ if (listenOnXtrPort) {
try {
-
- threadSocket.setSoTimeout(lispReceiveTimeout);
- } catch (SocketException e) {
- LOG.error("Cannot open socket on UDP port " + threadSocket.getLocalPort(), e);
- return;
+ Bootstrap xtrBootstrap = new Bootstrap();
+ xtrBootstrap.group(eventLoopGroup);
+ xtrBootstrap.channel(NioDatagramChannel.class);
+ xtrBootstrap.handler(lispXtrSouthboundHandler);
+ xtrChannel = (NioDatagramChannel) xtrBootstrap.bind(bindingAddress, xtrPort).sync().channel();
+ } catch (Exception e) {
+ LOG.error("Failed to open xTR socket ", e);
}
+ }
+ }
- while (shouldRun) {
- byte[] buffer = new byte[4096];
- DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
- try {
- threadSocket.receive(packet);
- LOG.trace("Received a packet!");
- } catch (SocketTimeoutException ste) {
- LOG.trace("Timed out waiting on socket", ste);
- continue;
- } catch (IOException e) {
- LOG.warn("IO Exception while trying to recieve packet", e);
- }
- LOG.trace(String.format("Handling packet from {%s}:{%d} (len={%d})", packet.getAddress()
- .getHostAddress(), packet.getPort(), packet.getLength()));
+ private void stop() {
+ try {
+ channel.close().sync();
+ channel = null;
+ } catch (Exception e) {
+ LOG.error("Failed to close main socket ", e);
+ }
+ }
- try {
- this.service.handlePacket(packet);
- } catch (Exception e) {
- LOG.warn("Error while handling packet", e);
- }
+ private void stopXtr() {
+ if (listenOnXtrPort) {
+ try {
+ xtrChannel.close().sync();
+ xtrChannel = null;
+ } catch (Exception e) {
+ LOG.error("Failed to close xTR socket ", e);
}
-
- threadSocket.close();
- LOG.trace("Socket closed");
- running = false;
}
+ }
- public void stopRunning() {
- shouldRun = false;
- }
+ private void restart() {
+ LOG.info("Reloading");
+ stop();
+ start();
+ }
- public boolean isRunning() {
- return running;
- }
+ private void restartXtr() {
+ LOG.info("Reloading xTR");
+ stopXtr();
+ startXtr();
}
- public static String intToIpv4(int address) {
- return ((address >> 24) & 0xff) + "." + //
- ((address >> 16) & 0xff) + "." + //
- ((address >> 8) & 0xff) + "." + //
- ((address >> 0) & 0xff);
+ public void setNotificationPublishService(NotificationPublishService notificationService) {
+ this.notificationPublishService = notificationService;
}
- private void startIOThread() {
- if (socket != null) {
- while (!socket.isClosed()) {
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- }
- }
- }
- try {
- socket = new DatagramSocket(new InetSocketAddress(bindingAddress, LispMessage.PORT_NUM));
- lispThread = new LispIoThread(socket, lispSouthboundService);
- lispThread.start();
- LOG.info("LISP (RFC6830) southbound plugin is listening for control packets!");
- if (listenOnXtrPort) {
- restartXtrThread();
- }
- } catch (SocketException e) {
- LOG.error("couldn't start socket: {}", ExceptionUtils.getStackTrace(e));
- }
+ public void setRpcRegistryDependency(RpcProviderRegistry rpcRegistry) {
+ this.rpcRegistry = rpcRegistry;
}
- private void restartXtrThread() {
- try {
- stopXtrThread();
- xtrSocket = new DatagramSocket(new InetSocketAddress(bindingAddress, xtrPort));
- xtrThread = new LispIoThread(xtrSocket, lispXtrSouthboundService);
- xtrThread.start();
- LOG.info("xTR southbound plugin is up!");
- } catch (SocketException e) {
- LOG.warn("failed to start xtr thread: {}", ExceptionUtils.getStackTrace(e));
- }
+ public void setBindingAwareBroker(BindingAwareBroker broker) {
+ this.broker = broker;
}
- public void handleSerializedLispBuffer(TransportAddress address, ByteBuffer outBuffer, MessageType packetType) {
- DatagramPacket packet = new DatagramPacket(outBuffer.array(), outBuffer.limit());
- packet.setPort(address.getPort().getValue());
+ private void unloadActions() {
+ lispSouthboundHandler = null;
+ lispXtrSouthboundHandler = null;
+ bindingAddress = "0.0.0.0";
+
+ stop();
+ stopXtr();
+
+ LOG.info("LISP (RFC6830) Southbound Plugin is down!");
+ }
+
+ public void handleSerializedLispBuffer(TransportAddress address, ByteBuffer outBuffer,
+ final MessageType packetType) {
InetAddress ip = InetAddresses.forString(new String(address.getIpAddress().getValue()));
- packet.setAddress(ip);
- try {
- if (LOG.isDebugEnabled()) {
- LOG.trace("Sending " + packetType + " on port " + address.getPort().getValue() + " to address: " + ip);
- }
- socket.send(packet);
- this.statistics.incrementTx(packetType.getIntValue());
- } catch (IOException e) {
- LOG.warn("Failed to send " + packetType, e);
- this.statistics.incrementTxErrors();
+ InetSocketAddress recipient = new InetSocketAddress(ip, address.getPort().getValue());
+ // the wrappedBuffer() method doesn't copy data, so this conversion shouldn't hurt performance
+ ByteBuf data = wrappedBuffer(outBuffer.array());
+ DatagramPacket packet = new DatagramPacket(data, recipient);
+ LOG.debug("Sending {} on port {} to address: {}", packetType, address.getPort().getValue(), ip);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Buffer:\n{}", ByteBufUtil.prettyHexDump(data));
}
+ channel.write(packet).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) {
+ if (future.isSuccess()) {
+ LOG.trace("Success");
+ statistics.incrementTx(packetType.getIntValue());
+ } else {
+ LOG.warn("Failed to send packet");
+ statistics.incrementTxErrors();
+ }
+ }
+ });
+ channel.flush();
}
public LispSouthboundStats getStats() {
return statistics;
}
+ @Override
public void setLispAddress(String address) {
synchronized (startLock) {
- if (bindingAddress != null && bindingAddress.equals(address)) {
- LOG.trace("configured lisp binding address didn't change.");
+ if (bindingAddress.equals(address)) {
+ LOG.debug("Configured LISP binding address didn't change.");
} else {
- String action = (bindingAddress == null ? "Setting" : "Resetting");
- LOG.trace(action + " lisp binding address to: " + address);
+ LOG.debug("Setting LISP binding address to {}", address);
bindingAddress = address;
- if (lispThread != null) {
- lispThread.stopRunning();
- while (lispThread.isRunning()) {
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- }
- }
- }
- stopXtrThread();
- startIOThread();
- }
- }
- }
-
- private void stopXtrThread() {
- if (xtrThread != null) {
- xtrThread.stopRunning();
- while (xtrThread.isRunning()) {
try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
+ restart();
+ restartXtr();
+ } catch (Exception e) {
+ LOG.error("Failed to set LISP binding address: ", e);
}
}
}
public void shouldListenOnXtrPort(boolean shouldListenOnXtrPort) {
listenOnXtrPort = shouldListenOnXtrPort;
if (listenOnXtrPort) {
- LOG.debug("restarting xtr thread");
- restartXtrThread();
+ restartXtr();
} else {
- LOG.debug("terminating thread");
- stopXtrThread();
+ LOG.info("Shutting down xTR");
+ stopXtr();
}
}
public void setXtrPort(int port) {
this.xtrPort = port;
if (listenOnXtrPort) {
- restartXtrThread();
+ restartXtr();
}
}
@Override
public void close() throws Exception {
unloadActions();
+ eventLoopGroup.shutdownGracefully();
sbRpcRegistration.close();
}