78682d3f3907b86f3a41db70e3761479277cfd3e
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / queue / QueueKeeperLightImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.openflow.md.queue;
9
10 import java.util.ArrayList;
11 import java.util.Collection;
12 import java.util.List;
13 import java.util.Map;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.LinkedBlockingQueue;
16 import java.util.concurrent.ScheduledThreadPoolExecutor;
17
18 import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
19 import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
20 import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
21 import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
23 import org.opendaylight.yangtools.yang.binding.DataObject;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 /**
28  * @author mirehak
29  */
30 public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
31
32     private static final Logger LOG = LoggerFactory
33             .getLogger(QueueKeeperLightImpl.class);
34
35     private Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping;
36     private BlockingQueue<TicketResult<DataObject>> processQueue;
37     private ScheduledThreadPoolExecutor pool;
38     private int poolSize = 10;
39     private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping;
40     private TicketProcessorFactory<OfHeader, DataObject> ticketProcessorFactory;
41     private MessageSpy<OfHeader, DataObject> messageSpy;
42
43     private VersionExtractor<OfHeader> versionExtractor = new VersionExtractor<OfHeader>() {
44         @Override
45         public Short extractVersion(OfHeader message) {
46             return message.getVersion();
47         }
48     };
49
50     private RegisteredTypeExtractor<OfHeader> registeredSrcTypeExtractor =
51             new RegisteredTypeExtractor<OfHeader>() {
52         @SuppressWarnings("unchecked")
53         @Override
54         public Class<? extends OfHeader> extractRegisteredType(
55                 OfHeader message) {
56             return (Class<? extends OfHeader>) message.getImplementedInterface();
57         }
58     };
59
60     private RegisteredTypeExtractor<DataObject> registeredOutTypeExtractor =
61             new RegisteredTypeExtractor<DataObject>() {
62         @SuppressWarnings("unchecked")
63         @Override
64         public Class<? extends DataObject> extractRegisteredType(
65                 DataObject message) {
66             return (Class<? extends DataObject>) message.getImplementedInterface();
67         }
68     };
69
70     /**
71      * prepare queue
72      */
73     public void init() {
74         processQueue = new LinkedBlockingQueue<>(1000);
75         pool = new ScheduledThreadPoolExecutor(poolSize);
76
77         ticketProcessorFactory = new TicketProcessorFactory<>();
78         ticketProcessorFactory.setRegisteredTypeExtractor(registeredSrcTypeExtractor);
79         ticketProcessorFactory.setTranslatorMapping(translatorMapping);
80         ticketProcessorFactory.setVersionExtractor(versionExtractor);
81         ticketProcessorFactory.setSpy(messageSpy);
82
83         TicketFinisher<DataObject> finisher = new TicketFinisher<>(
84                 processQueue, popListenersMapping, registeredOutTypeExtractor);
85         new Thread(finisher).start();
86     }
87
88     /**
89      * stop processing queue
90      */
91     public void shutdown() {
92         pool.shutdown();
93     }
94
95     @Override
96     public void push(OfHeader message, ConnectionConductor conductor) {
97         push(message,conductor,QueueKeeper.QueueType.DEFAULT);
98     }
99
100     @Override
101     public void push(OfHeader message, ConnectionConductor conductor, QueueType queueType) {
102         if(queueType == QueueKeeper.QueueType.DEFAULT) {
103             TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
104             ticket.setConductor(conductor);
105             ticket.setMessage(message);
106             LOG.debug("ticket scheduling: {}, ticket: {}",
107                     message.getImplementedInterface().getSimpleName(), System.identityHashCode(ticket));
108             try {
109                 processQueue.put(ticket);
110                 scheduleTicket(ticket);
111             } catch (InterruptedException e) {
112                 LOG.warn("message enqueing interrupted", e);
113             }
114         } else if (queueType == QueueKeeper.QueueType.UNORDERED){
115             List<DataObject> processedMessages = translate(message,conductor);
116             pop(processedMessages,conductor);
117         }
118     }
119
120     /**
121      * @param ticket
122      */
123     private void scheduleTicket(Ticket<OfHeader, DataObject> ticket) {
124         Runnable ticketProcessor = ticketProcessorFactory.createProcessor(ticket);
125         pool.execute(ticketProcessor);
126     }
127
128     /**
129      * @param poolSize the poolSize to set
130      */
131     public void setPoolSize(int poolSize) {
132         this.poolSize = poolSize;
133     }
134
135     @Override
136     public void setTranslatorMapping(
137             Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping) {
138         this.translatorMapping = translatorMapping;
139     }
140
141     @Override
142     public void setPopListenersMapping(
143             Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping) {
144         this.popListenersMapping = popListenersMapping;
145     }
146
147     /**
148      * @param messageSpy the messageSpy to set
149      */
150     public void setMessageSpy(MessageSpy<OfHeader, DataObject> messageSpy) {
151         this.messageSpy = messageSpy;
152     }
153
154     private List<DataObject> translate(OfHeader message, ConnectionConductor conductor) {
155         List<DataObject> result = new ArrayList<>();
156         Class<? extends OfHeader> messageType = registeredSrcTypeExtractor.extractRegisteredType(message);
157         Collection<IMDMessageTranslator<OfHeader, List<DataObject>>> translators = null;
158         LOG.debug("translating message: {}", messageType.getSimpleName());
159
160         Short version = versionExtractor.extractVersion(message);
161         if (version == null) {
162            throw new IllegalArgumentException("version is NULL");
163         }
164         TranslatorKey tKey = new TranslatorKey(version, messageType.getName());
165         translators = translatorMapping.get(tKey);
166
167         LOG.debug("translatorKey: {} + {}", version, messageType.getName());
168
169         if (translators != null) {
170             for (IMDMessageTranslator<OfHeader, List<DataObject>> translator : translators) {
171                 SwitchConnectionDistinguisher cookie = null;
172                 // Pass cookie only for PACKT_IN
173                 if (messageType.equals("PacketInMessage.class")) {
174                     cookie = conductor.getAuxiliaryKey();
175                 }
176                 List<DataObject> translatorOutput = translator.translate(cookie, conductor.getSessionContext(), message);
177                 if(translatorOutput != null) {
178                     result.addAll(translatorOutput);
179                 }
180             }
181             if (messageSpy != null) {
182                 messageSpy.spyIn(message);
183                 messageSpy.spyOut(result);
184             }
185         } else {
186             LOG.warn("No translators for this message Type: {}", messageType);
187         }
188         return result;
189     }
190
191     private void pop(List<DataObject> processedMessages,ConnectionConductor conductor) {
192         for (DataObject msg : processedMessages) {
193             Class<? extends Object> registeredType =
194                     registeredOutTypeExtractor.extractRegisteredType(msg);
195             Collection<PopListener<DataObject>> popListeners = popListenersMapping.get(registeredType);
196             if (popListeners == null) {
197                 LOG.warn("no popListener registered for type {}"+registeredType);
198             } else {
199                 for (PopListener<DataObject> consumer : popListeners) {
200                     consumer.onPop(msg);
201                 }
202             }
203         }
204     }
205 }