* prepare queue
*/
public void init() {
- processQueue = new LinkedBlockingQueue<>(100);
+ processQueue = new LinkedBlockingQueue<>();
pool = new ScheduledThreadPoolExecutor(poolSize);
-
+
ticketProcessorFactory = new TicketProcessorFactory<>();
ticketProcessorFactory.setRegisteredTypeExtractor(registeredSrcTypeExtractor);
ticketProcessorFactory.setTranslatorMapping(translatorMapping);
ticketProcessorFactory.setVersionExtractor(versionExtractor);
ticketProcessorFactory.setSpy(messageSpy);
-
+
TicketFinisher<DataObject> finisher = new TicketFinisher<>(
processQueue, popListenersMapping, registeredOutTypeExtractor);
new Thread(finisher).start();
public void push(OfHeader message, ConnectionConductor conductor) {
push(message,conductor,QueueKeeper.QueueType.DEFAULT);
}
-
+
@Override
public void push(OfHeader message, ConnectionConductor conductor, QueueType queueType) {
if(queueType == QueueKeeper.QueueType.DEFAULT) {
Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping) {
this.popListenersMapping = popListenersMapping;
}
-
+
/**
* @param messageSpy the messageSpy to set
*/
public void setMessageSpy(MessageSpy<OfHeader, DataObject> messageSpy) {
this.messageSpy = messageSpy;
}
-
+
private List<DataObject> translate(OfHeader message, ConnectionConductor conductor) {
List<DataObject> result = new ArrayList<>();
Class<? extends OfHeader> messageType = registeredSrcTypeExtractor.extractRegisteredType(message);
}
return result;
}
-
+
private void pop(List<DataObject> processedMessages,ConnectionConductor conductor) {
for (DataObject msg : processedMessages) {
- Class<? extends Object> registeredType =
+ Class<? extends Object> registeredType =
registeredOutTypeExtractor.extractRegisteredType(msg);
Collection<PopListener<DataObject>> popListeners = popListenersMapping.get(registeredType);
if (popListeners == null) {