bug 2446 - High priority (control) queue stop reading from channel if is full
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / ConnectionConductorImpl.java
1 /**
2  * Copyright (c) 2013-2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.openflow.md.core;
10
11 import java.util.concurrent.Future;
12 import java.util.concurrent.LinkedBlockingQueue;
13 import java.util.concurrent.ThreadPoolExecutor;
14 import java.util.concurrent.TimeUnit;
15
16 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
17 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
18 import org.opendaylight.openflowplugin.api.OFConstants;
19 import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
20 import org.opendaylight.openflowplugin.api.openflow.md.core.ErrorHandler;
21 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener;
22 import org.opendaylight.openflowplugin.api.openflow.md.core.NotificationEnqueuer;
23 import org.opendaylight.openflowplugin.api.openflow.md.core.NotificationQueueWrapper;
24 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
25 import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
26 import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionManager;
27 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper;
28 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper.QueueType;
29 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueProcessor;
30 import org.opendaylight.openflowplugin.api.openflow.md.queue.WaterMarkListener;
31 import org.opendaylight.openflowplugin.api.openflow.md.queue.WaterMarkListenerImpl;
32 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
33 import org.opendaylight.openflowplugin.openflow.md.core.session.PortFeaturesUtil;
34 import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperFactory;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartRequestFlags;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInputBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestDescCaseBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestGroupFeaturesCaseBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestMeterFeaturesCaseBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestPortDescCaseBuilder;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
61 import org.opendaylight.yangtools.yang.binding.DataObject;
62 import org.opendaylight.yangtools.yang.common.RpcError;
63 import org.opendaylight.yangtools.yang.common.RpcResult;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
66
67 import com.google.common.util.concurrent.Futures;
68
69 /**
70  * @author mirehak
71  */
72 public class ConnectionConductorImpl implements OpenflowProtocolListener,
73         SystemNotificationsListener, ConnectionConductor,
74         ConnectionReadyListener, HandshakeListener, NotificationEnqueuer,
75         AutoCloseable {
76
77     /**
78      * ingress queue limit
79      */
80     private static final int INGRESS_QUEUE_MAX_SIZE = 200;
81
82     protected static final Logger LOG = LoggerFactory
83             .getLogger(ConnectionConductorImpl.class);
84
85     /*
86      * variable to make BitMap-based negotiation enabled / disabled. it will
87      * help while testing and isolating issues related to processing of BitMaps
88      * from switches.
89      */
90     private boolean isBitmapNegotiationEnable = true;
91     protected ErrorHandler errorHandler;
92
93     private final ConnectionAdapter connectionAdapter;
94     private ConnectionConductor.CONDUCTOR_STATE conductorState;
95     private Short version;
96
97     protected SwitchConnectionDistinguisher auxiliaryKey;
98
99     protected SessionContext sessionContext;
100
101     private QueueProcessor<OfHeader, DataObject> queueProcessor;
102     private QueueKeeper<OfHeader> queue;
103     private ThreadPoolExecutor hsPool;
104     private HandshakeManager handshakeManager;
105
106     private boolean firstHelloProcessed;
107
108     private PortFeaturesUtil portFeaturesUtils;
109
110     private int conductorId;
111
112     private int ingressMaxQueueSize;
113
114     /**
115      * @param connectionAdapter
116      */
117     public ConnectionConductorImpl(ConnectionAdapter connectionAdapter) {
118         this(connectionAdapter, INGRESS_QUEUE_MAX_SIZE);
119     }
120
121     /**
122      * @param connectionAdapter
123      * @param ingressMaxQueueSize
124      *            ingress queue limit (blocking)
125      */
126     public ConnectionConductorImpl(ConnectionAdapter connectionAdapter,
127             int ingressMaxQueueSize) {
128         this.connectionAdapter = connectionAdapter;
129         this.ingressMaxQueueSize = ingressMaxQueueSize;
130         conductorState = CONDUCTOR_STATE.HANDSHAKING;
131         firstHelloProcessed = false;
132         handshakeManager = new HandshakeManagerImpl(connectionAdapter,
133                 ConnectionConductor.versionOrder.get(0),
134                 ConnectionConductor.versionOrder);
135         handshakeManager.setUseVersionBitmap(isBitmapNegotiationEnable);
136         handshakeManager.setHandshakeListener(this);
137         portFeaturesUtils = PortFeaturesUtil.getInstance();
138     }
139
140     @Override
141     public void init() {
142         int handshakeThreadLimit = 1;
143         hsPool = new ThreadPoolLoggingExecutor(handshakeThreadLimit,
144                 handshakeThreadLimit, 0L, TimeUnit.MILLISECONDS,
145                 new LinkedBlockingQueue<Runnable>(), "OFHandshake-"
146                         + conductorId);
147
148         connectionAdapter.setMessageListener(this);
149         connectionAdapter.setSystemListener(this);
150         connectionAdapter.setConnectionReadyListener(this);
151         WaterMarkListener waterMarkListener = new WaterMarkListenerImpl(
152                 connectionAdapter);
153         queue = QueueKeeperFactory.createFairQueueKeeper(queueProcessor,
154                 ingressMaxQueueSize, waterMarkListener);
155     }
156
157     @Override
158     public void setQueueProcessor(
159             QueueProcessor<OfHeader, DataObject> queueProcessor) {
160         this.queueProcessor = queueProcessor;
161     }
162
163     /**
164      * @param errorHandler
165      *            the errorHandler to set
166      */
167     @Override
168     public void setErrorHandler(ErrorHandler errorHandler) {
169         this.errorHandler = errorHandler;
170         handshakeManager.setErrorHandler(errorHandler);
171     }
172
173     @Override
174     public void onEchoRequestMessage(final EchoRequestMessage echoRequestMessage) {
175         new Thread(new Runnable() {
176             @Override
177             public void run() {
178                 LOG.debug("echo request received: "
179                         + echoRequestMessage.getXid());
180                 EchoReplyInputBuilder builder = new EchoReplyInputBuilder();
181                 builder.setVersion(echoRequestMessage.getVersion());
182                 builder.setXid(echoRequestMessage.getXid());
183                 builder.setData(echoRequestMessage.getData());
184
185                 getConnectionAdapter().echoReply(builder.build());
186             }
187         }).start();
188     }
189
190     @Override
191     public void onErrorMessage(ErrorMessage errorMessage) {
192         enqueueMessage(errorMessage);
193     }
194
195     /**
196      * @param message
197      */
198     private void enqueueMessage(OfHeader message) {
199         enqueueMessage(message, QueueType.DEFAULT);
200     }
201
202     @Override
203     public void enqueueNotification(NotificationQueueWrapper notification) {
204         enqueueMessage(notification);
205     }
206
207     /**
208      * @param message
209      * @param queueType
210      *            enqueue type
211      */
212     private void enqueueMessage(OfHeader message, QueueType queueType) {
213         queue.push(message, this, queueType);
214     }
215
216     @Override
217     public void onExperimenterMessage(ExperimenterMessage experimenterMessage) {
218         enqueueMessage(experimenterMessage);
219     }
220
221     @Override
222     public void onFlowRemovedMessage(FlowRemovedMessage message) {
223         enqueueMessage(message);
224     }
225
226     /**
227      * version negotiation happened as per following steps: 1. If HelloMessage
228      * version field has same version, continue connection processing. If
229      * HelloMessage version is lower than supported versions, just disconnect.
230      * 2. If HelloMessage contains bitmap and common version found in bitmap
231      * then continue connection processing. if no common version found, just
232      * disconnect. 3. If HelloMessage version is not supported, send
233      * HelloMessage with lower supported version. 4. If Hello message received
234      * again with not supported version, just disconnect.
235      */
236     @Override
237     public void onHelloMessage(final HelloMessage hello) {
238         LOG.debug("processing HELLO.xid: {}", hello.getXid());
239         firstHelloProcessed = true;
240         checkState(CONDUCTOR_STATE.HANDSHAKING);
241         HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper(
242                 hello, handshakeManager, connectionAdapter);
243         hsPool.submit(handshakeStepWrapper);
244     }
245
246     /**
247      * @return rpc-response timeout in [ms]
248      */
249     protected long getMaxTimeout() {
250         // TODO:: get from configuration
251         return 2000;
252     }
253
254     /**
255      * @return milliseconds
256      */
257     protected TimeUnit getMaxTimeoutUnit() {
258         // TODO:: get from configuration
259         return TimeUnit.MILLISECONDS;
260     }
261
262     @Override
263     public void onMultipartReplyMessage(MultipartReplyMessage message) {
264         enqueueMessage(message);
265     }
266
267     @Override
268     public void onPacketInMessage(PacketInMessage message) {
269         enqueueMessage(message, QueueKeeper.QueueType.UNORDERED);
270     }
271
272     @Override
273     public void onPortStatusMessage(PortStatusMessage message) {
274         processPortStatusMsg(message);
275         enqueueMessage(message);
276     }
277
278     protected void processPortStatusMsg(PortStatus msg) {
279         if (msg.getReason().getIntValue() == 2) {
280             updatePort(msg);
281         } else if (msg.getReason().getIntValue() == 0) {
282             updatePort(msg);
283         } else if (msg.getReason().getIntValue() == 1) {
284             deletePort(msg);
285         }
286     }
287
288     protected void updatePort(PortStatus msg) {
289         Long portNumber = msg.getPortNo();
290         Boolean portBandwidth = portFeaturesUtils.getPortBandwidth(msg);
291
292         if (portBandwidth == null) {
293             LOG.debug(
294                     "can't get bandwidth info from port: {}, aborting port update",
295                     msg.toString());
296         } else {
297             this.getSessionContext().getPhysicalPorts().put(portNumber, msg);
298             this.getSessionContext().getPortsBandwidth()
299                     .put(portNumber, portBandwidth);
300         }
301     }
302
303     protected void deletePort(PortGrouping port) {
304         Long portNumber = port.getPortNo();
305
306         this.getSessionContext().getPhysicalPorts().remove(portNumber);
307         this.getSessionContext().getPortsBandwidth().remove(portNumber);
308     }
309
310     @Override
311     public void onSwitchIdleEvent(SwitchIdleEvent notification) {
312         new Thread(new Runnable() {
313             @Override
314             public void run() {
315                 if (!CONDUCTOR_STATE.WORKING.equals(getConductorState())) {
316                     // idle state in any other conductorState than WORKING means
317                     // real
318                     // problem and wont be handled by echoReply, but
319                     // disconnection
320                     disconnect();
321                     OFSessionUtil.getSessionManager().invalidateOnDisconnect(
322                             ConnectionConductorImpl.this);
323                 } else {
324                     LOG.debug(
325                             "first idle state occured, sessionCtx={}|auxId={}",
326                             sessionContext, auxiliaryKey);
327                     EchoInputBuilder builder = new EchoInputBuilder();
328                     builder.setVersion(getVersion());
329                     builder.setXid(getSessionContext().getNextXid());
330
331                     Future<RpcResult<EchoOutput>> echoReplyFuture = getConnectionAdapter()
332                             .echo(builder.build());
333
334                     try {
335                         RpcResult<EchoOutput> echoReplyValue = echoReplyFuture
336                                 .get(getMaxTimeout(), getMaxTimeoutUnit());
337                         if (echoReplyValue.isSuccessful()) {
338                             setConductorState(CONDUCTOR_STATE.WORKING);
339                         } else {
340                             for (RpcError replyError : echoReplyValue
341                                     .getErrors()) {
342                                 Throwable cause = replyError.getCause();
343                                 LOG.error(
344                                         "while receiving echoReply in TIMEOUTING state: "
345                                                 + cause.getMessage(), cause);
346                             }
347                             // switch issue occurred
348                             throw new Exception("switch issue occurred");
349                         }
350                     } catch (Exception e) {
351                         LOG.error("while waiting for echoReply in TIMEOUTING state: "
352                                 + e.getMessage());
353                         errorHandler.handleException(e, sessionContext);
354                         // switch is not responding
355                         disconnect();
356                         OFSessionUtil.getSessionManager()
357                                 .invalidateOnDisconnect(
358                                         ConnectionConductorImpl.this);
359                     }
360                 }
361             }
362
363         }).start();
364     }
365
366     /**
367      * @param conductorState
368      *            the connectionState to set
369      */
370     @Override
371     public void setConductorState(CONDUCTOR_STATE conductorState) {
372         this.conductorState = conductorState;
373     }
374
375     @Override
376     public CONDUCTOR_STATE getConductorState() {
377         return conductorState;
378     }
379
380     /**
381      * @param expectedState
382      */
383     protected void checkState(CONDUCTOR_STATE expectedState) {
384         if (!conductorState.equals(expectedState)) {
385             throw new IllegalStateException("Expected state: " + expectedState
386                     + ", actual state:" + conductorState);
387         }
388     }
389
390     @Override
391     public void onDisconnectEvent(DisconnectEvent arg0) {
392         SessionManager sessionManager = OFSessionUtil.getSessionManager();
393         sessionManager.invalidateOnDisconnect(this);
394         close();
395     }
396
397     @Override
398     public Short getVersion() {
399         return version;
400     }
401
402     @Override
403     public Future<Boolean> disconnect() {
404         LOG.trace("disconnecting: sessionCtx={}|auxId={}", sessionContext,
405                 auxiliaryKey);
406
407         Future<Boolean> result = null;
408         if (connectionAdapter.isAlive()) {
409             result = connectionAdapter.disconnect();
410         } else {
411             LOG.debug("connection already disconnected");
412             result = Futures.immediateFuture(true);
413         }
414         close();
415         return result;
416     }
417
418     @Override
419     public void setConnectionCookie(SwitchConnectionDistinguisher auxiliaryKey) {
420         this.auxiliaryKey = auxiliaryKey;
421     }
422
423     @Override
424     public void setSessionContext(SessionContext sessionContext) {
425         this.sessionContext = sessionContext;
426     }
427
428     @Override
429     public SwitchConnectionDistinguisher getAuxiliaryKey() {
430         return auxiliaryKey;
431     }
432
433     @Override
434     public SessionContext getSessionContext() {
435         return sessionContext;
436     }
437
438     @Override
439     public ConnectionAdapter getConnectionAdapter() {
440         return connectionAdapter;
441     }
442
443     @Override
444     public void onConnectionReady() {
445         LOG.debug("connection is ready-to-use");
446         if (!firstHelloProcessed) {
447             HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper(
448                     null, handshakeManager, connectionAdapter);
449             hsPool.execute(handshakeStepWrapper);
450             firstHelloProcessed = true;
451         } else {
452             LOG.debug("already touched by hello message");
453         }
454     }
455
456     @Override
457     public void onHandshakeSuccessfull(GetFeaturesOutput featureOutput,
458             Short negotiatedVersion) {
459         postHandshakeBasic(featureOutput, negotiatedVersion);
460
461         // post-handshake actions
462         if (version == OFConstants.OFP_VERSION_1_3) {
463             requestPorts();
464             requestGroupFeatures();
465             requestMeterFeatures();
466         }
467
468         requestDesc();
469     }
470
471     @Override
472     public void onHandshakeFailure() {
473         LOG.info("OF handshake failed, doing cleanup.");
474         close();
475     }
476
477     /**
478      * used by tests
479      * 
480      * @param featureOutput
481      * @param negotiatedVersion
482      */
483     protected void postHandshakeBasic(GetFeaturesOutput featureOutput,
484             Short negotiatedVersion) {
485         version = negotiatedVersion;
486         if (version == OFConstants.OFP_VERSION_1_0) {
487             // Because the GetFeaturesOutput contains information about the port
488             // in OF1.0 (that we would otherwise get from the PortDesc) we have
489             // to pass
490             // it up for parsing to convert into a NodeConnectorUpdate
491             //
492             // BUG-1988 - this must be the first item in queue in order not to
493             // get behind link-up message
494             enqueueMessage(featureOutput);
495         }
496
497         OFSessionUtil.registerSession(this, featureOutput, negotiatedVersion);
498         hsPool.shutdown();
499         hsPool.purge();
500         conductorState = CONDUCTOR_STATE.WORKING;
501         QueueKeeperFactory.plugQueue(queueProcessor, queue);
502     }
503
504     /*
505      * Send an OFPMP_DESC request message to the switch
506      */
507     private void requestDesc() {
508         MultipartRequestInputBuilder builder = new MultipartRequestInputBuilder();
509         builder.setType(MultipartType.OFPMPDESC);
510         builder.setVersion(getVersion());
511         builder.setFlags(new MultipartRequestFlags(false));
512         builder.setMultipartRequestBody(new MultipartRequestDescCaseBuilder()
513                 .build());
514         builder.setXid(getSessionContext().getNextXid());
515         getConnectionAdapter().multipartRequest(builder.build());
516     }
517
518     private void requestPorts() {
519         MultipartRequestInputBuilder builder = new MultipartRequestInputBuilder();
520         builder.setType(MultipartType.OFPMPPORTDESC);
521         builder.setVersion(getVersion());
522         builder.setFlags(new MultipartRequestFlags(false));
523         builder.setMultipartRequestBody(new MultipartRequestPortDescCaseBuilder()
524                 .build());
525         builder.setXid(getSessionContext().getNextXid());
526         getConnectionAdapter().multipartRequest(builder.build());
527     }
528
529     private void requestGroupFeatures() {
530         MultipartRequestInputBuilder mprInput = new MultipartRequestInputBuilder();
531         mprInput.setType(MultipartType.OFPMPGROUPFEATURES);
532         mprInput.setVersion(getVersion());
533         mprInput.setFlags(new MultipartRequestFlags(false));
534         mprInput.setXid(getSessionContext().getNextXid());
535
536         MultipartRequestGroupFeaturesCaseBuilder mprGroupFeaturesBuild = new MultipartRequestGroupFeaturesCaseBuilder();
537         mprInput.setMultipartRequestBody(mprGroupFeaturesBuild.build());
538
539         LOG.debug("Send group features statistics request :{}",
540                 mprGroupFeaturesBuild);
541         getConnectionAdapter().multipartRequest(mprInput.build());
542
543     }
544
545     private void requestMeterFeatures() {
546         MultipartRequestInputBuilder mprInput = new MultipartRequestInputBuilder();
547         mprInput.setType(MultipartType.OFPMPMETERFEATURES);
548         mprInput.setVersion(getVersion());
549         mprInput.setFlags(new MultipartRequestFlags(false));
550         mprInput.setXid(getSessionContext().getNextXid());
551
552         MultipartRequestMeterFeaturesCaseBuilder mprMeterFeaturesBuild = new MultipartRequestMeterFeaturesCaseBuilder();
553         mprInput.setMultipartRequestBody(mprMeterFeaturesBuild.build());
554
555         LOG.debug("Send meter features statistics request :{}",
556                 mprMeterFeaturesBuild);
557         getConnectionAdapter().multipartRequest(mprInput.build());
558
559     }
560
561     /**
562      * @param isBitmapNegotiationEnable
563      *            the isBitmapNegotiationEnable to set
564      */
565     public void setBitmapNegotiationEnable(boolean isBitmapNegotiationEnable) {
566         this.isBitmapNegotiationEnable = isBitmapNegotiationEnable;
567     }
568
569     protected void shutdownPool() {
570         hsPool.shutdownNow();
571         LOG.debug("pool is terminated: {}", hsPool.isTerminated());
572     }
573
574     protected void shutdownPoolPolitely() {
575         hsPool.shutdown();
576         try {
577             hsPool.awaitTermination(1, TimeUnit.SECONDS);
578         } catch (InterruptedException e) {
579             LOG.info("Error while awaiting termination on pool. Will use shutdownNow method.");
580             shutdownPool();
581         }
582         hsPool.purge();
583         LOG.debug("pool is terminated: {}", hsPool.isTerminated());
584     }
585
586     @Override
587     public void setId(int conductorId) {
588         this.conductorId = conductorId;
589     }
590
591     @Override
592     public void close() {
593         shutdownPoolPolitely();
594         conductorState = CONDUCTOR_STATE.RIP;
595     }
596 }