Optimize port status and hello message handling
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / ConnectionConductorImpl.java
1 /**
2  * Copyright (c) 2013, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.openflow.md.core;
10
11 import com.google.common.util.concurrent.Futures;
12 import java.util.concurrent.Future;
13 import java.util.concurrent.TimeUnit;
14 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
15 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
16 import org.opendaylight.openflowplugin.api.OFConstants;
17 import org.opendaylight.openflowplugin.api.openflow.connection.HandshakeContext;
18 import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
19 import org.opendaylight.openflowplugin.api.openflow.md.core.ErrorHandler;
20 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener;
21 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeManager;
22 import org.opendaylight.openflowplugin.api.openflow.md.core.NotificationEnqueuer;
23 import org.opendaylight.openflowplugin.api.openflow.md.core.NotificationQueueWrapper;
24 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
25 import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
26 import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionManager;
27 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper;
28 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper.QueueType;
29 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueProcessor;
30 import org.opendaylight.openflowplugin.api.openflow.md.queue.WaterMarkListener;
31 import org.opendaylight.openflowplugin.api.openflow.md.queue.WaterMarkListenerImpl;
32 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
33 import org.opendaylight.openflowplugin.openflow.md.core.session.PortFeaturesUtil;
34 import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperFactory;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
54 import org.opendaylight.yangtools.yang.binding.DataObject;
55 import org.opendaylight.yangtools.yang.common.RpcError;
56 import org.opendaylight.yangtools.yang.common.RpcResult;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59
60 /**
61  * @author mirehak
62  */
63 public class ConnectionConductorImpl implements OpenflowProtocolListener,
64         SystemNotificationsListener, ConnectionConductor,
65         ConnectionReadyListener, HandshakeListener, NotificationEnqueuer,
66         AutoCloseable {
67
68     /**
69      * ingress queue limit
70      */
71     private static final int INGRESS_QUEUE_MAX_SIZE = 200;
72
73     protected static final Logger LOG = LoggerFactory
74             .getLogger(ConnectionConductorImpl.class);
75
76     /*
77      * variable to make BitMap-based negotiation enabled / disabled. it will
78      * help while testing and isolating issues related to processing of BitMaps
79      * from switches.
80      */
81     private boolean isBitmapNegotiationEnable = true;
82     protected ErrorHandler errorHandler;
83
84     private final ConnectionAdapter connectionAdapter;
85     private ConnectionConductor.CONDUCTOR_STATE conductorState;
86     private Short version;
87
88     protected SwitchConnectionDistinguisher auxiliaryKey;
89
90     protected SessionContext sessionContext;
91
92     private QueueProcessor<OfHeader, DataObject> queueProcessor;
93     private QueueKeeper<OfHeader> queue;
94     private HandshakeManager handshakeManager;
95
96     private boolean firstHelloProcessed;
97
98     private PortFeaturesUtil portFeaturesUtils;
99
100     private int conductorId;
101
102     private int ingressMaxQueueSize;
103     private HandshakeContext handshakeContext;
104
105     /**
106      * @param connectionAdapter connection adaptor for switch
107      */
108     public ConnectionConductorImpl(ConnectionAdapter connectionAdapter) {
109         this(connectionAdapter, INGRESS_QUEUE_MAX_SIZE);
110     }
111
112     /**
113      * @param connectionAdapter connection adaptor for switch
114      * @param ingressMaxQueueSize ingress queue limit (blocking)
115      */
116     public ConnectionConductorImpl(ConnectionAdapter connectionAdapter,
117                                    int ingressMaxQueueSize) {
118         this.connectionAdapter = connectionAdapter;
119         this.ingressMaxQueueSize = ingressMaxQueueSize;
120         conductorState = CONDUCTOR_STATE.HANDSHAKING;
121         firstHelloProcessed = false;
122         handshakeManager = new HandshakeManagerImpl(connectionAdapter,
123                 ConnectionConductor.VERSION_ORDER.get(0),
124                 ConnectionConductor.VERSION_ORDER);
125         handshakeManager.setUseVersionBitmap(isBitmapNegotiationEnable);
126         handshakeManager.setHandshakeListener(this);
127         portFeaturesUtils = PortFeaturesUtil.getInstance();
128     }
129
130     @Override
131     public void init() {
132         connectionAdapter.setMessageListener(this);
133         connectionAdapter.setSystemListener(this);
134         connectionAdapter.setConnectionReadyListener(this);
135         WaterMarkListener waterMarkListener = new WaterMarkListenerImpl(
136                 connectionAdapter);
137         queue = QueueKeeperFactory.createFairQueueKeeper(queueProcessor,
138                 ingressMaxQueueSize, waterMarkListener);
139     }
140
141     @Override
142     public void setQueueProcessor(
143             QueueProcessor<OfHeader, DataObject> queueProcessor) {
144         this.queueProcessor = queueProcessor;
145     }
146
147     /**
148      * @param errorHandler the errorHandler to set
149      */
150     @Override
151     public void setErrorHandler(ErrorHandler errorHandler) {
152         this.errorHandler = errorHandler;
153         handshakeManager.setErrorHandler(errorHandler);
154     }
155
156     @Override
157     public void onEchoRequestMessage(final EchoRequestMessage echoRequestMessage) {
158         new Thread(new Runnable() {
159             @Override
160             public void run() {
161                 LOG.debug("echo request received: "
162                         + echoRequestMessage.getXid());
163                 EchoReplyInputBuilder builder = new EchoReplyInputBuilder();
164                 builder.setVersion(echoRequestMessage.getVersion());
165                 builder.setXid(echoRequestMessage.getXid());
166                 builder.setData(echoRequestMessage.getData());
167
168                 getConnectionAdapter().echoReply(builder.build());
169             }
170         }).start();
171     }
172
173     @Override
174     public void onErrorMessage(ErrorMessage errorMessage) {
175         enqueueMessage(errorMessage);
176     }
177
178     /**
179      * @param message
180      */
181     private void enqueueMessage(OfHeader message) {
182         enqueueMessage(message, QueueType.DEFAULT);
183     }
184
185     @Override
186     public void enqueueNotification(NotificationQueueWrapper notification) {
187         enqueueMessage(notification);
188     }
189
190     /**
191      * @param message
192      * @param queueType enqueue type
193      */
194     private void enqueueMessage(OfHeader message, QueueType queueType) {
195         queue.push(message, this, queueType);
196     }
197
198     @Override
199     public void onExperimenterMessage(ExperimenterMessage experimenterMessage) {
200         enqueueMessage(experimenterMessage);
201     }
202
203     @Override
204     public void onFlowRemovedMessage(FlowRemovedMessage message) {
205         enqueueMessage(message);
206     }
207
208     /**
209      * version negotiation happened as per following steps: 1. If HelloMessage
210      * version field has same version, continue connection processing. If
211      * HelloMessage version is lower than supported versions, just disconnect.
212      * 2. If HelloMessage contains bitmap and common version found in bitmap
213      * then continue connection processing. if no common version found, just
214      * disconnect. 3. If HelloMessage version is not supported, send
215      * HelloMessage with lower supported version. 4. If Hello message received
216      * again with not supported version, just disconnect.
217      */
218     @Override
219     public void onHelloMessage(final HelloMessage hello) {
220         LOG.debug("processing HELLO.xid: {}", hello.getXid());
221         firstHelloProcessed = true;
222         checkState(CONDUCTOR_STATE.HANDSHAKING);
223         HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper(
224                 hello, handshakeManager, connectionAdapter);
225         Thread t = new Thread(handshakeStepWrapper, "OFHandshake-" + conductorId);
226         t.setDaemon(true);
227         t.start();
228     }
229
230     /**
231      * @return rpc-response timeout in [ms]
232      */
233     protected long getMaxTimeout() {
234         // TODO:: get from configuration
235         return 2000;
236     }
237
238     /**
239      * @return milliseconds
240      */
241     protected TimeUnit getMaxTimeoutUnit() {
242         // TODO:: get from configuration
243         return TimeUnit.MILLISECONDS;
244     }
245
246     @Override
247     public void onMultipartReplyMessage(MultipartReplyMessage message) {
248         enqueueMessage(message);
249     }
250
251     @Override
252     public void onPacketInMessage(PacketInMessage message) {
253         enqueueMessage(message, QueueKeeper.QueueType.UNORDERED);
254     }
255
256     @Override
257     public void onPortStatusMessage(PortStatusMessage message) {
258         try {
259             processPortStatusMsg(message);
260         } finally {
261             enqueueMessage(message);
262         }
263     }
264
265     protected void processPortStatusMsg(PortStatus msg) {
266         if (msg.getReason().getIntValue() == 2) {
267             updatePort(msg);
268         } else if (msg.getReason().getIntValue() == 0) {
269             updatePort(msg);
270         } else if (msg.getReason().getIntValue() == 1) {
271             deletePort(msg);
272         }
273     }
274
275     protected void updatePort(PortStatus msg) {
276         Long portNumber = msg.getPortNo();
277         Boolean portBandwidth = portFeaturesUtils.getPortBandwidth(msg);
278
279         if (portBandwidth == null) {
280             LOG.debug(
281                     "can't get bandwidth info from port: {}, aborting port update",
282                     msg.toString());
283         } else {
284             if (null != this.sessionContext) {
285                 //FIXME these two properties are never used in code
286                 this.getSessionContext().getPhysicalPorts().put(portNumber, msg);
287                 this.getSessionContext().getPortsBandwidth()
288                         .put(portNumber, portBandwidth);
289             } else {
290                 LOG.warn("Trying to process update port message before session context was created.");
291             }
292         }
293     }
294
295     protected void deletePort(PortGrouping port) {
296         Long portNumber = port.getPortNo();
297
298         this.getSessionContext().getPhysicalPorts().remove(portNumber);
299         this.getSessionContext().getPortsBandwidth().remove(portNumber);
300     }
301
302     @Override
303     public void onSwitchIdleEvent(SwitchIdleEvent notification) {
304         new Thread(new Runnable() {
305             @Override
306             public void run() {
307                 if (!CONDUCTOR_STATE.WORKING.equals(getConductorState())) {
308                     // idle state in any other conductorState than WORKING means
309                     // real
310                     // problem and wont be handled by echoReply, but
311                     // disconnection
312                     disconnect();
313                     OFSessionUtil.getSessionManager().invalidateOnDisconnect(
314                             ConnectionConductorImpl.this);
315                 } else {
316                     LOG.debug(
317                             "first idle state occured, sessionCtx={}|auxId={}",
318                             sessionContext, auxiliaryKey);
319                     EchoInputBuilder builder = new EchoInputBuilder();
320                     builder.setVersion(getVersion());
321                     builder.setXid(getSessionContext().getNextXid());
322
323                     Future<RpcResult<EchoOutput>> echoReplyFuture = getConnectionAdapter()
324                             .echo(builder.build());
325
326                     try {
327                         RpcResult<EchoOutput> echoReplyValue = echoReplyFuture
328                                 .get(getMaxTimeout(), getMaxTimeoutUnit());
329                         if (echoReplyValue.isSuccessful()) {
330                             setConductorState(CONDUCTOR_STATE.WORKING);
331                         } else {
332                             for (RpcError replyError : echoReplyValue
333                                     .getErrors()) {
334                                 Throwable cause = replyError.getCause();
335                                 LOG.error(
336                                         "while receiving echoReply in TIMEOUTING state: "
337                                                 + cause.getMessage(), cause);
338                             }
339                             // switch issue occurred
340                             throw new Exception("switch issue occurred");
341                         }
342                     } catch (Exception e) {
343                         LOG.error("while waiting for echoReply in TIMEOUTING state: "
344                                 + e.getMessage());
345                         errorHandler.handleException(e, sessionContext);
346                         // switch is not responding
347                         disconnect();
348                         OFSessionUtil.getSessionManager()
349                                 .invalidateOnDisconnect(
350                                         ConnectionConductorImpl.this);
351                     }
352                 }
353             }
354
355         }).start();
356     }
357
358     /**
359      * @param conductorState the connectionState to set
360      */
361     @Override
362     public void setConductorState(CONDUCTOR_STATE conductorState) {
363         this.conductorState = conductorState;
364     }
365
366     @Override
367     public CONDUCTOR_STATE getConductorState() {
368         return conductorState;
369     }
370
371     /**
372      * @param expectedState connection conductor state
373      */
374     protected void checkState(CONDUCTOR_STATE expectedState) {
375         if (!conductorState.equals(expectedState)) {
376             LOG.warn("State of connection to switch {} is not correct, "
377                     + "terminating the connection", connectionAdapter.getRemoteAddress());
378             throw new IllegalStateException("Expected state: " + expectedState
379                     + ", actual state:" + conductorState);
380         }
381     }
382
383     @Override
384     public void onDisconnectEvent(DisconnectEvent arg0) {
385         SessionManager sessionManager = OFSessionUtil.getSessionManager();
386         sessionManager.invalidateOnDisconnect(this);
387         close();
388     }
389
390     @Override
391     public Short getVersion() {
392         return version;
393     }
394
395     @Override
396     public Future<Boolean> disconnect() {
397         LOG.trace("disconnecting: sessionCtx={}|auxId={}", sessionContext,
398                 auxiliaryKey);
399
400         Future<Boolean> result = null;
401         if (connectionAdapter.isAlive()) {
402             result = connectionAdapter.disconnect();
403         } else {
404             LOG.debug("connection already disconnected");
405             result = Futures.immediateFuture(true);
406         }
407         close();
408         return result;
409     }
410
411     @Override
412     public void setConnectionCookie(SwitchConnectionDistinguisher auxiliaryKey) {
413         this.auxiliaryKey = auxiliaryKey;
414     }
415
416     @Override
417     public void setSessionContext(SessionContext sessionContext) {
418         this.sessionContext = sessionContext;
419     }
420
421     @Override
422     public SwitchConnectionDistinguisher getAuxiliaryKey() {
423         return auxiliaryKey;
424     }
425
426     @Override
427     public SessionContext getSessionContext() {
428         return sessionContext;
429     }
430
431     @Override
432     public ConnectionAdapter getConnectionAdapter() {
433         return connectionAdapter;
434     }
435
436     @Override
437     public void onConnectionReady() {
438         LOG.debug("connection is ready-to-use");
439         if (!firstHelloProcessed) {
440             checkState(CONDUCTOR_STATE.HANDSHAKING);
441             HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper(
442                     null, handshakeManager, connectionAdapter);
443             Thread t = new Thread(handshakeStepWrapper, "OFHandshake-" + conductorId);
444             t.setDaemon(true);
445             t.start();
446             firstHelloProcessed = true;
447         } else {
448             LOG.debug("already touched by hello message");
449         }
450     }
451
452     @Override
453     public void onHandshakeSuccessful(GetFeaturesOutput featureOutput,
454                                       Short negotiatedVersion) {
455         postHandshakeBasic(featureOutput, negotiatedVersion);
456     }
457
458     @Override
459     public void onHandshakeFailure() {
460         LOG.info("OF handshake failed, doing cleanup.");
461         close();
462     }
463
464     /**
465      * used by tests
466      *
467      * @param featureOutput feature request output
468      * @param negotiatedVersion negotiated openflow connection version
469      */
470     protected void postHandshakeBasic(GetFeaturesOutput featureOutput,
471                                       Short negotiatedVersion) {
472         version = negotiatedVersion;
473         if (version == OFConstants.OFP_VERSION_1_0) {
474             // Because the GetFeaturesOutput contains information about the port
475             // in OF1.0 (that we would otherwise get from the PortDesc) we have
476             // to pass
477             // it up for parsing to convert into a NodeConnectorUpdate
478             //
479             // BUG-1988 - this must be the first item in queue in order not to
480             // get behind link-up message
481             enqueueMessage(featureOutput);
482         }
483
484         SessionContext sessionContext =  OFSessionUtil.registerSession(this, featureOutput, negotiatedVersion);
485         conductorState = CONDUCTOR_STATE.WORKING;
486         QueueKeeperFactory.plugQueue(queueProcessor, queue);
487     }
488
489     /**
490      * @param isBitmapNegotiationEnable the isBitmapNegotiationEnable to set
491      */
492     public void setBitmapNegotiationEnable(boolean isBitmapNegotiationEnable) {
493         this.isBitmapNegotiationEnable = isBitmapNegotiationEnable;
494     }
495
496     @Override
497     public void setId(int conductorId) {
498         this.conductorId = conductorId;
499     }
500
501     @Override
502     public void close() {
503         conductorState = CONDUCTOR_STATE.RIP;
504         if (handshakeContext != null) {
505             try {
506                 handshakeContext.close();
507             } catch (Exception e) {
508                 LOG.warn("Closing handshake context failed: {}", e.getMessage());
509                 LOG.debug("Detail in hanshake context close:", e);
510             }
511         }
512     }
513
514     @Override
515     public void setHandshakeContext(HandshakeContext handshakeContext) {
516         this.handshakeContext = handshakeContext;
517     }
518 }