2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.openflow.md.queue;
10 import java.util.Collection;
11 import java.util.Collections;
12 import java.util.HashSet;
15 import java.util.concurrent.BlockingQueue;
16 import java.util.concurrent.LinkedBlockingQueue;
17 import java.util.concurrent.ScheduledThreadPoolExecutor;
19 import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
20 import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
21 import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
23 import org.opendaylight.yangtools.yang.binding.DataObject;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
30 public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
32 private static final Logger LOG = LoggerFactory
33 .getLogger(QueueKeeperLightImpl.class);
35 private Set<PopListener<DataObject>> listeners;
36 private BlockingQueue<TicketResult<DataObject>> processQueue;
37 private ScheduledThreadPoolExecutor pool;
38 private int poolSize = 10;
39 private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, DataObject>>> translatorMapping;
41 private VersionExtractor<OfHeader> versionExtractor = new VersionExtractor<OfHeader>() {
43 public Short extractVersion(OfHeader message) {
44 return message.getVersion();
52 listeners = Collections.synchronizedSet(new HashSet<PopListener<DataObject>>());
53 processQueue = new LinkedBlockingQueue<>(100);
54 pool = new ScheduledThreadPoolExecutor(poolSize);
55 TicketFinisher<DataObject> finisher = new TicketFinisher<>(processQueue, listeners);
56 new Thread(finisher).start();
60 * stop processing queue
62 public void shutdown() {
67 public void push(Class<? extends OfHeader> registeredMessageType, OfHeader message, ConnectionConductor conductor) {
68 TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
69 ticket.setConductor(conductor);
70 ticket.setMessage(message);
71 ticket.setRegisteredMessageType(registeredMessageType);
72 LOG.debug("ticket scheduling: {}, ticket: {}", registeredMessageType.getSimpleName(), System.identityHashCode(ticket));
73 //TODO: block if queue limit reached
74 processQueue.add(ticket);
75 scheduleTicket(ticket);
81 private void scheduleTicket(Ticket<OfHeader, DataObject> ticket) {
82 pool.execute(TicketProcessorFactory.createProcessor(ticket, versionExtractor, translatorMapping));
86 public synchronized void addPopListener(PopListener<DataObject> listener) {
87 listeners.add(listener);
91 public synchronized boolean removePopListener(PopListener<DataObject> listener) {
92 return listeners.remove(listener);
96 * @param poolSize the poolSize to set
98 public void setPoolSize(int poolSize) {
99 this.poolSize = poolSize;
103 public void setTranslatorMapping(
104 Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, DataObject>>> translatorMapping) {
105 this.translatorMapping = translatorMapping;