* prepare queue
*/
public void init() {
- processQueue = new LinkedBlockingQueue<>();
+ processQueue = new LinkedBlockingQueue<>(1000);
pool = new ScheduledThreadPoolExecutor(poolSize);
ticketProcessorFactory = new TicketProcessorFactory<>();
ticket.setMessage(message);
LOG.debug("ticket scheduling: {}, ticket: {}",
message.getImplementedInterface().getSimpleName(), System.identityHashCode(ticket));
- //TODO: block if queue limit reached
- processQueue.add(ticket);
- scheduleTicket(ticket);
+ try {
+ processQueue.put(ticket);
+ scheduleTicket(ticket);
+ } catch (InterruptedException e) {
+ LOG.warn("message enqueing interrupted", e);
+ }
} else if (queueType == QueueKeeper.QueueType.UNORDERED){
List<DataObject> processedMessages = translate(message,conductor);
pop(processedMessages,conductor);