+
+ if (packetReceived != null) {
+ messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
+ } else {
+ messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
+ return;
+ }
+
+ ListenableFuture<?> listenableFuture = notificationPublishService.offerNotification(packetReceived);
+ if (NotificationPublishService.REJECTED.equals(listenableFuture)) {
+ LOG.debug("notification offer rejected");
+ messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED);
+ } else if (listenableFuture.isDone()) {
+ Object x = null;
+ try {
+ x = listenableFuture.get();
+ messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
+ } catch (InterruptedException e) {
+ LOG.debug("notification offer interrupted: {}", e.getMessage());
+ LOG.trace("notification offer interrupted..", e);
+ } catch (ExecutionException e) {
+ LOG.debug("notification offer failed: {}", e.getMessage());
+ LOG.trace("notification offer failed..", e);
+ } finally {
+ if (null == x) {
+ messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED);
+ }
+ }
+ } else {
+ messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
+ }
+ }
+
+ private void applyThrottling(final PacketReceived packetReceived, final ConnectionAdapter connectionAdapter) {
+ final InetSocketAddress remoteAddress = connectionAdapter.getRemoteAddress();
+ LOG.debug("Notification offer refused by notification service.");
+ messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
+ connectionAdapter.setAutoRead(false);
+
+ LOG.debug("Throttling ingress for {}", remoteAddress);
+ final ListenableFuture<Void> queueDone;
+
+ // adding first notification
+ bumperQueue.offer(packetReceived);
+ synchronized (bumperQueue) {
+ queueDone = throttledConnectionsHolder.applyThrottlingOnConnection(bumperQueue);
+ }
+ Futures.addCallback(queueDone, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ LOG.debug("Un - throttling ingress for {}", remoteAddress);
+ connectionAdapter.setAutoRead(true);
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ LOG.warn("failed to offer queued notification for {}: {}", remoteAddress, t.getMessage());
+ LOG.debug("failed to offer queued notification for {}.. ", remoteAddress, t);
+ }
+ });