Bug 2103: Fix hash collisions preventing some switches from registering
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / queue / QueueProcessorLightImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.openflow.md.queue;
9
10 import java.util.Collection;
11 import java.util.List;
12 import java.util.Map;
13 import java.util.concurrent.ArrayBlockingQueue;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.CopyOnWriteArraySet;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Future;
18 import java.util.concurrent.RejectedExecutionHandler;
19 import java.util.concurrent.ThreadPoolExecutor;
20 import java.util.concurrent.TimeUnit;
21
22 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
23 import org.opendaylight.openflowplugin.api.statistics.MessageSpy;
24 import org.opendaylight.openflowplugin.api.statistics.MessageSpy.STATISTIC_GROUP;
25 import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
26 import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
28 import org.opendaylight.yangtools.yang.binding.DataContainer;
29 import org.opendaylight.yangtools.yang.binding.DataObject;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33
34 /**
35  * {@link QueueKeeper} implementation focused to keep order and use up mutiple threads for translation phase.
36  * <br/>
37  * There is internal thread pool of limited size ({@link QueueProcessorLightImpl#setProcessingPoolSize(int)})
38  * dedicated to translation. Then there is singleThreadPool dedicated to publishing (via popListeners)
39  * <br/>
40  * Workflow:
41  * <ol>
42  * <li>upon message push ticket is created and enqueued</li>
43  * <li>available threads from internal pool translate the massage wrapped in ticket</li>
44  * <li>when translation of particular message is finished, result is set in future result of wrapping ticket</br>
45  *     (order of tickets in queue is not touched during translate)
46  * </li>
47  * <li>at the end of queue there is {@link TicketFinisher} running in singleThreadPool and for each ticket it does:
48  *    <ol>
49  *      <li>invoke blocking {@link BlockingQueue#take()} method in order to get the oldest ticket</li>
50  *      <li>invoke blocking {@link Future#get()} on the dequeued ticket</li>
51  *      <li>as soon as the result of translation is available, appropriate popListener is invoked</li>
52  *    </ol>
53  *    and this way the order of messages is preserved and also multiple threads are used by translating
54  * </li>
55  * </ol>
56  *
57  *
58  */
59 public class QueueProcessorLightImpl implements QueueProcessor<OfHeader, DataObject> {
60
61     protected static final Logger LOG = LoggerFactory
62             .getLogger(QueueProcessorLightImpl.class);
63
64     private BlockingQueue<TicketResult<DataObject>> ticketQueue;
65     private ThreadPoolExecutor processorPool;
66     private int processingPoolSize = 4;
67     private ExecutorService harvesterPool;
68     private ExecutorService finisherPool;
69
70     protected Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping;
71     private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping;
72     private TicketProcessorFactory<OfHeader, DataObject> ticketProcessorFactory;
73     private MessageSpy<DataContainer> messageSpy;
74     protected Collection<QueueKeeper<OfHeader>> messageSources;
75     private QueueKeeperHarvester<OfHeader> harvester;
76
77     protected TicketFinisher<DataObject> finisher;
78
79     /**
80      * prepare queue
81      */
82     public void init() {
83         int ticketQueueCapacity = 1500;
84         ticketQueue = new ArrayBlockingQueue<>(ticketQueueCapacity);
85         /*
86          * TODO FIXME - DOES THIS REALLY NEED TO BE CONCURRENT?  Can we figure out
87          * a better lifecycle?  Why does this have to be a Set?
88          */
89         messageSources = new CopyOnWriteArraySet<>();
90
91         processorPool = new ThreadPoolLoggingExecutor(processingPoolSize, processingPoolSize, 0,
92                 TimeUnit.MILLISECONDS,
93                 new ArrayBlockingQueue<Runnable>(ticketQueueCapacity),
94                 "OFmsgProcessor");
95         // force blocking when pool queue is full
96         processorPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
97             @Override
98             public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
99                 try {
100                     executor.getQueue().put(r);
101                 } catch (InterruptedException e) {
102                     Thread.currentThread().interrupt();
103                     throw new IllegalStateException(e);
104                 }
105             }
106         });
107
108         harvesterPool = new ThreadPoolLoggingExecutor(1, 1, 0,
109                 TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), "OFmsgHarvester");
110         finisherPool = new ThreadPoolLoggingExecutor(1, 1, 0,
111                 TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1), "OFmsgFinisher");
112         finisher = new TicketFinisherImpl(
113                 ticketQueue, popListenersMapping);
114         finisherPool.execute(finisher);
115
116         harvester = new QueueKeeperHarvester<OfHeader>(this, messageSources);
117         harvesterPool.execute(harvester);
118
119         ticketProcessorFactory = new TicketProcessorFactoryImpl();
120         ticketProcessorFactory.setTranslatorMapping(translatorMapping);
121         ticketProcessorFactory.setSpy(messageSpy);
122         ticketProcessorFactory.setTicketFinisher(finisher);
123     }
124
125     /**
126      * stop processing queue
127      */
128     public void shutdown() {
129         processorPool.shutdown();
130     }
131
132     @Override
133     public void enqueueQueueItem(QueueItem<OfHeader> queueItem) {
134         messageSpy.spyMessage(queueItem.getMessage(), STATISTIC_GROUP.FROM_SWITCH_ENQUEUED);
135         TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
136         ticket.setConductor(queueItem.getConnectionConductor());
137         ticket.setMessage(queueItem.getMessage());
138         ticket.setQueueType(queueItem.getQueueType());
139
140         LOG.trace("ticket scheduling: {}, ticket: {}",
141                 queueItem.getMessage().getImplementedInterface().getSimpleName(),
142                 System.identityHashCode(queueItem));
143         scheduleTicket(ticket);
144     }
145
146
147     @Override
148     public void directProcessQueueItem(QueueItem<OfHeader> queueItem) {
149         messageSpy.spyMessage(queueItem.getMessage(), STATISTIC_GROUP.FROM_SWITCH_ENQUEUED);
150         TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
151         ticket.setConductor(queueItem.getConnectionConductor());
152         ticket.setMessage(queueItem.getMessage());
153
154         LOG.debug("ticket scheduling: {}, ticket: {}",
155                 queueItem.getMessage().getImplementedInterface().getSimpleName(),
156                 System.identityHashCode(queueItem));
157
158         ticketProcessorFactory.createProcessor(ticket).run();
159
160         // publish notification
161         finisher.firePopNotification(ticket.getDirectResult());
162     }
163
164     /**
165      * @param ticket
166      */
167     private void scheduleTicket(Ticket<OfHeader, DataObject> ticket) {
168         switch (ticket.getQueueType()) {
169         case DEFAULT:
170             Runnable ticketProcessor = ticketProcessorFactory.createProcessor(ticket);
171             processorPool.execute(ticketProcessor);
172             try {
173                 ticketQueue.put(ticket);
174             } catch (InterruptedException e) {
175                 LOG.warn("enqeueue of unordered message ticket failed", e);
176             }
177             break;
178         case UNORDERED:
179             Runnable ticketProcessorSync = ticketProcessorFactory.createSyncProcessor(ticket);
180             processorPool.execute(ticketProcessorSync);
181             break;
182         default:
183             LOG.warn("unsupported enqueue type: {}", ticket.getQueueType());
184         }
185     }
186
187     /**
188      * @param poolSize the poolSize to set
189      */
190     public void setProcessingPoolSize(int poolSize) {
191         this.processingPoolSize = poolSize;
192     }
193
194     @Override
195     public void setTranslatorMapping(
196             Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping) {
197         this.translatorMapping = translatorMapping;
198     }
199
200     @Override
201     public void setPopListenersMapping(
202             Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping) {
203         this.popListenersMapping = popListenersMapping;
204     }
205
206     /**
207      * @param messageSpy the messageSpy to set
208      */
209     public void setMessageSpy(MessageSpy<DataContainer> messageSpy) {
210         this.messageSpy = messageSpy;
211     }
212
213     @Override
214     public AutoCloseable registerMessageSource(QueueKeeper<OfHeader> queue) {
215         boolean added = messageSources.add(queue);
216         if (! added) {
217             LOG.debug("registration of message source queue failed - already registered");
218         }
219         MessageSourcePollRegistration<QueueKeeper<OfHeader>> queuePollRegistration =
220                 new MessageSourcePollRegistration<>(this, queue);
221         return queuePollRegistration;
222     }
223
224     @Override
225     public boolean unregisterMessageSource(QueueKeeper<OfHeader> queue) {
226         return messageSources.remove(queue);
227     }
228
229     @Override
230     public Collection<QueueKeeper<OfHeader>> getMessageSources() {
231         return messageSources;
232     }
233
234     @Override
235     public HarvesterHandle getHarvesterHandle() {
236         return harvester;
237     }
238 }