2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.openflow.md.queue;
10 import java.util.Collection;
11 import java.util.List;
13 import java.util.concurrent.ArrayBlockingQueue;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.CopyOnWriteArraySet;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Future;
18 import java.util.concurrent.RejectedExecutionHandler;
19 import java.util.concurrent.ThreadPoolExecutor;
20 import java.util.concurrent.TimeUnit;
21 import org.opendaylight.openflowplugin.api.openflow.md.core.IMDMessageTranslator;
22 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
23 import org.opendaylight.openflowplugin.api.openflow.md.queue.HarvesterHandle;
24 import org.opendaylight.openflowplugin.api.openflow.md.queue.PopListener;
25 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueItem;
26 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper;
27 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueProcessor;
28 import org.opendaylight.openflowplugin.api.openflow.statistics.MessageSpy;
29 import org.opendaylight.openflowplugin.api.openflow.statistics.MessageSpy.STATISTIC_GROUP;
30 import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
32 import org.opendaylight.yangtools.yang.binding.DataContainer;
33 import org.opendaylight.yangtools.yang.binding.DataObject;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
39 * {@link org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper} implementation focused to keep order and use up mutiple threads for translation phase.
41 * There is internal thread pool of limited size ({@link QueueProcessorLightImpl#setProcessingPoolSize(int)})
42 * dedicated to translation. Then there is singleThreadPool dedicated to publishing (via popListeners)
46 * <li>upon message push ticket is created and enqueued</li>
47 * <li>available threads from internal pool translate the massage wrapped in ticket</li>
48 * <li>when translation of particular message is finished, result is set in future result of wrapping ticket<br>
49 * (order of tickets in queue is not touched during translate)
51 * <li>at the end of queue there is {@link TicketFinisher} running in singleThreadPool and for each ticket it does:
53 * <li>invoke blocking {@link BlockingQueue#take()} method in order to get the oldest ticket</li>
54 * <li>invoke blocking {@link Future#get()} on the dequeued ticket</li>
55 * <li>as soon as the result of translation is available, appropriate popListener is invoked</li>
57 * and this way the order of messages is preserved and also multiple threads are used by translating
63 public class QueueProcessorLightImpl implements QueueProcessor<OfHeader, DataObject> {
65 private static final Logger LOG = LoggerFactory
66 .getLogger(QueueProcessorLightImpl.class);
68 private BlockingQueue<TicketResult<DataObject>> ticketQueue;
69 private ThreadPoolExecutor processorPool;
70 private int processingPoolSize = 4;
71 private ExecutorService harvesterPool;
72 private ExecutorService finisherPool;
74 protected Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping;
75 private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping;
76 private TicketProcessorFactory<OfHeader, DataObject> ticketProcessorFactory;
77 private MessageSpy<DataContainer> messageSpy;
78 protected Collection<QueueKeeper<OfHeader>> messageSources;
79 private QueueKeeperHarvester<OfHeader> harvester;
81 protected TicketFinisher<DataObject> finisher;
87 int ticketQueueCapacity = 1500;
88 ticketQueue = new ArrayBlockingQueue<>(ticketQueueCapacity);
90 * TODO FIXME - DOES THIS REALLY NEED TO BE CONCURRENT? Can we figure out
91 * a better lifecycle? Why does this have to be a Set?
93 messageSources = new CopyOnWriteArraySet<>();
95 processorPool = new ThreadPoolLoggingExecutor(processingPoolSize, processingPoolSize, 0,
96 TimeUnit.MILLISECONDS,
97 new ArrayBlockingQueue<Runnable>(ticketQueueCapacity),
99 // force blocking when pool queue is full
100 processorPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
102 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
104 executor.getQueue().put(r);
105 } catch (InterruptedException e) {
106 Thread.currentThread().interrupt();
107 throw new IllegalStateException(e);
112 harvesterPool = new ThreadPoolLoggingExecutor(1, 1, 0,
113 TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), "OFmsgHarvester");
114 finisherPool = new ThreadPoolLoggingExecutor(1, 1, 0,
115 TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), "OFmsgFinisher");
116 finisher = new TicketFinisherImpl(
117 ticketQueue, popListenersMapping);
118 finisherPool.execute(finisher);
120 harvester = new QueueKeeperHarvester<OfHeader>(this, messageSources);
121 harvesterPool.execute(harvester);
123 ticketProcessorFactory = new TicketProcessorFactoryImpl();
124 ticketProcessorFactory.setTranslatorMapping(translatorMapping);
125 ticketProcessorFactory.setSpy(messageSpy);
126 ticketProcessorFactory.setTicketFinisher(finisher);
130 * stop processing queue
132 public void shutdown() {
133 processorPool.shutdown();
137 public void enqueueQueueItem(QueueItem<OfHeader> queueItem) {
138 messageSpy.spyMessage(queueItem.getMessage(), STATISTIC_GROUP.FROM_SWITCH_ENQUEUED);
139 TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
140 ticket.setConductor(queueItem.getConnectionConductor());
141 ticket.setMessage(queueItem.getMessage());
142 ticket.setQueueType(queueItem.getQueueType());
144 LOG.trace("ticket scheduling: {}, ticket: {}",
145 queueItem.getMessage().getImplementedInterface().getSimpleName(),
146 System.identityHashCode(queueItem));
147 scheduleTicket(ticket);
152 public void directProcessQueueItem(QueueItem<OfHeader> queueItem) {
153 messageSpy.spyMessage(queueItem.getMessage(), STATISTIC_GROUP.FROM_SWITCH_ENQUEUED);
154 TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
155 ticket.setConductor(queueItem.getConnectionConductor());
156 ticket.setMessage(queueItem.getMessage());
158 LOG.debug("ticket scheduling: {}, ticket: {}",
159 queueItem.getMessage().getImplementedInterface().getSimpleName(),
160 System.identityHashCode(queueItem));
162 ticketProcessorFactory.createProcessor(ticket).run();
164 // publish notification
165 finisher.firePopNotification(ticket.getDirectResult());
171 private void scheduleTicket(Ticket<OfHeader, DataObject> ticket) {
172 switch (ticket.getQueueType()) {
174 Runnable ticketProcessor = ticketProcessorFactory.createProcessor(ticket);
175 processorPool.execute(ticketProcessor);
177 ticketQueue.put(ticket);
178 } catch (InterruptedException e) {
179 LOG.warn("enqeueue of unordered message ticket failed", e);
183 Runnable ticketProcessorSync = ticketProcessorFactory.createSyncProcessor(ticket);
184 processorPool.execute(ticketProcessorSync);
187 LOG.warn("unsupported enqueue type: {}", ticket.getQueueType());
192 * @param poolSize the poolSize to set
194 public void setProcessingPoolSize(int poolSize) {
195 this.processingPoolSize = poolSize;
199 public void setTranslatorMapping(
200 Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping) {
201 this.translatorMapping = translatorMapping;
205 public void setPopListenersMapping(
206 Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping) {
207 this.popListenersMapping = popListenersMapping;
211 * @param messageSpy the messageSpy to set
213 public void setMessageSpy(MessageSpy<DataContainer> messageSpy) {
214 this.messageSpy = messageSpy;
218 public AutoCloseable registerMessageSource(QueueKeeper<OfHeader> queue) {
219 boolean added = messageSources.add(queue);
221 LOG.debug("registration of message source queue failed - already registered");
223 MessageSourcePollRegistration<QueueKeeper<OfHeader>> queuePollRegistration =
224 new MessageSourcePollRegistration<>(this, queue);
225 return queuePollRegistration;
229 public boolean unregisterMessageSource(QueueKeeper<OfHeader> queue) {
230 return messageSources.remove(queue);
234 public Collection<QueueKeeper<OfHeader>> getMessageSources() {
235 return messageSources;
239 public HarvesterHandle getHarvesterHandle() {