Bug 1764 - moved Session related interfaces to openflowplugin-api
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / queue / QueueProcessorLightImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.openflow.md.queue;
9
10 import java.util.Collection;
11 import java.util.List;
12 import java.util.Map;
13 import java.util.concurrent.ArrayBlockingQueue;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.CopyOnWriteArraySet;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Future;
18 import java.util.concurrent.RejectedExecutionHandler;
19 import java.util.concurrent.ThreadPoolExecutor;
20 import java.util.concurrent.TimeUnit;
21
22 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
23 import org.opendaylight.openflowplugin.api.openflow.md.queue.HarvesterHandle;
24 import org.opendaylight.openflowplugin.api.openflow.md.queue.PopListener;
25 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueItem;
26 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper;
27 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueProcessor;
28 import org.opendaylight.openflowplugin.api.statistics.MessageSpy;
29 import org.opendaylight.openflowplugin.api.statistics.MessageSpy.STATISTIC_GROUP;
30 import org.opendaylight.openflowplugin.api.openflow.md.core.IMDMessageTranslator;
31 import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
33 import org.opendaylight.yangtools.yang.binding.DataContainer;
34 import org.opendaylight.yangtools.yang.binding.DataObject;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38
39 /**
40  * {@link org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper} implementation focused to keep order and use up mutiple threads for translation phase.
41  * <br/>
42  * There is internal thread pool of limited size ({@link QueueProcessorLightImpl#setProcessingPoolSize(int)})
43  * dedicated to translation. Then there is singleThreadPool dedicated to publishing (via popListeners)
44  * <br/>
45  * Workflow:
46  * <ol>
47  * <li>upon message push ticket is created and enqueued</li>
48  * <li>available threads from internal pool translate the massage wrapped in ticket</li>
49  * <li>when translation of particular message is finished, result is set in future result of wrapping ticket</br>
50  *     (order of tickets in queue is not touched during translate)
51  * </li>
52  * <li>at the end of queue there is {@link TicketFinisher} running in singleThreadPool and for each ticket it does:
53  *    <ol>
54  *      <li>invoke blocking {@link BlockingQueue#take()} method in order to get the oldest ticket</li>
55  *      <li>invoke blocking {@link Future#get()} on the dequeued ticket</li>
56  *      <li>as soon as the result of translation is available, appropriate popListener is invoked</li>
57  *    </ol>
58  *    and this way the order of messages is preserved and also multiple threads are used by translating
59  * </li>
60  * </ol>
61  *
62  *
63  */
64 public class QueueProcessorLightImpl implements QueueProcessor<OfHeader, DataObject> {
65
66     protected static final Logger LOG = LoggerFactory
67             .getLogger(QueueProcessorLightImpl.class);
68
69     private BlockingQueue<TicketResult<DataObject>> ticketQueue;
70     private ThreadPoolExecutor processorPool;
71     private int processingPoolSize = 4;
72     private ExecutorService harvesterPool;
73     private ExecutorService finisherPool;
74
75     protected Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping;
76     private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping;
77     private TicketProcessorFactory<OfHeader, DataObject> ticketProcessorFactory;
78     private MessageSpy<DataContainer> messageSpy;
79     protected Collection<QueueKeeper<OfHeader>> messageSources;
80     private QueueKeeperHarvester<OfHeader> harvester;
81
82     protected TicketFinisher<DataObject> finisher;
83
84     /**
85      * prepare queue
86      */
87     public void init() {
88         int ticketQueueCapacity = 1500;
89         ticketQueue = new ArrayBlockingQueue<>(ticketQueueCapacity);
90         /*
91          * TODO FIXME - DOES THIS REALLY NEED TO BE CONCURRENT?  Can we figure out
92          * a better lifecycle?  Why does this have to be a Set?
93          */
94         messageSources = new CopyOnWriteArraySet<>();
95
96         processorPool = new ThreadPoolLoggingExecutor(processingPoolSize, processingPoolSize, 0,
97                 TimeUnit.MILLISECONDS,
98                 new ArrayBlockingQueue<Runnable>(ticketQueueCapacity),
99                 "OFmsgProcessor");
100         // force blocking when pool queue is full
101         processorPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
102             @Override
103             public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
104                 try {
105                     executor.getQueue().put(r);
106                 } catch (InterruptedException e) {
107                     Thread.currentThread().interrupt();
108                     throw new IllegalStateException(e);
109                 }
110             }
111         });
112
113         harvesterPool = new ThreadPoolLoggingExecutor(1, 1, 0,
114                 TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), "OFmsgHarvester");
115         finisherPool = new ThreadPoolLoggingExecutor(1, 1, 0,
116                 TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), "OFmsgFinisher");
117         finisher = new TicketFinisherImpl(
118                 ticketQueue, popListenersMapping);
119         finisherPool.execute(finisher);
120
121         harvester = new QueueKeeperHarvester<OfHeader>(this, messageSources);
122         harvesterPool.execute(harvester);
123
124         ticketProcessorFactory = new TicketProcessorFactoryImpl();
125         ticketProcessorFactory.setTranslatorMapping(translatorMapping);
126         ticketProcessorFactory.setSpy(messageSpy);
127         ticketProcessorFactory.setTicketFinisher(finisher);
128     }
129
130     /**
131      * stop processing queue
132      */
133     public void shutdown() {
134         processorPool.shutdown();
135     }
136
137     @Override
138     public void enqueueQueueItem(QueueItem<OfHeader> queueItem) {
139         messageSpy.spyMessage(queueItem.getMessage(), STATISTIC_GROUP.FROM_SWITCH_ENQUEUED);
140         TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
141         ticket.setConductor(queueItem.getConnectionConductor());
142         ticket.setMessage(queueItem.getMessage());
143         ticket.setQueueType(queueItem.getQueueType());
144
145         LOG.trace("ticket scheduling: {}, ticket: {}",
146                 queueItem.getMessage().getImplementedInterface().getSimpleName(),
147                 System.identityHashCode(queueItem));
148         scheduleTicket(ticket);
149     }
150
151
152     @Override
153     public void directProcessQueueItem(QueueItem<OfHeader> queueItem) {
154         messageSpy.spyMessage(queueItem.getMessage(), STATISTIC_GROUP.FROM_SWITCH_ENQUEUED);
155         TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
156         ticket.setConductor(queueItem.getConnectionConductor());
157         ticket.setMessage(queueItem.getMessage());
158
159         LOG.debug("ticket scheduling: {}, ticket: {}",
160                 queueItem.getMessage().getImplementedInterface().getSimpleName(),
161                 System.identityHashCode(queueItem));
162
163         ticketProcessorFactory.createProcessor(ticket).run();
164
165         // publish notification
166         finisher.firePopNotification(ticket.getDirectResult());
167     }
168
169     /**
170      * @param ticket
171      */
172     private void scheduleTicket(Ticket<OfHeader, DataObject> ticket) {
173         switch (ticket.getQueueType()) {
174         case DEFAULT:
175             Runnable ticketProcessor = ticketProcessorFactory.createProcessor(ticket);
176             processorPool.execute(ticketProcessor);
177             try {
178                 ticketQueue.put(ticket);
179             } catch (InterruptedException e) {
180                 LOG.warn("enqeueue of unordered message ticket failed", e);
181             }
182             break;
183         case UNORDERED:
184             Runnable ticketProcessorSync = ticketProcessorFactory.createSyncProcessor(ticket);
185             processorPool.execute(ticketProcessorSync);
186             break;
187         default:
188             LOG.warn("unsupported enqueue type: {}", ticket.getQueueType());
189         }
190     }
191
192     /**
193      * @param poolSize the poolSize to set
194      */
195     public void setProcessingPoolSize(int poolSize) {
196         this.processingPoolSize = poolSize;
197     }
198
199     @Override
200     public void setTranslatorMapping(
201             Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping) {
202         this.translatorMapping = translatorMapping;
203     }
204
205     @Override
206     public void setPopListenersMapping(
207             Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping) {
208         this.popListenersMapping = popListenersMapping;
209     }
210
211     /**
212      * @param messageSpy the messageSpy to set
213      */
214     public void setMessageSpy(MessageSpy<DataContainer> messageSpy) {
215         this.messageSpy = messageSpy;
216     }
217
218     @Override
219     public AutoCloseable registerMessageSource(QueueKeeper<OfHeader> queue) {
220         boolean added = messageSources.add(queue);
221         if (! added) {
222             LOG.debug("registration of message source queue failed - already registered");
223         }
224         MessageSourcePollRegistration<QueueKeeper<OfHeader>> queuePollRegistration =
225                 new MessageSourcePollRegistration<>(this, queue);
226         return queuePollRegistration;
227     }
228
229     @Override
230     public boolean unregisterMessageSource(QueueKeeper<OfHeader> queue) {
231         return messageSources.remove(queue);
232     }
233
234     @Override
235     public Collection<QueueKeeper<OfHeader>> getMessageSources() {
236         return messageSources;
237     }
238
239     @Override
240     public HarvesterHandle getHarvesterHandle() {
241         return harvester;
242     }
243 }