BUG-1075: ingress back pressure
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / queue / TicketFinisherImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.openflow.md.queue;
9
10 import java.util.Collection;
11 import java.util.List;
12 import java.util.Map;
13 import java.util.concurrent.BlockingQueue;
14
15 import org.opendaylight.yangtools.yang.binding.DataObject;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
18
19 /**
20  *
21  */
22 public class TicketFinisherImpl implements TicketFinisher<DataObject> {
23
24     private static final Logger LOG = LoggerFactory
25             .getLogger(TicketFinisherImpl.class);
26
27     private final Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping;
28
29     private boolean finished;
30
31     private BlockingQueue<TicketResult<DataObject>> queue;
32
33     /**
34      * @param queue
35      * @param popListenersMapping
36      */
37     public TicketFinisherImpl(BlockingQueue<TicketResult<DataObject>> queue,
38             Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping) {
39         this.queue = queue;
40         this.popListenersMapping = popListenersMapping;
41     }
42     
43     @Override
44     public void run() {
45         while (! finished ) {
46             try {
47                 //TODO:: handle shutdown of queue
48                 TicketResult<DataObject> result = queue.take();
49                 List<DataObject> processedMessages = result.getResult().get();
50                 firePopNotification(processedMessages);
51             } catch (Exception e) {
52                 LOG.warn("processing (translate, publish) of ticket failed", e);
53             }
54         }
55     }
56     
57     @Override
58     public void firePopNotification(List<DataObject> processedMessages) {
59         for (DataObject msg : processedMessages) {
60             Class<? extends Object> registeredType =
61                     msg.getImplementedInterface();
62             Collection<PopListener<DataObject>> popListeners = popListenersMapping.get(registeredType);
63             if (popListeners == null) {
64                 LOG.warn("no popListener registered for type {}", registeredType);
65             } else {
66                 for (PopListener<DataObject> consumer : popListeners) {
67                     consumer.onPop(msg);
68                 }
69             }
70         }
71     }
72     
73     /**
74      * initiate shutdown of this worker
75      */
76     @Override
77     public void finish() {
78         finished = true;
79     }
80 }