import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
-import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
+ // TODO: watermarks should be derived from effective rpc limit (75%|95%)
+ private static final int PACKETIN_LOW_WATERMARK = 15000;
+ private static final int PACKETIN_HIGH_WATERMARK = 19000;
+ // TODO: drain factor should be parametrized
+ public static final float REJECTED_DRAIN_FACTOR = 0.25f;
+
private final ConnectionContext primaryConnectionContext;
private final DeviceState deviceState;
private final DataBroker dataBroker;
private final HashedWheelTimer hashedWheelTimer;
private final Map<SwitchConnectionDistinguisher, ConnectionContext> auxiliaryConnectionContexts;
private final TransactionChainManager txChainManager;
- private TranslatorLibrary translatorLibrary;
private final DeviceFlowRegistry deviceFlowRegistry;
private final DeviceGroupRegistry deviceGroupRegistry;
private final DeviceMeterRegistry deviceMeterRegistry;
- private Timeout barrierTaskTimeout;
- private NotificationService notificationService;
- private final MessageSpy messageSpy;
- private DeviceDisconnectedHandler deviceDisconnectedHandler;
private final Collection<DeviceContextClosedHandler> closeHandlers = new HashSet<>();
+ private final PacketInRateLimiter packetInLimiter;
+ private final MessageSpy messageSpy;
private NotificationPublishService notificationPublishService;
+ private DeviceDisconnectedHandler deviceDisconnectedHandler;
+ private NotificationService notificationService;
+ private TranslatorLibrary translatorLibrary;
private OutboundQueue outboundQueueProvider;
-
- private volatile int outstandingNotificationsAmount = 0;
- private volatile boolean filteringPacketIn = false;
- private final Object throttlingLock = new Object();
- private int filteringHighWaterMark = 0;
- private OutboundQueueHandlerRegistration<?> outboundQueueHandlerRegistration;
-
- @Override
- public Long getReservedXid() {
- return outboundQueueProvider.reserveEntry();
- }
+ private Timeout barrierTaskTimeout;
@VisibleForTesting
DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
@Nonnull final DeviceState deviceState,
@Nonnull final DataBroker dataBroker,
@Nonnull final HashedWheelTimer hashedWheelTimer,
- @Nonnull final MessageSpy _messageSpy) {
+ @Nonnull final MessageSpy _messageSpy, OutboundQueueProvider outboundQueueProvider) {
this.primaryConnectionContext = Preconditions.checkNotNull(primaryConnectionContext);
this.deviceState = Preconditions.checkNotNull(deviceState);
this.dataBroker = Preconditions.checkNotNull(dataBroker);
this.hashedWheelTimer = Preconditions.checkNotNull(hashedWheelTimer);
+ this.outboundQueueProvider = Preconditions.checkNotNull(outboundQueueProvider);
txChainManager = new TransactionChainManager(dataBroker, deviceState);
auxiliaryConnectionContexts = new HashMap<>();
deviceFlowRegistry = new DeviceFlowRegistryImpl();
deviceGroupRegistry = new DeviceGroupRegistryImpl();
deviceMeterRegistry = new DeviceMeterRegistryImpl();
messageSpy = _messageSpy;
+
+ this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
+ PACKETIN_LOW_WATERMARK, PACKETIN_HIGH_WATERMARK, messageSpy, REJECTED_DRAIN_FACTOR);
}
/**
txChainManager.initialSubmitWriteTransaction();
}
+ @Override
+ public Long getReservedXid() {
+ return outboundQueueProvider.reserveEntry();
+ }
+
@Override
public <M extends ChildOf<DataObject>> void onMessage(final M message, final RequestContext<?> requestContext) {
// TODO Auto-generated method stub
final PacketReceived packetReceived = messageTranslator.translate(packetInMessage, this, null);
if (packetReceived == null) {
- LOG.debug("Received a null packet from switch");
+ LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
return;
}
- messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
- final ListenableFuture<? extends Object> offerNotification = notificationPublishService.offerNotification(packetReceived);
- synchronized (throttlingLock) {
- outstandingNotificationsAmount += 1;
+ if (!packetInLimiter.acquirePermit()) {
+ LOG.debug("Packet limited");
+ // TODO: save packet into emergency slot if possible
+ // FIXME: some other counter
+ messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
+ return;
}
+
+ final ListenableFuture<? extends Object> offerNotification = notificationPublishService.offerNotification(packetReceived);
if (NotificationPublishService.REJECTED.equals(offerNotification)) {
LOG.debug("notification offer rejected");
- synchronized (throttlingLock) {
- if (outstandingNotificationsAmount > 1 && !filteringPacketIn) {
- connectionAdapter.setPacketInFiltering(true);
- messageSpy.spyMessage(DeviceContext.class, MessageSpy.STATISTIC_GROUP.OFJ_BACKPRESSURE_ON);
- filteringPacketIn = true;
- filteringHighWaterMark = outstandingNotificationsAmount;
- LOG.debug("PacketIn filtering on: {}, watermark: {}", connectionAdapter.getRemoteAddress(), outstandingNotificationsAmount);
- }
- }
+ messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
+ packetInLimiter.drainLowWaterMark();
+ packetInLimiter.releasePermit();
+ return;
}
- Futures.addCallback(offerNotification,
- new FutureCallback<Object>() {
- @Override
- public void onSuccess(final Object result) {
- countdownFiltering();
- messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
- }
-
- @Override
- public void onFailure(final Throwable t) {
- countdownFiltering();
- messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED);
- LOG.debug("notification offer failed: {}, outstanding: {}", t.getMessage(), outstandingNotificationsAmount);
- LOG.trace("notification offer failed..", t);
- }
-
- private void countdownFiltering() {
- synchronized (throttlingLock) {
- outstandingNotificationsAmount -= 1;
- if (outstandingNotificationsAmount == 0 && filteringPacketIn) {
- connectionAdapter.setPacketInFiltering(false);
- messageSpy.spyMessage(DeviceContext.class, MessageSpy.STATISTIC_GROUP.OFJ_BACKPRESSURE_OFF);
-
- filteringPacketIn = false;
- LOG.debug("PacketIn filtering off: {}, outstanding: {}", connectionAdapter.getRemoteAddress(), outstandingNotificationsAmount);
- }
- }
- }
- }
- );
+ Futures.addCallback(offerNotification, new FutureCallback<Object>() {
+ @Override
+ public void onSuccess(final Object result) {
+ messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
+ packetInLimiter.releasePermit();
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED);
+ LOG.debug("notification offer failed: {}", t.getMessage());
+ LOG.trace("notification offer failed..", t);
+ packetInLimiter.releasePermit();
+ }
+ });
}
@Override
deviceFlowRegistry.close();
deviceMeterRegistry.close();
- outboundQueueHandlerRegistration.close();
-
- if (primaryConnectionContext.getConnectionAdapter().isAlive()) {
- primaryConnectionContext.setConnectionState(ConnectionContext.CONNECTION_STATE.RIP);
- primaryConnectionContext.getConnectionAdapter().disconnect();
- }
+ primaryConnectionContext.close();
for (final ConnectionContext connectionContext : auxiliaryConnectionContexts.values()) {
- if (connectionContext.getConnectionAdapter().isAlive()) {
- connectionContext.getConnectionAdapter().disconnect();
- }
+ connectionContext.close();
}
for (final DeviceContextClosedHandler deviceContextClosedHandler : closeHandlers) {
}
}
- @Override
- public void registerOutboundQueueProvider(final OutboundQueueProvider outboundQueueProvider, final int maxQueueDepth, final long barrierNanos) {
- final ConnectionAdapter primaryConnectionAdapter = primaryConnectionContext.getConnectionAdapter();
- outboundQueueHandlerRegistration = primaryConnectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, maxQueueDepth, barrierNanos);
- this.outboundQueueProvider = outboundQueueProvider;
- primaryConnectionContext.setOutboundQueueProvider(outboundQueueProvider);
- }
-
@Override
public MultiMsgCollector getMultiMsgCollector(final RequestContext<List<MultipartReply>> requestContext) {
return new MultiMsgCollectorImpl(this, requestContext);