public interface IFlowMapping extends IMapResolver, IMapServer {
public void clean();
+
+ public void shouldListenOnXtrPort(boolean listenOnXtrPort);
+
+ public void setXtrPort(int port);
}
void setShouldIterateMask(boolean iterateMask);
void setShouldAuthenticate(boolean shouldAuthenticate);
+
}
package org.opendaylight.lispflowmapping.implementation;
-import java.net.InetAddress;
-
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.osgi.framework.console.CommandInterpreter;
import org.opendaylight.lispflowmapping.implementation.dao.MappingServiceNoMaskKey;
import org.opendaylight.lispflowmapping.implementation.lisp.MapResolver;
import org.opendaylight.lispflowmapping.implementation.lisp.MapServer;
-import org.opendaylight.lispflowmapping.implementation.serializer.LispMessage;
import org.opendaylight.lispflowmapping.implementation.util.LispAFIConvertor;
-import org.opendaylight.lispflowmapping.implementation.util.LispNotificationHelper;
import org.opendaylight.lispflowmapping.interfaces.dao.ILispDAO;
import org.opendaylight.lispflowmapping.interfaces.dao.ILispTypeConverter;
import org.opendaylight.lispflowmapping.interfaces.dao.IRowVisitor;
import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.SendMapNotifyInputBuilder;
import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.SendMapReplyInputBuilder;
import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.SendMapRequestInputBuilder;
+import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.SetXtrPortInputBuilder;
+import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.ShouldListenOnXtrPortInputBuilder;
import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.lispaddress.LispAddressContainer;
import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.lispaddress.LispAddressContainerBuilder;
import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.lispaddress.lispaddresscontainer.Address;
import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.mapreplymessage.MapReplyBuilder;
import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.maprequestmessage.MapRequestBuilder;
import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.transportaddress.TransportAddress;
-import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.transportaddress.TransportAddressBuilder;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddressBuilder;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Ipv4Address;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Ipv6Address;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.PortNumber;
+import org.opendaylight.yangtools.yang.binding.Notification;
import org.osgi.framework.BundleContext;
import org.osgi.framework.FrameworkUtil;
import org.slf4j.Logger;
private ConsumerContext session;
+ private NotificationService notificationService;
+
class LispIpv4AddressInMemoryConverter implements ILispTypeConverter<Ipv4Address, Integer> {
}
public void onSessionInitialized(ConsumerContext session) {
logger.info("Lisp Consumer session initialized!");
- NotificationService notificationService = session.getSALService(NotificationService.class);
- notificationService.registerNotificationListener(AddMapping.class, new MapRegisterNotificationHandler());
- notificationService.registerNotificationListener(RequestMapping.class, new MapRequestNotificationHandler());
+ notificationService = session.getSALService(NotificationService.class);
+ registerNotificationListener(AddMapping.class, new MapRegisterNotificationHandler());
+ registerNotificationListener(RequestMapping.class, new MapRequestNotificationHandler());
this.session = session;
}
+ public <T extends Notification> void registerNotificationListener(Class<T> notificationType, NotificationListener<T> listener) {
+ notificationService.registerNotificationListener(notificationType, listener);
+ }
+
private class MapRegisterNotificationHandler implements NotificationListener<AddMapping> {
@Override
mapServer.setOverwrite(overwrite);
}
+ @Override
+ public void shouldListenOnXtrPort(boolean listenOnXtrPort) {
+ getLispSB().shouldListenOnXtrPort(new ShouldListenOnXtrPortInputBuilder().setShouldListenOnXtrPort(listenOnXtrPort).build());
+ }
+
+ @Override
+ public void setXtrPort(int port) {
+ getLispSB().setXtrPort(new SetXtrPortInputBuilder().setXtrPort(new PortNumber(port)).build());
+ }
+
}
package org.opendaylight.lispflowmapping.implementation.serializer;
public interface LispMessage {
- int PORT_NUM = 4342;
+ int PORT_NUM = 4342;
+ int XTR_PORT_NUM = 4343;
interface Pos {
int TYPE = 0;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.opendaylight.controller.sal.binding.api.NotificationListener;
import org.opendaylight.lispflowmapping.clusterdao.ClusterDAOService;
+import org.opendaylight.lispflowmapping.implementation.LispMappingService;
import org.opendaylight.lispflowmapping.implementation.serializer.LispMessage;
import org.opendaylight.lispflowmapping.implementation.serializer.MapNotifySerializer;
import org.opendaylight.lispflowmapping.implementation.serializer.MapRegisterSerializer;
import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.MapReply;
import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.MapRequest;
import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.ReencapHop;
+import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.XtrRequestMapping;
import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.eidrecords.EidRecord;
import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.eidrecords.EidRecordBuilder;
import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.eidtolocatorrecords.EidToLocatorRecord;
import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.lispaddress.lispaddresscontainer.address.Mac;
import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.lispaddress.lispaddresscontainer.address.MacBuilder;
import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.lispaddress.lispaddresscontainer.address.NoBuilder;
-import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.lispsimpleaddress.PrimitiveAddress;
import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.locatorrecords.LocatorRecord;
import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.locatorrecords.LocatorRecordBuilder;
import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.mapregisternotification.MapRegisterBuilder;
private byte[] mapRegisterPacketWithNotify;
private byte[] mapRegisterPacketWithoutNotify;
private IConfigLispPlugin configLispPlugin;
- int lispPortNumber = LispMessage.PORT_NUM;
String lispBindAddress = "127.0.0.1";
String ourAddress = "127.0.0.2";
private LispAFIAddress locatorEid;
public static final String YANG = "org.opendaylight.yangtools";
public static final String JERSEY = "com.sun.jersey";
private static final int MAX_SERVICE_LOAD_RETRIES = 45;
+ private static final int MAX_NOTIFICATION_RETRYS = 20;
@After
public void after() {
public void before() throws Exception {
areWeReady();
locatorEid = asIPAfiAddress("4.3.2.1");
- socket = initSocket(socket);
+ socket = initSocket(socket, LispMessage.PORT_NUM);
// SRC: 127.0.0.1:58560 to 127.0.0.1:4342
// LISP(Type = 8 - Encapsulated)
@Inject
private BundleContext bc;
private HttpURLConnection connection;
+ protected static boolean notificationCalled;
// Configure the OSGi container
@Configuration
}
@Test
- public void testNonProxy() throws Exception {
+ public void testNonProxy() throws Throwable {
testSimpleNonProxy();
testNonProxyOtherPort();
+ testRecievingNonProxyOnXtrPort();
}
// ------------------------------- Simple Tests ---------------------------
public void testNonProxyOtherPort() throws SocketTimeoutException, SocketException {
cleanUP();
String rloc = "127.0.0.3";
- int port = 4343;
+ int port = 4350;
LcafApplicationData adLcaf = new LcafApplicationDataBuilder()
.setAfi(AddressFamilyNumberEnum.LCAF.getIanaCode())
.setLcafType((short) LispCanonicalAddressFormatEnum.APPLICATION_DATA.getLispCode())
}
+ public void testRecievingNonProxyOnXtrPort() throws SocketTimeoutException, SocketException, Throwable {
+ cleanUP();
+ lms.shouldListenOnXtrPort(true);
+ notificationCalled = false;
+ final String eid = "10.10.10.10";
+ String rloc = "127.0.0.3";
+ int port = LispMessage.XTR_PORT_NUM;
+ LcafApplicationData adLcaf = new LcafApplicationDataBuilder()
+ .setAfi(AddressFamilyNumberEnum.LCAF.getIanaCode())
+ .setLcafType((short) LispCanonicalAddressFormatEnum.APPLICATION_DATA.getLispCode())
+ .setAddress(
+ new org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.lcafapplicationdataaddress.AddressBuilder().setPrimitiveAddress(
+ LispAFIConvertor.asPrimitiveIPAfiAddress(rloc)).build()).setLocalPort(new PortNumber(port)).build();
+ final MapRequest mapRequest = createNonProxyMapRequest(eid, adLcaf);
+ ((LispMappingService) lms).registerNotificationListener(XtrRequestMapping.class, new NotificationListener<XtrRequestMapping>() {
+
+ @Override
+ public void onNotification(XtrRequestMapping notification) {
+ assertEquals(((LispIpv4Address) mapRequest.getEidRecord().get(0).getLispAddressContainer().getAddress()).getIpv4Address().getValue(),
+ eid);
+ notificationCalled = true;
+ logger.warn("notification arrived");
+ }
+ });
+ sendMapRequest(mapRequest, port);
+ for (int i = 0; i < MAX_NOTIFICATION_RETRYS; i++) {
+ if (notificationCalled) {
+ return;
+ } else {
+ logger.warn("notification hasn't arrived, sleeping...");
+ Thread.sleep(500);
+ }
+ }
+
+ fail("Notification hasn't arrived");
+
+ }
+
private void sendProxyMapRequest(String rloc, int port, LispAFIAddress adLcaf) throws SocketTimeoutException, SocketException {
- String eid = "10.0.0.1";
+ String eid = "10.1.0.1";
+ MapRequest mapRequest = createNonProxyMapRequest(eid, adLcaf);
+ sendMapRequest(mapRequest);
+ DatagramSocket nonProxySocket = new DatagramSocket(new InetSocketAddress(rloc, port));
+ MapRequest recievedMapRequest = receiveMapRequest(nonProxySocket);
+ assertEquals(mapRequest.getNonce(), recievedMapRequest.getNonce());
+ assertEquals(mapRequest.getSourceEid(), recievedMapRequest.getSourceEid());
+ assertEquals(mapRequest.getItrRloc(), recievedMapRequest.getItrRloc());
+ assertEquals(mapRequest.getEidRecord(), recievedMapRequest.getEidRecord());
+ nonProxySocket.close();
+ }
+
+ private MapRequest createNonProxyMapRequest(String eid, LispAFIAddress adLcaf) throws SocketTimeoutException {
MapRegister mr = createMapRegister(LispAFIConvertor.asIPAfiAddress(eid));
LocatorRecord record = new LocatorRecordBuilder(mr.getEidToLocatorRecord().get(0).getLocatorRecord().get(0)).setLispAddressContainer(
LispAFIConvertor.toContainer(adLcaf)).build();
MapRequestBuilder builder = new MapRequestBuilder(mapRequest);
builder.setPitr(true);
mapRequest = builder.build();
- sendMapRequest(mapRequest);
- DatagramSocket nonProxySocket = new DatagramSocket(new InetSocketAddress(rloc, port));
- MapRequest recievedMapRequest = receiveMapRequest(nonProxySocket);
- assertEquals(mapRequest.getNonce(), recievedMapRequest.getNonce());
- assertEquals(mapRequest.getSourceEid(), recievedMapRequest.getSourceEid());
- assertEquals(mapRequest.getItrRloc(), recievedMapRequest.getItrRloc());
- assertEquals(mapRequest.getEidRecord(), recievedMapRequest.getEidRecord());
- nonProxySocket.close();
+ return mapRequest;
}
private void assertMapNotifyRecieved() throws SocketTimeoutException {
}
private void sendMapRequest(MapRequest mapRequest) {
- sendPacket(MapRequestSerializer.getInstance().serialize(mapRequest).array());
+ sendMapRequest(mapRequest, LispMessage.PORT_NUM);
+ }
+
+ private void sendMapRequest(MapRequest mapRequest, int port) {
+ sendPacket(MapRequestSerializer.getInstance().serialize(mapRequest).array(), port);
}
private void sendMapRegister(MapRegister mapRegister) {
}
private void sendPacket(byte[] bytesToSend) {
+ sendPacket(bytesToSend, LispMessage.PORT_NUM);
+ }
+
+ private void sendPacket(byte[] bytesToSend, int port) {
try {
DatagramPacket packet = new DatagramPacket(bytesToSend, bytesToSend.length);
- initPacketAddress(packet);
+ initPacketAddress(packet, port);
logger.trace("Sending MapRegister to LispPlugin on socket");
socket.send(packet);
} catch (Throwable t) {
}
}
- private void initPacketAddress(DatagramPacket packet) throws UnknownHostException {
+ private void initPacketAddress(DatagramPacket packet, int port) throws UnknownHostException {
packet.setAddress(InetAddress.getByName(lispBindAddress));
- packet.setPort(lispPortNumber);
+ packet.setPort(port);
}
- private DatagramSocket initSocket(DatagramSocket socket) {
+ private DatagramSocket initSocket(DatagramSocket socket, int port) {
try {
- socket = new DatagramSocket(new InetSocketAddress(ourAddress, LispMessage.PORT_NUM));
+ socket = new DatagramSocket(new InetSocketAddress(ourAddress, port));
} catch (SocketException e) {
e.printStackTrace();
fail();
private void cleanUP() {
after();
lms.clean();
- socket = initSocket(socket);
+ lms.shouldListenOnXtrPort(false);
+ socket = initSocket(socket, LispMessage.PORT_NUM);
}
import org.opendaylight.lispflowmapping.implementation.serializer.MapNotifySerializer;
import org.opendaylight.lispflowmapping.implementation.serializer.MapReplySerializer;
import org.opendaylight.lispflowmapping.implementation.serializer.MapRequestSerializer;
+import org.opendaylight.lispflowmapping.southbound.lisp.ILispSouthboundService;
import org.opendaylight.lispflowmapping.southbound.lisp.LispSouthboundService;
+import org.opendaylight.lispflowmapping.southbound.lisp.LispXtrSouthboundService;
import org.opendaylight.lispflowmapping.type.sbplugin.IConfigLispPlugin;
import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.LispflowmappingService;
import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.SendMapNotifyInput;
import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.SendMapReplyInput;
import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.SendMapRequestInput;
+import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.SetXtrPortInput;
+import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.ShouldListenOnXtrPortInput;
import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.transportaddress.TransportAddress;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.osgi.framework.BundleContext;
protected static final Logger logger = LoggerFactory.getLogger(LispSouthboundPlugin.class);
private static Object startLock = new Object();
- private LispIoThread thread;
+ private LispIoThread lispThread;
+ private LispIoThread xtrThread;
private LispSouthboundService lispSouthboundService;
+ private LispXtrSouthboundService lispXtrSouthboundService;
private volatile DatagramSocket socket = null;
private final String MAP_NOTIFY = "MapNotify";
private final String MAP_REPlY = "MapReply";
private final String MAP_REQUEST = "MapRequest";
private volatile String bindingAddress = null;
- private volatile boolean stillRunning = false;
private volatile boolean alreadyInit = false;
+ private volatile int xtrPort = LispMessage.XTR_PORT_NUM;
+ private volatile boolean listenOnXtrPort = false;
+
+ private DatagramSocket xtrSocket;
private void registerWithOSGIConsole() {
BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass()).getBundleContext();
}
private void unloadActions() {
- if (thread != null) {
- thread.stopRunning();
+ if (lispThread != null) {
+ lispThread.stopRunning();
}
lispSouthboundService = null;
- thread = null;
+ lispXtrSouthboundService = null;
+ lispThread = null;
+ xtrThread = null;
logger.info("LISP (RFC6830) Mapping Service is down!");
try {
Thread.sleep(1100);
}
private class LispIoThread extends Thread {
+ private volatile boolean shouldRun;
+ private volatile DatagramSocket threadSocket = null;
+ private volatile ILispSouthboundService service;
private volatile boolean running;
- public LispIoThread() {
+ public LispIoThread(DatagramSocket socket, ILispSouthboundService service) {
super("Lisp Thread");
- running = true;
+ this.threadSocket = socket;
+ this.service = service;
+ shouldRun = true;
}
@Override
public void run() {
- stillRunning = true;
+ running = true;
- int lispPortNumber = LispMessage.PORT_NUM;
int lispReceiveTimeout = 1000;
- logger.info("LISP (RFC6830) Mapping Service is running and listening on " + bindingAddress);
+ logger.info("LISP (RFC6830) Mapping Service is running and listening on address: " + bindingAddress + " port: "
+ + threadSocket.getLocalPort());
try {
- socket = new DatagramSocket(new InetSocketAddress(bindingAddress, lispPortNumber));
- socket.setSoTimeout(lispReceiveTimeout);
+
+ threadSocket.setSoTimeout(lispReceiveTimeout);
} catch (SocketException e) {
- logger.error("Cannot open socket on UDP port " + lispPortNumber, e);
+ logger.error("Cannot open socket on UDP port " + threadSocket.getLocalPort(), e);
return;
}
- while (running) {
+ while (shouldRun) {
byte[] buffer = new byte[4096];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
try {
- socket.receive(packet);
+ threadSocket.receive(packet);
logger.trace("Received a packet!");
} catch (SocketTimeoutException ste) {
continue;
packet.getLength()));
try {
- lispSouthboundService.handlePacket(packet);
+ this.service.handlePacket(packet);
} catch (Throwable t) {
logger.warn("Error while handling packet", t);
}
}
- socket.close();
+ threadSocket.close();
logger.trace("Socket closed");
- stillRunning = false;
+ running = false;
}
public void stopRunning() {
- running = false;
+ shouldRun = false;
+ }
+
+ public boolean isRunning() {
+ return running;
}
}
}
private void startIOThread() {
- thread = new LispIoThread();
- logger.info("LISP (RFC6830) Mapping Service Southbound Plugin is up!");
- thread.start();
+ try {
+ socket = new DatagramSocket(new InetSocketAddress(bindingAddress, LispMessage.PORT_NUM));
+ lispThread = new LispIoThread(socket, lispSouthboundService);
+ lispThread.start();
+ logger.info("LISP (RFC6830) Mapping Service Southbound Plugin is up!");
+ if (listenOnXtrPort) {
+ restartXtrThread();
+ }
+ } catch (SocketException e) {
+ logger.error("couldn't start socket {}", e.getMessage());
+ e.printStackTrace();
+ }
+ }
+
+ private void restartXtrThread() {
+ try {
+ stopXtrThread();
+ xtrSocket = new DatagramSocket(new InetSocketAddress(bindingAddress, xtrPort));
+ xtrThread = new LispIoThread(xtrSocket, lispXtrSouthboundService);
+ xtrThread.start();
+ logger.info("xTR Southbound Plugin is up!");
+ } catch (SocketException e) {
+ logger.warn("failed to start xtr thread: {}", e.getMessage());
+ }
}
public void onSessionInitiated(ProviderContext session) {
if (!alreadyInit) {
alreadyInit = true;
lispSouthboundService = new LispSouthboundService();
+ lispXtrSouthboundService = new LispXtrSouthboundService();
registerWithOSGIConsole();
registerRPCs(session);
logger.trace("Provider Session initialized");
private void registerRPCs(ProviderContext session) {
try {
lispSouthboundService.setNotificationProvider(session.getSALService(NotificationProviderService.class));
+ lispXtrSouthboundService.setNotificationProvider(session.getSALService(NotificationProviderService.class));
session.addRpcImplementation(LispflowmappingService.class, this);
} catch (Throwable t) {
logger.error(t.getMessage(), t);
String action = (bindingAddress == null ? "Setting" : "Resetting");
logger.trace(action + " lisp binding address to: " + address);
bindingAddress = address;
- if (thread != null) {
- thread.stopRunning();
- while (stillRunning) {
+ if (lispThread != null) {
+ lispThread.stopRunning();
+ while (lispThread.isRunning()) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
}
}
+ stopXtrThread();
startIOThread();
}
}
}
+ private void stopXtrThread() {
+ if (xtrThread != null) {
+ xtrThread.stopRunning();
+ while (xtrThread.isRunning()) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ }
+
@Override
public Future<RpcResult<Void>> sendMapNotify(SendMapNotifyInput mapNotifyInput) {
logger.trace("sendMapNotify called!!");
}
return null;
}
+
+ @Override
+ public Future<RpcResult<Void>> shouldListenOnXtrPort(ShouldListenOnXtrPortInput input) {
+ if (listenOnXtrPort == input.isShouldListenOnXtrPort()) {
+ return null;
+ }
+ listenOnXtrPort = input.isShouldListenOnXtrPort();
+ if (listenOnXtrPort) {
+ restartXtrThread();
+ } else {
+ stopXtrThread();
+ }
+ return null;
+ }
+
+ @Override
+ public Future<RpcResult<Void>> setXtrPort(SetXtrPortInput input) {
+ this.xtrPort = input.getXtrPort().getValue();
+ if (listenOnXtrPort) {
+ restartXtrThread();
+ }
+ return null;
+ }
}
}
}
+ rpc shouldListenOnXtrPort {
+ input {
+ leaf shouldListenOnXtrPort {
+ type boolean;
+ }
+ }
+ }
+
+ rpc setXtrPort {
+ input {
+ leaf xtrPort {
+ type inet:port-number;
+ }
+ }
+ }
+
notification addMapping {
uses MapRegisterNotification;
}
uses MapRequestNotification;
}
+ notification xtrRequestMapping {
+ uses MapRequestNotification;
+ }
+
}