Reliable SMR - fix performance issue 10/47610/7
authorMiroslav Toth <mirtoth@cisco.com>
Wed, 26 Oct 2016 13:11:26 +0000 (15:11 +0200)
committerLori Jakab <lorand.jakab@gmail.com>
Fri, 28 Oct 2016 09:52:26 +0000 (09:52 +0000)
Change-Id: I7f942037002cb983432368f54fd32c260b838e91
Signed-off-by: Miroslav Toth <mirtoth@cisco.com>
integrationtest/src/test/java/org/opendaylight/lispflowmapping/integrationtest/MappingServiceIntegrationTest.java
mappingservice/api/src/main/java/org/opendaylight/lispflowmapping/interfaces/lisp/SmrEvent.java
mappingservice/implementation/src/main/java/org/opendaylight/lispflowmapping/implementation/lisp/MapResolver.java
mappingservice/implementation/src/main/java/org/opendaylight/lispflowmapping/implementation/lisp/MapServer.java

index 735ad42a024b411212b7e67453da31cdabe04fce..f7d61dc891058cac991548a8fbd8f2d383eb91a8 100644 (file)
@@ -7,6 +7,29 @@
  */
 package org.opendaylight.lispflowmapping.integrationtest;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_A;
+import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_A_SB;
+import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_B;
+import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_B_SB;
+import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_C;
+import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_C_RLOC_10;
+import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_C_SB;
+import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_C_WP_100_1_SB;
+import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_C_WP_50_2_SB;
+import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_D4;
+import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_D5;
+import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_D_DELETE_SB;
+import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_D_WP_100_1_SB;
+import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_D_WP_50_2_SB;
+import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_E_SB;
+import static org.ops4j.pax.exam.CoreOptions.composite;
+import static org.ops4j.pax.exam.CoreOptions.maven;
+import static org.ops4j.pax.exam.karaf.options.KarafDistributionOption.editConfigurationFilePut;
+
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.net.InetAddresses;
@@ -124,29 +147,6 @@ import org.osgi.framework.InvalidSyntaxException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_A;
-import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_A_SB;
-import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_B;
-import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_B_SB;
-import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_C;
-import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_C_RLOC_10;
-import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_C_SB;
-import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_C_WP_100_1_SB;
-import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_C_WP_50_2_SB;
-import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_D4;
-import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_D5;
-import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_D_DELETE_SB;
-import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_D_WP_100_1_SB;
-import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_D_WP_50_2_SB;
-import static org.opendaylight.lispflowmapping.integrationtest.MultiSiteScenarioUtil.SITE_E_SB;
-import static org.ops4j.pax.exam.CoreOptions.composite;
-import static org.ops4j.pax.exam.CoreOptions.maven;
-import static org.ops4j.pax.exam.karaf.options.KarafDistributionOption.editConfigurationFilePut;
-
 @RunWith(PaxExam.class)
 @ExamReactorStrategy(PerClass.class)
 public class MappingServiceIntegrationTest extends AbstractMdsalTestBase {
@@ -424,7 +424,8 @@ public class MappingServiceIntegrationTest extends AbstractMdsalTestBase {
 
         final InstanceIdType iid = new InstanceIdType(1L);
         final Eid eid1 = LispAddressUtil.asIpv4Eid("1.1.1.1", 1L);
-        final Eid eid2 = LispAddressUtil.asIpv4Eid("2.2.2.2", 1L);
+        final int expectedSmrs1 = 2;
+        final int expectedSmrs2 = 3;
 
         /* set auth */
         final Eid eid = LispAddressUtil.asIpv4PrefixBinaryEid("0.0.0.0/0", iid);
@@ -433,12 +434,9 @@ public class MappingServiceIntegrationTest extends AbstractMdsalTestBase {
         /* add subscribers */
         final String subscriberSrcRloc1 = "127.0.0.3";
         final String subscriberSrcRloc2 = "127.0.0.4";
-        final Set<SubscriberRLOC> subscriberSet = Sets.newHashSet(
-                newSubscriber(eid1, subscriberSrcRloc1), newSubscriber(eid2, subscriberSrcRloc2));
-        mapService.addData(MappingOrigin.Southbound, eid1, SubKeys.SUBSCRIBERS, subscriberSet);
-
-        final int expectedSmrs1 = 2;
-        final int expectedSmrs2 = 3;
+        final Set<SubscriberRLOC> subscriberSet1 = Sets.newHashSet(newSubscriber(eid1, subscriberSrcRloc1),
+                newSubscriber(eid1, subscriberSrcRloc2));
+        mapService.addData(MappingOrigin.Southbound, eid1, SubKeys.SUBSCRIBERS, subscriberSet1);
 
         final SocketReader reader1 = startSocketReader(subscriberSrcRloc1, 15000);
         final SocketReader reader2 = startSocketReader(subscriberSrcRloc2, 15000);
@@ -447,10 +445,10 @@ public class MappingServiceIntegrationTest extends AbstractMdsalTestBase {
         /* add mapping */
         final MappingRecord mapping1 = new MappingRecordBuilder()
                 .setEid(eid1).setTimestamp(System.currentTimeMillis()).setRecordTtl(1440).build();
-        mapService.addMapping(MappingOrigin.Northbound, mapping1.getEid(), null, mapping1, false);
+        mapService.addMapping(MappingOrigin.Northbound, eid1, null, mapping1, false);
 
         sleepForMilliseconds((timeout * expectedSmrs1) - 1500);
-        final List<MapRequest> requests1 = processBuffers(reader1, subscriberSrcRloc1, expectedSmrs1);
+        final List<MapRequest> requests1 = processSmrPackets(reader1, subscriberSrcRloc1, expectedSmrs1);
         final MapReply mapReply1 = lms.handleMapRequest(
                 new MapRequestBuilder(requests1.get(0))
                         .setItrRloc(Lists.newArrayList(new ItrRlocBuilder()
@@ -461,12 +459,12 @@ public class MappingServiceIntegrationTest extends AbstractMdsalTestBase {
 
         // sleep to get 1 extra smr request
         sleepForMilliseconds(timeout * 1);
-        final List<MapRequest> requests2 = processBuffers(reader2, subscriberSrcRloc2, expectedSmrs2);
+        final List<MapRequest> requests2 = processSmrPackets(reader2, subscriberSrcRloc2, expectedSmrs2);
         final MapReply mapReply2 = lms.handleMapRequest(
                 new MapRequestBuilder(requests2.get(0))
                         .setItrRloc(Lists.newArrayList(new ItrRlocBuilder()
                                 .setRloc(LispAddressUtil.asIpv4Rloc(subscriberSrcRloc2)).build()))
-                        .setEidItem(Lists.newArrayList(new EidItemBuilder().setEid(eid2).build()))
+                        .setEidItem(Lists.newArrayList(new EidItemBuilder().setEid(eid1).build()))
                         .setSmrInvoked(true)
                         .setSmr(false).build());
 
@@ -493,7 +491,7 @@ public class MappingServiceIntegrationTest extends AbstractMdsalTestBase {
         return SocketReader.startReadingInStandaloneThread(receivingSocket, timeout);
     }
 
-    private List<MapRequest> processBuffers(SocketReader reader, String address, int expectedSmrs) {
+    private List<MapRequest> processSmrPackets(SocketReader reader, String address, int expectedSmrs) {
         InetAddress inetAddress = null;
         try {
             inetAddress = InetAddress.getByName(address);
index 4789d3aa949a52fea29d7be0429d7ae52f388547..f56573ead1824d940268591c5edbf98a95a89a2a 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.lispflowmapping.interfaces.lisp;
 
 import java.util.List;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.inet.binary.types.rev160303.IpAddressBinary;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.eid.container.Eid;
 
 /**
  * Carries information about received SMR-invoked request.
@@ -16,12 +17,39 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.inet.binary.types.rev16
 public class SmrEvent {
 
     private final List<IpAddressBinary> subscriberAddresses;
+    private final Eid eid;
+    private final long nonce;
 
-    public SmrEvent(List<IpAddressBinary> subscriberAddresses) {
+    public SmrEvent(List<IpAddressBinary> subscriberAddresses, Eid eid, long nonce) {
         this.subscriberAddresses = subscriberAddresses;
+        this.eid = eid;
+        this.nonce = nonce;
     }
 
+    /**
+     * Returns the list of subscriber addresses that are subscribed to receive SMR MapRequest for a specific eid.
+     *
+     * @return the list of subscriber Addresses.
+     */
     public List<IpAddressBinary> getSubscriberAddressList() {
         return subscriberAddresses;
     }
+
+    /**
+     * Returns the eid which the xTRs are subscribed to.
+     *
+     * @return the subscribed eid.
+     */
+    public Eid getEid() {
+        return eid;
+    }
+
+    /**
+     * Returns the nonce associated to a MapRequest.
+     *
+     * @return the nonce.
+     */
+    public long getNonce() {
+        return nonce;
+    }
 }
index 2c95e0d16b69e3a0b5e1eb60589812d905fc5dfb..2c621f26225037aa113f005fd7ac582296871c96 100644 (file)
@@ -89,8 +89,11 @@ public class MapResolver implements IMapResolverAsync {
         }
         if (request.isSmrInvoked()) {
             LOG.debug("SMR-invoked request received.");
-            final SmrEvent event = new SmrEvent(LispAddressUtil.addressBinariesFromItrRlocs(request.getItrRloc()));
-            smrNotificationListener.onSmrInvokedReceived(event);
+            for (EidItem eidItem : request.getEidItem()) {
+                final SmrEvent event = new SmrEvent(LispAddressUtil.addressBinariesFromItrRlocs(request.getItrRloc()),
+                        eidItem.getEid(), request.getNonce());
+                smrNotificationListener.onSmrInvokedReceived(event);
+            }
         }
         Eid srcEid = null;
         if (request.getSourceEid() != null) {
index 59f868bfe0d908b85fc8a94061b6cdf3ed0dc966..03876853efbb1ad87e0b263483e8b128cc85fa84 100644 (file)
@@ -9,13 +9,14 @@
 package org.opendaylight.lispflowmapping.implementation.lisp;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.net.InetAddress;
 import java.net.NetworkInterface;
 import java.net.SocketException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Enumeration;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -25,6 +26,7 @@ import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.BooleanUtils;
 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
@@ -89,6 +91,7 @@ public class MapServer implements IMapServerAsync, OdlMappingserviceListener, IS
         if (notificationService != null) {
             notificationService.registerNotificationListener(this);
         }
+        scheduler = new SmrScheduler();
     }
 
     @Override
@@ -245,7 +248,6 @@ public class MapServer implements IMapServerAsync, OdlMappingserviceListener, IS
         final MapRequestBuilder mrb = MapRequestUtil.prepareSMR(eid, LispAddressUtil.toRloc(getLocalAddress()));
         LOG.trace("Built SMR packet: " + mrb.build().toString());
 
-        scheduler = new SmrScheduler();
         scheduler.scheduleSmrs(mrb, subscribers.iterator());
         addSubscribers(eid, subscribers);
     }
@@ -291,7 +293,7 @@ public class MapServer implements IMapServerAsync, OdlMappingserviceListener, IS
 
     @Override
     public void onSmrInvokedReceived(SmrEvent event) {
-        scheduler.smrReceived(event.getSubscriberAddressList());
+        scheduler.smrReceived(event);
     }
 
     /**
@@ -300,19 +302,13 @@ public class MapServer implements IMapServerAsync, OdlMappingserviceListener, IS
      * is triggered.
      */
     private class SmrScheduler {
-        private ScheduledExecutorService executor;
-
-        /**
-         * Scheduling SMRs is done in a single thread. If a use case demands better performance, this may be revisited
-         * and for multi-threading we will need ConcurrentHashMap here along with a multi-threaded executor.
-         */
-        private Map<IpAddressBinary, ScheduledFuture<?>> subscriberFutureMap = new HashMap<>();
-        private long smrNonce;
+        final int cpuCores = Runtime.getRuntime().availableProcessors();
+        private final ThreadFactory threadFactory = new ThreadFactoryBuilder()
+                .setNameFormat("smr-executor-%d").build();
+        private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(cpuCores * 2, threadFactory);
+        private final Map<IpAddressBinary, Map<Eid, ScheduledFuture<?>>> subscriberFutureMap = Maps.newConcurrentMap();
 
         void scheduleSmrs(MapRequestBuilder mrb, Iterator<SubscriberRLOC> subscribers) {
-            executor = Executors.newSingleThreadScheduledExecutor();
-            smrNonce = mrb.getNonce();
-
             // Using Iterator ensures that we don't get a ConcurrentModificationException when removing a SubscriberRLOC
             // from a Set.
             while (subscribers.hasNext()) {
@@ -323,20 +319,35 @@ public class MapServer implements IMapServerAsync, OdlMappingserviceListener, IS
                 } else {
                     final ScheduledFuture<?> future = executor.scheduleAtFixedRate(new CancellableRunnable(
                             mrb, subscriber), 0L, ConfigIni.getInstance().getSmrTimeout(), TimeUnit.MILLISECONDS);
-                    subscriberFutureMap.put(
-                            LispAddressUtil.addressBinaryFromAddress(subscriber.getSrcRloc().getAddress()), future);
+                    final IpAddressBinary subscriberAddress = LispAddressUtil
+                            .addressBinaryFromAddress(subscriber.getSrcRloc().getAddress());
+
+                    if (subscriberFutureMap.containsKey(subscriberAddress)) {
+                        subscriberFutureMap.get(subscriberAddress).put(mrb.getSourceEid().getEid(), future);
+                    } else {
+                        final Map<Eid, ScheduledFuture<?>> eidFutureMap = Maps.newConcurrentMap();
+                        eidFutureMap.put(mrb.getSourceEid().getEid(), future);
+                        subscriberFutureMap.put(subscriberAddress, eidFutureMap);
+                    }
                 }
             }
         }
 
-        void smrReceived(List<IpAddressBinary> subscriberAddressList) {
-            ScheduledFuture<?> future;
+        void smrReceived(SmrEvent event) {
+            final List<IpAddressBinary> subscriberAddressList = event.getSubscriberAddressList();
             for (IpAddressBinary subscriberAddress : subscriberAddressList) {
-                future = subscriberFutureMap.get(subscriberAddress);
-                if (future != null && !future.isCancelled()) {
-                    future.cancel(false);
-                    LOG.trace("SMR-invoked MapRequest received, scheduled task for subscriber {} with nonce {} has "
-                            + "been canceled", subscriberAddress.toString(), smrNonce);
+                final Map<Eid, ScheduledFuture<?>> eidFutureMap = subscriberFutureMap.get(subscriberAddress);
+                if (eidFutureMap != null) {
+                    final ScheduledFuture future = eidFutureMap.get(event.getEid());
+                    if (future != null && !future.isCancelled()) {
+                        future.cancel(false);
+                        LOG.trace("SMR-invoked MapRequest received, scheduled task for subscriber {} with nonce {} has "
+                                + "been canceled", subscriberAddress.toString(), event.getNonce());
+                        eidFutureMap.remove(event.getEid());
+                    }
+                    if (eidFutureMap.isEmpty()) {
+                        subscriberFutureMap.remove(subscriberAddress);
+                    }
                 }
             }
         }
@@ -368,32 +379,26 @@ public class MapServer implements IMapServerAsync, OdlMappingserviceListener, IS
                     } else {
                         LOG.trace("Cancelling execution of a SMR Map-Request after {} failed attempts.",
                                 executionCount - 1);
-                        subscriberFutureMap.get(subscriberAddress).cancel(false);
+                        cancelAndRemove(subscriberAddress);
                     }
                 } catch (Exception e) {
                     LOG.error("Errors encountered while handling SMR:", e);
-                    subscriberFutureMap.get(subscriberAddress).cancel(false);
-                } finally {
-                    tryToShutDownExecutor();
+                    cancelAndRemove(subscriberAddress);
                 }
                 executionCount++;
             }
-        }
 
-        private void tryToShutDownExecutor() {
-            if (!executor.isShutdown() && areAllFuturesCanceled()) {
-                executor.shutdown();
-                LOG.trace("SMR scheduler executor shutdown.");
-            }
-        }
-
-        private boolean areAllFuturesCanceled() {
-            for (Map.Entry<IpAddressBinary, ScheduledFuture<?>> entry : subscriberFutureMap.entrySet()) {
-                if (!entry.getValue().isCancelled()) {
-                    return false;
+            private void cancelAndRemove(IpAddressBinary subscriberAddress) {
+                final Map<Eid, ScheduledFuture<?>> eidFutureMap = subscriberFutureMap.get(subscriberAddress);
+                final Eid eid = mrb.getSourceEid().getEid();
+                if (eidFutureMap.containsKey(eid)) {
+                    eidFutureMap.get(eid).cancel(false);
+                }
+                eidFutureMap.remove(eid);
+                if (eidFutureMap.isEmpty()) {
+                    subscriberFutureMap.remove(subscriberAddress);
                 }
             }
-            return true;
         }
     }
 }