Made Notification and Data service available from SessionManager
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / queue / QueueKeeperLightImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.openflow.md.queue;
9
10 import java.util.Collection;
11 import java.util.Collections;
12 import java.util.HashSet;
13 import java.util.Map;
14 import java.util.Set;
15 import java.util.concurrent.BlockingQueue;
16 import java.util.concurrent.LinkedBlockingQueue;
17 import java.util.concurrent.ScheduledThreadPoolExecutor;
18
19 import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
20 import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
21 import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
23 import org.opendaylight.yangtools.yang.binding.DataObject;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 /**
28  * @author mirehak
29  */
30 public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
31     
32     private static final Logger LOG = LoggerFactory
33             .getLogger(QueueKeeperLightImpl.class);
34     
35     private Set<PopListener<DataObject>> listeners;
36     private BlockingQueue<TicketResult<DataObject>> processQueue;
37     private ScheduledThreadPoolExecutor pool;
38     private int poolSize = 10;
39     private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, DataObject>>> translatorMapping;
40     
41     private VersionExtractor<OfHeader> versionExtractor = new VersionExtractor<OfHeader>() {
42         @Override
43         public Short extractVersion(OfHeader message) {
44             return message.getVersion();
45         }
46     }; 
47     
48     /**
49      * prepare queue
50      */
51     public void init() {
52         listeners = Collections.synchronizedSet(new HashSet<PopListener<DataObject>>());
53         processQueue = new LinkedBlockingQueue<>(100);
54         pool = new ScheduledThreadPoolExecutor(poolSize);
55         TicketFinisher<DataObject> finisher = new TicketFinisher<>(processQueue, listeners);
56         new Thread(finisher).start();
57     }
58     
59     /**
60      * stop processing queue
61      */
62     public void shutdown() {
63         pool.shutdown();
64     }
65
66     @Override
67     public void push(Class<? extends OfHeader> registeredMessageType, OfHeader message, ConnectionConductor conductor) {
68         TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
69         ticket.setConductor(conductor);
70         ticket.setMessage(message);
71         ticket.setRegisteredMessageType(registeredMessageType);
72         LOG.debug("ticket scheduling: {}, ticket: {}", registeredMessageType.getSimpleName(), System.identityHashCode(ticket));
73         //TODO: block if queue limit reached 
74         processQueue.add(ticket);
75         scheduleTicket(ticket);
76     }
77
78     /**
79      * @param ticket
80      */
81     private void scheduleTicket(Ticket<OfHeader, DataObject> ticket) {
82         pool.execute(TicketProcessorFactory.createProcessor(ticket, versionExtractor, translatorMapping));
83     }
84
85     @Override
86     public synchronized void addPopListener(PopListener<DataObject> listener) {
87         listeners.add(listener);
88     }
89
90     @Override
91     public synchronized boolean removePopListener(PopListener<DataObject> listener) {
92         return listeners.remove(listener);
93     }
94
95     /**
96      * @param poolSize the poolSize to set
97      */
98     public void setPoolSize(int poolSize) {
99         this.poolSize = poolSize;
100     }
101
102     @Override
103     public void setTranslatorMapping(
104             Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, DataObject>>> translatorMapping) {
105         this.translatorMapping = translatorMapping;
106     }
107 }