/*
- * Copyright (c) 2013 Contextream, Inc. and others. All rights reserved.
+ * Copyright (c) 2014 Contextream, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
import org.opendaylight.lispflowmapping.implementation.serializer.LispMessage;
import org.opendaylight.lispflowmapping.implementation.serializer.MapNotifySerializer;
import org.opendaylight.lispflowmapping.implementation.serializer.MapReplySerializer;
+import org.opendaylight.lispflowmapping.implementation.serializer.MapRequestSerializer;
+import org.opendaylight.lispflowmapping.southbound.lisp.ILispSouthboundService;
import org.opendaylight.lispflowmapping.southbound.lisp.LispSouthboundService;
-import org.opendaylight.lispflowmapping.type.lisp.MapNotify;
-import org.opendaylight.lispflowmapping.type.lisp.MapReply;
-import org.opendaylight.lispflowmapping.type.sbplugin.IConfigLispPlugin;
-import org.opendaylight.lispflowmapping.type.sbplugin.ILispSouthboundPlugin;
+import org.opendaylight.lispflowmapping.southbound.lisp.LispXtrSouthboundService;
+import org.opendaylight.lispflowmapping.type.sbplugin.IConfigLispSouthboundPlugin;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.control.plane.rev150314.LfmControlPlaneService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.control.plane.rev150314.SendMapNotifyInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.control.plane.rev150314.SendMapReplyInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.control.plane.rev150314.SendMapRequestInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.control.plane.rev150314.transportaddress.TransportAddress;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.osgi.framework.BundleContext;
import org.osgi.framework.FrameworkUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class LispSouthboundPlugin extends AbstractBindingAwareProvider implements ILispSouthboundPlugin, IConfigLispPlugin, CommandProvider {
- protected static final Logger logger = LoggerFactory.getLogger(LispSouthboundPlugin.class);
+import com.google.common.net.InetAddresses;
+
+public class LispSouthboundPlugin extends AbstractBindingAwareProvider implements IConfigLispSouthboundPlugin, CommandProvider, LfmControlPlaneService {
+ protected static final Logger LOG = LoggerFactory.getLogger(LispSouthboundPlugin.class);
private static Object startLock = new Object();
- private LispIoThread thread;
+ private LispIoThread lispThread;
+ private LispIoThread xtrThread;
private LispSouthboundService lispSouthboundService;
+ private LispXtrSouthboundService lispXtrSouthboundService;
private volatile DatagramSocket socket = null;
private final String MAP_NOTIFY = "MapNotify";
private final String MAP_REPlY = "MapReply";
+ private final String MAP_REQUEST = "MapRequest";
private volatile String bindingAddress = null;
- private volatile boolean stillRunning = false;
private volatile boolean alreadyInit = false;
+ private volatile int xtrPort = LispMessage.XTR_PORT_NUM;
+ private volatile boolean listenOnXtrPort = false;
+
+ private DatagramSocket xtrSocket;
private void registerWithOSGIConsole() {
BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass()).getBundleContext();
bundleContext.registerService(CommandProvider.class.getName(), this, null);
- bundleContext.registerService(IConfigLispPlugin.class.getName(), this, null);
+ bundleContext.registerService(IConfigLispSouthboundPlugin.class.getName(), this, null);
}
protected void stopImpl(BundleContext context) {
}
private void unloadActions() {
- if (thread != null) {
- thread.stopRunning();
+ if (lispThread != null) {
+ lispThread.stopRunning();
}
lispSouthboundService = null;
- thread = null;
- logger.info("LISP (RFC6830) Mapping Service is down!");
+ lispXtrSouthboundService = null;
+ lispThread = null;
+ xtrThread = null;
+ LOG.info("LISP (RFC6830) Mapping Service is down!");
try {
Thread.sleep(1100);
} catch (InterruptedException e) {
}
private class LispIoThread extends Thread {
+ private volatile boolean shouldRun;
+ private volatile DatagramSocket threadSocket = null;
+ private volatile ILispSouthboundService service;
private volatile boolean running;
- public LispIoThread() {
+ public LispIoThread(DatagramSocket socket, ILispSouthboundService service) {
super("Lisp Thread");
- running = true;
+ this.threadSocket = socket;
+ this.service = service;
+ shouldRun = true;
}
@Override
public void run() {
- stillRunning = true;
+ running = true;
- int lispPortNumber = LispMessage.PORT_NUM;
int lispReceiveTimeout = 1000;
- logger.info("LISP (RFC6830) Mapping Service is running and listening on " + bindingAddress);
+ LOG.info("LISP (RFC6830) Mapping Service is running and listening on address: " + bindingAddress + " port: "
+ + threadSocket.getLocalPort());
try {
- socket = new DatagramSocket(new InetSocketAddress(bindingAddress, lispPortNumber));
- socket.setSoTimeout(lispReceiveTimeout);
+
+ threadSocket.setSoTimeout(lispReceiveTimeout);
} catch (SocketException e) {
- logger.warn("Cannot open socket on UDP port " + lispPortNumber, e);
+ LOG.error("Cannot open socket on UDP port " + threadSocket.getLocalPort(), e);
return;
}
- while (running) {
+ while (shouldRun) {
byte[] buffer = new byte[4096];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
try {
- socket.receive(packet);
- logger.debug("Received a packet!");
+ threadSocket.receive(packet);
+ LOG.trace("Received a packet!");
} catch (SocketTimeoutException ste) {
continue;
} catch (IOException e) {
- logger.error("IO Exception while trying to recieve packet", e);
+ LOG.warn("IO Exception while trying to recieve packet", e);
}
- logger.debug("Handling packet from {}:{} (len={})", packet.getAddress().getHostAddress(), packet.getPort(), packet.getLength());
+ LOG.trace(String.format("Handling packet from {%s}:{%d} (len={%d})", packet.getAddress().getHostAddress(), packet.getPort(),
+ packet.getLength()));
try {
- lispSouthboundService.handlePacket(packet);
- } catch (Throwable t) {
- logger.error("Error while handling packet", t);
+ this.service.handlePacket(packet);
+ } catch (Exception e) {
+ LOG.warn("Error while handling packet", e);
}
}
- socket.close();
- logger.info("Socket closed");
- stillRunning = false;
+ threadSocket.close();
+ LOG.trace("Socket closed");
+ running = false;
}
public void stopRunning() {
- running = false;
+ shouldRun = false;
+ }
+
+ public boolean isRunning() {
+ return running;
}
}
}
private void startIOThread() {
- thread = new LispIoThread();
- logger.info("LISP (RFC6830) Mapping Service Southbound Plugin is up!");
- thread.start();
+ try {
+ socket = new DatagramSocket(new InetSocketAddress(bindingAddress, LispMessage.PORT_NUM));
+ lispThread = new LispIoThread(socket, lispSouthboundService);
+ lispThread.start();
+ LOG.info("LISP (RFC6830) Mapping Service Southbound Plugin is up!");
+ if (listenOnXtrPort) {
+ restartXtrThread();
+ }
+ } catch (SocketException e) {
+ LOG.error("couldn't start socket {}", e.getMessage());
+ }
+ }
+
+ private void restartXtrThread() {
+ try {
+ stopXtrThread();
+ xtrSocket = new DatagramSocket(new InetSocketAddress(bindingAddress, xtrPort));
+ xtrThread = new LispIoThread(xtrSocket, lispXtrSouthboundService);
+ xtrThread.start();
+ LOG.info("xTR Southbound Plugin is up!");
+ } catch (SocketException e) {
+ LOG.warn("failed to start xtr thread: {}", e.getMessage());
+ }
}
public void onSessionInitiated(ProviderContext session) {
+ LOG.info("LISP (RFC6830) Mapping Service is up!");
synchronized (startLock) {
if (!alreadyInit) {
alreadyInit = true;
lispSouthboundService = new LispSouthboundService();
+ lispXtrSouthboundService = new LispXtrSouthboundService();
registerWithOSGIConsole();
registerRPCs(session);
- logger.debug("Provider Session initialized");
+ LOG.trace("Provider Session initialized");
if (bindingAddress == null) {
setLispAddress("0.0.0.0");
}
private void registerRPCs(ProviderContext session) {
try {
lispSouthboundService.setNotificationProvider(session.getSALService(NotificationProviderService.class));
- session.addRpcImplementation(ILispSouthboundPlugin.class, this);
- } catch (Throwable t) {
- logger.error(t.getMessage(), t);
+ lispXtrSouthboundService.setNotificationProvider(session.getSALService(NotificationProviderService.class));
+ session.addRpcImplementation(LfmControlPlaneService.class, this);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
}
}
- public Future<RpcResult<Void>> handleMapNotify(MapNotify mapNotify, InetAddress address) {
- logger.trace("handleMapNotify called!!");
- if (mapNotify != null) {
- ByteBuffer outBuffer = MapNotifySerializer.getInstance().serialize(mapNotify);
- handleSerializedLispBuffer(address, outBuffer, MAP_NOTIFY);
- } else {
- logger.debug("MapNotify was null");
- }
- return null;
- }
-
- private void handleSerializedLispBuffer(InetAddress address, ByteBuffer outBuffer, String packetType) {
+ private void handleSerializedLispBuffer(TransportAddress address, ByteBuffer outBuffer, String packetType) {
DatagramPacket packet = new DatagramPacket(outBuffer.array(), outBuffer.limit());
- packet.setPort(LispMessage.PORT_NUM);
- packet.setAddress(address);
+ packet.setPort(address.getPort().getValue());
+ InetAddress ip = InetAddresses.forString(address.getIpAddress().getIpv4Address().getValue());
+ packet.setAddress(ip);
try {
- if (logger.isDebugEnabled()) {
- logger.debug("Sending " + packetType + " on port " + LispMessage.PORT_NUM + " to address: " + address);
+ if (LOG.isDebugEnabled()) {
+ LOG.trace("Sending " + packetType + " on port " + address.getPort().getValue() + " to address: " + ip);
}
socket.send(packet);
} catch (IOException e) {
- logger.error("Failed to send " + packetType, e);
+ LOG.warn("Failed to send " + packetType, e);
}
}
- public Future<RpcResult<Void>> handleMapReply(MapReply mapReply, InetAddress address) {
- logger.trace("handleMapReply called!!");
- if (mapReply != null) {
- ByteBuffer outBuffer = MapReplySerializer.getInstance().serialize(mapReply);
- handleSerializedLispBuffer(address, outBuffer, MAP_REPlY);
- } else {
- logger.debug("MapReply was null");
- }
- return null;
- }
-
public void setLispAddress(String address) {
synchronized (startLock) {
if (bindingAddress != null && bindingAddress.equals(address)) {
- logger.debug("configured lisp binding address didn't change.");
+ LOG.trace("configured lisp binding address didn't change.");
} else {
String action = (bindingAddress == null ? "Setting" : "Resetting");
- logger.info(action + " lisp binding address to: " + address);
+ LOG.trace(action + " lisp binding address to: " + address);
bindingAddress = address;
- if (thread != null) {
- thread.stopRunning();
- while (stillRunning) {
+ if (lispThread != null) {
+ lispThread.stopRunning();
+ while (lispThread.isRunning()) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
- e.printStackTrace();
}
}
}
+ stopXtrThread();
startIOThread();
}
}
}
+
+ private void stopXtrThread() {
+ if (xtrThread != null) {
+ xtrThread.stopRunning();
+ while (xtrThread.isRunning()) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ }
+
+ @Override
+ public Future<RpcResult<Void>> sendMapNotify(SendMapNotifyInput mapNotifyInput) {
+ LOG.trace("sendMapNotify called!!");
+ if (mapNotifyInput != null) {
+ ByteBuffer outBuffer = MapNotifySerializer.getInstance().serialize(mapNotifyInput.getMapNotify());
+ handleSerializedLispBuffer(mapNotifyInput.getTransportAddress(), outBuffer, MAP_NOTIFY);
+ } else {
+ LOG.warn("MapNotify was null");
+ }
+ return null;
+ }
+
+ @Override
+ public Future<RpcResult<Void>> sendMapReply(SendMapReplyInput mapReplyInput) {
+ LOG.trace("sendMapReply called!!");
+ if (mapReplyInput != null) {
+ ByteBuffer outBuffer = MapReplySerializer.getInstance().serialize(mapReplyInput.getMapReply());
+ handleSerializedLispBuffer(mapReplyInput.getTransportAddress(), outBuffer, MAP_REPlY);
+ } else {
+ LOG.warn("MapReply was null");
+ }
+ return null;
+ }
+
+ @Override
+ public Future<RpcResult<Void>> sendMapRequest(SendMapRequestInput mapRequestInput) {
+ LOG.trace("sendMapRequest called!!");
+ if (mapRequestInput != null) {
+ ByteBuffer outBuffer = MapRequestSerializer.getInstance().serialize(mapRequestInput.getMapRequest());
+ handleSerializedLispBuffer(mapRequestInput.getTransportAddress(), outBuffer, MAP_REQUEST);
+ } else {
+ LOG.debug("MapRequest was null");
+ }
+ return null;
+ }
+
+ @Override
+ public void shouldListenOnXtrPort(boolean shouldListenOnXtrPort) {
+ listenOnXtrPort = shouldListenOnXtrPort;
+ if (listenOnXtrPort) {
+ LOG.debug("restarting xtr thread");
+ restartXtrThread();
+ } else {
+ LOG.debug("terminating thread");
+ stopXtrThread();
+ }
+ }
+
+ @Override
+ public void setXtrPort(int port) {
+ this.xtrPort = port;
+ if (listenOnXtrPort) {
+ restartXtrThread();
+ }
+ }
}