*/
package org.opendaylight.lispflowmapping.integrationtest;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_A;
+import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_A_SB;
+import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_B;
+import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_B_SB;
+import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_C;
+import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_C_RLOC_10;
+import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_C_SB;
+import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_C_WP_100_1_SB;
+import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_C_WP_50_2_SB;
+import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_D4;
+import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_D5;
+import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_D_DELETE_SB;
+import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_D_WP_100_1_SB;
+import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_D_WP_50_2_SB;
+import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_E_SB;
+import static org.ops4j.pax.exam.CoreOptions.composite;
+import static org.ops4j.pax.exam.CoreOptions.maven;
+import static org.ops4j.pax.exam.karaf.options.KarafDistributionOption.editConfigurationFilePut;
+
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.net.InetAddresses;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_A;
-import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_A_SB;
-import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_B;
-import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_B_SB;
-import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_C;
-import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_C_RLOC_10;
-import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_C_SB;
-import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_C_WP_100_1_SB;
-import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_C_WP_50_2_SB;
-import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_D4;
-import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_D5;
-import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_D_DELETE_SB;
-import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_D_WP_100_1_SB;
-import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_D_WP_50_2_SB;
-import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_E_SB;
-import static org.ops4j.pax.exam.CoreOptions.composite;
-import static org.ops4j.pax.exam.CoreOptions.maven;
-import static org.ops4j.pax.exam.karaf.options.KarafDistributionOption.editConfigurationFilePut;
-
@RunWith(PaxExam.class)
@ExamReactorStrategy(PerClass.class)
public class MappingServiceIntegrationTest extends AbstractMdsalTestBase {
final InstanceIdType iid = new InstanceIdType(1L);
final Eid eid1 = LispAddressUtil.asIpv4Eid("1.1.1.1", 1L);
- final Eid eid2 = LispAddressUtil.asIpv4Eid("2.2.2.2", 1L);
+ final int expectedSmrs1 = 2;
+ final int expectedSmrs2 = 3;
/* set auth */
final Eid eid = LispAddressUtil.asIpv4PrefixBinaryEid("0.0.0.0/0", iid);
/* add subscribers */
final String subscriberSrcRloc1 = "127.0.0.3";
final String subscriberSrcRloc2 = "127.0.0.4";
- final Set<SubscriberRLOC> subscriberSet = Sets.newHashSet(
- newSubscriber(eid1, subscriberSrcRloc1), newSubscriber(eid2, subscriberSrcRloc2));
- mapService.addData(MappingOrigin.Southbound, eid1, SubKeys.SUBSCRIBERS, subscriberSet);
-
- final int expectedSmrs1 = 2;
- final int expectedSmrs2 = 3;
+ final Set<SubscriberRLOC> subscriberSet1 = Sets.newHashSet(newSubscriber(eid1, subscriberSrcRloc1),
+ newSubscriber(eid1, subscriberSrcRloc2));
+ mapService.addData(MappingOrigin.Southbound, eid1, SubKeys.SUBSCRIBERS, subscriberSet1);
final SocketReader reader1 = startSocketReader(subscriberSrcRloc1, 15000);
final SocketReader reader2 = startSocketReader(subscriberSrcRloc2, 15000);
/* add mapping */
final MappingRecord mapping1 = new MappingRecordBuilder()
.setEid(eid1).setTimestamp(System.currentTimeMillis()).setRecordTtl(1440).build();
- mapService.addMapping(MappingOrigin.Northbound, mapping1.getEid(), null, mapping1, false);
+ mapService.addMapping(MappingOrigin.Northbound, eid1, null, mapping1, false);
sleepForMilliseconds((timeout * expectedSmrs1) - 1500);
- final List<MapRequest> requests1 = processBuffers(reader1, subscriberSrcRloc1, expectedSmrs1);
+ final List<MapRequest> requests1 = processSmrPackets(reader1, subscriberSrcRloc1, expectedSmrs1);
final MapReply mapReply1 = lms.handleMapRequest(
new MapRequestBuilder(requests1.get(0))
.setItrRloc(Lists.newArrayList(new ItrRlocBuilder()
// sleep to get 1 extra smr request
sleepForMilliseconds(timeout * 1);
- final List<MapRequest> requests2 = processBuffers(reader2, subscriberSrcRloc2, expectedSmrs2);
+ final List<MapRequest> requests2 = processSmrPackets(reader2, subscriberSrcRloc2, expectedSmrs2);
final MapReply mapReply2 = lms.handleMapRequest(
new MapRequestBuilder(requests2.get(0))
.setItrRloc(Lists.newArrayList(new ItrRlocBuilder()
.setRloc(LispAddressUtil.asIpv4Rloc(subscriberSrcRloc2)).build()))
- .setEidItem(Lists.newArrayList(new EidItemBuilder().setEid(eid2).build()))
+ .setEidItem(Lists.newArrayList(new EidItemBuilder().setEid(eid1).build()))
.setSmrInvoked(true)
.setSmr(false).build());
return SocketReader.startReadingInStandaloneThread(receivingSocket, timeout);
}
- private List<MapRequest> processBuffers(SocketReader reader, String address, int expectedSmrs) {
+ private List<MapRequest> processSmrPackets(SocketReader reader, String address, int expectedSmrs) {
InetAddress inetAddress = null;
try {
inetAddress = InetAddress.getByName(address);
import java.util.List;
import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.inet.binary.types.rev160303.IpAddressBinary;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.eid.container.Eid;
/**
* Carries information about received SMR-invoked request.
public class SmrEvent {
private final List<IpAddressBinary> subscriberAddresses;
+ private final Eid eid;
+ private final long nonce;
- public SmrEvent(List<IpAddressBinary> subscriberAddresses) {
+ public SmrEvent(List<IpAddressBinary> subscriberAddresses, Eid eid, long nonce) {
this.subscriberAddresses = subscriberAddresses;
+ this.eid = eid;
+ this.nonce = nonce;
}
+ /**
+ * Returns the list of subscriber addresses that are subscribed to receive SMR MapRequest for a specific eid.
+ *
+ * @return the list of subscriber Addresses.
+ */
public List<IpAddressBinary> getSubscriberAddressList() {
return subscriberAddresses;
}
+
+ /**
+ * Returns the eid which the xTRs are subscribed to.
+ *
+ * @return the subscribed eid.
+ */
+ public Eid getEid() {
+ return eid;
+ }
+
+ /**
+ * Returns the nonce associated to a MapRequest.
+ *
+ * @return the nonce.
+ */
+ public long getNonce() {
+ return nonce;
+ }
}
}
if (request.isSmrInvoked()) {
LOG.debug("SMR-invoked request received.");
- final SmrEvent event = new SmrEvent(LispAddressUtil.addressBinariesFromItrRlocs(request.getItrRloc()));
- smrNotificationListener.onSmrInvokedReceived(event);
+ for (EidItem eidItem : request.getEidItem()) {
+ final SmrEvent event = new SmrEvent(LispAddressUtil.addressBinariesFromItrRlocs(request.getItrRloc()),
+ eidItem.getEid(), request.getNonce());
+ smrNotificationListener.onSmrInvokedReceived(event);
+ }
}
Eid srcEid = null;
if (request.getSourceEid() != null) {
package org.opendaylight.lispflowmapping.implementation.lisp;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Enumeration;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.BooleanUtils;
import org.opendaylight.controller.md.sal.binding.api.NotificationService;
if (notificationService != null) {
notificationService.registerNotificationListener(this);
}
+ scheduler = new SmrScheduler();
}
@Override
final MapRequestBuilder mrb = MapRequestUtil.prepareSMR(eid, LispAddressUtil.toRloc(getLocalAddress()));
LOG.trace("Built SMR packet: " + mrb.build().toString());
- scheduler = new SmrScheduler();
scheduler.scheduleSmrs(mrb, subscribers.iterator());
addSubscribers(eid, subscribers);
}
@Override
public void onSmrInvokedReceived(SmrEvent event) {
- scheduler.smrReceived(event.getSubscriberAddressList());
+ scheduler.smrReceived(event);
}
/**
* is triggered.
*/
private class SmrScheduler {
- private ScheduledExecutorService executor;
-
- /**
- * Scheduling SMRs is done in a single thread. If a use case demands better performance, this may be revisited
- * and for multi-threading we will need ConcurrentHashMap here along with a multi-threaded executor.
- */
- private Map<IpAddressBinary, ScheduledFuture<?>> subscriberFutureMap = new HashMap<>();
- private long smrNonce;
+ final int cpuCores = Runtime.getRuntime().availableProcessors();
+ private final ThreadFactory threadFactory = new ThreadFactoryBuilder()
+ .setNameFormat("smr-executor-%d").build();
+ private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(cpuCores * 2, threadFactory);
+ private final Map<IpAddressBinary, Map<Eid, ScheduledFuture<?>>> subscriberFutureMap = Maps.newConcurrentMap();
void scheduleSmrs(MapRequestBuilder mrb, Iterator<SubscriberRLOC> subscribers) {
- executor = Executors.newSingleThreadScheduledExecutor();
- smrNonce = mrb.getNonce();
-
// Using Iterator ensures that we don't get a ConcurrentModificationException when removing a SubscriberRLOC
// from a Set.
while (subscribers.hasNext()) {
} else {
final ScheduledFuture<?> future = executor.scheduleAtFixedRate(new CancellableRunnable(
mrb, subscriber), 0L, ConfigIni.getInstance().getSmrTimeout(), TimeUnit.MILLISECONDS);
- subscriberFutureMap.put(
- LispAddressUtil.addressBinaryFromAddress(subscriber.getSrcRloc().getAddress()), future);
+ final IpAddressBinary subscriberAddress = LispAddressUtil
+ .addressBinaryFromAddress(subscriber.getSrcRloc().getAddress());
+
+ if (subscriberFutureMap.containsKey(subscriberAddress)) {
+ subscriberFutureMap.get(subscriberAddress).put(mrb.getSourceEid().getEid(), future);
+ } else {
+ final Map<Eid, ScheduledFuture<?>> eidFutureMap = Maps.newConcurrentMap();
+ eidFutureMap.put(mrb.getSourceEid().getEid(), future);
+ subscriberFutureMap.put(subscriberAddress, eidFutureMap);
+ }
}
}
}
- void smrReceived(List<IpAddressBinary> subscriberAddressList) {
- ScheduledFuture<?> future;
+ void smrReceived(SmrEvent event) {
+ final List<IpAddressBinary> subscriberAddressList = event.getSubscriberAddressList();
for (IpAddressBinary subscriberAddress : subscriberAddressList) {
- future = subscriberFutureMap.get(subscriberAddress);
- if (future != null && !future.isCancelled()) {
- future.cancel(false);
- LOG.trace("SMR-invoked MapRequest received, scheduled task for subscriber {} with nonce {} has "
- + "been canceled", subscriberAddress.toString(), smrNonce);
+ final Map<Eid, ScheduledFuture<?>> eidFutureMap = subscriberFutureMap.get(subscriberAddress);
+ if (eidFutureMap != null) {
+ final ScheduledFuture future = eidFutureMap.get(event.getEid());
+ if (future != null && !future.isCancelled()) {
+ future.cancel(false);
+ LOG.trace("SMR-invoked MapRequest received, scheduled task for subscriber {} with nonce {} has "
+ + "been canceled", subscriberAddress.toString(), event.getNonce());
+ eidFutureMap.remove(event.getEid());
+ }
+ if (eidFutureMap.isEmpty()) {
+ subscriberFutureMap.remove(subscriberAddress);
+ }
}
}
}
} else {
LOG.trace("Cancelling execution of a SMR Map-Request after {} failed attempts.",
executionCount - 1);
- subscriberFutureMap.get(subscriberAddress).cancel(false);
+ cancelAndRemove(subscriberAddress);
}
} catch (Exception e) {
LOG.error("Errors encountered while handling SMR:", e);
- subscriberFutureMap.get(subscriberAddress).cancel(false);
- } finally {
- tryToShutDownExecutor();
+ cancelAndRemove(subscriberAddress);
}
executionCount++;
}
- }
- private void tryToShutDownExecutor() {
- if (!executor.isShutdown() && areAllFuturesCanceled()) {
- executor.shutdown();
- LOG.trace("SMR scheduler executor shutdown.");
- }
- }
-
- private boolean areAllFuturesCanceled() {
- for (Map.Entry<IpAddressBinary, ScheduledFuture<?>> entry : subscriberFutureMap.entrySet()) {
- if (!entry.getValue().isCancelled()) {
- return false;
+ private void cancelAndRemove(IpAddressBinary subscriberAddress) {
+ final Map<Eid, ScheduledFuture<?>> eidFutureMap = subscriberFutureMap.get(subscriberAddress);
+ final Eid eid = mrb.getSourceEid().getEid();
+ if (eidFutureMap.containsKey(eid)) {
+ eidFutureMap.get(eid).cancel(false);
+ }
+ eidFutureMap.remove(eid);
+ if (eidFutureMap.isEmpty()) {
+ subscriberFutureMap.remove(subscriberAddress);
}
}
- return true;
}
}
}