BUG-1075: ingress back pressure
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / queue / QueueProcessorLightImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.openflow.md.queue;
9
10 import java.util.Collection;
11 import java.util.Comparator;
12 import java.util.List;
13 import java.util.Map;
14 import java.util.concurrent.ArrayBlockingQueue;
15 import java.util.concurrent.BlockingQueue;
16 import java.util.concurrent.ConcurrentSkipListSet;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.Future;
19 import java.util.concurrent.RejectedExecutionHandler;
20 import java.util.concurrent.ThreadPoolExecutor;
21 import java.util.concurrent.TimeUnit;
22
23 import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
24 import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
25 import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
26 import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy.STATISTIC_GROUP;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
28 import org.opendaylight.yangtools.yang.binding.DataContainer;
29 import org.opendaylight.yangtools.yang.binding.DataObject;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33
34 /**
35  * {@link QueueKeeper} implementation focused to keep order and use up mutiple threads for translation phase.
36  * <br/>
37  * There is internal thread pool of limited size ({@link QueueProcessorLightImpl#setProcessingPoolSize(int)}) 
38  * dedicated to translation. Then there is singleThreadPool dedicated to publishing (via popListeners)
39  * <br/>
40  * Workflow:
41  * <ol>
42  * <li>upon message push ticket is created and enqueued</li>
43  * <li>available threads from internal pool translate the massage wrapped in ticket</li>
44  * <li>when translation of particular message is finished, result is set in future result of wrapping ticket</br>
45  *     (order of tickets in queue is not touched during translate)
46  * </li>
47  * <li>at the end of queue there is {@link TicketFinisher} running in singleThreadPool and for each ticket it does:
48  *    <ol>
49  *      <li>invoke blocking {@link BlockingQueue#take()} method in order to get the oldest ticket</li>
50  *      <li>invoke blocking {@link Future#get()} on the dequeued ticket</li>
51  *      <li>as soon as the result of translation is available, appropriate popListener is invoked</li>
52  *    </ol>
53  *    and this way the order of messages is preserved and also multiple threads are used by translating 
54  * </li>
55  * </ol>
56  * 
57  * 
58  */
59 public class QueueProcessorLightImpl implements QueueProcessor<OfHeader, DataObject> {
60
61     protected static final Logger LOG = LoggerFactory
62             .getLogger(QueueProcessorLightImpl.class);
63
64     private BlockingQueue<TicketResult<DataObject>> ticketQueue;
65     private ThreadPoolExecutor processorPool;
66     private int processingPoolSize = 4;
67     private ExecutorService harvesterPool;
68     private ExecutorService finisherPool;
69     
70     protected Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping;
71     private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping;
72     private TicketProcessorFactory<OfHeader, DataObject> ticketProcessorFactory;
73     private MessageSpy<DataContainer> messageSpy;
74     protected Collection<QueueKeeper<OfHeader>> messageSources;
75     private QueueKeeperHarvester<OfHeader> harvester;
76
77     protected TicketFinisher<DataObject> finisher;
78
79     /**
80      * prepare queue
81      */
82     public void init() {
83         int ticketQueueCapacity = 1500;
84         ticketQueue = new ArrayBlockingQueue<>(ticketQueueCapacity);
85         messageSources = new ConcurrentSkipListSet<>(
86                 new Comparator<QueueKeeper<OfHeader>>() {
87                     @Override
88                     public int compare(QueueKeeper<OfHeader> o1,
89                             QueueKeeper<OfHeader> o2) {
90                         return Integer.valueOf(o1.hashCode()).compareTo(o2.hashCode());
91                     }
92                 });
93         
94         processorPool = new ThreadPoolLoggingExecutor(processingPoolSize, processingPoolSize, 0, 
95                 TimeUnit.MILLISECONDS, 
96                 new ArrayBlockingQueue<Runnable>(ticketQueueCapacity), 
97                 "OFmsgProcessor");
98         // force blocking when pool queue is full
99         processorPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
100             @Override
101             public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
102                 try {
103                     executor.getQueue().put(r);
104                 } catch (InterruptedException e) {
105                     Thread.currentThread().interrupt();
106                     throw new IllegalStateException(e);
107                 }
108             }
109         });
110         
111         harvesterPool = new ThreadPoolLoggingExecutor(1, 1, 0, 
112                 TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), "OFmsgHarvester");
113         finisherPool = new ThreadPoolLoggingExecutor(1, 1, 0, 
114                 TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), "OFmsgFinisher");
115         finisher = new TicketFinisherImpl(
116                 ticketQueue, popListenersMapping);
117         finisherPool.execute(finisher);
118         
119         harvester = new QueueKeeperHarvester<OfHeader>(this, messageSources);
120         harvesterPool.execute(harvester);
121
122         ticketProcessorFactory = new TicketProcessorFactoryImpl();
123         ticketProcessorFactory.setTranslatorMapping(translatorMapping);
124         ticketProcessorFactory.setSpy(messageSpy);
125         ticketProcessorFactory.setTicketFinisher(finisher);
126     }
127
128     /**
129      * stop processing queue
130      */
131     public void shutdown() {
132         processorPool.shutdown();
133     }
134
135     @Override
136     public void enqueueQueueItem(QueueItem<OfHeader> queueItem) {
137         messageSpy.spyMessage(queueItem.getMessage(), STATISTIC_GROUP.FROM_SWITCH_ENQUEUED);
138         TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
139         ticket.setConductor(queueItem.getConnectionConductor());
140         ticket.setMessage(queueItem.getMessage());
141         ticket.setQueueType(queueItem.getQueueType());
142         
143         LOG.trace("ticket scheduling: {}, ticket: {}",
144                 queueItem.getMessage().getImplementedInterface().getSimpleName(), 
145                 System.identityHashCode(queueItem));
146         scheduleTicket(ticket);
147     }
148     
149     
150     @Override
151     public void directProcessQueueItem(QueueItem<OfHeader> queueItem) {
152         messageSpy.spyMessage(queueItem.getMessage(), STATISTIC_GROUP.FROM_SWITCH_ENQUEUED);
153         TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
154         ticket.setConductor(queueItem.getConnectionConductor());
155         ticket.setMessage(queueItem.getMessage());
156         
157         LOG.debug("ticket scheduling: {}, ticket: {}",
158                 queueItem.getMessage().getImplementedInterface().getSimpleName(), 
159                 System.identityHashCode(queueItem));
160         
161         ticketProcessorFactory.createProcessor(ticket).run();
162         
163         // publish notification
164         finisher.firePopNotification(ticket.getDirectResult());
165     }
166
167     /**
168      * @param ticket
169      */
170     private void scheduleTicket(Ticket<OfHeader, DataObject> ticket) {
171         switch (ticket.getQueueType()) {
172         case DEFAULT:
173             Runnable ticketProcessor = ticketProcessorFactory.createProcessor(ticket);
174             processorPool.execute(ticketProcessor);
175             try {
176                 ticketQueue.put(ticket);
177             } catch (InterruptedException e) {
178                 LOG.warn("enqeueue of unordered message ticket failed", e);
179             }
180             break;
181         case UNORDERED:
182             Runnable ticketProcessorSync = ticketProcessorFactory.createSyncProcessor(ticket);
183             processorPool.execute(ticketProcessorSync);
184             break;
185         default:
186             LOG.warn("unsupported enqueue type: {}", ticket.getQueueType());
187         }
188     }
189
190     /**
191      * @param poolSize the poolSize to set
192      */
193     public void setProcessingPoolSize(int poolSize) {
194         this.processingPoolSize = poolSize;
195     }
196
197     @Override
198     public void setTranslatorMapping(
199             Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping) {
200         this.translatorMapping = translatorMapping;
201     }
202
203     @Override
204     public void setPopListenersMapping(
205             Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping) {
206         this.popListenersMapping = popListenersMapping;
207     }
208
209     /**
210      * @param messageSpy the messageSpy to set
211      */
212     public void setMessageSpy(MessageSpy<DataContainer> messageSpy) {
213         this.messageSpy = messageSpy;
214     }
215
216     @Override
217     public AutoCloseable registerMessageSource(QueueKeeper<OfHeader> queue) {
218         boolean added = messageSources.add(queue);
219         if (! added) {
220             LOG.debug("registration of message source queue failed - already registered");
221         }
222         MessageSourcePollRegistration<QueueKeeper<OfHeader>> queuePollRegistration = 
223                 new MessageSourcePollRegistration<>(this, queue);
224         return queuePollRegistration;
225     }
226     
227     @Override
228     public boolean unregisterMessageSource(QueueKeeper<OfHeader> queue) {
229         return messageSources.remove(queue);
230     }
231     
232     @Override
233     public Collection<QueueKeeper<OfHeader>> getMessageSources() {
234         return messageSources;
235     }
236     
237     @Override
238     public HarvesterHandle getHarvesterHandle() {
239         return harvester;
240     }
241 }