BUG-542 - adding overall statictics
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / queue / QueueKeeperLightImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.openflow.md.queue;
9
10 import java.util.ArrayList;
11 import java.util.Collection;
12 import java.util.List;
13 import java.util.Map;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.LinkedBlockingQueue;
16 import java.util.concurrent.ScheduledThreadPoolExecutor;
17
18 import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
19 import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
20 import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
21 import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
22 import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy.STATISTIC_GROUP;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
24 import org.opendaylight.yangtools.yang.binding.DataContainer;
25 import org.opendaylight.yangtools.yang.binding.DataObject;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 /**
30  * @author mirehak
31  */
32 public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
33
34     private static final Logger LOG = LoggerFactory
35             .getLogger(QueueKeeperLightImpl.class);
36
37     private Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping;
38     private BlockingQueue<TicketResult<DataObject>> processQueue;
39     private ScheduledThreadPoolExecutor pool;
40     private int poolSize = 10;
41     private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping;
42     private TicketProcessorFactory<OfHeader, DataObject> ticketProcessorFactory;
43     private MessageSpy<DataContainer> messageSpy;
44
45     private VersionExtractor<OfHeader> versionExtractor = new VersionExtractor<OfHeader>() {
46         @Override
47         public Short extractVersion(OfHeader message) {
48             return message.getVersion();
49         }
50     };
51
52     private RegisteredTypeExtractor<OfHeader> registeredSrcTypeExtractor =
53             new RegisteredTypeExtractor<OfHeader>() {
54         @SuppressWarnings("unchecked")
55         @Override
56         public Class<? extends OfHeader> extractRegisteredType(
57                 OfHeader message) {
58             return (Class<? extends OfHeader>) message.getImplementedInterface();
59         }
60     };
61
62     private RegisteredTypeExtractor<DataObject> registeredOutTypeExtractor =
63             new RegisteredTypeExtractor<DataObject>() {
64         @SuppressWarnings("unchecked")
65         @Override
66         public Class<? extends DataObject> extractRegisteredType(
67                 DataObject message) {
68             return (Class<? extends DataObject>) message.getImplementedInterface();
69         }
70     };
71
72     /**
73      * prepare queue
74      */
75     public void init() {
76         processQueue = new LinkedBlockingQueue<>(1000);
77         pool = new ScheduledThreadPoolExecutor(poolSize);
78
79         ticketProcessorFactory = new TicketProcessorFactory<>();
80         ticketProcessorFactory.setRegisteredTypeExtractor(registeredSrcTypeExtractor);
81         ticketProcessorFactory.setTranslatorMapping(translatorMapping);
82         ticketProcessorFactory.setVersionExtractor(versionExtractor);
83         ticketProcessorFactory.setSpy(messageSpy);
84
85         TicketFinisher<DataObject> finisher = new TicketFinisher<>(
86                 processQueue, popListenersMapping, registeredOutTypeExtractor);
87         new Thread(finisher).start();
88     }
89
90     /**
91      * stop processing queue
92      */
93     public void shutdown() {
94         pool.shutdown();
95     }
96
97     @Override
98     public void push(OfHeader message, ConnectionConductor conductor) {
99         push(message,conductor,QueueKeeper.QueueType.DEFAULT);
100     }
101
102     @Override
103     public void push(OfHeader message, ConnectionConductor conductor, QueueType queueType) {
104         messageSpy.spyMessage(message, STATISTIC_GROUP.FROM_SWITCH_ENQUEUED);
105         if(queueType == QueueKeeper.QueueType.DEFAULT) {
106             TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
107             ticket.setConductor(conductor);
108             ticket.setMessage(message);
109             LOG.debug("ticket scheduling: {}, ticket: {}",
110                     message.getImplementedInterface().getSimpleName(), System.identityHashCode(ticket));
111             try {
112                 processQueue.put(ticket);
113                 scheduleTicket(ticket);
114             } catch (InterruptedException e) {
115                 LOG.warn("message enqueing interrupted", e);
116             }
117         } else if (queueType == QueueKeeper.QueueType.UNORDERED){
118             List<DataObject> processedMessages = translate(message,conductor);
119             pop(processedMessages,conductor);
120         }
121     }
122
123     /**
124      * @param ticket
125      */
126     private void scheduleTicket(Ticket<OfHeader, DataObject> ticket) {
127         Runnable ticketProcessor = ticketProcessorFactory.createProcessor(ticket);
128         pool.execute(ticketProcessor);
129     }
130
131     /**
132      * @param poolSize the poolSize to set
133      */
134     public void setPoolSize(int poolSize) {
135         this.poolSize = poolSize;
136     }
137
138     @Override
139     public void setTranslatorMapping(
140             Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping) {
141         this.translatorMapping = translatorMapping;
142     }
143
144     @Override
145     public void setPopListenersMapping(
146             Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenersMapping) {
147         this.popListenersMapping = popListenersMapping;
148     }
149
150     /**
151      * @param messageSpy the messageSpy to set
152      */
153     public void setMessageSpy(MessageSpy<DataContainer> messageSpy) {
154         this.messageSpy = messageSpy;
155     }
156
157     private List<DataObject> translate(OfHeader message, ConnectionConductor conductor) {
158         List<DataObject> result = new ArrayList<>();
159         Class<? extends OfHeader> messageType = registeredSrcTypeExtractor.extractRegisteredType(message);
160         Collection<IMDMessageTranslator<OfHeader, List<DataObject>>> translators = null;
161         LOG.debug("translating message: {}", messageType.getSimpleName());
162
163         Short version = versionExtractor.extractVersion(message);
164         if (version == null) {
165            throw new IllegalArgumentException("version is NULL");
166         }
167         TranslatorKey tKey = new TranslatorKey(version, messageType.getName());
168         translators = translatorMapping.get(tKey);
169
170         LOG.debug("translatorKey: {} + {}", version, messageType.getName());
171
172         if (translators != null) {
173             for (IMDMessageTranslator<OfHeader, List<DataObject>> translator : translators) {
174                 SwitchConnectionDistinguisher cookie = null;
175                 // Pass cookie only for PACKT_IN
176                 if (messageType.equals("PacketInMessage.class")) {
177                     cookie = conductor.getAuxiliaryKey();
178                 }
179                 List<DataObject> translatorOutput = translator.translate(cookie, conductor.getSessionContext(), message);
180                 if(translatorOutput != null) {
181                     result.addAll(translatorOutput);
182                 }
183             }
184             if (messageSpy != null) {
185                 messageSpy.spyIn(message);
186                 for (DataObject outMsg : result) {
187                     messageSpy.spyOut(outMsg);
188                 }
189             }
190         } else {
191             LOG.warn("No translators for this message Type: {}", messageType);
192             messageSpy.spyMessage(message, MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
193         }
194         return result;
195     }
196
197     /**
198      * @param processedMessages
199      * @param conductor
200      */
201     private void pop(List<DataObject> processedMessages, ConnectionConductor conductor) {
202         for (DataObject msg : processedMessages) {
203             Class<? extends Object> registeredType =
204                     registeredOutTypeExtractor.extractRegisteredType(msg);
205             Collection<PopListener<DataObject>> popListeners = popListenersMapping.get(registeredType);
206             if (popListeners == null) {
207                 LOG.warn("no popListener registered for type {}"+registeredType);
208             } else {
209                 for (PopListener<DataObject> consumer : popListeners) {
210                     consumer.onPop(msg);
211                 }
212             }
213         }
214     }
215 }