2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.impl.connection;
11 import com.google.common.util.concurrent.ListenableFuture;
12 import com.google.common.util.concurrent.SettableFuture;
14 import java.util.Queue;
15 import java.util.concurrent.ArrayBlockingQueue;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.CountDownLatch;
18 import java.util.concurrent.TimeUnit;
19 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
20 import org.opendaylight.openflowplugin.api.openflow.connection.ThrottledNotificationsOfferer;
21 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
22 import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
23 import org.opendaylight.yangtools.yang.binding.Notification;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
28 * Created by Martin Bobak <mbobak@cisco.com> on 8.5.2015.
30 public class ThrottledNotificationsOffererImpl<T extends Notification> implements ThrottledNotificationsOfferer<T>, Runnable {
32 private static final Logger LOG = LoggerFactory.getLogger(ThrottledNotificationsOffererImpl.class);
33 private final Map<Queue<T>, SettableFuture<Void>> throttledQueues = new ConcurrentHashMap<>();
34 private final ThreadPoolLoggingExecutor throttleWorkerPool;
35 private final NotificationPublishService notificationPublishService;
36 private final MessageSpy<Class<?>> messageIntelligenceAgency;
37 private boolean finishing = false;
38 private CountDownLatch sleeperLatch = new CountDownLatch(0);
41 * @param notificationPublishService
42 * @param messageIntelligenceAgency
44 public ThrottledNotificationsOffererImpl(final NotificationPublishService notificationPublishService, final MessageSpy<Class<?>> messageIntelligenceAgency) {
45 this.notificationPublishService = notificationPublishService;
46 this.messageIntelligenceAgency = messageIntelligenceAgency;
47 throttleWorkerPool = new ThreadPoolLoggingExecutor(
48 1, 1, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), "throttleWorkerPool");
49 throttleWorkerPool.execute(this);
50 LOG.info("throttled worker started");
54 public ListenableFuture<Void> applyThrottlingOnConnection(final Queue<T> notificationsQueue) {
55 SettableFuture<Void> throttleWatching = SettableFuture.create();
56 throttledQueues.put(notificationsQueue, throttleWatching);
57 synchronized (throttledQueues) {
58 sleeperLatch.countDown();
60 return throttleWatching;
67 if (throttledQueues.isEmpty()) {
69 synchronized (throttledQueues) {
70 if (throttledQueues.isEmpty()) {
71 sleeperLatch = new CountDownLatch(1);
76 } catch (InterruptedException e) {
80 for (Map.Entry<Queue<T>, SettableFuture<Void>> throttledTuple : throttledQueues.entrySet()) {
81 Queue<T> key = throttledTuple.getKey();
82 T notification = key.poll();
83 if (notification == null) {
85 // free throttling and announce via future
86 throttledTuple.getValue().set(null);
87 throttledQueues.remove(key);
91 notificationPublishService.putNotification(notification);
92 messageIntelligenceAgency.spyMessage(notification.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
93 } catch (InterruptedException e) {
94 LOG.trace("putNotification failed.. ", e);
95 messageIntelligenceAgency.spyMessage(notification.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
105 LOG.info("throttled worker finishing");
109 public boolean isThrottlingEffective(final Queue<T> notificationsQueue) {
110 return throttledQueues.containsKey(notificationsQueue);
114 public void close() throws SecurityException {
116 throttleWorkerPool.shutdown();
117 if (!throttleWorkerPool.isTerminated()) {
119 throttleWorkerPool.awaitTermination(2L, TimeUnit.SECONDS);
120 } catch (InterruptedException e) {
121 LOG.debug("Exception during pool shutdown: {}", e.getMessage());
123 if (!throttleWorkerPool.isTerminated()) {
124 throttleWorkerPool.shutdownNow();