85403b8024c56be6efeb0bbd51d2fdfe7d2c4c4a
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / connection / ThrottledNotificationsOffererImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.connection;
10
11 import com.google.common.util.concurrent.ListenableFuture;
12 import com.google.common.util.concurrent.SettableFuture;
13 import java.util.Map;
14 import java.util.Queue;
15 import java.util.concurrent.ArrayBlockingQueue;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.CountDownLatch;
18 import java.util.concurrent.TimeUnit;
19 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
20 import org.opendaylight.openflowplugin.api.openflow.connection.ThrottledNotificationsOfferer;
21 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
22 import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
23 import org.opendaylight.yangtools.yang.binding.Notification;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 /**
28  * Created by Martin Bobak <mbobak@cisco.com> on 8.5.2015.
29  */
30 public class ThrottledNotificationsOffererImpl<T extends Notification> implements ThrottledNotificationsOfferer<T>, Runnable {
31
32     private static final Logger LOG = LoggerFactory.getLogger(ThrottledNotificationsOffererImpl.class);
33     private final Map<Queue<T>, SettableFuture<Void>> throttledQueues = new ConcurrentHashMap<>();
34     private final ThreadPoolLoggingExecutor throttleWorkerPool;
35     private final NotificationPublishService notificationPublishService;
36     private final MessageSpy messageIntelligenceAgency;
37     private boolean finishing = false;
38     private CountDownLatch sleeperLatch = new CountDownLatch(0);
39
40     /**
41      * @param notificationPublishService
42      * @param messageIntelligenceAgency
43      */
44     public ThrottledNotificationsOffererImpl(final NotificationPublishService notificationPublishService, final MessageSpy messageIntelligenceAgency) {
45         this.notificationPublishService = notificationPublishService;
46         this.messageIntelligenceAgency = messageIntelligenceAgency;
47         throttleWorkerPool = new ThreadPoolLoggingExecutor(
48                 1, 1, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), "throttleWorkerPool");
49         throttleWorkerPool.execute(this);
50         LOG.info("throttled worker started");
51     }
52
53     @Override
54     public ListenableFuture<Void> applyThrottlingOnConnection(final Queue<T> notificationsQueue) {
55         SettableFuture<Void> throttleWatching = SettableFuture.create();
56         throttledQueues.put(notificationsQueue, throttleWatching);
57         synchronized (throttledQueues) {
58             sleeperLatch.countDown();
59         }
60         return throttleWatching;
61     }
62
63
64     @Override
65     public void run() {
66         while (!finishing) {
67             if (throttledQueues.isEmpty()) {
68                 // do some sleeping
69                 synchronized (throttledQueues) {
70                     if (throttledQueues.isEmpty()) {
71                         sleeperLatch = new CountDownLatch(1);
72                     }
73                 }
74                 try {
75                     sleeperLatch.await();
76                 } catch (InterruptedException e) {
77                     // NOOP
78                 }
79             } else {
80                 for (Map.Entry<Queue<T>, SettableFuture<Void>> throttledTuple : throttledQueues.entrySet()) {
81                     Queue<T> key = throttledTuple.getKey();
82                     T notification = key.poll();
83                     if (notification == null) {
84                         synchronized (key) {
85                             // free throttling and announce via future
86                             throttledTuple.getValue().set(null);
87                             throttledQueues.remove(key);
88                         }
89                     } else {
90                         try {
91                             notificationPublishService.putNotification(notification);
92                             messageIntelligenceAgency.spyMessage(notification.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
93                         } catch (InterruptedException e) {
94                             LOG.trace("putNotification failed.. ", e);
95                             messageIntelligenceAgency.spyMessage(notification.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
96                         }
97                     }
98                     if (finishing) {
99                         break;
100                     }
101                 }
102             }
103         }
104
105         LOG.info("throttled worker finishing");
106     }
107
108     @Override
109     public boolean isThrottlingEffective(final Queue<T> notificationsQueue) {
110         return throttledQueues.containsKey(notificationsQueue);
111     }
112
113     @Override
114     public void close() throws SecurityException {
115         finishing = true;
116         throttleWorkerPool.shutdown();
117         if (!throttleWorkerPool.isTerminated()) {
118             try {
119                 throttleWorkerPool.awaitTermination(2L, TimeUnit.SECONDS);
120             } catch (InterruptedException e) {
121                 LOG.debug("Exception during pool shutdown: {}", e.getMessage());
122             } finally {
123                 if (!throttleWorkerPool.isTerminated()) {
124                     throttleWorkerPool.shutdownNow();
125                 }
126             }
127         }
128     }
129 }