private final OutboundQueue outboundQueueProvider;
private final MultiMsgCollector multiMsgCollector = new MultiMsgCollectorImpl();
- private int outstandingNotificationsAmount = 0;
- private boolean filteringPacketIn = false;
+ private volatile int outstandingNotificationsAmount = 0;
+ private volatile boolean filteringPacketIn = false;
private Object throttlingLock = new Object();
private int filteringHighWaterMark = 0;
}
ListenableFuture<? extends Object> offerNotification = notificationPublishService.offerNotification(packetReceived);
- outstandingNotificationsAmount += 1;
+ synchronized (throttlingLock) {
+ outstandingNotificationsAmount += 1;
+ }
if (NotificationPublishService.REJECTED.equals(offerNotification)) {
LOG.debug("notification offer rejected");
- messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED);
synchronized (throttlingLock) {
- if (outstandingNotificationsAmount > 0) {
+ if (outstandingNotificationsAmount > 1 && !filteringPacketIn) {
connectionAdapter.setPacketInFiltering(true);
filteringPacketIn = true;
filteringHighWaterMark = outstandingNotificationsAmount;