import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
/**
- * @author mirehak
+ * This processing mechanism based on queue. Processing consists of 2 steps: translate and publish.
+ * Proposed workflow (might slightly deviate in implementations):
+ * <ol>
+ * <li>messages of input type are pushed in (via {@link QueueKeeper#push(Object, ConnectionConductor)} and similar)</li>
+ * <li>ticket (executable task) is build upon each pushed message and enqueued</li>
+ * <li>ticket is translated using appropriate translator</li>
+ * <li>ticket is dequeued and result is published by appropriate popListener</li>
+ * </ol>
+ * Message order might be not important, e.g. when speed is of the essence
* @param <IN> source type
* @param <OUT> result type
*/
public interface QueueKeeper<IN, OUT> {
- public enum QueueType {DEFAULT, UNORDERED}
+ /** type of message enqueue */
+ public enum QueueType {
+ /** ordered processing */
+ DEFAULT,
+ /** unordered processing - bypass fair processing */
+ UNORDERED}
/**
- * @param translatorMapping
+ * @param translatorMapping translators for message processing
*/
void setTranslatorMapping(Map<TranslatorKey, Collection<IMDMessageTranslator<IN, List<OUT>>>> translatorMapping);
/**
+ * enqueue message for processing using {@link QueueType#DEFAULT}
* @param message
- * @param conductor
+ * @param conductor source of message
*/
void push(IN message, ConnectionConductor conductor);
/**
+ * enqueue message for processing
* @param message
- * @param conductor
- * @param ordered - true if message order matters, false otherwise
+ * @param conductor source of message
+ * @param queueType - {@link QueueType#DEFAULT} if message order matters, {@link QueueType#UNORDERED} otherwise
*/
void push(IN message, ConnectionConductor conductor, QueueType queueType);
/**
- * @param popListenersMapping the popListenersMapping to set
+ * @param popListenersMapping listeners invoked when processing done
*/
void setPopListenersMapping(Map<Class<? extends OUT>, Collection<PopListener<OUT>>> popListenersMapping);
}
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
- * @author mirehak
+ * {@link QueueKeeper} implementation focused to keep order and use up mutiple threads for translation phase.
+ * <br/>
+ * There is internal thread pool of limited size ({@link QueueKeeperLightImpl#setPoolSize(int)})
+ * dedicated to translation. Then there is singleThreadPool dedicated to publishing (via popListeners)
+ * <br/>
+ * Workflow:
+ * <ol>
+ * <li>upon message push ticket is created and enqueued</li>
+ * <li>available threads from internal pool translate the massage wrapped in ticket</li>
+ * <li>when translation of particular message is finished, result is set in future result of wrapping ticket</br>
+ * (order of tickets in queue is not touched during translate)
+ * </li>
+ * <li>at the end of queue there is {@link TicketFinisher} running in singleThreadPool and for each ticket it does:
+ * <ol>
+ * <li>invoke blocking {@link BlockingQueue#take()} method in order to get the oldest ticket</li>
+ * <li>invoke blocking {@link Future#get()} on the dequeued ticket</li>
+ * <li>as soon as the result of translation is available, appropriate popListener is invoked</li>
+ * </ol>
+ * and this way the order of messages is preserved and also multiple threads are used by translating
+ * </li>
+ * </ol>
+ *
+ *
*/
public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {