/*
- * Copyright (c) 2013 Contextream, Inc. and others. All rights reserved.
+ * Copyright (c) 2014 Contextream, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
-import java.util.concurrent.Future;
-import org.eclipse.osgi.framework.console.CommandProvider;
-import org.opendaylight.controller.sal.binding.api.AbstractBindingAwareProvider;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
-import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
-import org.opendaylight.lispflowmapping.implementation.serializer.LispMessage;
-import org.opendaylight.lispflowmapping.implementation.serializer.MapNotifySerializer;
-import org.opendaylight.lispflowmapping.implementation.serializer.MapReplySerializer;
+import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.lispflowmapping.lisp.type.LispMessage;
+import org.opendaylight.lispflowmapping.southbound.lisp.ILispSouthboundService;
import org.opendaylight.lispflowmapping.southbound.lisp.LispSouthboundService;
-import org.opendaylight.lispflowmapping.type.sbplugin.IConfigLispPlugin;
-import org.opendaylight.lispflowmapping.type.sbplugin.ILispSouthboundPlugin;
-import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.MapNotify;
-import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.MapReply;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.FrameworkUtil;
+import org.opendaylight.lispflowmapping.southbound.lisp.LispXtrSouthboundService;
+import org.opendaylight.lispflowmapping.type.sbplugin.IConfigLispSouthboundPlugin;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.MessageType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.transportaddress.TransportAddress;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.sb.rev150904.LispSbService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class LispSouthboundPlugin extends AbstractBindingAwareProvider implements ILispSouthboundPlugin, IConfigLispPlugin, CommandProvider {
- protected static final Logger logger = LoggerFactory.getLogger(LispSouthboundPlugin.class);
+import com.google.common.net.InetAddresses;
+
+public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCloseable, BindingAwareProvider {
+ protected static final Logger LOG = LoggerFactory.getLogger(LispSouthboundPlugin.class);
private static Object startLock = new Object();
- private LispIoThread thread;
+ private LispIoThread lispThread;
+ private LispIoThread xtrThread;
private LispSouthboundService lispSouthboundService;
+ private LispXtrSouthboundService lispXtrSouthboundService;
+ private NotificationPublishService notificationPublishService;
+ private RpcProviderRegistry rpcRegistry;
+ private BindingAwareBroker broker;
private volatile DatagramSocket socket = null;
- private final String MAP_NOTIFY = "MapNotify";
- private final String MAP_REPlY = "MapReply";
private volatile String bindingAddress = null;
- private volatile boolean stillRunning = false;
- private volatile boolean alreadyInit = false;
+ private volatile int xtrPort = LispMessage.XTR_PORT_NUM;
+ private volatile boolean listenOnXtrPort = false;
+ private BindingAwareBroker.RpcRegistration<LispSbService> sbRpcRegistration;
+ private DatagramSocket xtrSocket;
+ private LispSouthboundStats statistics = new LispSouthboundStats();
+
+ public void init() {
+ LOG.info("LISP (RFC6830) southbound plugin is initializing...");
+ final LispSouthboundRPC sbRpcHandler = new LispSouthboundRPC(this);
- private void registerWithOSGIConsole() {
- BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass()).getBundleContext();
- bundleContext.registerService(CommandProvider.class.getName(), this, null);
- bundleContext.registerService(IConfigLispPlugin.class.getName(), this, null);
+ sbRpcRegistration = rpcRegistry.addRpcImplementation(LispSbService.class, sbRpcHandler);
+ broker.registerProvider(this);
+
+ synchronized (startLock) {
+ lispSouthboundService = new LispSouthboundService(this);
+ lispXtrSouthboundService = new LispXtrSouthboundService();
+ lispSouthboundService.setNotificationProvider(this.notificationPublishService);
+ lispXtrSouthboundService.setNotificationProvider(this.notificationPublishService);
+ if (bindingAddress == null) {
+ setLispAddress("0.0.0.0");
+ }
+ LOG.info("LISP (RFC6830) southbound plugin is up!");
+ }
}
- protected void stopImpl(BundleContext context) {
- unloadActions();
+ public void setNotificationPublishService(NotificationPublishService notificationService) {
+ this.notificationPublishService = notificationService;
+ }
+
+ public void setRpcRegistryDependency(RpcProviderRegistry rpcRegistry) {
+ this.rpcRegistry = rpcRegistry;
+ }
+
+ public void setBindingAwareBroker(BindingAwareBroker broker) {
+ this.broker = broker;
}
private void unloadActions() {
- if (thread != null) {
- thread.stopRunning();
+ if (lispThread != null) {
+ lispThread.stopRunning();
}
lispSouthboundService = null;
- thread = null;
- logger.info("LISP (RFC6830) Mapping Service is down!");
+ lispXtrSouthboundService = null;
+ lispThread = null;
+ xtrThread = null;
+ bindingAddress = null;
+ LOG.info("LISP (RFC6830) southbound plugin is down!");
try {
Thread.sleep(1100);
} catch (InterruptedException e) {
}
}
- public void destroy() {
- unloadActions();
- }
-
private class LispIoThread extends Thread {
+ private volatile boolean shouldRun;
+ private volatile DatagramSocket threadSocket = null;
+ private volatile ILispSouthboundService service;
private volatile boolean running;
- public LispIoThread() {
+ public LispIoThread(DatagramSocket socket, ILispSouthboundService service) {
super("Lisp Thread");
- running = true;
+ this.threadSocket = socket;
+ this.service = service;
+ shouldRun = true;
}
@Override
public void run() {
- stillRunning = true;
+ running = true;
- int lispPortNumber = LispMessage.PORT_NUM;
int lispReceiveTimeout = 1000;
- logger.info("LISP (RFC6830) Mapping Service is running and listening on " + bindingAddress);
+ LOG.info("LISP (RFC6830) southbound plugin is running and listening on address: " + bindingAddress
+ + " port: " + threadSocket.getLocalPort());
try {
- socket = new DatagramSocket(new InetSocketAddress(bindingAddress, lispPortNumber));
- socket.setSoTimeout(lispReceiveTimeout);
+
+ threadSocket.setSoTimeout(lispReceiveTimeout);
} catch (SocketException e) {
- logger.warn("Cannot open socket on UDP port " + lispPortNumber, e);
+ LOG.error("Cannot open socket on UDP port " + threadSocket.getLocalPort(), e);
return;
}
- while (running) {
+ while (shouldRun) {
byte[] buffer = new byte[4096];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
try {
- socket.receive(packet);
- logger.debug("Received a packet!");
+ threadSocket.receive(packet);
+ LOG.trace("Received a packet!");
} catch (SocketTimeoutException ste) {
continue;
} catch (IOException e) {
- logger.error("IO Exception while trying to recieve packet", e);
+ LOG.warn("IO Exception while trying to recieve packet", e);
}
- logger.debug("Handling packet from {}:{} (len={})", packet.getAddress().getHostAddress(), packet.getPort(), packet.getLength());
+ LOG.trace(String.format("Handling packet from {%s}:{%d} (len={%d})", packet.getAddress()
+ .getHostAddress(), packet.getPort(), packet.getLength()));
try {
- lispSouthboundService.handlePacket(packet);
- } catch (Throwable t) {
- logger.error("Error while handling packet", t);
+ this.service.handlePacket(packet);
+ } catch (Exception e) {
+ LOG.warn("Error while handling packet", e);
}
}
- socket.close();
- logger.info("Socket closed");
- stillRunning = false;
+ threadSocket.close();
+ LOG.trace("Socket closed");
+ running = false;
}
public void stopRunning() {
- running = false;
+ shouldRun = false;
+ }
+
+ public boolean isRunning() {
+ return running;
}
}
((address >> 0) & 0xff);
}
- public String getHelp() {
- StringBuffer help = new StringBuffer();
- help.append("---LISP Southbound Plugin---\n");
- return help.toString();
- }
-
private void startIOThread() {
- thread = new LispIoThread();
- logger.info("LISP (RFC6830) Mapping Service Southbound Plugin is up!");
- thread.start();
- }
-
- public void onSessionInitiated(ProviderContext session) {
- logger.info("LISP (RFC6830) Mapping Service is up!");
- synchronized (startLock) {
- if (!alreadyInit) {
- alreadyInit = true;
- lispSouthboundService = new LispSouthboundService();
- registerWithOSGIConsole();
- registerRPCs(session);
- logger.debug("Provider Session initialized");
- if (bindingAddress == null) {
- setLispAddress("0.0.0.0");
+ if (socket != null) {
+ while (!socket.isClosed()) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
}
}
-
}
- }
-
- private void registerRPCs(ProviderContext session) {
try {
- lispSouthboundService.setNotificationProvider(session.getSALService(NotificationProviderService.class));
- session.addRpcImplementation(ILispSouthboundPlugin.class, this);
- } catch (Throwable t) {
- logger.error(t.getMessage(), t);
+ socket = new DatagramSocket(new InetSocketAddress(bindingAddress, LispMessage.PORT_NUM));
+ lispThread = new LispIoThread(socket, lispSouthboundService);
+ lispThread.start();
+ LOG.info("LISP (RFC6830) southbound plugin is listening for control packets!");
+ if (listenOnXtrPort) {
+ restartXtrThread();
+ }
+ } catch (SocketException e) {
+ LOG.error("couldn't start socket: {}", ExceptionUtils.getStackTrace(e));
}
}
- public Future<RpcResult<Void>> handleMapNotify(MapNotify mapNotify, InetAddress address) {
- logger.trace("handleMapNotify called!!");
- if (mapNotify != null) {
- ByteBuffer outBuffer = MapNotifySerializer.getInstance().serialize(mapNotify);
- handleSerializedLispBuffer(address, outBuffer, MAP_NOTIFY);
- } else {
- logger.debug("MapNotify was null");
+ private void restartXtrThread() {
+ try {
+ stopXtrThread();
+ xtrSocket = new DatagramSocket(new InetSocketAddress(bindingAddress, xtrPort));
+ xtrThread = new LispIoThread(xtrSocket, lispXtrSouthboundService);
+ xtrThread.start();
+ LOG.info("xTR southbound plugin is up!");
+ } catch (SocketException e) {
+ LOG.warn("failed to start xtr thread: {}", ExceptionUtils.getStackTrace(e));
}
- return null;
}
- private void handleSerializedLispBuffer(InetAddress address, ByteBuffer outBuffer, String packetType) {
+ public void handleSerializedLispBuffer(TransportAddress address, ByteBuffer outBuffer, MessageType packetType) {
DatagramPacket packet = new DatagramPacket(outBuffer.array(), outBuffer.limit());
- packet.setPort(LispMessage.PORT_NUM);
- packet.setAddress(address);
+ packet.setPort(address.getPort().getValue());
+ InetAddress ip = InetAddresses.forString(new String(address.getIpAddress().getValue()));
+ packet.setAddress(ip);
try {
- if (logger.isDebugEnabled()) {
- logger.debug("Sending " + packetType + " on port " + LispMessage.PORT_NUM + " to address: " + address);
+ if (LOG.isDebugEnabled()) {
+ LOG.trace("Sending " + packetType + " on port " + address.getPort().getValue() + " to address: " + ip);
}
socket.send(packet);
+ this.statistics.incrementTx(packetType.getIntValue());
} catch (IOException e) {
- logger.error("Failed to send " + packetType, e);
+ LOG.warn("Failed to send " + packetType, e);
+ this.statistics.incrementTxErrors();
}
}
- public Future<RpcResult<Void>> handleMapReply(MapReply mapReply, InetAddress address) {
- logger.trace("handleMapReply called!!");
- if (mapReply != null) {
- ByteBuffer outBuffer = MapReplySerializer.getInstance().serialize(mapReply);
- handleSerializedLispBuffer(address, outBuffer, MAP_REPlY);
- } else {
- logger.debug("MapReply was null");
- }
- return null;
+ public LispSouthboundStats getStats() {
+ return statistics;
}
public void setLispAddress(String address) {
synchronized (startLock) {
if (bindingAddress != null && bindingAddress.equals(address)) {
- logger.debug("configured lisp binding address didn't change.");
+ LOG.trace("configured lisp binding address didn't change.");
} else {
String action = (bindingAddress == null ? "Setting" : "Resetting");
- logger.info(action + " lisp binding address to: " + address);
+ LOG.trace(action + " lisp binding address to: " + address);
bindingAddress = address;
- if (thread != null) {
- thread.stopRunning();
- while (stillRunning) {
+ if (lispThread != null) {
+ lispThread.stopRunning();
+ while (lispThread.isRunning()) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
- e.printStackTrace();
}
}
}
+ stopXtrThread();
startIOThread();
}
}
}
+
+ private void stopXtrThread() {
+ if (xtrThread != null) {
+ xtrThread.stopRunning();
+ while (xtrThread.isRunning()) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ }
+
+ @Override
+ public void shouldListenOnXtrPort(boolean shouldListenOnXtrPort) {
+ listenOnXtrPort = shouldListenOnXtrPort;
+ if (listenOnXtrPort) {
+ LOG.debug("restarting xtr thread");
+ restartXtrThread();
+ } else {
+ LOG.debug("terminating thread");
+ stopXtrThread();
+ }
+ }
+
+ @Override
+ public void setXtrPort(int port) {
+ this.xtrPort = port;
+ if (listenOnXtrPort) {
+ restartXtrThread();
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ unloadActions();
+ sbRpcRegistration.close();
+ }
+
+ @Override
+ public void onSessionInitiated(ProviderContext session) {
+ LOG.debug("LispSouthboundPlugin Provider Session Initiated");
+ }
}