Speed up packetin throttling 78/20978/14
authorRobert Varga <rovarga@cisco.com>
Fri, 22 May 2015 11:17:51 +0000 (13:17 +0200)
committerMichal Rehak <mirehak@cisco.com>
Tue, 26 May 2015 18:35:56 +0000 (20:35 +0200)
Perform an atomic operation instead of taking a lock in the fast path.

Change-Id: If204594f375aa2f9f7295cc5321236756f01c258
Signed-off-by: Robert Varga <rovarga@cisco.com>
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/connection/ConnectionContext.java
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/DeviceContext.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/ConnectionContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/PacketInRateLimiter.java [new file with mode: 0644]
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/SimpleRatelimiter.java [new file with mode: 0644]
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImplTest.java

index 3a49d5be8220a7cbc688e7a200b6aea23ae80a7c..bbbffdc0f2d196a70b4778661af7f13942edc59d 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.openflowplugin.api.openflow.connection;
 
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceDisconnectedHandler;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FeaturesReply;
@@ -21,7 +22,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731
  * </p>
  * Created by Martin Bobak &lt;mbobak@cisco.com&gt; on 25.2.2015.
  */
-public interface ConnectionContext {
+public interface ConnectionContext extends AutoCloseable {
 
     /**
      * distinguished connection states
@@ -111,4 +112,9 @@ public interface ConnectionContext {
      * Method provides propagates info about closed connection to handler for handling closing connections.
      */
     void propagateClosingConnection();
+
+    void setOutboundQueueHandleRegistration(OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration);
+
+    @Override
+    void close();
 }
index eb17433141d3bc6df8fe36b55b560ffda45afd3f..8a93e96e1106ff1e15e2acc4139369191b74a1b0 100644 (file)
@@ -17,7 +17,6 @@ import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginTimer;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
-import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceContextClosedHandler;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceDisconnectedHandler;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceReplyProcessor;
@@ -177,16 +176,5 @@ public interface DeviceContext extends AutoCloseable,
      */
     void onPublished();
 
-
-    /**
-     * Method registers outbound queue provider into current device context's primary connection adapter.
-     *
-     * @param outboundQueueProvider
-     * @param maxQueueDepth
-     * @param barrierNanos
-     */
-    void registerOutboundQueueProvider(OutboundQueueProvider outboundQueueProvider, int maxQueueDepth, long barrierNanos);
-
-
 }
 
index 4ec203c8767b17a8c271caafd27316d70c86f5dd..0453c23eef920b9e0ad7f62d2e7a1e12024f2127 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.openflowplugin.impl.connection;
 
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceDisconnectedHandler;
@@ -29,6 +30,7 @@ public class ConnectionContextImpl implements ConnectionContext {
     private DeviceDisconnectedHandler deviceDisconnectedHandler;
     private static final Logger LOG = LoggerFactory.getLogger(ConnectionContextImpl.class);
     private OutboundQueueProvider outboundQueueProvider;
+    private OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration;
 
     /**
      * @param connectionAdapter
@@ -94,4 +96,21 @@ public class ConnectionContextImpl implements ConnectionContext {
     public void setFeatures(final FeaturesReply featuresReply) {
         this.featuresReply = featuresReply;
     }
+
+    @Override
+    public void close() {
+        if (getConnectionAdapter().isAlive()) {
+            setConnectionState(ConnectionContext.CONNECTION_STATE.RIP);
+            getConnectionAdapter().disconnect();
+        }
+        if (outboundQueueHandlerRegistration != null) {
+            outboundQueueHandlerRegistration.close();
+            outboundQueueHandlerRegistration = null;
+        }
+    }
+
+    @Override
+    public void setOutboundQueueHandleRegistration(OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration) {
+        this.outboundQueueHandlerRegistration = outboundQueueHandlerRegistration;
+    }
 }
index f176b5ceb2f9c10c8f48716cd2b48bcd2959720d..d24e4c8426d33ee4bf2f4a8fca02b75980eaee2a 100644 (file)
@@ -28,7 +28,6 @@ import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
-import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
@@ -84,51 +83,51 @@ public class DeviceContextImpl implements DeviceContext {
 
     private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
 
+    // TODO: watermarks should be derived from effective rpc limit (75%|95%)
+    private static final int PACKETIN_LOW_WATERMARK = 15000;
+    private static final int PACKETIN_HIGH_WATERMARK = 19000;
+    // TODO: drain factor should be parametrized
+    public static final float REJECTED_DRAIN_FACTOR = 0.25f;
+
     private final ConnectionContext primaryConnectionContext;
     private final DeviceState deviceState;
     private final DataBroker dataBroker;
     private final HashedWheelTimer hashedWheelTimer;
     private final Map<SwitchConnectionDistinguisher, ConnectionContext> auxiliaryConnectionContexts;
     private final TransactionChainManager txChainManager;
-    private TranslatorLibrary translatorLibrary;
     private final DeviceFlowRegistry deviceFlowRegistry;
     private final DeviceGroupRegistry deviceGroupRegistry;
     private final DeviceMeterRegistry deviceMeterRegistry;
-    private Timeout barrierTaskTimeout;
-    private NotificationService notificationService;
-    private final MessageSpy messageSpy;
-    private DeviceDisconnectedHandler deviceDisconnectedHandler;
     private final Collection<DeviceContextClosedHandler> closeHandlers = new HashSet<>();
+    private final PacketInRateLimiter packetInLimiter;
+    private final MessageSpy messageSpy;
     private NotificationPublishService notificationPublishService;
+    private DeviceDisconnectedHandler deviceDisconnectedHandler;
+    private NotificationService notificationService;
+    private TranslatorLibrary translatorLibrary;
     private OutboundQueue outboundQueueProvider;
-
-    private volatile int outstandingNotificationsAmount = 0;
-    private volatile boolean filteringPacketIn = false;
-    private final Object throttlingLock = new Object();
-    private int filteringHighWaterMark = 0;
-    private OutboundQueueHandlerRegistration<?> outboundQueueHandlerRegistration;
-
-    @Override
-    public Long getReservedXid() {
-        return outboundQueueProvider.reserveEntry();
-    }
+    private Timeout barrierTaskTimeout;
 
     @VisibleForTesting
     DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
                       @Nonnull final DeviceState deviceState,
                       @Nonnull final DataBroker dataBroker,
                       @Nonnull final HashedWheelTimer hashedWheelTimer,
-                      @Nonnull final MessageSpy _messageSpy) {
+                      @Nonnull final MessageSpy _messageSpy, OutboundQueueProvider outboundQueueProvider) {
         this.primaryConnectionContext = Preconditions.checkNotNull(primaryConnectionContext);
         this.deviceState = Preconditions.checkNotNull(deviceState);
         this.dataBroker = Preconditions.checkNotNull(dataBroker);
         this.hashedWheelTimer = Preconditions.checkNotNull(hashedWheelTimer);
+        this.outboundQueueProvider = Preconditions.checkNotNull(outboundQueueProvider);
         txChainManager = new TransactionChainManager(dataBroker, deviceState);
         auxiliaryConnectionContexts = new HashMap<>();
         deviceFlowRegistry = new DeviceFlowRegistryImpl();
         deviceGroupRegistry = new DeviceGroupRegistryImpl();
         deviceMeterRegistry = new DeviceMeterRegistryImpl();
         messageSpy = _messageSpy;
+
+        this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
+                PACKETIN_LOW_WATERMARK, PACKETIN_HIGH_WATERMARK, messageSpy, REJECTED_DRAIN_FACTOR);
     }
 
     /**
@@ -139,6 +138,11 @@ public class DeviceContextImpl implements DeviceContext {
         txChainManager.initialSubmitWriteTransaction();
     }
 
+    @Override
+    public Long getReservedXid() {
+        return outboundQueueProvider.reserveEntry();
+    }
+
     @Override
     public <M extends ChildOf<DataObject>> void onMessage(final M message, final RequestContext<?> requestContext) {
         // TODO Auto-generated method stub
@@ -268,58 +272,42 @@ public class DeviceContextImpl implements DeviceContext {
         final PacketReceived packetReceived = messageTranslator.translate(packetInMessage, this, null);
 
         if (packetReceived == null) {
-            LOG.debug("Received a null packet from switch");
+            LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
             return;
         }
-        messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
 
-        final ListenableFuture<? extends Object> offerNotification = notificationPublishService.offerNotification(packetReceived);
-        synchronized (throttlingLock) {
-            outstandingNotificationsAmount += 1;
+        if (!packetInLimiter.acquirePermit()) {
+            LOG.debug("Packet limited");
+            // TODO: save packet into emergency slot if possible
+            // FIXME: some other counter
+            messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
+            return;
         }
+
+        final ListenableFuture<? extends Object> offerNotification = notificationPublishService.offerNotification(packetReceived);
         if (NotificationPublishService.REJECTED.equals(offerNotification)) {
             LOG.debug("notification offer rejected");
-            synchronized (throttlingLock) {
-                if (outstandingNotificationsAmount > 1 && !filteringPacketIn) {
-                    connectionAdapter.setPacketInFiltering(true);
-                    messageSpy.spyMessage(DeviceContext.class, MessageSpy.STATISTIC_GROUP.OFJ_BACKPRESSURE_ON);
-                    filteringPacketIn = true;
-                    filteringHighWaterMark = outstandingNotificationsAmount;
-                    LOG.debug("PacketIn filtering on: {}, watermark: {}", connectionAdapter.getRemoteAddress(), outstandingNotificationsAmount);
-                }
-            }
+            messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
+            packetInLimiter.drainLowWaterMark();
+            packetInLimiter.releasePermit();
+            return;
         }
 
-        Futures.addCallback(offerNotification,
-                new FutureCallback<Object>() {
-                    @Override
-                    public void onSuccess(final Object result) {
-                        countdownFiltering();
-                        messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
-                    }
-
-                    @Override
-                    public void onFailure(final Throwable t) {
-                        countdownFiltering();
-                        messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED);
-                        LOG.debug("notification offer failed: {}, outstanding: {}", t.getMessage(), outstandingNotificationsAmount);
-                        LOG.trace("notification offer failed..", t);
-                    }
-
-                    private void countdownFiltering() {
-                        synchronized (throttlingLock) {
-                            outstandingNotificationsAmount -= 1;
-                            if (outstandingNotificationsAmount == 0 && filteringPacketIn) {
-                                connectionAdapter.setPacketInFiltering(false);
-                                messageSpy.spyMessage(DeviceContext.class, MessageSpy.STATISTIC_GROUP.OFJ_BACKPRESSURE_OFF);
-
-                                filteringPacketIn = false;
-                                LOG.debug("PacketIn filtering off: {}, outstanding: {}", connectionAdapter.getRemoteAddress(), outstandingNotificationsAmount);
-                            }
-                        }
-                    }
-                }
-        );
+        Futures.addCallback(offerNotification, new FutureCallback<Object>() {
+            @Override
+            public void onSuccess(final Object result) {
+                messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
+                packetInLimiter.releasePermit();
+            }
+
+            @Override
+            public void onFailure(final Throwable t) {
+                messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED);
+                LOG.debug("notification offer failed: {}", t.getMessage());
+                LOG.trace("notification offer failed..", t);
+                packetInLimiter.releasePermit();
+            }
+        });
     }
 
     @Override
@@ -345,16 +333,9 @@ public class DeviceContextImpl implements DeviceContext {
         deviceFlowRegistry.close();
         deviceMeterRegistry.close();
 
-        outboundQueueHandlerRegistration.close();
-
-        if (primaryConnectionContext.getConnectionAdapter().isAlive()) {
-            primaryConnectionContext.setConnectionState(ConnectionContext.CONNECTION_STATE.RIP);
-            primaryConnectionContext.getConnectionAdapter().disconnect();
-        }
+        primaryConnectionContext.close();
         for (final ConnectionContext connectionContext : auxiliaryConnectionContexts.values()) {
-            if (connectionContext.getConnectionAdapter().isAlive()) {
-                connectionContext.getConnectionAdapter().disconnect();
-            }
+            connectionContext.close();
         }
 
         for (final DeviceContextClosedHandler deviceContextClosedHandler : closeHandlers) {
@@ -424,14 +405,6 @@ public class DeviceContextImpl implements DeviceContext {
         }
     }
 
-    @Override
-    public void registerOutboundQueueProvider(final OutboundQueueProvider outboundQueueProvider, final int maxQueueDepth, final long barrierNanos) {
-        final ConnectionAdapter primaryConnectionAdapter = primaryConnectionContext.getConnectionAdapter();
-        outboundQueueHandlerRegistration = primaryConnectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, maxQueueDepth, barrierNanos);
-        this.outboundQueueProvider = outboundQueueProvider;
-        primaryConnectionContext.setOutboundQueueProvider(outboundQueueProvider);
-    }
-
     @Override
     public MultiMsgCollector getMultiMsgCollector(final RequestContext<List<MultipartReply>> requestContext) {
         return new MultiMsgCollectorImpl(this, requestContext);
index 00cf64754abebf1a1cc58676bd156b00cf545ce3..4685bccbb2281249d567d1e140277e60b27471f3 100644 (file)
@@ -32,6 +32,7 @@ import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
 import org.opendaylight.openflowplugin.api.ConnectionException;
 import org.opendaylight.openflowplugin.api.OFConstants;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
@@ -167,10 +168,15 @@ public class DeviceManagerImpl implements DeviceManager, AutoCloseable {
         final Short version = connectionContext.getFeatures().getVersion();
         final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(version);
 
+        connectionContext.setOutboundQueueProvider(outboundQueueProvider);
+        final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
+                connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, maxQueueDepth, barrierNanos);
+        connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
+
         final DeviceState deviceState = new DeviceStateImpl(connectionContext.getFeatures(), connectionContext.getNodeId());
 
-        final DeviceContext deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker, hashedWheelTimer, messageIntelligenceAgency);
-        deviceContext.registerOutboundQueueProvider(outboundQueueProvider, maxQueueDepth, barrierNanos);
+        final DeviceContext deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker,
+                hashedWheelTimer, messageIntelligenceAgency, outboundQueueProvider);
         deviceContext.setNotificationService(notificationService);
         deviceContext.setNotificationPublishService(notificationPublishService);
         final NodeBuilder nodeBuilder = new NodeBuilder().setId(deviceState.getNodeId()).setNodeConnector(Collections.<NodeConnector>emptyList());
diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/PacketInRateLimiter.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/PacketInRateLimiter.java
new file mode 100644 (file)
index 0000000..ad0a8b1
--- /dev/null
@@ -0,0 +1,48 @@
+/**
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.impl.device;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
+import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class PacketInRateLimiter extends SimpleRatelimiter {
+    private static final Logger LOG = LoggerFactory.getLogger(PacketInRateLimiter.class);
+    private final float rejectedDrainFactor;
+    private final ConnectionAdapter connectionAdapter;
+    private final MessageSpy messageSpy;
+
+    PacketInRateLimiter(final ConnectionAdapter connectionAdapter, final int lowWatermark, final int highWatermark, final MessageSpy messageSpy, float rejectedDrainFactor) {
+        super(lowWatermark, highWatermark);
+        Preconditions.checkArgument(rejectedDrainFactor > 0 && rejectedDrainFactor < 1);
+        this.rejectedDrainFactor = rejectedDrainFactor;
+        this.connectionAdapter = Preconditions.checkNotNull(connectionAdapter);
+        this.messageSpy = Preconditions.checkNotNull(messageSpy);
+    }
+
+    @Override
+    protected void disableFlow() {
+        messageSpy.spyMessage(DeviceContext.class, MessageSpy.STATISTIC_GROUP.OFJ_BACKPRESSURE_ON);
+        connectionAdapter.setPacketInFiltering(true);
+        LOG.debug("PacketIn filtering on: {}", connectionAdapter.getRemoteAddress());
+    }
+
+    @Override
+    protected void enableFlow() {
+        messageSpy.spyMessage(DeviceContext.class, MessageSpy.STATISTIC_GROUP.OFJ_BACKPRESSURE_OFF);
+        connectionAdapter.setPacketInFiltering(false);
+        LOG.debug("PacketIn filtering off: {}", connectionAdapter.getRemoteAddress());
+    }
+
+    public void drainLowWaterMark() {
+        adaptLowWaterMarkAndDisableFlow((int) (getOccupiedPermits() * rejectedDrainFactor));
+    }
+}
diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/SimpleRatelimiter.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/SimpleRatelimiter.java
new file mode 100644 (file)
index 0000000..eed1ccd
--- /dev/null
@@ -0,0 +1,90 @@
+/**
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.impl.device;
+
+import com.google.common.base.Preconditions;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.concurrent.GuardedBy;
+
+abstract class SimpleRatelimiter {
+    private final AtomicInteger counter = new AtomicInteger();
+    private final int lowWatermark;
+    private int lowWatermarkEffective;
+    private final int highWatermark;
+    @GuardedBy("counter")
+    private volatile boolean limited;
+
+    SimpleRatelimiter(final int lowWatermark, final int highWatermark) {
+        Preconditions.checkArgument(lowWatermark >= 0);
+        Preconditions.checkArgument(highWatermark >= 0);
+        Preconditions.checkArgument(lowWatermark <= highWatermark);
+
+        this.lowWatermark = lowWatermark;
+        this.highWatermark = highWatermark;
+        lowWatermarkEffective = lowWatermark;
+    }
+
+    protected final boolean isLimited() {
+        return limited;
+    }
+
+    protected abstract void disableFlow();
+    protected abstract void enableFlow();
+
+    boolean acquirePermit() {
+        final int cnt = counter.incrementAndGet();
+        if (cnt > highWatermark) {
+            synchronized (counter) {
+                final int recheck = counter.decrementAndGet();
+                if (recheck >= highWatermark && !limited) {
+                    disableFlow();
+                    limited = true;
+                }
+            }
+            return false;
+        }
+
+        return true;
+    }
+
+    void releasePermit() {
+        final int cnt = counter.decrementAndGet();
+        if (cnt <= lowWatermarkEffective) {
+            synchronized (counter) {
+                final int recheck = counter.get();
+                if (recheck <= lowWatermarkEffective && limited) {
+                    enableFlow();
+                    limited = false;
+                    resetLowWaterMark();
+                }
+            }
+        }
+    }
+
+    void resetLowWaterMark() {
+        synchronized (counter) {
+            lowWatermarkEffective = lowWatermark;
+        }
+    }
+
+    void adaptLowWaterMarkAndDisableFlow(int temporaryLowWaterMark) {
+        if (temporaryLowWaterMark < highWatermark) {
+            synchronized (counter) {
+                lowWatermarkEffective = temporaryLowWaterMark;
+                if (!limited) {
+                    disableFlow();
+                    limited = true;
+                }
+            }
+        }
+    }
+
+    int getOccupiedPermits() {
+        return counter.get();
+    }
+}
index 556afe8efdf019d3382c78d66d6a761eba24be2d..237a145fe7d1411abfb45452fad23d1febcb3c3a 100644 (file)
@@ -21,6 +21,7 @@ import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
 import org.opendaylight.openflowplugin.api.OFConstants;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
@@ -78,6 +79,8 @@ public class DeviceContextImplTest {
     MessageIntelligenceAgency messageIntelligenceAgency;
     @Mock
     OutboundQueueProvider outboundQueueProvider;
+    @Mock
+    ConnectionAdapter connectionAdapter;
 
     private final AtomicLong atomicLong = new AtomicLong(0);
 
@@ -109,7 +112,8 @@ public class DeviceContextImplTest {
         Mockito.when(txChainFactory.newWriteOnlyTransaction()).thenReturn(wTx);
         Mockito.when(dataBroker.newReadOnlyTransaction()).thenReturn(rTx);
         Mockito.when(connectionContext.getOutboundQueueProvider()).thenReturn(outboundQueueProvider);
-        deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker, timer, messageIntelligenceAgency);
+        Mockito.when(connectionContext.getConnectionAdapter()).thenReturn(connectionAdapter);
+        deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker, timer, messageIntelligenceAgency, outboundQueueProvider);
 
         xid = new Xid(atomicLong.incrementAndGet());
         xidMulti = new Xid(atomicLong.incrementAndGet());
@@ -117,22 +121,22 @@ public class DeviceContextImplTest {
 
     @Test(expected = NullPointerException.class)
     public void testDeviceContextImplConstructorNullConnectionContext() throws Exception {
-        new DeviceContextImpl(null, deviceState, dataBroker, timer, messageIntelligenceAgency).close();
+        new DeviceContextImpl(null, deviceState, dataBroker, timer, messageIntelligenceAgency, outboundQueueProvider).close();
     }
 
     @Test(expected = NullPointerException.class)
     public void testDeviceContextImplConstructorNullDataBroker() throws Exception {
-        new DeviceContextImpl(connectionContext, deviceState, null, timer, messageIntelligenceAgency).close();
+        new DeviceContextImpl(connectionContext, deviceState, null, timer, messageIntelligenceAgency, outboundQueueProvider).close();
     }
 
     @Test(expected = NullPointerException.class)
     public void testDeviceContextImplConstructorNullDeviceState() throws Exception {
-        new DeviceContextImpl(connectionContext, null, dataBroker, timer, messageIntelligenceAgency).close();
+        new DeviceContextImpl(connectionContext, null, dataBroker, timer, messageIntelligenceAgency, outboundQueueProvider).close();
     }
 
     @Test(expected = NullPointerException.class)
     public void testDeviceContextImplConstructorNullTimer() throws Exception {
-        new DeviceContextImpl(null, deviceState, dataBroker, null, messageIntelligenceAgency).close();
+        new DeviceContextImpl(null, deviceState, dataBroker, null, messageIntelligenceAgency, outboundQueueProvider).close();
     }
 
     @Test