import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
*
*/
public class TicketFinisher<OUT> implements Runnable {
-
+
private static final Logger LOG = LoggerFactory
.getLogger(TicketFinisher.class);
-
+
private final BlockingQueue<TicketResult<OUT>> queue;
private final Map<Class<? extends OUT>, Collection<PopListener<OUT>>> popListenersMapping;
private final RegisteredTypeExtractor<OUT> registeredOutTypeExtractor;
-
+
/**
* @param queue
* @param popListenersMapping
* @param registeredOutTypeExtractor
*/
public TicketFinisher(BlockingQueue<TicketResult<OUT>> queue,
- Map<Class<? extends OUT>, Collection<PopListener<OUT>>> popListenersMapping,
+ Map<Class<? extends OUT>, Collection<PopListener<OUT>>> popListenersMapping,
RegisteredTypeExtractor<OUT> registeredOutTypeExtractor) {
this.queue = queue;
this.popListenersMapping = popListenersMapping;
try {
//TODO:: handle shutdown of queue
TicketResult<OUT> result = queue.take();
+ long before = System.nanoTime();
+ LOG.debug("finishing ticket(before): {}, {} remain in queue, {} capacity remaining", System.identityHashCode(result),queue.size(), queue.remainingCapacity());
List<OUT> processedMessages = result.getResult().get();
- LOG.debug("finishing ticket: {}", System.identityHashCode(result));
+ long after = System.nanoTime();
+ LOG.debug("finishing ticket(after): {}, {} remain in queue, {} capacity remaining, processingTime {} ns", System.identityHashCode(result),queue.size(), queue.remainingCapacity(),after-before);
for (OUT msg : processedMessages) {
- Class<? extends Object> registeredType =
+ Class<? extends Object> registeredType =
registeredOutTypeExtractor.extractRegisteredType(msg);
Collection<PopListener<OUT>> popListeners = popListenersMapping.get(registeredType);
if (popListeners == null) {