Speed up packetin throttling
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceContextImpl.java
index f176b5ceb2f9c10c8f48716cd2b48bcd2959720d..d24e4c8426d33ee4bf2f4a8fca02b75980eaee2a 100644 (file)
@@ -28,7 +28,6 @@ import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
-import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
@@ -84,51 +83,51 @@ public class DeviceContextImpl implements DeviceContext {
 
     private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
 
+    // TODO: watermarks should be derived from effective rpc limit (75%|95%)
+    private static final int PACKETIN_LOW_WATERMARK = 15000;
+    private static final int PACKETIN_HIGH_WATERMARK = 19000;
+    // TODO: drain factor should be parametrized
+    public static final float REJECTED_DRAIN_FACTOR = 0.25f;
+
     private final ConnectionContext primaryConnectionContext;
     private final DeviceState deviceState;
     private final DataBroker dataBroker;
     private final HashedWheelTimer hashedWheelTimer;
     private final Map<SwitchConnectionDistinguisher, ConnectionContext> auxiliaryConnectionContexts;
     private final TransactionChainManager txChainManager;
-    private TranslatorLibrary translatorLibrary;
     private final DeviceFlowRegistry deviceFlowRegistry;
     private final DeviceGroupRegistry deviceGroupRegistry;
     private final DeviceMeterRegistry deviceMeterRegistry;
-    private Timeout barrierTaskTimeout;
-    private NotificationService notificationService;
-    private final MessageSpy messageSpy;
-    private DeviceDisconnectedHandler deviceDisconnectedHandler;
     private final Collection<DeviceContextClosedHandler> closeHandlers = new HashSet<>();
+    private final PacketInRateLimiter packetInLimiter;
+    private final MessageSpy messageSpy;
     private NotificationPublishService notificationPublishService;
+    private DeviceDisconnectedHandler deviceDisconnectedHandler;
+    private NotificationService notificationService;
+    private TranslatorLibrary translatorLibrary;
     private OutboundQueue outboundQueueProvider;
-
-    private volatile int outstandingNotificationsAmount = 0;
-    private volatile boolean filteringPacketIn = false;
-    private final Object throttlingLock = new Object();
-    private int filteringHighWaterMark = 0;
-    private OutboundQueueHandlerRegistration<?> outboundQueueHandlerRegistration;
-
-    @Override
-    public Long getReservedXid() {
-        return outboundQueueProvider.reserveEntry();
-    }
+    private Timeout barrierTaskTimeout;
 
     @VisibleForTesting
     DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
                       @Nonnull final DeviceState deviceState,
                       @Nonnull final DataBroker dataBroker,
                       @Nonnull final HashedWheelTimer hashedWheelTimer,
-                      @Nonnull final MessageSpy _messageSpy) {
+                      @Nonnull final MessageSpy _messageSpy, OutboundQueueProvider outboundQueueProvider) {
         this.primaryConnectionContext = Preconditions.checkNotNull(primaryConnectionContext);
         this.deviceState = Preconditions.checkNotNull(deviceState);
         this.dataBroker = Preconditions.checkNotNull(dataBroker);
         this.hashedWheelTimer = Preconditions.checkNotNull(hashedWheelTimer);
+        this.outboundQueueProvider = Preconditions.checkNotNull(outboundQueueProvider);
         txChainManager = new TransactionChainManager(dataBroker, deviceState);
         auxiliaryConnectionContexts = new HashMap<>();
         deviceFlowRegistry = new DeviceFlowRegistryImpl();
         deviceGroupRegistry = new DeviceGroupRegistryImpl();
         deviceMeterRegistry = new DeviceMeterRegistryImpl();
         messageSpy = _messageSpy;
+
+        this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
+                PACKETIN_LOW_WATERMARK, PACKETIN_HIGH_WATERMARK, messageSpy, REJECTED_DRAIN_FACTOR);
     }
 
     /**
@@ -139,6 +138,11 @@ public class DeviceContextImpl implements DeviceContext {
         txChainManager.initialSubmitWriteTransaction();
     }
 
+    @Override
+    public Long getReservedXid() {
+        return outboundQueueProvider.reserveEntry();
+    }
+
     @Override
     public <M extends ChildOf<DataObject>> void onMessage(final M message, final RequestContext<?> requestContext) {
         // TODO Auto-generated method stub
@@ -268,58 +272,42 @@ public class DeviceContextImpl implements DeviceContext {
         final PacketReceived packetReceived = messageTranslator.translate(packetInMessage, this, null);
 
         if (packetReceived == null) {
-            LOG.debug("Received a null packet from switch");
+            LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
             return;
         }
-        messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
 
-        final ListenableFuture<? extends Object> offerNotification = notificationPublishService.offerNotification(packetReceived);
-        synchronized (throttlingLock) {
-            outstandingNotificationsAmount += 1;
+        if (!packetInLimiter.acquirePermit()) {
+            LOG.debug("Packet limited");
+            // TODO: save packet into emergency slot if possible
+            // FIXME: some other counter
+            messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
+            return;
         }
+
+        final ListenableFuture<? extends Object> offerNotification = notificationPublishService.offerNotification(packetReceived);
         if (NotificationPublishService.REJECTED.equals(offerNotification)) {
             LOG.debug("notification offer rejected");
-            synchronized (throttlingLock) {
-                if (outstandingNotificationsAmount > 1 && !filteringPacketIn) {
-                    connectionAdapter.setPacketInFiltering(true);
-                    messageSpy.spyMessage(DeviceContext.class, MessageSpy.STATISTIC_GROUP.OFJ_BACKPRESSURE_ON);
-                    filteringPacketIn = true;
-                    filteringHighWaterMark = outstandingNotificationsAmount;
-                    LOG.debug("PacketIn filtering on: {}, watermark: {}", connectionAdapter.getRemoteAddress(), outstandingNotificationsAmount);
-                }
-            }
+            messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
+            packetInLimiter.drainLowWaterMark();
+            packetInLimiter.releasePermit();
+            return;
         }
 
-        Futures.addCallback(offerNotification,
-                new FutureCallback<Object>() {
-                    @Override
-                    public void onSuccess(final Object result) {
-                        countdownFiltering();
-                        messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
-                    }
-
-                    @Override
-                    public void onFailure(final Throwable t) {
-                        countdownFiltering();
-                        messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED);
-                        LOG.debug("notification offer failed: {}, outstanding: {}", t.getMessage(), outstandingNotificationsAmount);
-                        LOG.trace("notification offer failed..", t);
-                    }
-
-                    private void countdownFiltering() {
-                        synchronized (throttlingLock) {
-                            outstandingNotificationsAmount -= 1;
-                            if (outstandingNotificationsAmount == 0 && filteringPacketIn) {
-                                connectionAdapter.setPacketInFiltering(false);
-                                messageSpy.spyMessage(DeviceContext.class, MessageSpy.STATISTIC_GROUP.OFJ_BACKPRESSURE_OFF);
-
-                                filteringPacketIn = false;
-                                LOG.debug("PacketIn filtering off: {}, outstanding: {}", connectionAdapter.getRemoteAddress(), outstandingNotificationsAmount);
-                            }
-                        }
-                    }
-                }
-        );
+        Futures.addCallback(offerNotification, new FutureCallback<Object>() {
+            @Override
+            public void onSuccess(final Object result) {
+                messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
+                packetInLimiter.releasePermit();
+            }
+
+            @Override
+            public void onFailure(final Throwable t) {
+                messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED);
+                LOG.debug("notification offer failed: {}", t.getMessage());
+                LOG.trace("notification offer failed..", t);
+                packetInLimiter.releasePermit();
+            }
+        });
     }
 
     @Override
@@ -345,16 +333,9 @@ public class DeviceContextImpl implements DeviceContext {
         deviceFlowRegistry.close();
         deviceMeterRegistry.close();
 
-        outboundQueueHandlerRegistration.close();
-
-        if (primaryConnectionContext.getConnectionAdapter().isAlive()) {
-            primaryConnectionContext.setConnectionState(ConnectionContext.CONNECTION_STATE.RIP);
-            primaryConnectionContext.getConnectionAdapter().disconnect();
-        }
+        primaryConnectionContext.close();
         for (final ConnectionContext connectionContext : auxiliaryConnectionContexts.values()) {
-            if (connectionContext.getConnectionAdapter().isAlive()) {
-                connectionContext.getConnectionAdapter().disconnect();
-            }
+            connectionContext.close();
         }
 
         for (final DeviceContextClosedHandler deviceContextClosedHandler : closeHandlers) {
@@ -424,14 +405,6 @@ public class DeviceContextImpl implements DeviceContext {
         }
     }
 
-    @Override
-    public void registerOutboundQueueProvider(final OutboundQueueProvider outboundQueueProvider, final int maxQueueDepth, final long barrierNanos) {
-        final ConnectionAdapter primaryConnectionAdapter = primaryConnectionContext.getConnectionAdapter();
-        outboundQueueHandlerRegistration = primaryConnectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, maxQueueDepth, barrierNanos);
-        this.outboundQueueProvider = outboundQueueProvider;
-        primaryConnectionContext.setOutboundQueueProvider(outboundQueueProvider);
-    }
-
     @Override
     public MultiMsgCollector getMultiMsgCollector(final RequestContext<List<MultipartReply>> requestContext) {
         return new MultiMsgCollectorImpl(this, requestContext);