package org.opendaylight.lispflowmapping.southbound;
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
+import static io.netty.buffer.Unpooled.wrappedBuffer;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.DatagramPacket;
+import io.netty.channel.socket.nio.NioDatagramChannel;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
-
-import org.apache.commons.lang3.exception.ExceptionUtils;
+import java.util.concurrent.ThreadFactory;
import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
import org.opendaylight.lispflowmapping.lisp.type.LispMessage;
-import org.opendaylight.lispflowmapping.southbound.lisp.ILispSouthboundService;
-import org.opendaylight.lispflowmapping.southbound.lisp.LispSouthboundService;
-import org.opendaylight.lispflowmapping.southbound.lisp.LispXtrSouthboundService;
+import org.opendaylight.lispflowmapping.southbound.lisp.LispSouthboundHandler;
+import org.opendaylight.lispflowmapping.southbound.lisp.LispXtrSouthboundHandler;
import org.opendaylight.lispflowmapping.type.sbplugin.IConfigLispSouthboundPlugin;
import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.MessageType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.transport.address.TransportAddress;
protected static final Logger LOG = LoggerFactory.getLogger(LispSouthboundPlugin.class);
private static Object startLock = new Object();
- private LispIoThread lispThread;
- private LispIoThread xtrThread;
- private LispSouthboundService lispSouthboundService;
- private LispXtrSouthboundService lispXtrSouthboundService;
+ private LispSouthboundHandler lispSouthboundHandler;
+ private LispXtrSouthboundHandler lispXtrSouthboundHandler;
private NotificationPublishService notificationPublishService;
private RpcProviderRegistry rpcRegistry;
private BindingAwareBroker broker;
- private volatile DatagramSocket socket = null;
- private volatile String bindingAddress = null;
+ private NioDatagramChannel channel;
+ private volatile String bindingAddress = "0.0.0.0";
private volatile int xtrPort = LispMessage.XTR_PORT_NUM;
private volatile boolean listenOnXtrPort = false;
private BindingAwareBroker.RpcRegistration<OdlLispSbService> sbRpcRegistration;
- private DatagramSocket xtrSocket;
+ private NioDatagramChannel xtrChannel;
private LispSouthboundStats statistics = new LispSouthboundStats();
+ private ThreadFactory threadFactory = new DefaultThreadFactory("lisp-sb");
+ private EventLoopGroup eventLoopGroup = new NioEventLoopGroup(0, threadFactory);
+
public void init() {
- LOG.info("LISP (RFC6830) southbound plugin is initializing...");
+ LOG.info("LISP (RFC6830) Southbound Plugin is initializing...");
final LispSouthboundRPC sbRpcHandler = new LispSouthboundRPC(this);
sbRpcRegistration = rpcRegistry.addRpcImplementation(OdlLispSbService.class, sbRpcHandler);
broker.registerProvider(this);
synchronized (startLock) {
- lispSouthboundService = new LispSouthboundService(this);
- lispXtrSouthboundService = new LispXtrSouthboundService();
- lispSouthboundService.setNotificationProvider(this.notificationPublishService);
- lispXtrSouthboundService.setNotificationProvider(this.notificationPublishService);
- if (bindingAddress == null) {
- setLispAddress("0.0.0.0");
- }
- LOG.info("LISP (RFC6830) southbound plugin is up!");
- }
- }
+ lispSouthboundHandler = new LispSouthboundHandler(this);
+ lispXtrSouthboundHandler = new LispXtrSouthboundHandler();
+ lispSouthboundHandler.setNotificationProvider(this.notificationPublishService);
+ lispXtrSouthboundHandler.setNotificationProvider(this.notificationPublishService);
- public void setNotificationPublishService(NotificationPublishService notificationService) {
- this.notificationPublishService = notificationService;
- }
+ start();
+ startXtr();
- public void setRpcRegistryDependency(RpcProviderRegistry rpcRegistry) {
- this.rpcRegistry = rpcRegistry;
- }
-
- public void setBindingAwareBroker(BindingAwareBroker broker) {
- this.broker = broker;
+ LOG.info("LISP (RFC6830) Southbound Plugin is up!");
+ }
}
- private void unloadActions() {
- if (lispThread != null) {
- lispThread.stopRunning();
- }
- lispSouthboundService = null;
- lispXtrSouthboundService = null;
- lispThread = null;
- xtrThread = null;
- bindingAddress = null;
- LOG.info("LISP (RFC6830) southbound plugin is down!");
+ private void start() {
try {
- Thread.sleep(1100);
- } catch (InterruptedException e) {
+ Bootstrap bootstrap = new Bootstrap();
+ bootstrap.group(eventLoopGroup);
+ bootstrap.channel(NioDatagramChannel.class);
+ bootstrap.handler(lispSouthboundHandler);
+ channel = (NioDatagramChannel) bootstrap.bind(bindingAddress, LispMessage.PORT_NUM).sync().channel();
+ } catch (Exception e) {
+ LOG.error("Failed to open main socket ", e);
}
}
- private class LispIoThread extends Thread {
- private volatile boolean shouldRun;
- private volatile DatagramSocket threadSocket = null;
- private volatile ILispSouthboundService service;
- private volatile boolean running;
-
- public LispIoThread(DatagramSocket socket, ILispSouthboundService service) {
- super("Lisp Thread");
- this.threadSocket = socket;
- this.service = service;
- shouldRun = true;
- }
-
- @Override
- public void run() {
- running = true;
-
- int lispReceiveTimeout = 1000;
-
- LOG.info("LISP (RFC6830) southbound plugin is running and listening on address: " + bindingAddress
- + " port: " + threadSocket.getLocalPort());
+ private void startXtr() {
+ if (listenOnXtrPort) {
try {
-
- threadSocket.setSoTimeout(lispReceiveTimeout);
- } catch (SocketException e) {
- LOG.error("Cannot open socket on UDP port " + threadSocket.getLocalPort(), e);
- return;
+ Bootstrap xtrBootstrap = new Bootstrap();
+ xtrBootstrap.group(eventLoopGroup);
+ xtrBootstrap.channel(NioDatagramChannel.class);
+ xtrBootstrap.handler(lispXtrSouthboundHandler);
+ xtrChannel = (NioDatagramChannel) xtrBootstrap.bind(bindingAddress, xtrPort).sync().channel();
+ } catch (Exception e) {
+ LOG.error("Failed to open xTR socket ", e);
}
+ }
+ }
- while (shouldRun) {
- byte[] buffer = new byte[4096];
- DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
- try {
- threadSocket.receive(packet);
- LOG.trace("Received a packet!");
- } catch (SocketTimeoutException ste) {
- LOG.trace("Timed out waiting on socket", ste);
- continue;
- } catch (IOException e) {
- LOG.warn("IO Exception while trying to recieve packet", e);
- }
- LOG.trace(String.format("Handling packet from {%s}:{%d} (len={%d})", packet.getAddress()
- .getHostAddress(), packet.getPort(), packet.getLength()));
+ private void stop() {
+ try {
+ channel.close().sync();
+ channel = null;
+ } catch (Exception e) {
+ LOG.error("Failed to close main socket ", e);
+ }
+ }
- try {
- this.service.handlePacket(packet);
- } catch (Exception e) {
- LOG.warn("Error while handling packet", e);
- }
+ private void stopXtr() {
+ if (listenOnXtrPort) {
+ try {
+ xtrChannel.close().sync();
+ xtrChannel = null;
+ } catch (Exception e) {
+ LOG.error("Failed to close xTR socket ", e);
}
-
- threadSocket.close();
- LOG.trace("Socket closed");
- running = false;
}
+ }
- public void stopRunning() {
- shouldRun = false;
- }
+ private void restart() {
+ LOG.info("Reloading");
+ stop();
+ start();
+ }
- public boolean isRunning() {
- return running;
- }
+ private void restartXtr() {
+ LOG.info("Reloading xTR");
+ stopXtr();
+ startXtr();
}
- public static String intToIpv4(int address) {
- return ((address >> 24) & 0xff) + "." + //
- ((address >> 16) & 0xff) + "." + //
- ((address >> 8) & 0xff) + "." + //
- ((address >> 0) & 0xff);
+ public void setNotificationPublishService(NotificationPublishService notificationService) {
+ this.notificationPublishService = notificationService;
}
- private void startIOThread() {
- if (socket != null) {
- while (!socket.isClosed()) {
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- }
- }
- }
- try {
- socket = new DatagramSocket(new InetSocketAddress(bindingAddress, LispMessage.PORT_NUM));
- lispThread = new LispIoThread(socket, lispSouthboundService);
- lispThread.start();
- LOG.info("LISP (RFC6830) southbound plugin is listening for control packets!");
- if (listenOnXtrPort) {
- restartXtrThread();
- }
- } catch (SocketException e) {
- LOG.error("couldn't start socket: {}", ExceptionUtils.getStackTrace(e));
- }
+ public void setRpcRegistryDependency(RpcProviderRegistry rpcRegistry) {
+ this.rpcRegistry = rpcRegistry;
}
- private void restartXtrThread() {
- try {
- stopXtrThread();
- xtrSocket = new DatagramSocket(new InetSocketAddress(bindingAddress, xtrPort));
- xtrThread = new LispIoThread(xtrSocket, lispXtrSouthboundService);
- xtrThread.start();
- LOG.info("xTR southbound plugin is up!");
- } catch (SocketException e) {
- LOG.warn("failed to start xtr thread: {}", ExceptionUtils.getStackTrace(e));
- }
+ public void setBindingAwareBroker(BindingAwareBroker broker) {
+ this.broker = broker;
}
- public void handleSerializedLispBuffer(TransportAddress address, ByteBuffer outBuffer, MessageType packetType) {
- DatagramPacket packet = new DatagramPacket(outBuffer.array(), outBuffer.limit());
- packet.setPort(address.getPort().getValue());
+ private void unloadActions() {
+ lispSouthboundHandler = null;
+ lispXtrSouthboundHandler = null;
+ bindingAddress = "0.0.0.0";
+
+ stop();
+ stopXtr();
+
+ LOG.info("LISP (RFC6830) Southbound Plugin is down!");
+ }
+
+ public void handleSerializedLispBuffer(TransportAddress address, ByteBuffer outBuffer,
+ final MessageType packetType) {
InetAddress ip = InetAddresses.forString(new String(address.getIpAddress().getValue()));
- packet.setAddress(ip);
- try {
- if (LOG.isDebugEnabled()) {
- LOG.trace("Sending " + packetType + " on port " + address.getPort().getValue() + " to address: " + ip);
- }
- socket.send(packet);
- this.statistics.incrementTx(packetType.getIntValue());
- } catch (IOException e) {
- LOG.warn("Failed to send " + packetType, e);
- this.statistics.incrementTxErrors();
+ InetSocketAddress recipient = new InetSocketAddress(ip, address.getPort().getValue());
+ // the wrappedBuffer() method doesn't copy data, so this conversion shouldn't hurt performance
+ ByteBuf data = wrappedBuffer(outBuffer.array());
+ DatagramPacket packet = new DatagramPacket(data, recipient);
+ LOG.debug("Sending {} on port {} to address: {}", packetType, address.getPort().getValue(), ip);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Buffer:\n{}", ByteBufUtil.prettyHexDump(data));
}
+ channel.write(packet).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) {
+ if (future.isSuccess()) {
+ LOG.trace("Success");
+ statistics.incrementTx(packetType.getIntValue());
+ } else {
+ LOG.warn("Failed to send packet");
+ statistics.incrementTxErrors();
+ }
+ }
+ });
+ channel.flush();
}
public LispSouthboundStats getStats() {
return statistics;
}
+ @Override
public void setLispAddress(String address) {
synchronized (startLock) {
- if (bindingAddress != null && bindingAddress.equals(address)) {
- LOG.trace("configured lisp binding address didn't change.");
+ if (bindingAddress.equals(address)) {
+ LOG.debug("Configured LISP binding address didn't change.");
} else {
- String action = (bindingAddress == null ? "Setting" : "Resetting");
- LOG.trace(action + " lisp binding address to: " + address);
+ LOG.debug("Setting LISP binding address to {}", address);
bindingAddress = address;
- if (lispThread != null) {
- lispThread.stopRunning();
- while (lispThread.isRunning()) {
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- }
- }
- }
- stopXtrThread();
- startIOThread();
- }
- }
- }
-
- private void stopXtrThread() {
- if (xtrThread != null) {
- xtrThread.stopRunning();
- while (xtrThread.isRunning()) {
try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
+ restart();
+ restartXtr();
+ } catch (Exception e) {
+ LOG.error("Failed to set LISP binding address: ", e);
}
}
}
public void shouldListenOnXtrPort(boolean shouldListenOnXtrPort) {
listenOnXtrPort = shouldListenOnXtrPort;
if (listenOnXtrPort) {
- LOG.debug("restarting xtr thread");
- restartXtrThread();
+ restartXtr();
} else {
- LOG.debug("terminating thread");
- stopXtrThread();
+ LOG.info("Shutting down xTR");
+ stopXtr();
}
}
public void setXtrPort(int port) {
this.xtrPort = port;
if (listenOnXtrPort) {
- restartXtrThread();
+ restartXtr();
}
}
@Override
public void close() throws Exception {
unloadActions();
+ eventLoopGroup.shutdownGracefully();
sbRpcRegistration.close();
}
package org.opendaylight.lispflowmapping.southbound.lisp;
-import java.net.DatagramPacket;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.socket.DatagramPacket;
+
import java.net.InetAddress;
import java.nio.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class LispSouthboundService implements ILispSouthboundService {
+@ChannelHandler.Sharable
+public class LispSouthboundHandler extends SimpleChannelInboundHandler<DatagramPacket> implements ILispSouthboundService {
private NotificationPublishService notificationPublishService;
- protected static final Logger LOG = LoggerFactory.getLogger(LispSouthboundService.class);
+ protected static final Logger LOG = LoggerFactory.getLogger(LispSouthboundHandler.class);
private final LispSouthboundPlugin lispSbPlugin;
private LispSouthboundStats lispSbStats = null;
- public LispSouthboundService(LispSouthboundPlugin lispSbPlugin) {
+ public LispSouthboundHandler(LispSouthboundPlugin lispSbPlugin) {
this.lispSbPlugin = lispSbPlugin;
if (lispSbPlugin != null) {
this.lispSbStats = lispSbPlugin.getStats();
this.notificationPublishService = nps;
}
- public void handlePacket(DatagramPacket packet) {
- ByteBuffer inBuffer = ByteBuffer.wrap(packet.getData(), 0, packet.getLength());
+ public void handlePacket(DatagramPacket msg) {
+ ByteBuffer inBuffer = msg.content().nioBuffer();
int type = ByteUtil.getUnsignedByte(inBuffer, LispMessage.Pos.TYPE) >> 4;
handleStats(type);
Object lispType = MessageType.forValue(type);
if (lispType == MessageType.EncapsulatedControlMessage) {
LOG.trace("Received packet of type Encapsulated Control Message");
- handleEncapsulatedControlMessage(inBuffer, packet.getAddress());
+ handleEncapsulatedControlMessage(inBuffer, msg.sender().getAddress());
} else if (lispType == MessageType.MapRequest) {
LOG.trace("Received packet of type Map-Request");
- handleMapRequest(inBuffer, packet.getPort());
+ handleMapRequest(inBuffer, msg.sender().getPort());
} else if (lispType == MessageType.MapRegister) {
LOG.trace("Received packet of type Map-Register");
- handleMapRegister(inBuffer, packet.getAddress(), packet.getPort());
+ handleMapRegister(inBuffer, msg.sender().getAddress(), msg.sender().getPort());
} else if (lispType == MessageType.MapNotify) {
LOG.trace("Received packet of type Map-Notify");
- handleMapNotify(inBuffer, packet.getAddress(), packet.getPort());
+ handleMapNotify(inBuffer, msg.sender().getAddress(), msg.sender().getPort());
} else if (lispType == MessageType.MapReply) {
LOG.trace("Received packet of type Map-Reply");
- handleMapReply(inBuffer, packet.getAddress(), packet.getPort());
+ handleMapReply(inBuffer, msg.sender().getAddress(), msg.sender().getPort());
} else {
LOG.warn("Received unknown LISP control packet (type " + ((lispType != null) ? lispType : type) + ")");
- LOG.trace("Buffer: " + ByteUtil.bytesToHex(packet.getData(), packet.getLength()));
}
}
}
}
}
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Received UDP packet from {}:{} with content:\n{}", msg.sender().getHostString(),
+ msg.sender().getPort(), ByteBufUtil.prettyHexDump(msg.content()));
+ }
+ handlePacket(msg);
+ }
+
+ @Override
+ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+ ctx.flush();
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ LOG.error("Error on channel: " + cause, cause);
+ }
}
package org.opendaylight.lispflowmapping.southbound.lisp;
+import static io.netty.buffer.Unpooled.wrappedBuffer;
+import io.netty.channel.socket.DatagramPacket;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import java.net.DatagramPacket;
+import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
public class LispSouthboundServiceTest extends BaseTestCase {
- private LispSouthboundService testedLispService;
+ private LispSouthboundHandler testedLispService;
private NotificationPublishService nps;
private byte[] mapRequestPacket;
private byte[] mapRegisterPacket;
super.before();
// mapResolver = context.mock(IMapResolver.class);
// mapServer = context.mock(IMapServer.class);
- testedLispService = new LispSouthboundService(null);
+ testedLispService = new LispSouthboundHandler(null);
nps = context.mock(NotificationPublishService.class);
testedLispService.setNotificationProvider(nps);
lispNotificationSaver = new ValueSaverAction<Notification>();
System.arraycopy(mapRegisterPacket, 0, extraDataPacket, 0, mapRegisterPacket.length);
stubMapRegister(true);
- DatagramPacket dp = new DatagramPacket(extraDataPacket, extraDataPacket.length);
- dp.setLength(mapRegisterPacket.length);
+ DatagramPacket dp = new DatagramPacket(wrappedBuffer(extraDataPacket), new InetSocketAddress(0), new InetSocketAddress(0));
testedLispService.handlePacket(dp);
// Check map register fields.
// XXX: test
// byte[] notifyResult = testedLispService.handlePacket(dp).getData();
- byte[] notifyResult = lastMapNotifyPacket().getData();
+ byte[] notifyResult = lastMapNotifyPacket().content().array();
assertEquals(mapRegisterPacket.length, notifyResult.length);
}
private DatagramPacket lastMapReplyPacket() {
ByteBuffer serialize = MapReplySerializer.getInstance().serialize(mapReplyBuilder.build());
- return new DatagramPacket(serialize.array(), serialize.array().length);
+ return new DatagramPacket(wrappedBuffer(serialize), new InetSocketAddress(0), new InetSocketAddress(0));
}
private DatagramPacket lastMapNotifyPacket() {
mapNotifyBuilder.setKeyId((short) 0);
mapNotifyBuilder.setAuthenticationData(new byte[0]);
ByteBuffer serialize = MapNotifySerializer.getInstance().serialize(mapNotifyBuilder.build());
- return new DatagramPacket(serialize.array(), serialize.array().length);
+ return new DatagramPacket(wrappedBuffer(serialize), new InetSocketAddress(0), new InetSocketAddress(0));
}
@Test
stubMapRegister(true);
DatagramPacket notifyPacket = handleMapRegisterPacket(mapRegisterPacket);
- assertEquals(LispMessage.PORT_NUM, notifyPacket.getPort());
+ assertEquals(LispMessage.PORT_NUM, notifyPacket.recipient().getPort());
}
@Test
// ret(mapReply);
DatagramPacket replyPacket = handleMapRequestPacket(mapRequestPacket);
- assertEquals(4342, replyPacket.getPort());
+ assertEquals(4342, replyPacket.recipient().getPort());
}
@Test
public void mapReply__UseEncapsulatedUdpPort() throws Exception {
stubHandleRequest();
- assertEquals(LispMessage.PORT_NUM, handleMapRequestPacket(mapRequestPacket).getPort());
+ assertEquals(LispMessage.PORT_NUM, handleMapRequestPacket(mapRequestPacket).recipient().getPort());
}
@Test
private byte[] handleMapRequestAsByteArray(byte[] inPacket) {
handleMapRequestPacket(inPacket);
- return lastMapReplyPacket().getData();
+ return lastMapReplyPacket().content().array();
}
private byte[] handleMapRegisterAsByteArray(byte[] inPacket) {
handleMapRegisterPacket(inPacket);
- return lastMapNotifyPacket().getData();
+ return lastMapNotifyPacket().content().array();
}
private DatagramPacket handleMapRequestPacket(byte[] inPacket) {
- DatagramPacket dp = new DatagramPacket(inPacket, inPacket.length);
+ DatagramPacket dp = new DatagramPacket(wrappedBuffer(inPacket), new InetSocketAddress(0), new InetSocketAddress(0));
// Unless we explicitly set the source port, it will be -1, which breaks some tests
// This is till not the real port number, but it's better
- dp.setPort(LispMessage.PORT_NUM);
+ //dp.setPort(LispMessage.PORT_NUM);
testedLispService.handlePacket(dp);
return lastMapReplyPacket();
}
private DatagramPacket handleMapRegisterPacket(byte[] inPacket) {
- DatagramPacket dp = new DatagramPacket(inPacket, inPacket.length);
+ DatagramPacket dp = new DatagramPacket(wrappedBuffer(inPacket), new InetSocketAddress(0), new InetSocketAddress(0));
// Unless we explicitly set the source port, it will be -1, which breaks some tests
// This is till not the real port number, but it's better
- dp.setPort(LispMessage.PORT_NUM);
+ //dp.setPort(LispMessage.PORT_NUM);
testedLispService.handlePacket(dp);
if (mapNotifyBuilder == null) {
return null;
private DatagramPacket handlePacket(byte[] inPacket) {
// TODO get from mock
- testedLispService.handlePacket(new DatagramPacket(inPacket, inPacket.length));
+ testedLispService.handlePacket(new DatagramPacket(wrappedBuffer(inPacket), new InetSocketAddress(0), new InetSocketAddress(0)));
return null;
}