From 4b34ce62d369b80533e57e0bc0e8bc0ac7c496b3 Mon Sep 17 00:00:00 2001
From: Robert Varga
Date: Fri, 22 May 2015 13:17:51 +0200
Subject: [PATCH] Speed up packetin throttling
Perform an atomic operation instead of taking a lock in the fast path.
Change-Id: If204594f375aa2f9f7295cc5321236756f01c258
Signed-off-by: Robert Varga
---
.../connection/ConnectionContext.java | 8 +-
.../api/openflow/device/DeviceContext.java | 12 --
.../connection/ConnectionContextImpl.java | 19 +++
.../impl/device/DeviceContextImpl.java | 131 +++++++-----------
.../impl/device/DeviceManagerImpl.java | 10 +-
.../impl/device/PacketInRateLimiter.java | 48 +++++++
.../impl/device/SimpleRatelimiter.java | 90 ++++++++++++
.../impl/device/DeviceContextImplTest.java | 14 +-
8 files changed, 233 insertions(+), 99 deletions(-)
create mode 100644 openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/PacketInRateLimiter.java
create mode 100644 openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/SimpleRatelimiter.java
diff --git a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/connection/ConnectionContext.java b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/connection/ConnectionContext.java
index 3a49d5be82..bbbffdc0f2 100644
--- a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/connection/ConnectionContext.java
+++ b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/connection/ConnectionContext.java
@@ -10,6 +10,7 @@ package org.opendaylight.openflowplugin.api.openflow.connection;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceDisconnectedHandler;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FeaturesReply;
@@ -21,7 +22,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731
*
* Created by Martin Bobak <mbobak@cisco.com> on 25.2.2015.
*/
-public interface ConnectionContext {
+public interface ConnectionContext extends AutoCloseable {
/**
* distinguished connection states
@@ -111,4 +112,9 @@ public interface ConnectionContext {
* Method provides propagates info about closed connection to handler for handling closing connections.
*/
void propagateClosingConnection();
+
+ void setOutboundQueueHandleRegistration(OutboundQueueHandlerRegistration outboundQueueHandlerRegistration);
+
+ @Override
+ void close();
}
diff --git a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/DeviceContext.java b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/DeviceContext.java
index eb17433141..8a93e96e11 100644
--- a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/DeviceContext.java
+++ b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/DeviceContext.java
@@ -17,7 +17,6 @@ import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginTimer;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
-import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceContextClosedHandler;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceDisconnectedHandler;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceReplyProcessor;
@@ -177,16 +176,5 @@ public interface DeviceContext extends AutoCloseable,
*/
void onPublished();
-
- /**
- * Method registers outbound queue provider into current device context's primary connection adapter.
- *
- * @param outboundQueueProvider
- * @param maxQueueDepth
- * @param barrierNanos
- */
- void registerOutboundQueueProvider(OutboundQueueProvider outboundQueueProvider, int maxQueueDepth, long barrierNanos);
-
-
}
diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/ConnectionContextImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/ConnectionContextImpl.java
index 4ec203c876..0453c23eef 100644
--- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/ConnectionContextImpl.java
+++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/ConnectionContextImpl.java
@@ -9,6 +9,7 @@ package org.opendaylight.openflowplugin.impl.connection;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceDisconnectedHandler;
@@ -29,6 +30,7 @@ public class ConnectionContextImpl implements ConnectionContext {
private DeviceDisconnectedHandler deviceDisconnectedHandler;
private static final Logger LOG = LoggerFactory.getLogger(ConnectionContextImpl.class);
private OutboundQueueProvider outboundQueueProvider;
+ private OutboundQueueHandlerRegistration outboundQueueHandlerRegistration;
/**
* @param connectionAdapter
@@ -94,4 +96,21 @@ public class ConnectionContextImpl implements ConnectionContext {
public void setFeatures(final FeaturesReply featuresReply) {
this.featuresReply = featuresReply;
}
+
+ @Override
+ public void close() {
+ if (getConnectionAdapter().isAlive()) {
+ setConnectionState(ConnectionContext.CONNECTION_STATE.RIP);
+ getConnectionAdapter().disconnect();
+ }
+ if (outboundQueueHandlerRegistration != null) {
+ outboundQueueHandlerRegistration.close();
+ outboundQueueHandlerRegistration = null;
+ }
+ }
+
+ @Override
+ public void setOutboundQueueHandleRegistration(OutboundQueueHandlerRegistration outboundQueueHandlerRegistration) {
+ this.outboundQueueHandlerRegistration = outboundQueueHandlerRegistration;
+ }
}
diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java
index f176b5ceb2..d24e4c8426 100644
--- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java
+++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java
@@ -28,7 +28,6 @@ import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
-import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
@@ -84,51 +83,51 @@ public class DeviceContextImpl implements DeviceContext {
private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
+ // TODO: watermarks should be derived from effective rpc limit (75%|95%)
+ private static final int PACKETIN_LOW_WATERMARK = 15000;
+ private static final int PACKETIN_HIGH_WATERMARK = 19000;
+ // TODO: drain factor should be parametrized
+ public static final float REJECTED_DRAIN_FACTOR = 0.25f;
+
private final ConnectionContext primaryConnectionContext;
private final DeviceState deviceState;
private final DataBroker dataBroker;
private final HashedWheelTimer hashedWheelTimer;
private final Map auxiliaryConnectionContexts;
private final TransactionChainManager txChainManager;
- private TranslatorLibrary translatorLibrary;
private final DeviceFlowRegistry deviceFlowRegistry;
private final DeviceGroupRegistry deviceGroupRegistry;
private final DeviceMeterRegistry deviceMeterRegistry;
- private Timeout barrierTaskTimeout;
- private NotificationService notificationService;
- private final MessageSpy messageSpy;
- private DeviceDisconnectedHandler deviceDisconnectedHandler;
private final Collection closeHandlers = new HashSet<>();
+ private final PacketInRateLimiter packetInLimiter;
+ private final MessageSpy messageSpy;
private NotificationPublishService notificationPublishService;
+ private DeviceDisconnectedHandler deviceDisconnectedHandler;
+ private NotificationService notificationService;
+ private TranslatorLibrary translatorLibrary;
private OutboundQueue outboundQueueProvider;
-
- private volatile int outstandingNotificationsAmount = 0;
- private volatile boolean filteringPacketIn = false;
- private final Object throttlingLock = new Object();
- private int filteringHighWaterMark = 0;
- private OutboundQueueHandlerRegistration> outboundQueueHandlerRegistration;
-
- @Override
- public Long getReservedXid() {
- return outboundQueueProvider.reserveEntry();
- }
+ private Timeout barrierTaskTimeout;
@VisibleForTesting
DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
@Nonnull final DeviceState deviceState,
@Nonnull final DataBroker dataBroker,
@Nonnull final HashedWheelTimer hashedWheelTimer,
- @Nonnull final MessageSpy _messageSpy) {
+ @Nonnull final MessageSpy _messageSpy, OutboundQueueProvider outboundQueueProvider) {
this.primaryConnectionContext = Preconditions.checkNotNull(primaryConnectionContext);
this.deviceState = Preconditions.checkNotNull(deviceState);
this.dataBroker = Preconditions.checkNotNull(dataBroker);
this.hashedWheelTimer = Preconditions.checkNotNull(hashedWheelTimer);
+ this.outboundQueueProvider = Preconditions.checkNotNull(outboundQueueProvider);
txChainManager = new TransactionChainManager(dataBroker, deviceState);
auxiliaryConnectionContexts = new HashMap<>();
deviceFlowRegistry = new DeviceFlowRegistryImpl();
deviceGroupRegistry = new DeviceGroupRegistryImpl();
deviceMeterRegistry = new DeviceMeterRegistryImpl();
messageSpy = _messageSpy;
+
+ this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
+ PACKETIN_LOW_WATERMARK, PACKETIN_HIGH_WATERMARK, messageSpy, REJECTED_DRAIN_FACTOR);
}
/**
@@ -139,6 +138,11 @@ public class DeviceContextImpl implements DeviceContext {
txChainManager.initialSubmitWriteTransaction();
}
+ @Override
+ public Long getReservedXid() {
+ return outboundQueueProvider.reserveEntry();
+ }
+
@Override
public > void onMessage(final M message, final RequestContext> requestContext) {
// TODO Auto-generated method stub
@@ -268,58 +272,42 @@ public class DeviceContextImpl implements DeviceContext {
final PacketReceived packetReceived = messageTranslator.translate(packetInMessage, this, null);
if (packetReceived == null) {
- LOG.debug("Received a null packet from switch");
+ LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
return;
}
- messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
- final ListenableFuture extends Object> offerNotification = notificationPublishService.offerNotification(packetReceived);
- synchronized (throttlingLock) {
- outstandingNotificationsAmount += 1;
+ if (!packetInLimiter.acquirePermit()) {
+ LOG.debug("Packet limited");
+ // TODO: save packet into emergency slot if possible
+ // FIXME: some other counter
+ messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
+ return;
}
+
+ final ListenableFuture extends Object> offerNotification = notificationPublishService.offerNotification(packetReceived);
if (NotificationPublishService.REJECTED.equals(offerNotification)) {
LOG.debug("notification offer rejected");
- synchronized (throttlingLock) {
- if (outstandingNotificationsAmount > 1 && !filteringPacketIn) {
- connectionAdapter.setPacketInFiltering(true);
- messageSpy.spyMessage(DeviceContext.class, MessageSpy.STATISTIC_GROUP.OFJ_BACKPRESSURE_ON);
- filteringPacketIn = true;
- filteringHighWaterMark = outstandingNotificationsAmount;
- LOG.debug("PacketIn filtering on: {}, watermark: {}", connectionAdapter.getRemoteAddress(), outstandingNotificationsAmount);
- }
- }
+ messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
+ packetInLimiter.drainLowWaterMark();
+ packetInLimiter.releasePermit();
+ return;
}
- Futures.addCallback(offerNotification,
- new FutureCallback