e7c8ca8622581a9bc49c8c8cbb138f7e4e025d57
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / queue / TicketFinisher.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.openflow.md.queue;
9
10 import java.util.Collection;
11 import java.util.List;
12 import java.util.Map;
13 import java.util.concurrent.BlockingQueue;
14 import java.util.concurrent.ExecutionException;
15
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
18
19 /**
20  * @param <OUT> result type
21  *
22  */
23 public class TicketFinisher<OUT> implements Runnable {
24
25     private static final Logger LOG = LoggerFactory
26             .getLogger(TicketFinisher.class);
27
28     private final BlockingQueue<TicketResult<OUT>> queue;
29     private final Map<Class<? extends OUT>, Collection<PopListener<OUT>>> popListenersMapping;
30     private final RegisteredTypeExtractor<OUT> registeredOutTypeExtractor;
31
32     /**
33      * @param queue
34      * @param popListenersMapping
35      * @param registeredOutTypeExtractor
36      */
37     public TicketFinisher(BlockingQueue<TicketResult<OUT>> queue,
38             Map<Class<? extends OUT>, Collection<PopListener<OUT>>> popListenersMapping,
39             RegisteredTypeExtractor<OUT> registeredOutTypeExtractor) {
40         this.queue = queue;
41         this.popListenersMapping = popListenersMapping;
42         this.registeredOutTypeExtractor = registeredOutTypeExtractor;
43     }
44
45
46     @Override
47     public void run() {
48         while (true) {
49             try {
50                 //TODO:: handle shutdown of queue
51                 TicketResult<OUT> result = queue.take();
52                 long before = System.nanoTime();
53                 LOG.debug("finishing ticket(before): {}, {} remain in queue, {} capacity remaining", System.identityHashCode(result),queue.size(), queue.remainingCapacity());
54                 List<OUT> processedMessages = result.getResult().get();
55                 long after = System.nanoTime();
56                 LOG.debug("finishing ticket(after): {}, {} remain in queue, {} capacity remaining, processingTime {} ns", System.identityHashCode(result),queue.size(), queue.remainingCapacity(),after-before);
57                 for (OUT msg : processedMessages) {
58                     Class<? extends Object> registeredType =
59                             registeredOutTypeExtractor.extractRegisteredType(msg);
60                     Collection<PopListener<OUT>> popListeners = popListenersMapping.get(registeredType);
61                     if (popListeners == null) {
62                         LOG.warn("no popListener registered for type {}"+registeredType);
63                     } else {
64                         for (PopListener<OUT> consumer : popListeners) {
65                             consumer.onPop(msg);
66                         }
67                     }
68                 }
69             } catch (Exception e) {
70                 LOG.error(e.getMessage(), e);
71             }
72         }
73     }
74 }