*
*/
public class TicketFinisher<OUT> implements Runnable {
-
+
private static final Logger LOG = LoggerFactory
.getLogger(TicketFinisher.class);
-
+
private final BlockingQueue<TicketResult<OUT>> queue;
private final Map<Class<? extends OUT>, Collection<PopListener<OUT>>> popListenersMapping;
private final RegisteredTypeExtractor<OUT> registeredOutTypeExtractor;
-
+
/**
* @param queue
* @param popListenersMapping
* @param registeredOutTypeExtractor
*/
public TicketFinisher(BlockingQueue<TicketResult<OUT>> queue,
- Map<Class<? extends OUT>, Collection<PopListener<OUT>>> popListenersMapping,
+ Map<Class<? extends OUT>, Collection<PopListener<OUT>>> popListenersMapping,
RegisteredTypeExtractor<OUT> registeredOutTypeExtractor) {
this.queue = queue;
this.popListenersMapping = popListenersMapping;
try {
//TODO:: handle shutdown of queue
TicketResult<OUT> result = queue.take();
+ long before = System.nanoTime();
+ LOG.debug("finishing ticket(before): {}, {} remain in queue, {} capacity remaining", System.identityHashCode(result),queue.size(), queue.remainingCapacity());
List<OUT> processedMessages = result.getResult().get();
- LOG.debug("finishing ticket: {}", System.identityHashCode(result));
+ long after = System.nanoTime();
+ LOG.debug("finishing ticket(after): {}, {} remain in queue, {} capacity remaining, processingTime {} ns", System.identityHashCode(result),queue.size(), queue.remainingCapacity(),after-before);
for (OUT msg : processedMessages) {
- Class<? extends Object> registeredType =
+ Class<? extends Object> registeredType =
registeredOutTypeExtractor.extractRegisteredType(msg);
Collection<PopListener<OUT>> popListeners = popListenersMapping.get(registeredType);
if (popListeners == null) {
protected static final Logger LOG = LoggerFactory
.getLogger(TicketProcessorFactory.class);
-
+
protected VersionExtractor<IN> versionExtractor;
protected RegisteredTypeExtractor<IN> registeredTypeExtractor;
protected Map<TranslatorKey, Collection<IMDMessageTranslator<IN, List<OUT>>>> translatorMapping;
protected MessageSpy<IN, OUT> spy;
-
+
/**
* @param versionExtractor the versionExtractor to set
*/
* @return runnable ticket processor
*/
public Runnable createProcessor(final Ticket<IN, OUT> ticket) {
-
+
Runnable ticketProcessor = new Runnable() {
@Override
public void run() {
if (messageType.equals("PacketInMessage.class")) {
cookie = conductor.getAuxiliaryKey();
}
+ long start = System.nanoTime();
List<OUT> translatorOutput = translator.translate(cookie, conductor.getSessionContext(), message);
+ long end = System.nanoTime();
+ LOG.debug("translator: {} elapsed time {} ns",translator,end-start);
if(translatorOutput != null && !translatorOutput.isEmpty()) {
result.addAll(translatorOutput);
}