deeb4d938215527cbe9d30c9ccee40946d562e0d
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / queue / TicketFinisher.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.openflow.md.queue;
9
10 import java.util.Collection;
11 import java.util.List;
12 import java.util.Map;
13 import java.util.concurrent.BlockingQueue;
14
15 import org.slf4j.Logger;
16 import org.slf4j.LoggerFactory;
17
18 /**
19  * @param <OUT> result type
20  *
21  */
22 public class TicketFinisher<OUT> implements Runnable {
23
24     private static final Logger LOG = LoggerFactory
25             .getLogger(TicketFinisher.class);
26
27     private final BlockingQueue<TicketResult<OUT>> queue;
28     private final Map<Class<? extends OUT>, Collection<PopListener<OUT>>> popListenersMapping;
29     private final RegisteredTypeExtractor<OUT> registeredOutTypeExtractor;
30
31     /**
32      * @param queue
33      * @param popListenersMapping
34      * @param registeredOutTypeExtractor
35      */
36     public TicketFinisher(BlockingQueue<TicketResult<OUT>> queue,
37             Map<Class<? extends OUT>, Collection<PopListener<OUT>>> popListenersMapping,
38             RegisteredTypeExtractor<OUT> registeredOutTypeExtractor) {
39         this.queue = queue;
40         this.popListenersMapping = popListenersMapping;
41         this.registeredOutTypeExtractor = registeredOutTypeExtractor;
42     }
43
44
45     @Override
46     public void run() {
47         while (true) {
48             try {
49                 //TODO:: handle shutdown of queue
50                 TicketResult<OUT> result = queue.take();
51                 long before = System.nanoTime();
52                 LOG.debug("finishing ticket(before): {}, {} remain in queue, {} capacity remaining", System.identityHashCode(result),queue.size(), queue.remainingCapacity());
53                 List<OUT> processedMessages = result.getResult().get();
54                 long after = System.nanoTime();
55                 LOG.debug("finishing ticket(after): {}, {} remain in queue, {} capacity remaining, processingTime {} ns", System.identityHashCode(result),queue.size(), queue.remainingCapacity(),after-before);
56                 for (OUT msg : processedMessages) {
57                     Class<? extends Object> registeredType =
58                             registeredOutTypeExtractor.extractRegisteredType(msg);
59                     Collection<PopListener<OUT>> popListeners = popListenersMapping.get(registeredType);
60                     if (popListeners == null) {
61                         LOG.warn("no popListener registered for type {}"+registeredType);
62                     } else {
63                         for (PopListener<OUT> consumer : popListeners) {
64                             consumer.onPop(msg);
65                         }
66                     }
67                 }
68             } catch (Exception e) {
69                 LOG.error(e.getMessage(), e);
70             }
71         }
72     }
73 }