Merge "Increase mdsal's notification queue size, make it configurable."
authorTony Tkacik <ttkacik@cisco.com>
Wed, 7 May 2014 15:42:46 +0000 (15:42 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 7 May 2014 15:42:46 +0000 (15:42 +0000)
opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java
opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/topology/TopologyCommitHandler.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/FlowStatsTracker.java
opendaylight/samples/simpleforwarding/src/main/java/org/opendaylight/controller/samples/simpleforwarding/internal/SimpleForwardingImpl.java

index f7b647dd721a1f1f9117d95bc3723867574e5064..46008b24c638241c665a7bde863f0ff5052d587a 100644 (file)
@@ -2342,7 +2342,7 @@ public class ForwardingRulesManager implements
         for (FlowConfig flowConfig : flowConfigForNode) {
             if (doesFlowContainNodeConnector(flowConfig.getFlow(), nodeConnector)) {
                 if (flowConfig.installInHw() && !flowConfig.getStatus().equals(StatusCode.SUCCESS.toString())) {
-                    Status status = this.installFlowEntry(flowConfig.getFlowEntry());
+                    Status status = this.installFlowEntryAsync(flowConfig.getFlowEntry());
                     if (!status.isSuccess()) {
                         flowConfig.setStatus(status.getDescription());
                     } else {
index a35c3ed98ccc411460bf9dd5d69e0ef34bb825be..e798b681dfce3d4aaea245a16ff188b3fa46bd8b 100644 (file)
@@ -68,7 +68,7 @@ public class TopologyCommitHandler implements DataChangeListener {
             for (InstanceIdentifier<? extends DataObject> path : modification.getRemovedOperationalData()) {
                 if (path.getTargetType() == Link.class) {
                     Link link = (Link) modification.getOriginalOperationalData().get(path);
-                    msg.add(toTopoEdgeUpdate(toAdEdge(link, topology), UpdateType.CHANGED, reader));
+                    msg.add(toTopoEdgeUpdate(toAdEdge(link, topology), UpdateType.REMOVED, reader));
                 }
 
             }
index 06d6e821122617aa1642e3cfd6a5d46808f8518a..add46bd162fc5644a0f4a5bf13fed72e5ad0a6d2 100644 (file)
@@ -41,17 +41,17 @@ final class FlowStatsTracker extends AbstractListeningStatsTracker<FlowAndStatis
     private FlowTableStatsTracker flowTableStats;
     private int unaccountedFlowsCounter = 1;
 
-    FlowStatsTracker(OpendaylightFlowStatisticsService flowStatsService, final FlowCapableContext context) {
+    FlowStatsTracker(final OpendaylightFlowStatisticsService flowStatsService, final FlowCapableContext context) {
         super(context);
         this.flowStatsService = flowStatsService;
     }
-    FlowStatsTracker(OpendaylightFlowStatisticsService flowStatsService, final FlowCapableContext context, FlowTableStatsTracker flowTableStats) {
+    FlowStatsTracker(final OpendaylightFlowStatisticsService flowStatsService, final FlowCapableContext context, final FlowTableStatsTracker flowTableStats) {
         this(flowStatsService, context);
         this.flowTableStats = flowTableStats;
     }
 
     @Override
-    protected void cleanupSingleStat(DataModificationTransaction trans, FlowStatsEntry item) {
+    protected void cleanupSingleStat(final DataModificationTransaction trans, final FlowStatsEntry item) {
         InstanceIdentifier<?> flowRef = getNodeIdentifierBuilder()
                             .augmentation(FlowCapableNode.class)
                             .child(Table.class, new TableKey(item.getTableId()))
@@ -61,7 +61,7 @@ final class FlowStatsTracker extends AbstractListeningStatsTracker<FlowAndStatis
     }
 
     @Override
-    protected FlowStatsEntry updateSingleStat(DataModificationTransaction trans, FlowAndStatisticsMapList map) {
+    protected FlowStatsEntry updateSingleStat(final DataModificationTransaction trans, final FlowAndStatisticsMapList map) {
         short tableId = map.getTableId();
 
         FlowBuilder flowBuilder = new FlowBuilder();
@@ -221,7 +221,7 @@ final class FlowStatsTracker extends AbstractListeningStatsTracker<FlowAndStatis
         }
 
         this.requestAllFlowsAllTables();
-        
+
     }
     public void requestAllFlowsAllTables() {
         if (flowStatsService != null) {
@@ -254,7 +254,7 @@ final class FlowStatsTracker extends AbstractListeningStatsTracker<FlowAndStatis
     }
 
     @Override
-    public void onDataChanged(DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
+    public void onDataChanged(final DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
         for (Entry<InstanceIdentifier<?>, DataObject> e : change.getCreatedConfigurationData().entrySet()) {
             if (Flow.class.equals(e.getKey().getTargetType())) {
                 final Flow flow = (Flow) e.getValue();
@@ -270,11 +270,8 @@ final class FlowStatsTracker extends AbstractListeningStatsTracker<FlowAndStatis
             if (Flow.class.equals(key.getTargetType())) {
                 @SuppressWarnings("unchecked")
                 final InstanceIdentifier<Flow> flow = (InstanceIdentifier<Flow>)key;
-                final InstanceIdentifier<?> del = InstanceIdentifier.builder(flow)
-                        .augmentation(FlowStatisticsData.class).build();
-                logger.debug("Key {} triggered remove of augmentation {}", key, del);
-
-                trans.removeOperationalData(del);
+                logger.debug("Key {} triggered remove of Flow from operational space.", key);
+                trans.removeOperationalData(flow);
             }
         }
         trans.commit();
index 94e67247c8eaa44696f83d2950bea6650732f89a..d6957385bd8278d566ba31fc0cf795539d1161c8 100644 (file)
@@ -19,7 +19,10 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.Timer;
+import java.util.TimerTask;
 
 import org.opendaylight.controller.clustering.services.CacheConfigException;
 import org.opendaylight.controller.clustering.services.CacheExistException;
@@ -100,6 +103,98 @@ public class SimpleForwardingImpl implements IfNewHostNotify,
     private ISwitchManager switchManager;
     private IDataPacketService dataPacketService;
 
+    /**
+     * Ip packets that are punted may not have their destination known by hostTracker at the time it
+     * is presented to SimpleForwardingImpl. Instead of dropping the packet, we will keep it around
+     * for a 'little' while, to accommodate any transients. See bug 590 for more details.
+     */
+    private class PendingPacketData {
+        private final static byte MAX_AGE = 2;
+
+        public final IPv4 pkt;
+        public final NodeConnector incomingNodeConnector;
+        private byte age;
+
+        public PendingPacketData(IPv4 pkt, NodeConnector incomingNodeConnector) {
+            this.pkt = pkt;
+            this.incomingNodeConnector = incomingNodeConnector;
+            this.age = 0;
+        }
+        boolean bumpAgeAndCheckIfTooOld() { return ++age > MAX_AGE; }
+    }
+    private static final int MAX_PENDING_PACKET_DESTINATIONS = 64;
+    private ConcurrentMap<InetAddress, PendingPacketData> pendingPacketDestinations;
+    private Timer pendingPacketsAgerTimer;
+
+    private class PendingPacketsAgerTimerHandler extends TimerTask {
+        @Override
+        public void run() {
+            if (pendingPacketDestinations == null) {
+                return;
+            }
+            try {
+                Iterator<ConcurrentMap.Entry<InetAddress, PendingPacketData>> iterator =
+                        pendingPacketDestinations.entrySet().iterator();
+                while (iterator.hasNext()) {
+                    ConcurrentHashMap.Entry<InetAddress, PendingPacketData> entry = iterator.next();
+                    InetAddress dIP = entry.getKey();
+                    PendingPacketData pendingPacketData = entry.getValue();
+
+                    if (pendingPacketData.bumpAgeAndCheckIfTooOld()) {
+                        iterator.remove(); // safe to remove while iterating...
+                        log.debug("Pending packet for {} has been aged out", dIP);
+                    } else {
+                        /** Replace the entry for a key only if currently mapped to some value.
+                         * This will protect the concurrent map against a race where this thread
+                         * would be re-adding an entry that just got taken out.
+                         */
+                        pendingPacketDestinations.replace(dIP, pendingPacketData);
+                    }
+                }
+            } catch (IllegalStateException e) {
+                log.debug("IllegalStateException Received by PendingPacketsAgerTimerHandler from: {}",
+                        e.getMessage());
+            }
+        }
+    }
+
+    /**
+     * Add punted packet to pendingPackets
+     */
+    private void addToPendingPackets(InetAddress dIP, IPv4 pkt, NodeConnector incomingNodeConnector) {
+        if (pendingPacketDestinations.size() >= MAX_PENDING_PACKET_DESTINATIONS) {
+            log.info("Will not pend packet for {}: Too many destinations", dIP);
+            return;
+        }
+
+        /** TODO: The current implementation allows for up to 1 pending packet per InetAddress.
+         * This limitation is done for sake of simplicity. A potential enhancement could be to use a
+         * ConcurrentMultiMap instead of ConcurrentMap.
+         */
+        if (pendingPacketDestinations.containsKey(dIP)) {
+            log.trace("Will not pend packet for {}: Already have a packet pending", dIP);
+            return;
+        }
+
+        PendingPacketData pendingPacketData = new PendingPacketData(pkt, incomingNodeConnector);
+        pendingPacketDestinations.put(dIP, pendingPacketData);
+        log.debug("Pending packet for {}", dIP);
+    }
+
+    /**
+     * Send punted packet to given destination. This is invoked when there is a certain level of
+     * hope that the destination is known by hostTracker.
+     */
+    private void sendPendingPacket(InetAddress dIP) {
+        pendingPacketDestinations.get(dIP);
+        PendingPacketData pendingPacketData = pendingPacketDestinations.get(dIP);
+        if (pendingPacketData != null) {
+            handlePuntedIPPacket(pendingPacketData.pkt, pendingPacketData.incomingNodeConnector, false);
+            log.trace("Packet for {} is no longer pending", dIP);
+            pendingPacketDestinations.remove(dIP);
+        }
+    }
+
     /**
      * Return codes from the programming of the perHost rules in HW
      */
@@ -170,6 +265,15 @@ public class SimpleForwardingImpl implements IfNewHostNotify,
     public void startUp() {
         allocateCaches();
         retrieveCaches();
+        nonClusterObjectCreate();
+    }
+
+    public void nonClusterObjectCreate() {
+        pendingPacketDestinations = new ConcurrentHashMap<InetAddress, PendingPacketData>();
+
+        /* Pending Packets Ager Timer to go off every 6 seconds to implement pending packet aging */
+        pendingPacketsAgerTimer = new Timer();
+        pendingPacketsAgerTimer.schedule(new PendingPacketsAgerTimerHandler(), 6000, 6000);
     }
 
     /**
@@ -838,6 +942,9 @@ public class SimpleForwardingImpl implements IfNewHostNotify,
         Set<Node> switches = preparePerHostRules(host);
         if (switches != null) {
             installPerHostRules(host, switches);
+
+            // Green light for sending pending packet to this host. Safe to call if there are none.
+            sendPendingPacket(host.getNetworkAddress());
         }
     }
 
@@ -962,6 +1069,8 @@ public class SimpleForwardingImpl implements IfNewHostNotify,
      *
      */
     void stop() {
+        pendingPacketsAgerTimer.cancel();
+        pendingPacketDestinations.clear();
     }
 
     public void setSwitchManager(ISwitchManager switchManager) {
@@ -985,14 +1094,14 @@ public class SimpleForwardingImpl implements IfNewHostNotify,
             Object nextPak = formattedPak.getPayload();
             if (nextPak instanceof IPv4) {
                 log.trace("Handle punted IP packet: {}", formattedPak);
-                handlePuntedIPPacket((IPv4) nextPak, inPkt.getIncomingNodeConnector());
+                handlePuntedIPPacket((IPv4) nextPak, inPkt.getIncomingNodeConnector(), true);
             }
         }
         return PacketResult.IGNORED;
 
     }
 
-    private void handlePuntedIPPacket(IPv4 pkt, NodeConnector incomingNodeConnector) {
+    private void handlePuntedIPPacket(IPv4 pkt, NodeConnector incomingNodeConnector, boolean allowAddPending) {
         InetAddress dIP = NetUtils.getInetAddress(pkt.getDestinationAddress());
         if (dIP == null || hostTracker == null) {
             log.debug("Invalid param(s) in handlePuntedIPPacket.. DestIP: {}. hostTracker: {}", dIP, hostTracker);
@@ -1026,7 +1135,12 @@ public class SimpleForwardingImpl implements IfNewHostNotify,
                 rp.setOutgoingNodeConnector(nc);
                 this.dataPacketService.transmitDataPacket(rp);
             }
-
+        } else if (allowAddPending) {
+            // If we made it here, let's hang on to the punted packet, with hopes that its destination
+            // will become available soon.
+            addToPendingPackets(dIP, pkt, incomingNodeConnector);
+        } else {
+            log.warn("Dropping punted IP packet received at {} to Host {}", incomingNodeConnector, dIP);
         }
     }
 }