From: Tony Tkacik Date: Wed, 7 May 2014 15:42:46 +0000 (+0000) Subject: Merge "Increase mdsal's notification queue size, make it configurable." X-Git-Tag: autorelease-tag-v20140601202136_82eb3f9~125 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=3425d61ef729f17632b7c764bc93ce2df1d9345a;hp=f02aa37017e8cf62375bd84e29b0a6df324891a4 Merge "Increase mdsal's notification queue size, make it configurable." --- diff --git a/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java b/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java index f7b647dd72..46008b24c6 100644 --- a/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java +++ b/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java @@ -2342,7 +2342,7 @@ public class ForwardingRulesManager implements for (FlowConfig flowConfig : flowConfigForNode) { if (doesFlowContainNodeConnector(flowConfig.getFlow(), nodeConnector)) { if (flowConfig.installInHw() && !flowConfig.getStatus().equals(StatusCode.SUCCESS.toString())) { - Status status = this.installFlowEntry(flowConfig.getFlowEntry()); + Status status = this.installFlowEntryAsync(flowConfig.getFlowEntry()); if (!status.isSuccess()) { flowConfig.setStatus(status.getDescription()); } else { diff --git a/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/topology/TopologyCommitHandler.java b/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/topology/TopologyCommitHandler.java index a35c3ed98c..e798b681df 100644 --- a/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/topology/TopologyCommitHandler.java +++ b/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/topology/TopologyCommitHandler.java @@ -68,7 +68,7 @@ public class TopologyCommitHandler implements DataChangeListener { for (InstanceIdentifier path : modification.getRemovedOperationalData()) { if (path.getTargetType() == Link.class) { Link link = (Link) modification.getOriginalOperationalData().get(path); - msg.add(toTopoEdgeUpdate(toAdEdge(link, topology), UpdateType.CHANGED, reader)); + msg.add(toTopoEdgeUpdate(toAdEdge(link, topology), UpdateType.REMOVED, reader)); } } diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/FlowStatsTracker.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/FlowStatsTracker.java index 06d6e82112..add46bd162 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/FlowStatsTracker.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/FlowStatsTracker.java @@ -41,17 +41,17 @@ final class FlowStatsTracker extends AbstractListeningStatsTracker flowRef = getNodeIdentifierBuilder() .augmentation(FlowCapableNode.class) .child(Table.class, new TableKey(item.getTableId())) @@ -61,7 +61,7 @@ final class FlowStatsTracker extends AbstractListeningStatsTracker, DataObject> change) { + public void onDataChanged(final DataChangeEvent, DataObject> change) { for (Entry, DataObject> e : change.getCreatedConfigurationData().entrySet()) { if (Flow.class.equals(e.getKey().getTargetType())) { final Flow flow = (Flow) e.getValue(); @@ -270,11 +270,8 @@ final class FlowStatsTracker extends AbstractListeningStatsTracker flow = (InstanceIdentifier)key; - final InstanceIdentifier del = InstanceIdentifier.builder(flow) - .augmentation(FlowStatisticsData.class).build(); - logger.debug("Key {} triggered remove of augmentation {}", key, del); - - trans.removeOperationalData(del); + logger.debug("Key {} triggered remove of Flow from operational space.", key); + trans.removeOperationalData(flow); } } trans.commit(); diff --git a/opendaylight/samples/simpleforwarding/src/main/java/org/opendaylight/controller/samples/simpleforwarding/internal/SimpleForwardingImpl.java b/opendaylight/samples/simpleforwarding/src/main/java/org/opendaylight/controller/samples/simpleforwarding/internal/SimpleForwardingImpl.java index 94e67247c8..d6957385bd 100644 --- a/opendaylight/samples/simpleforwarding/src/main/java/org/opendaylight/controller/samples/simpleforwarding/internal/SimpleForwardingImpl.java +++ b/opendaylight/samples/simpleforwarding/src/main/java/org/opendaylight/controller/samples/simpleforwarding/internal/SimpleForwardingImpl.java @@ -19,7 +19,10 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.Timer; +import java.util.TimerTask; import org.opendaylight.controller.clustering.services.CacheConfigException; import org.opendaylight.controller.clustering.services.CacheExistException; @@ -100,6 +103,98 @@ public class SimpleForwardingImpl implements IfNewHostNotify, private ISwitchManager switchManager; private IDataPacketService dataPacketService; + /** + * Ip packets that are punted may not have their destination known by hostTracker at the time it + * is presented to SimpleForwardingImpl. Instead of dropping the packet, we will keep it around + * for a 'little' while, to accommodate any transients. See bug 590 for more details. + */ + private class PendingPacketData { + private final static byte MAX_AGE = 2; + + public final IPv4 pkt; + public final NodeConnector incomingNodeConnector; + private byte age; + + public PendingPacketData(IPv4 pkt, NodeConnector incomingNodeConnector) { + this.pkt = pkt; + this.incomingNodeConnector = incomingNodeConnector; + this.age = 0; + } + boolean bumpAgeAndCheckIfTooOld() { return ++age > MAX_AGE; } + } + private static final int MAX_PENDING_PACKET_DESTINATIONS = 64; + private ConcurrentMap pendingPacketDestinations; + private Timer pendingPacketsAgerTimer; + + private class PendingPacketsAgerTimerHandler extends TimerTask { + @Override + public void run() { + if (pendingPacketDestinations == null) { + return; + } + try { + Iterator> iterator = + pendingPacketDestinations.entrySet().iterator(); + while (iterator.hasNext()) { + ConcurrentHashMap.Entry entry = iterator.next(); + InetAddress dIP = entry.getKey(); + PendingPacketData pendingPacketData = entry.getValue(); + + if (pendingPacketData.bumpAgeAndCheckIfTooOld()) { + iterator.remove(); // safe to remove while iterating... + log.debug("Pending packet for {} has been aged out", dIP); + } else { + /** Replace the entry for a key only if currently mapped to some value. + * This will protect the concurrent map against a race where this thread + * would be re-adding an entry that just got taken out. + */ + pendingPacketDestinations.replace(dIP, pendingPacketData); + } + } + } catch (IllegalStateException e) { + log.debug("IllegalStateException Received by PendingPacketsAgerTimerHandler from: {}", + e.getMessage()); + } + } + } + + /** + * Add punted packet to pendingPackets + */ + private void addToPendingPackets(InetAddress dIP, IPv4 pkt, NodeConnector incomingNodeConnector) { + if (pendingPacketDestinations.size() >= MAX_PENDING_PACKET_DESTINATIONS) { + log.info("Will not pend packet for {}: Too many destinations", dIP); + return; + } + + /** TODO: The current implementation allows for up to 1 pending packet per InetAddress. + * This limitation is done for sake of simplicity. A potential enhancement could be to use a + * ConcurrentMultiMap instead of ConcurrentMap. + */ + if (pendingPacketDestinations.containsKey(dIP)) { + log.trace("Will not pend packet for {}: Already have a packet pending", dIP); + return; + } + + PendingPacketData pendingPacketData = new PendingPacketData(pkt, incomingNodeConnector); + pendingPacketDestinations.put(dIP, pendingPacketData); + log.debug("Pending packet for {}", dIP); + } + + /** + * Send punted packet to given destination. This is invoked when there is a certain level of + * hope that the destination is known by hostTracker. + */ + private void sendPendingPacket(InetAddress dIP) { + pendingPacketDestinations.get(dIP); + PendingPacketData pendingPacketData = pendingPacketDestinations.get(dIP); + if (pendingPacketData != null) { + handlePuntedIPPacket(pendingPacketData.pkt, pendingPacketData.incomingNodeConnector, false); + log.trace("Packet for {} is no longer pending", dIP); + pendingPacketDestinations.remove(dIP); + } + } + /** * Return codes from the programming of the perHost rules in HW */ @@ -170,6 +265,15 @@ public class SimpleForwardingImpl implements IfNewHostNotify, public void startUp() { allocateCaches(); retrieveCaches(); + nonClusterObjectCreate(); + } + + public void nonClusterObjectCreate() { + pendingPacketDestinations = new ConcurrentHashMap(); + + /* Pending Packets Ager Timer to go off every 6 seconds to implement pending packet aging */ + pendingPacketsAgerTimer = new Timer(); + pendingPacketsAgerTimer.schedule(new PendingPacketsAgerTimerHandler(), 6000, 6000); } /** @@ -838,6 +942,9 @@ public class SimpleForwardingImpl implements IfNewHostNotify, Set switches = preparePerHostRules(host); if (switches != null) { installPerHostRules(host, switches); + + // Green light for sending pending packet to this host. Safe to call if there are none. + sendPendingPacket(host.getNetworkAddress()); } } @@ -962,6 +1069,8 @@ public class SimpleForwardingImpl implements IfNewHostNotify, * */ void stop() { + pendingPacketsAgerTimer.cancel(); + pendingPacketDestinations.clear(); } public void setSwitchManager(ISwitchManager switchManager) { @@ -985,14 +1094,14 @@ public class SimpleForwardingImpl implements IfNewHostNotify, Object nextPak = formattedPak.getPayload(); if (nextPak instanceof IPv4) { log.trace("Handle punted IP packet: {}", formattedPak); - handlePuntedIPPacket((IPv4) nextPak, inPkt.getIncomingNodeConnector()); + handlePuntedIPPacket((IPv4) nextPak, inPkt.getIncomingNodeConnector(), true); } } return PacketResult.IGNORED; } - private void handlePuntedIPPacket(IPv4 pkt, NodeConnector incomingNodeConnector) { + private void handlePuntedIPPacket(IPv4 pkt, NodeConnector incomingNodeConnector, boolean allowAddPending) { InetAddress dIP = NetUtils.getInetAddress(pkt.getDestinationAddress()); if (dIP == null || hostTracker == null) { log.debug("Invalid param(s) in handlePuntedIPPacket.. DestIP: {}. hostTracker: {}", dIP, hostTracker); @@ -1026,7 +1135,12 @@ public class SimpleForwardingImpl implements IfNewHostNotify, rp.setOutgoingNodeConnector(nc); this.dataPacketService.transmitDataPacket(rp); } - + } else if (allowAddPending) { + // If we made it here, let's hang on to the punted packet, with hopes that its destination + // will become available soon. + addToPendingPackets(dIP, pkt, incomingNodeConnector); + } else { + log.warn("Dropping punted IP packet received at {} to Host {}", incomingNodeConnector, dIP); } } }