Merge "Bug 8873 - Bundle based reconciliation to enable bundling of messages"
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / queue / QueueProcessorLightImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.openflow.md.queue;
9
10 import java.util.Collection;
11 import java.util.List;
12 import java.util.Map;
13 import java.util.concurrent.ArrayBlockingQueue;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.CopyOnWriteArraySet;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Future;
18 import java.util.concurrent.RejectedExecutionHandler;
19 import java.util.concurrent.ThreadPoolExecutor;
20 import java.util.concurrent.TimeUnit;
21 import org.opendaylight.openflowplugin.api.openflow.md.core.IMDMessageTranslator;
22 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
23 import org.opendaylight.openflowplugin.api.openflow.md.queue.HarvesterHandle;
24 import org.opendaylight.openflowplugin.api.openflow.md.queue.PopListener;
25 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueItem;
26 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper;
27 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueProcessor;
28 import org.opendaylight.openflowplugin.api.openflow.statistics.MessageSpy;
29 import org.opendaylight.openflowplugin.api.openflow.statistics.MessageSpy.StatisticsGroup;
30 import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
32 import org.opendaylight.yangtools.yang.binding.DataContainer;
33 import org.opendaylight.yangtools.yang.binding.DataObject;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37
38 /**
39  * {@link org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper} implementation focused to keep order and use up mutiple threads for translation phase.
40  * <br>
41  * There is internal thread pool of limited size ({@link QueueProcessorLightImpl#setProcessingPoolSize(int)})
42  * dedicated to translation. Then there is singleThreadPool dedicated to publishing (via popListeners)
43  * <br>
44  * Workflow:
45  * <ol>
46  * <li>upon message push ticket is created and enqueued</li>
47  * <li>available threads from internal pool translate the massage wrapped in ticket</li>
48  * <li>when translation of particular message is finished, result is set in future result of wrapping ticket<br>
49  *     (order of tickets in queue is not touched during translate)
50  * </li>
51  * <li>at the end of queue there is {@link TicketFinisher} running in singleThreadPool and for each ticket it does:
52  *    <ol>
53  *      <li>invoke blocking {@link BlockingQueue#take()} method in order to get the oldest ticket</li>
54  *      <li>invoke blocking {@link Future#get()} on the dequeued ticket</li>
55  *      <li>as soon as the result of translation is available, appropriate popListener is invoked</li>
56  *    </ol>
57  *    and this way the order of messages is preserved and also multiple threads are used by translating
58  * </li>
59  * </ol>
60  *
61  *
62  */
63 public class QueueProcessorLightImpl implements QueueProcessor<OfHeader, DataObject> {
64
65     private static final Logger LOG = LoggerFactory
66             .getLogger(QueueProcessorLightImpl.class);
67
68     private BlockingQueue<TicketResult<DataObject>> ticketQueue;
69     private ThreadPoolExecutor processorPool;
70     private int processingPoolSize = 4;
71     private ExecutorService harvesterPool;
72     private ExecutorService finisherPool;
73
74     protected Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping;
75     private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping;
76     private TicketProcessorFactory<OfHeader, DataObject> ticketProcessorFactory;
77     private MessageSpy<DataContainer> messageSpy;
78     protected Collection<QueueKeeper<OfHeader>> messageSources;
79     private QueueKeeperHarvester<OfHeader> harvester;
80
81     protected TicketFinisher<DataObject> finisher;
82
83     /**
84      * prepare queue
85      */
86     public void init() {
87         int ticketQueueCapacity = 1500;
88         ticketQueue = new ArrayBlockingQueue<>(ticketQueueCapacity);
89         /*
90          * TODO FIXME - DOES THIS REALLY NEED TO BE CONCURRENT?  Can we figure out
91          * a better lifecycle?  Why does this have to be a Set?
92          */
93         messageSources = new CopyOnWriteArraySet<>();
94
95         processorPool = new ThreadPoolLoggingExecutor(processingPoolSize, processingPoolSize, 0,
96                 TimeUnit.MILLISECONDS,
97                 new ArrayBlockingQueue<Runnable>(ticketQueueCapacity),
98                 "OFmsgProcessor");
99         // force blocking when pool queue is full
100         processorPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
101             @Override
102             public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
103                 try {
104                     executor.getQueue().put(r);
105                 } catch (InterruptedException e) {
106                     Thread.currentThread().interrupt();
107                     throw new IllegalStateException(e);
108                 }
109             }
110         });
111
112         harvesterPool = new ThreadPoolLoggingExecutor(1, 1, 0,
113                 TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), "OFmsgHarvester");
114         finisherPool = new ThreadPoolLoggingExecutor(1, 1, 0,
115                 TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), "OFmsgFinisher");
116         finisher = new TicketFinisherImpl(
117                 ticketQueue, popListenersMapping);
118         finisherPool.execute(finisher);
119
120         harvester = new QueueKeeperHarvester<OfHeader>(this, messageSources);
121         harvesterPool.execute(harvester);
122
123         ticketProcessorFactory = new TicketProcessorFactoryImpl();
124         ticketProcessorFactory.setTranslatorMapping(translatorMapping);
125         ticketProcessorFactory.setSpy(messageSpy);
126         ticketProcessorFactory.setTicketFinisher(finisher);
127     }
128
129     /**
130      * stop processing queue
131      */
132     public void shutdown() {
133         processorPool.shutdown();
134     }
135
136     @Override
137     public void enqueueQueueItem(QueueItem<OfHeader> queueItem) {
138         messageSpy.spyMessage(queueItem.getMessage(), StatisticsGroup.FROM_SWITCH_ENQUEUED);
139         TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
140         ticket.setConductor(queueItem.getConnectionConductor());
141         ticket.setMessage(queueItem.getMessage());
142         ticket.setQueueType(queueItem.getQueueType());
143
144         LOG.trace("ticket scheduling: {}, ticket: {}",
145                 queueItem.getMessage().getImplementedInterface().getSimpleName(),
146                 System.identityHashCode(queueItem));
147         scheduleTicket(ticket);
148     }
149
150
151     @Override
152     public void directProcessQueueItem(QueueItem<OfHeader> queueItem) {
153         messageSpy.spyMessage(queueItem.getMessage(), StatisticsGroup.FROM_SWITCH_ENQUEUED);
154         TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
155         ticket.setConductor(queueItem.getConnectionConductor());
156         ticket.setMessage(queueItem.getMessage());
157
158         LOG.debug("ticket scheduling: {}, ticket: {}",
159                 queueItem.getMessage().getImplementedInterface().getSimpleName(),
160                 System.identityHashCode(queueItem));
161
162         ticketProcessorFactory.createProcessor(ticket).run();
163
164         // publish notification
165         finisher.firePopNotification(ticket.getDirectResult());
166     }
167
168     /**
169      * @param ticket
170      */
171     private void scheduleTicket(Ticket<OfHeader, DataObject> ticket) {
172         switch (ticket.getQueueType()) {
173         case DEFAULT:
174             Runnable ticketProcessor = ticketProcessorFactory.createProcessor(ticket);
175             processorPool.execute(ticketProcessor);
176             try {
177                 ticketQueue.put(ticket);
178             } catch (InterruptedException e) {
179                 LOG.warn("enqeueue of unordered message ticket failed", e);
180             }
181             break;
182         case UNORDERED:
183             Runnable ticketProcessorSync = ticketProcessorFactory.createSyncProcessor(ticket);
184             processorPool.execute(ticketProcessorSync);
185             break;
186         default:
187             LOG.warn("unsupported enqueue type: {}", ticket.getQueueType());
188         }
189     }
190
191     /**
192      * @param poolSize the poolSize to set
193      */
194     public void setProcessingPoolSize(int poolSize) {
195         this.processingPoolSize = poolSize;
196     }
197
198     @Override
199     public void setTranslatorMapping(
200             Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping) {
201         this.translatorMapping = translatorMapping;
202     }
203
204     @Override
205     public void setPopListenersMapping(
206             Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping) {
207         this.popListenersMapping = popListenersMapping;
208     }
209
210     /**
211      * @param messageSpy the messageSpy to set
212      */
213     public void setMessageSpy(MessageSpy<DataContainer> messageSpy) {
214         this.messageSpy = messageSpy;
215     }
216
217     @Override
218     public AutoCloseable registerMessageSource(QueueKeeper<OfHeader> queue) {
219         boolean added = messageSources.add(queue);
220         if (! added) {
221             LOG.debug("registration of message source queue failed - already registered");
222         }
223         MessageSourcePollRegistration<QueueKeeper<OfHeader>> queuePollRegistration =
224                 new MessageSourcePollRegistration<>(this, queue);
225         return queuePollRegistration;
226     }
227
228     @Override
229     public boolean unregisterMessageSource(QueueKeeper<OfHeader> queue) {
230         return messageSources.remove(queue);
231     }
232
233     @Override
234     public Collection<QueueKeeper<OfHeader>> getMessageSources() {
235         return messageSources;
236     }
237
238     @Override
239     public HarvesterHandle getHarvesterHandle() {
240         return harvester;
241     }
242 }