From 4b34ce62d369b80533e57e0bc0e8bc0ac7c496b3 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Fri, 22 May 2015 13:17:51 +0200 Subject: [PATCH] Speed up packetin throttling Perform an atomic operation instead of taking a lock in the fast path. Change-Id: If204594f375aa2f9f7295cc5321236756f01c258 Signed-off-by: Robert Varga --- .../connection/ConnectionContext.java | 8 +- .../api/openflow/device/DeviceContext.java | 12 -- .../connection/ConnectionContextImpl.java | 19 +++ .../impl/device/DeviceContextImpl.java | 131 +++++++----------- .../impl/device/DeviceManagerImpl.java | 10 +- .../impl/device/PacketInRateLimiter.java | 48 +++++++ .../impl/device/SimpleRatelimiter.java | 90 ++++++++++++ .../impl/device/DeviceContextImplTest.java | 14 +- 8 files changed, 233 insertions(+), 99 deletions(-) create mode 100644 openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/PacketInRateLimiter.java create mode 100644 openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/SimpleRatelimiter.java diff --git a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/connection/ConnectionContext.java b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/connection/ConnectionContext.java index 3a49d5be82..bbbffdc0f2 100644 --- a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/connection/ConnectionContext.java +++ b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/connection/ConnectionContext.java @@ -10,6 +10,7 @@ package org.opendaylight.openflowplugin.api.openflow.connection; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue; +import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration; import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceDisconnectedHandler; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FeaturesReply; @@ -21,7 +22,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731 *

* Created by Martin Bobak <mbobak@cisco.com> on 25.2.2015. */ -public interface ConnectionContext { +public interface ConnectionContext extends AutoCloseable { /** * distinguished connection states @@ -111,4 +112,9 @@ public interface ConnectionContext { * Method provides propagates info about closed connection to handler for handling closing connections. */ void propagateClosingConnection(); + + void setOutboundQueueHandleRegistration(OutboundQueueHandlerRegistration outboundQueueHandlerRegistration); + + @Override + void close(); } diff --git a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/DeviceContext.java b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/DeviceContext.java index eb17433141..8a93e96e11 100644 --- a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/DeviceContext.java +++ b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/DeviceContext.java @@ -17,7 +17,6 @@ import org.opendaylight.controller.md.sal.binding.api.ReadTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginTimer; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; -import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider; import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceContextClosedHandler; import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceDisconnectedHandler; import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceReplyProcessor; @@ -177,16 +176,5 @@ public interface DeviceContext extends AutoCloseable, */ void onPublished(); - - /** - * Method registers outbound queue provider into current device context's primary connection adapter. - * - * @param outboundQueueProvider - * @param maxQueueDepth - * @param barrierNanos - */ - void registerOutboundQueueProvider(OutboundQueueProvider outboundQueueProvider, int maxQueueDepth, long barrierNanos); - - } diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/ConnectionContextImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/ConnectionContextImpl.java index 4ec203c876..0453c23eef 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/ConnectionContextImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/ConnectionContextImpl.java @@ -9,6 +9,7 @@ package org.opendaylight.openflowplugin.impl.connection; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue; +import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider; import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceDisconnectedHandler; @@ -29,6 +30,7 @@ public class ConnectionContextImpl implements ConnectionContext { private DeviceDisconnectedHandler deviceDisconnectedHandler; private static final Logger LOG = LoggerFactory.getLogger(ConnectionContextImpl.class); private OutboundQueueProvider outboundQueueProvider; + private OutboundQueueHandlerRegistration outboundQueueHandlerRegistration; /** * @param connectionAdapter @@ -94,4 +96,21 @@ public class ConnectionContextImpl implements ConnectionContext { public void setFeatures(final FeaturesReply featuresReply) { this.featuresReply = featuresReply; } + + @Override + public void close() { + if (getConnectionAdapter().isAlive()) { + setConnectionState(ConnectionContext.CONNECTION_STATE.RIP); + getConnectionAdapter().disconnect(); + } + if (outboundQueueHandlerRegistration != null) { + outboundQueueHandlerRegistration.close(); + outboundQueueHandlerRegistration = null; + } + } + + @Override + public void setOutboundQueueHandleRegistration(OutboundQueueHandlerRegistration outboundQueueHandlerRegistration) { + this.outboundQueueHandlerRegistration = outboundQueueHandlerRegistration; + } } diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java index f176b5ceb2..d24e4c8426 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java @@ -28,7 +28,6 @@ import org.opendaylight.controller.md.sal.binding.api.ReadTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue; -import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; @@ -84,51 +83,51 @@ public class DeviceContextImpl implements DeviceContext { private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class); + // TODO: watermarks should be derived from effective rpc limit (75%|95%) + private static final int PACKETIN_LOW_WATERMARK = 15000; + private static final int PACKETIN_HIGH_WATERMARK = 19000; + // TODO: drain factor should be parametrized + public static final float REJECTED_DRAIN_FACTOR = 0.25f; + private final ConnectionContext primaryConnectionContext; private final DeviceState deviceState; private final DataBroker dataBroker; private final HashedWheelTimer hashedWheelTimer; private final Map auxiliaryConnectionContexts; private final TransactionChainManager txChainManager; - private TranslatorLibrary translatorLibrary; private final DeviceFlowRegistry deviceFlowRegistry; private final DeviceGroupRegistry deviceGroupRegistry; private final DeviceMeterRegistry deviceMeterRegistry; - private Timeout barrierTaskTimeout; - private NotificationService notificationService; - private final MessageSpy messageSpy; - private DeviceDisconnectedHandler deviceDisconnectedHandler; private final Collection closeHandlers = new HashSet<>(); + private final PacketInRateLimiter packetInLimiter; + private final MessageSpy messageSpy; private NotificationPublishService notificationPublishService; + private DeviceDisconnectedHandler deviceDisconnectedHandler; + private NotificationService notificationService; + private TranslatorLibrary translatorLibrary; private OutboundQueue outboundQueueProvider; - - private volatile int outstandingNotificationsAmount = 0; - private volatile boolean filteringPacketIn = false; - private final Object throttlingLock = new Object(); - private int filteringHighWaterMark = 0; - private OutboundQueueHandlerRegistration outboundQueueHandlerRegistration; - - @Override - public Long getReservedXid() { - return outboundQueueProvider.reserveEntry(); - } + private Timeout barrierTaskTimeout; @VisibleForTesting DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext, @Nonnull final DeviceState deviceState, @Nonnull final DataBroker dataBroker, @Nonnull final HashedWheelTimer hashedWheelTimer, - @Nonnull final MessageSpy _messageSpy) { + @Nonnull final MessageSpy _messageSpy, OutboundQueueProvider outboundQueueProvider) { this.primaryConnectionContext = Preconditions.checkNotNull(primaryConnectionContext); this.deviceState = Preconditions.checkNotNull(deviceState); this.dataBroker = Preconditions.checkNotNull(dataBroker); this.hashedWheelTimer = Preconditions.checkNotNull(hashedWheelTimer); + this.outboundQueueProvider = Preconditions.checkNotNull(outboundQueueProvider); txChainManager = new TransactionChainManager(dataBroker, deviceState); auxiliaryConnectionContexts = new HashMap<>(); deviceFlowRegistry = new DeviceFlowRegistryImpl(); deviceGroupRegistry = new DeviceGroupRegistryImpl(); deviceMeterRegistry = new DeviceMeterRegistryImpl(); messageSpy = _messageSpy; + + this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(), + PACKETIN_LOW_WATERMARK, PACKETIN_HIGH_WATERMARK, messageSpy, REJECTED_DRAIN_FACTOR); } /** @@ -139,6 +138,11 @@ public class DeviceContextImpl implements DeviceContext { txChainManager.initialSubmitWriteTransaction(); } + @Override + public Long getReservedXid() { + return outboundQueueProvider.reserveEntry(); + } + @Override public > void onMessage(final M message, final RequestContext requestContext) { // TODO Auto-generated method stub @@ -268,58 +272,42 @@ public class DeviceContextImpl implements DeviceContext { final PacketReceived packetReceived = messageTranslator.translate(packetInMessage, this, null); if (packetReceived == null) { - LOG.debug("Received a null packet from switch"); + LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress()); return; } - messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE); - final ListenableFuture offerNotification = notificationPublishService.offerNotification(packetReceived); - synchronized (throttlingLock) { - outstandingNotificationsAmount += 1; + if (!packetInLimiter.acquirePermit()) { + LOG.debug("Packet limited"); + // TODO: save packet into emergency slot if possible + // FIXME: some other counter + messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE); + return; } + + final ListenableFuture offerNotification = notificationPublishService.offerNotification(packetReceived); if (NotificationPublishService.REJECTED.equals(offerNotification)) { LOG.debug("notification offer rejected"); - synchronized (throttlingLock) { - if (outstandingNotificationsAmount > 1 && !filteringPacketIn) { - connectionAdapter.setPacketInFiltering(true); - messageSpy.spyMessage(DeviceContext.class, MessageSpy.STATISTIC_GROUP.OFJ_BACKPRESSURE_ON); - filteringPacketIn = true; - filteringHighWaterMark = outstandingNotificationsAmount; - LOG.debug("PacketIn filtering on: {}, watermark: {}", connectionAdapter.getRemoteAddress(), outstandingNotificationsAmount); - } - } + messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE); + packetInLimiter.drainLowWaterMark(); + packetInLimiter.releasePermit(); + return; } - Futures.addCallback(offerNotification, - new FutureCallback() { - @Override - public void onSuccess(final Object result) { - countdownFiltering(); - messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS); - } - - @Override - public void onFailure(final Throwable t) { - countdownFiltering(); - messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED); - LOG.debug("notification offer failed: {}, outstanding: {}", t.getMessage(), outstandingNotificationsAmount); - LOG.trace("notification offer failed..", t); - } - - private void countdownFiltering() { - synchronized (throttlingLock) { - outstandingNotificationsAmount -= 1; - if (outstandingNotificationsAmount == 0 && filteringPacketIn) { - connectionAdapter.setPacketInFiltering(false); - messageSpy.spyMessage(DeviceContext.class, MessageSpy.STATISTIC_GROUP.OFJ_BACKPRESSURE_OFF); - - filteringPacketIn = false; - LOG.debug("PacketIn filtering off: {}, outstanding: {}", connectionAdapter.getRemoteAddress(), outstandingNotificationsAmount); - } - } - } - } - ); + Futures.addCallback(offerNotification, new FutureCallback() { + @Override + public void onSuccess(final Object result) { + messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS); + packetInLimiter.releasePermit(); + } + + @Override + public void onFailure(final Throwable t) { + messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED); + LOG.debug("notification offer failed: {}", t.getMessage()); + LOG.trace("notification offer failed..", t); + packetInLimiter.releasePermit(); + } + }); } @Override @@ -345,16 +333,9 @@ public class DeviceContextImpl implements DeviceContext { deviceFlowRegistry.close(); deviceMeterRegistry.close(); - outboundQueueHandlerRegistration.close(); - - if (primaryConnectionContext.getConnectionAdapter().isAlive()) { - primaryConnectionContext.setConnectionState(ConnectionContext.CONNECTION_STATE.RIP); - primaryConnectionContext.getConnectionAdapter().disconnect(); - } + primaryConnectionContext.close(); for (final ConnectionContext connectionContext : auxiliaryConnectionContexts.values()) { - if (connectionContext.getConnectionAdapter().isAlive()) { - connectionContext.getConnectionAdapter().disconnect(); - } + connectionContext.close(); } for (final DeviceContextClosedHandler deviceContextClosedHandler : closeHandlers) { @@ -424,14 +405,6 @@ public class DeviceContextImpl implements DeviceContext { } } - @Override - public void registerOutboundQueueProvider(final OutboundQueueProvider outboundQueueProvider, final int maxQueueDepth, final long barrierNanos) { - final ConnectionAdapter primaryConnectionAdapter = primaryConnectionContext.getConnectionAdapter(); - outboundQueueHandlerRegistration = primaryConnectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, maxQueueDepth, barrierNanos); - this.outboundQueueProvider = outboundQueueProvider; - primaryConnectionContext.setOutboundQueueProvider(outboundQueueProvider); - } - @Override public MultiMsgCollector getMultiMsgCollector(final RequestContext> requestContext) { return new MultiMsgCollectorImpl(this, requestContext); diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java index 00cf64754a..4685bccbb2 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java @@ -32,6 +32,7 @@ import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue; +import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration; import org.opendaylight.openflowplugin.api.ConnectionException; import org.opendaylight.openflowplugin.api.OFConstants; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; @@ -167,10 +168,15 @@ public class DeviceManagerImpl implements DeviceManager, AutoCloseable { final Short version = connectionContext.getFeatures().getVersion(); final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(version); + connectionContext.setOutboundQueueProvider(outboundQueueProvider); + final OutboundQueueHandlerRegistration outboundQueueHandlerRegistration = + connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, maxQueueDepth, barrierNanos); + connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration); + final DeviceState deviceState = new DeviceStateImpl(connectionContext.getFeatures(), connectionContext.getNodeId()); - final DeviceContext deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker, hashedWheelTimer, messageIntelligenceAgency); - deviceContext.registerOutboundQueueProvider(outboundQueueProvider, maxQueueDepth, barrierNanos); + final DeviceContext deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker, + hashedWheelTimer, messageIntelligenceAgency, outboundQueueProvider); deviceContext.setNotificationService(notificationService); deviceContext.setNotificationPublishService(notificationPublishService); final NodeBuilder nodeBuilder = new NodeBuilder().setId(deviceState.getNodeId()).setNodeConnector(Collections.emptyList()); diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/PacketInRateLimiter.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/PacketInRateLimiter.java new file mode 100644 index 0000000000..ad0a8b1b74 --- /dev/null +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/PacketInRateLimiter.java @@ -0,0 +1,48 @@ +/** + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowplugin.impl.device; + +import com.google.common.base.Preconditions; +import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; +import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; +import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class PacketInRateLimiter extends SimpleRatelimiter { + private static final Logger LOG = LoggerFactory.getLogger(PacketInRateLimiter.class); + private final float rejectedDrainFactor; + private final ConnectionAdapter connectionAdapter; + private final MessageSpy messageSpy; + + PacketInRateLimiter(final ConnectionAdapter connectionAdapter, final int lowWatermark, final int highWatermark, final MessageSpy messageSpy, float rejectedDrainFactor) { + super(lowWatermark, highWatermark); + Preconditions.checkArgument(rejectedDrainFactor > 0 && rejectedDrainFactor < 1); + this.rejectedDrainFactor = rejectedDrainFactor; + this.connectionAdapter = Preconditions.checkNotNull(connectionAdapter); + this.messageSpy = Preconditions.checkNotNull(messageSpy); + } + + @Override + protected void disableFlow() { + messageSpy.spyMessage(DeviceContext.class, MessageSpy.STATISTIC_GROUP.OFJ_BACKPRESSURE_ON); + connectionAdapter.setPacketInFiltering(true); + LOG.debug("PacketIn filtering on: {}", connectionAdapter.getRemoteAddress()); + } + + @Override + protected void enableFlow() { + messageSpy.spyMessage(DeviceContext.class, MessageSpy.STATISTIC_GROUP.OFJ_BACKPRESSURE_OFF); + connectionAdapter.setPacketInFiltering(false); + LOG.debug("PacketIn filtering off: {}", connectionAdapter.getRemoteAddress()); + } + + public void drainLowWaterMark() { + adaptLowWaterMarkAndDisableFlow((int) (getOccupiedPermits() * rejectedDrainFactor)); + } +} diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/SimpleRatelimiter.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/SimpleRatelimiter.java new file mode 100644 index 0000000000..eed1ccdecf --- /dev/null +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/SimpleRatelimiter.java @@ -0,0 +1,90 @@ +/** + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowplugin.impl.device; + +import com.google.common.base.Preconditions; +import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.concurrent.GuardedBy; + +abstract class SimpleRatelimiter { + private final AtomicInteger counter = new AtomicInteger(); + private final int lowWatermark; + private int lowWatermarkEffective; + private final int highWatermark; + @GuardedBy("counter") + private volatile boolean limited; + + SimpleRatelimiter(final int lowWatermark, final int highWatermark) { + Preconditions.checkArgument(lowWatermark >= 0); + Preconditions.checkArgument(highWatermark >= 0); + Preconditions.checkArgument(lowWatermark <= highWatermark); + + this.lowWatermark = lowWatermark; + this.highWatermark = highWatermark; + lowWatermarkEffective = lowWatermark; + } + + protected final boolean isLimited() { + return limited; + } + + protected abstract void disableFlow(); + protected abstract void enableFlow(); + + boolean acquirePermit() { + final int cnt = counter.incrementAndGet(); + if (cnt > highWatermark) { + synchronized (counter) { + final int recheck = counter.decrementAndGet(); + if (recheck >= highWatermark && !limited) { + disableFlow(); + limited = true; + } + } + return false; + } + + return true; + } + + void releasePermit() { + final int cnt = counter.decrementAndGet(); + if (cnt <= lowWatermarkEffective) { + synchronized (counter) { + final int recheck = counter.get(); + if (recheck <= lowWatermarkEffective && limited) { + enableFlow(); + limited = false; + resetLowWaterMark(); + } + } + } + } + + void resetLowWaterMark() { + synchronized (counter) { + lowWatermarkEffective = lowWatermark; + } + } + + void adaptLowWaterMarkAndDisableFlow(int temporaryLowWaterMark) { + if (temporaryLowWaterMark < highWatermark) { + synchronized (counter) { + lowWatermarkEffective = temporaryLowWaterMark; + if (!limited) { + disableFlow(); + limited = true; + } + } + } + } + + int getOccupiedPermits() { + return counter.get(); + } +} diff --git a/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImplTest.java b/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImplTest.java index 556afe8efd..237a145fe7 100644 --- a/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImplTest.java +++ b/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImplTest.java @@ -21,6 +21,7 @@ import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; import org.opendaylight.controller.md.sal.binding.api.ReadTransaction; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; +import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; import org.opendaylight.openflowplugin.api.OFConstants; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider; @@ -78,6 +79,8 @@ public class DeviceContextImplTest { MessageIntelligenceAgency messageIntelligenceAgency; @Mock OutboundQueueProvider outboundQueueProvider; + @Mock + ConnectionAdapter connectionAdapter; private final AtomicLong atomicLong = new AtomicLong(0); @@ -109,7 +112,8 @@ public class DeviceContextImplTest { Mockito.when(txChainFactory.newWriteOnlyTransaction()).thenReturn(wTx); Mockito.when(dataBroker.newReadOnlyTransaction()).thenReturn(rTx); Mockito.when(connectionContext.getOutboundQueueProvider()).thenReturn(outboundQueueProvider); - deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker, timer, messageIntelligenceAgency); + Mockito.when(connectionContext.getConnectionAdapter()).thenReturn(connectionAdapter); + deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker, timer, messageIntelligenceAgency, outboundQueueProvider); xid = new Xid(atomicLong.incrementAndGet()); xidMulti = new Xid(atomicLong.incrementAndGet()); @@ -117,22 +121,22 @@ public class DeviceContextImplTest { @Test(expected = NullPointerException.class) public void testDeviceContextImplConstructorNullConnectionContext() throws Exception { - new DeviceContextImpl(null, deviceState, dataBroker, timer, messageIntelligenceAgency).close(); + new DeviceContextImpl(null, deviceState, dataBroker, timer, messageIntelligenceAgency, outboundQueueProvider).close(); } @Test(expected = NullPointerException.class) public void testDeviceContextImplConstructorNullDataBroker() throws Exception { - new DeviceContextImpl(connectionContext, deviceState, null, timer, messageIntelligenceAgency).close(); + new DeviceContextImpl(connectionContext, deviceState, null, timer, messageIntelligenceAgency, outboundQueueProvider).close(); } @Test(expected = NullPointerException.class) public void testDeviceContextImplConstructorNullDeviceState() throws Exception { - new DeviceContextImpl(connectionContext, null, dataBroker, timer, messageIntelligenceAgency).close(); + new DeviceContextImpl(connectionContext, null, dataBroker, timer, messageIntelligenceAgency, outboundQueueProvider).close(); } @Test(expected = NullPointerException.class) public void testDeviceContextImplConstructorNullTimer() throws Exception { - new DeviceContextImpl(null, deviceState, dataBroker, null, messageIntelligenceAgency).close(); + new DeviceContextImpl(null, deviceState, dataBroker, null, messageIntelligenceAgency, outboundQueueProvider).close(); } @Test -- 2.36.6