Perform an atomic operation instead of taking a lock in the fast path.
Change-Id: If204594f375aa2f9f7295cc5321236756f01c258
Signed-off-by: Robert Varga <rovarga@cisco.com>
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceDisconnectedHandler;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FeaturesReply;
* </p>
* Created by Martin Bobak <mbobak@cisco.com> on 25.2.2015.
*/
-public interface ConnectionContext {
+public interface ConnectionContext extends AutoCloseable {
/**
* distinguished connection states
* Method provides propagates info about closed connection to handler for handling closing connections.
*/
void propagateClosingConnection();
+
+ void setOutboundQueueHandleRegistration(OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration);
+
+ @Override
+ void close();
}
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginTimer;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
-import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceContextClosedHandler;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceDisconnectedHandler;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceReplyProcessor;
*/
void onPublished();
-
- /**
- * Method registers outbound queue provider into current device context's primary connection adapter.
- *
- * @param outboundQueueProvider
- * @param maxQueueDepth
- * @param barrierNanos
- */
- void registerOutboundQueueProvider(OutboundQueueProvider outboundQueueProvider, int maxQueueDepth, long barrierNanos);
-
-
}
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceDisconnectedHandler;
private DeviceDisconnectedHandler deviceDisconnectedHandler;
private static final Logger LOG = LoggerFactory.getLogger(ConnectionContextImpl.class);
private OutboundQueueProvider outboundQueueProvider;
+ private OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration;
/**
* @param connectionAdapter
public void setFeatures(final FeaturesReply featuresReply) {
this.featuresReply = featuresReply;
}
+
+ @Override
+ public void close() {
+ if (getConnectionAdapter().isAlive()) {
+ setConnectionState(ConnectionContext.CONNECTION_STATE.RIP);
+ getConnectionAdapter().disconnect();
+ }
+ if (outboundQueueHandlerRegistration != null) {
+ outboundQueueHandlerRegistration.close();
+ outboundQueueHandlerRegistration = null;
+ }
+ }
+
+ @Override
+ public void setOutboundQueueHandleRegistration(OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration) {
+ this.outboundQueueHandlerRegistration = outboundQueueHandlerRegistration;
+ }
}
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
-import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
+ // TODO: watermarks should be derived from effective rpc limit (75%|95%)
+ private static final int PACKETIN_LOW_WATERMARK = 15000;
+ private static final int PACKETIN_HIGH_WATERMARK = 19000;
+ // TODO: drain factor should be parametrized
+ public static final float REJECTED_DRAIN_FACTOR = 0.25f;
+
private final ConnectionContext primaryConnectionContext;
private final DeviceState deviceState;
private final DataBroker dataBroker;
private final HashedWheelTimer hashedWheelTimer;
private final Map<SwitchConnectionDistinguisher, ConnectionContext> auxiliaryConnectionContexts;
private final TransactionChainManager txChainManager;
- private TranslatorLibrary translatorLibrary;
private final DeviceFlowRegistry deviceFlowRegistry;
private final DeviceGroupRegistry deviceGroupRegistry;
private final DeviceMeterRegistry deviceMeterRegistry;
- private Timeout barrierTaskTimeout;
- private NotificationService notificationService;
- private final MessageSpy messageSpy;
- private DeviceDisconnectedHandler deviceDisconnectedHandler;
private final Collection<DeviceContextClosedHandler> closeHandlers = new HashSet<>();
+ private final PacketInRateLimiter packetInLimiter;
+ private final MessageSpy messageSpy;
private NotificationPublishService notificationPublishService;
+ private DeviceDisconnectedHandler deviceDisconnectedHandler;
+ private NotificationService notificationService;
+ private TranslatorLibrary translatorLibrary;
private OutboundQueue outboundQueueProvider;
-
- private volatile int outstandingNotificationsAmount = 0;
- private volatile boolean filteringPacketIn = false;
- private final Object throttlingLock = new Object();
- private int filteringHighWaterMark = 0;
- private OutboundQueueHandlerRegistration<?> outboundQueueHandlerRegistration;
-
- @Override
- public Long getReservedXid() {
- return outboundQueueProvider.reserveEntry();
- }
+ private Timeout barrierTaskTimeout;
@VisibleForTesting
DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
@Nonnull final DeviceState deviceState,
@Nonnull final DataBroker dataBroker,
@Nonnull final HashedWheelTimer hashedWheelTimer,
- @Nonnull final MessageSpy _messageSpy) {
+ @Nonnull final MessageSpy _messageSpy, OutboundQueueProvider outboundQueueProvider) {
this.primaryConnectionContext = Preconditions.checkNotNull(primaryConnectionContext);
this.deviceState = Preconditions.checkNotNull(deviceState);
this.dataBroker = Preconditions.checkNotNull(dataBroker);
this.hashedWheelTimer = Preconditions.checkNotNull(hashedWheelTimer);
+ this.outboundQueueProvider = Preconditions.checkNotNull(outboundQueueProvider);
txChainManager = new TransactionChainManager(dataBroker, deviceState);
auxiliaryConnectionContexts = new HashMap<>();
deviceFlowRegistry = new DeviceFlowRegistryImpl();
deviceGroupRegistry = new DeviceGroupRegistryImpl();
deviceMeterRegistry = new DeviceMeterRegistryImpl();
messageSpy = _messageSpy;
+
+ this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
+ PACKETIN_LOW_WATERMARK, PACKETIN_HIGH_WATERMARK, messageSpy, REJECTED_DRAIN_FACTOR);
}
/**
txChainManager.initialSubmitWriteTransaction();
}
+ @Override
+ public Long getReservedXid() {
+ return outboundQueueProvider.reserveEntry();
+ }
+
@Override
public <M extends ChildOf<DataObject>> void onMessage(final M message, final RequestContext<?> requestContext) {
// TODO Auto-generated method stub
final PacketReceived packetReceived = messageTranslator.translate(packetInMessage, this, null);
if (packetReceived == null) {
- LOG.debug("Received a null packet from switch");
+ LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
return;
}
- messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
- final ListenableFuture<? extends Object> offerNotification = notificationPublishService.offerNotification(packetReceived);
- synchronized (throttlingLock) {
- outstandingNotificationsAmount += 1;
+ if (!packetInLimiter.acquirePermit()) {
+ LOG.debug("Packet limited");
+ // TODO: save packet into emergency slot if possible
+ // FIXME: some other counter
+ messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
+ return;
}
+
+ final ListenableFuture<? extends Object> offerNotification = notificationPublishService.offerNotification(packetReceived);
if (NotificationPublishService.REJECTED.equals(offerNotification)) {
LOG.debug("notification offer rejected");
- synchronized (throttlingLock) {
- if (outstandingNotificationsAmount > 1 && !filteringPacketIn) {
- connectionAdapter.setPacketInFiltering(true);
- messageSpy.spyMessage(DeviceContext.class, MessageSpy.STATISTIC_GROUP.OFJ_BACKPRESSURE_ON);
- filteringPacketIn = true;
- filteringHighWaterMark = outstandingNotificationsAmount;
- LOG.debug("PacketIn filtering on: {}, watermark: {}", connectionAdapter.getRemoteAddress(), outstandingNotificationsAmount);
- }
- }
+ messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
+ packetInLimiter.drainLowWaterMark();
+ packetInLimiter.releasePermit();
+ return;
}
- Futures.addCallback(offerNotification,
- new FutureCallback<Object>() {
- @Override
- public void onSuccess(final Object result) {
- countdownFiltering();
- messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
- }
-
- @Override
- public void onFailure(final Throwable t) {
- countdownFiltering();
- messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED);
- LOG.debug("notification offer failed: {}, outstanding: {}", t.getMessage(), outstandingNotificationsAmount);
- LOG.trace("notification offer failed..", t);
- }
-
- private void countdownFiltering() {
- synchronized (throttlingLock) {
- outstandingNotificationsAmount -= 1;
- if (outstandingNotificationsAmount == 0 && filteringPacketIn) {
- connectionAdapter.setPacketInFiltering(false);
- messageSpy.spyMessage(DeviceContext.class, MessageSpy.STATISTIC_GROUP.OFJ_BACKPRESSURE_OFF);
-
- filteringPacketIn = false;
- LOG.debug("PacketIn filtering off: {}, outstanding: {}", connectionAdapter.getRemoteAddress(), outstandingNotificationsAmount);
- }
- }
- }
- }
- );
+ Futures.addCallback(offerNotification, new FutureCallback<Object>() {
+ @Override
+ public void onSuccess(final Object result) {
+ messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
+ packetInLimiter.releasePermit();
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED);
+ LOG.debug("notification offer failed: {}", t.getMessage());
+ LOG.trace("notification offer failed..", t);
+ packetInLimiter.releasePermit();
+ }
+ });
}
@Override
deviceFlowRegistry.close();
deviceMeterRegistry.close();
- outboundQueueHandlerRegistration.close();
-
- if (primaryConnectionContext.getConnectionAdapter().isAlive()) {
- primaryConnectionContext.setConnectionState(ConnectionContext.CONNECTION_STATE.RIP);
- primaryConnectionContext.getConnectionAdapter().disconnect();
- }
+ primaryConnectionContext.close();
for (final ConnectionContext connectionContext : auxiliaryConnectionContexts.values()) {
- if (connectionContext.getConnectionAdapter().isAlive()) {
- connectionContext.getConnectionAdapter().disconnect();
- }
+ connectionContext.close();
}
for (final DeviceContextClosedHandler deviceContextClosedHandler : closeHandlers) {
}
}
- @Override
- public void registerOutboundQueueProvider(final OutboundQueueProvider outboundQueueProvider, final int maxQueueDepth, final long barrierNanos) {
- final ConnectionAdapter primaryConnectionAdapter = primaryConnectionContext.getConnectionAdapter();
- outboundQueueHandlerRegistration = primaryConnectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, maxQueueDepth, barrierNanos);
- this.outboundQueueProvider = outboundQueueProvider;
- primaryConnectionContext.setOutboundQueueProvider(outboundQueueProvider);
- }
-
@Override
public MultiMsgCollector getMultiMsgCollector(final RequestContext<List<MultipartReply>> requestContext) {
return new MultiMsgCollectorImpl(this, requestContext);
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
import org.opendaylight.openflowplugin.api.ConnectionException;
import org.opendaylight.openflowplugin.api.OFConstants;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
final Short version = connectionContext.getFeatures().getVersion();
final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(version);
+ connectionContext.setOutboundQueueProvider(outboundQueueProvider);
+ final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
+ connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, maxQueueDepth, barrierNanos);
+ connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
+
final DeviceState deviceState = new DeviceStateImpl(connectionContext.getFeatures(), connectionContext.getNodeId());
- final DeviceContext deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker, hashedWheelTimer, messageIntelligenceAgency);
- deviceContext.registerOutboundQueueProvider(outboundQueueProvider, maxQueueDepth, barrierNanos);
+ final DeviceContext deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker,
+ hashedWheelTimer, messageIntelligenceAgency, outboundQueueProvider);
deviceContext.setNotificationService(notificationService);
deviceContext.setNotificationPublishService(notificationPublishService);
final NodeBuilder nodeBuilder = new NodeBuilder().setId(deviceState.getNodeId()).setNodeConnector(Collections.<NodeConnector>emptyList());
--- /dev/null
+/**
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.impl.device;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
+import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class PacketInRateLimiter extends SimpleRatelimiter {
+ private static final Logger LOG = LoggerFactory.getLogger(PacketInRateLimiter.class);
+ private final float rejectedDrainFactor;
+ private final ConnectionAdapter connectionAdapter;
+ private final MessageSpy messageSpy;
+
+ PacketInRateLimiter(final ConnectionAdapter connectionAdapter, final int lowWatermark, final int highWatermark, final MessageSpy messageSpy, float rejectedDrainFactor) {
+ super(lowWatermark, highWatermark);
+ Preconditions.checkArgument(rejectedDrainFactor > 0 && rejectedDrainFactor < 1);
+ this.rejectedDrainFactor = rejectedDrainFactor;
+ this.connectionAdapter = Preconditions.checkNotNull(connectionAdapter);
+ this.messageSpy = Preconditions.checkNotNull(messageSpy);
+ }
+
+ @Override
+ protected void disableFlow() {
+ messageSpy.spyMessage(DeviceContext.class, MessageSpy.STATISTIC_GROUP.OFJ_BACKPRESSURE_ON);
+ connectionAdapter.setPacketInFiltering(true);
+ LOG.debug("PacketIn filtering on: {}", connectionAdapter.getRemoteAddress());
+ }
+
+ @Override
+ protected void enableFlow() {
+ messageSpy.spyMessage(DeviceContext.class, MessageSpy.STATISTIC_GROUP.OFJ_BACKPRESSURE_OFF);
+ connectionAdapter.setPacketInFiltering(false);
+ LOG.debug("PacketIn filtering off: {}", connectionAdapter.getRemoteAddress());
+ }
+
+ public void drainLowWaterMark() {
+ adaptLowWaterMarkAndDisableFlow((int) (getOccupiedPermits() * rejectedDrainFactor));
+ }
+}
--- /dev/null
+/**
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.impl.device;
+
+import com.google.common.base.Preconditions;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.concurrent.GuardedBy;
+
+abstract class SimpleRatelimiter {
+ private final AtomicInteger counter = new AtomicInteger();
+ private final int lowWatermark;
+ private int lowWatermarkEffective;
+ private final int highWatermark;
+ @GuardedBy("counter")
+ private volatile boolean limited;
+
+ SimpleRatelimiter(final int lowWatermark, final int highWatermark) {
+ Preconditions.checkArgument(lowWatermark >= 0);
+ Preconditions.checkArgument(highWatermark >= 0);
+ Preconditions.checkArgument(lowWatermark <= highWatermark);
+
+ this.lowWatermark = lowWatermark;
+ this.highWatermark = highWatermark;
+ lowWatermarkEffective = lowWatermark;
+ }
+
+ protected final boolean isLimited() {
+ return limited;
+ }
+
+ protected abstract void disableFlow();
+ protected abstract void enableFlow();
+
+ boolean acquirePermit() {
+ final int cnt = counter.incrementAndGet();
+ if (cnt > highWatermark) {
+ synchronized (counter) {
+ final int recheck = counter.decrementAndGet();
+ if (recheck >= highWatermark && !limited) {
+ disableFlow();
+ limited = true;
+ }
+ }
+ return false;
+ }
+
+ return true;
+ }
+
+ void releasePermit() {
+ final int cnt = counter.decrementAndGet();
+ if (cnt <= lowWatermarkEffective) {
+ synchronized (counter) {
+ final int recheck = counter.get();
+ if (recheck <= lowWatermarkEffective && limited) {
+ enableFlow();
+ limited = false;
+ resetLowWaterMark();
+ }
+ }
+ }
+ }
+
+ void resetLowWaterMark() {
+ synchronized (counter) {
+ lowWatermarkEffective = lowWatermark;
+ }
+ }
+
+ void adaptLowWaterMarkAndDisableFlow(int temporaryLowWaterMark) {
+ if (temporaryLowWaterMark < highWatermark) {
+ synchronized (counter) {
+ lowWatermarkEffective = temporaryLowWaterMark;
+ if (!limited) {
+ disableFlow();
+ limited = true;
+ }
+ }
+ }
+ }
+
+ int getOccupiedPermits() {
+ return counter.get();
+ }
+}
import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowplugin.api.OFConstants;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
MessageIntelligenceAgency messageIntelligenceAgency;
@Mock
OutboundQueueProvider outboundQueueProvider;
+ @Mock
+ ConnectionAdapter connectionAdapter;
private final AtomicLong atomicLong = new AtomicLong(0);
Mockito.when(txChainFactory.newWriteOnlyTransaction()).thenReturn(wTx);
Mockito.when(dataBroker.newReadOnlyTransaction()).thenReturn(rTx);
Mockito.when(connectionContext.getOutboundQueueProvider()).thenReturn(outboundQueueProvider);
- deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker, timer, messageIntelligenceAgency);
+ Mockito.when(connectionContext.getConnectionAdapter()).thenReturn(connectionAdapter);
+ deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker, timer, messageIntelligenceAgency, outboundQueueProvider);
xid = new Xid(atomicLong.incrementAndGet());
xidMulti = new Xid(atomicLong.incrementAndGet());
@Test(expected = NullPointerException.class)
public void testDeviceContextImplConstructorNullConnectionContext() throws Exception {
- new DeviceContextImpl(null, deviceState, dataBroker, timer, messageIntelligenceAgency).close();
+ new DeviceContextImpl(null, deviceState, dataBroker, timer, messageIntelligenceAgency, outboundQueueProvider).close();
}
@Test(expected = NullPointerException.class)
public void testDeviceContextImplConstructorNullDataBroker() throws Exception {
- new DeviceContextImpl(connectionContext, deviceState, null, timer, messageIntelligenceAgency).close();
+ new DeviceContextImpl(connectionContext, deviceState, null, timer, messageIntelligenceAgency, outboundQueueProvider).close();
}
@Test(expected = NullPointerException.class)
public void testDeviceContextImplConstructorNullDeviceState() throws Exception {
- new DeviceContextImpl(connectionContext, null, dataBroker, timer, messageIntelligenceAgency).close();
+ new DeviceContextImpl(connectionContext, null, dataBroker, timer, messageIntelligenceAgency, outboundQueueProvider).close();
}
@Test(expected = NullPointerException.class)
public void testDeviceContextImplConstructorNullTimer() throws Exception {
- new DeviceContextImpl(null, deviceState, dataBroker, null, messageIntelligenceAgency).close();
+ new DeviceContextImpl(null, deviceState, dataBroker, null, messageIntelligenceAgency, outboundQueueProvider).close();
}
@Test