Bug 1588 - OFConstants.java moved to openflowplugin-api module
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / ConnectionConductorImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.openflow.md.core;
10
11 import java.util.concurrent.Future;
12 import java.util.concurrent.LinkedBlockingQueue;
13 import java.util.concurrent.ThreadPoolExecutor;
14 import java.util.concurrent.TimeUnit;
15
16 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
17 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
18 import org.opendaylight.openflowplugin.api.OFConstants;
19 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
20 import org.opendaylight.openflowplugin.openflow.md.core.session.PortFeaturesUtil;
21 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
22 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionManager;
23 import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper;
24 import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper.QueueType;
25 import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperFactory;
26 import org.opendaylight.openflowplugin.openflow.md.queue.QueueProcessor;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartRequestFlags;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.SwitchConfigFlag;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInputBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInputBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestDescCaseBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestGroupFeaturesCaseBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestMeterFeaturesCaseBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestPortDescCaseBuilder;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
55 import org.opendaylight.yangtools.yang.binding.DataObject;
56 import org.opendaylight.yangtools.yang.common.RpcError;
57 import org.opendaylight.yangtools.yang.common.RpcResult;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60
61 import com.google.common.util.concurrent.Futures;
62
63 /**
64  * @author mirehak
65  */
66 public class ConnectionConductorImpl implements OpenflowProtocolListener,
67         SystemNotificationsListener, ConnectionConductor, ConnectionReadyListener, HandshakeListener {
68
69     /** ingress queue limit */
70     private static final int INGRESS_QUEUE_MAX_SIZE = 200;
71
72     protected static final Logger LOG = LoggerFactory
73             .getLogger(ConnectionConductorImpl.class);
74
75     /* variable to make BitMap-based negotiation enabled / disabled.
76      * it will help while testing and isolating issues related to processing of
77      * BitMaps from switches.
78      */
79     private boolean isBitmapNegotiationEnable = true;
80     protected ErrorHandler errorHandler;
81
82     private final ConnectionAdapter connectionAdapter;
83     private ConnectionConductor.CONDUCTOR_STATE conductorState;
84     private Short version;
85
86     protected SwitchConnectionDistinguisher auxiliaryKey;
87
88     protected SessionContext sessionContext;
89
90     private QueueProcessor<OfHeader, DataObject> queueProcessor;
91     private QueueKeeper<OfHeader> queue;
92     private ThreadPoolExecutor hsPool;
93     private HandshakeManager handshakeManager;
94
95     private boolean firstHelloProcessed;
96     
97     private PortFeaturesUtil portFeaturesUtils;
98
99     private int conductorId;
100
101     private int ingressMaxQueueSize;
102
103     
104     /**
105      * @param connectionAdapter
106      */
107     public ConnectionConductorImpl(ConnectionAdapter connectionAdapter) {
108         this(connectionAdapter, INGRESS_QUEUE_MAX_SIZE);
109     }
110
111     /**
112      * @param connectionAdapter
113      * @param ingressMaxQueueSize ingress queue limit (blocking)
114      */
115     public ConnectionConductorImpl(ConnectionAdapter connectionAdapter, int ingressMaxQueueSize) {
116         this.connectionAdapter = connectionAdapter;
117         this.ingressMaxQueueSize = ingressMaxQueueSize;
118         conductorState = CONDUCTOR_STATE.HANDSHAKING;
119         firstHelloProcessed = false;
120         handshakeManager = new HandshakeManagerImpl(connectionAdapter,
121                 ConnectionConductor.versionOrder.get(0), ConnectionConductor.versionOrder);
122         handshakeManager.setUseVersionBitmap(isBitmapNegotiationEnable);
123         handshakeManager.setHandshakeListener(this);
124         portFeaturesUtils = PortFeaturesUtil.getInstance();
125     }
126
127     @Override
128     public void init() {
129         int handshakeThreadLimit = 1;
130         hsPool = new ThreadPoolLoggingExecutor(handshakeThreadLimit , handshakeThreadLimit, 0L, 
131                 TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 
132                 "OFHandshake-"+conductorId);
133         
134         connectionAdapter.setMessageListener(this);
135         connectionAdapter.setSystemListener(this);
136         connectionAdapter.setConnectionReadyListener(this);
137         queue = QueueKeeperFactory.createFairQueueKeeper(queueProcessor, ingressMaxQueueSize);
138     }
139
140     @Override
141     public void setQueueProcessor(QueueProcessor<OfHeader, DataObject> queueProcessor) {
142         this.queueProcessor = queueProcessor;
143     }
144
145     /**
146      * @param errorHandler the errorHandler to set
147      */
148     @Override
149     public void setErrorHandler(ErrorHandler errorHandler) {
150         this.errorHandler = errorHandler;
151         handshakeManager.setErrorHandler(errorHandler);
152     }
153
154     @Override
155     public void onEchoRequestMessage(final EchoRequestMessage echoRequestMessage) {
156         new Thread(new Runnable() {
157             @Override
158             public void run() {
159                 LOG.debug("echo request received: " + echoRequestMessage.getXid());
160                 EchoReplyInputBuilder builder = new EchoReplyInputBuilder();
161                 builder.setVersion(echoRequestMessage.getVersion());
162                 builder.setXid(echoRequestMessage.getXid());
163                 builder.setData(echoRequestMessage.getData());
164
165                 getConnectionAdapter().echoReply(builder.build());
166             }
167         }).start();
168     }
169
170     @Override
171     public void onErrorMessage(ErrorMessage errorMessage) {
172         enqueueMessage(errorMessage);
173     }
174
175     
176     /**
177      * @param message
178      */
179     private void enqueueMessage(OfHeader message) {
180         enqueueMessage(message, QueueType.DEFAULT);
181     }
182
183     /**
184      * @param message
185      * @param queueType enqueue type
186      */
187     private void enqueueMessage(OfHeader message, QueueType queueType) {
188         queue.push(message, this, queueType);
189     }
190
191     @Override
192     public void onExperimenterMessage(ExperimenterMessage experimenterMessage) {
193         enqueueMessage(experimenterMessage);
194     }
195
196     @Override
197     public void onFlowRemovedMessage(FlowRemovedMessage message) {
198         enqueueMessage(message);
199     }
200
201
202     /**
203      * version negotiation happened as per following steps:
204      * 1. If HelloMessage version field has same version, continue connection processing.
205      *    If HelloMessage version is lower than supported versions, just disconnect.
206      * 2. If HelloMessage contains bitmap and common version found in bitmap
207      *    then continue connection processing. if no common version found, just disconnect.
208      * 3. If HelloMessage version is not supported, send HelloMessage with lower supported version.
209      * 4. If Hello message received again with not supported version, just disconnect.
210      */
211     @Override
212     public void onHelloMessage(final HelloMessage hello) {
213         LOG.debug("processing HELLO.xid: {}", hello.getXid());
214         firstHelloProcessed = true;
215         checkState(CONDUCTOR_STATE.HANDSHAKING);
216         HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper(
217                 hello, handshakeManager, connectionAdapter);
218         hsPool.submit(handshakeStepWrapper);
219     }
220
221     /**
222      * @return rpc-response timeout in [ms]
223      */
224     protected long getMaxTimeout() {
225         // TODO:: get from configuration
226         return 2000;
227     }
228
229     /**
230      * @return milliseconds
231      */
232     protected TimeUnit getMaxTimeoutUnit() {
233         // TODO:: get from configuration
234         return TimeUnit.MILLISECONDS;
235     }
236
237     @Override
238     public void onMultipartReplyMessage(MultipartReplyMessage message) {
239         enqueueMessage(message);
240     }
241
242     @Override
243     public void onPacketInMessage(PacketInMessage message) {
244         enqueueMessage(message, QueueKeeper.QueueType.UNORDERED);
245     }
246
247     @Override
248     public void onPortStatusMessage(PortStatusMessage message) {
249         processPortStatusMsg(message);
250         enqueueMessage(message);
251     }
252     
253     protected void processPortStatusMsg(PortStatus msg) {
254         if (msg.getReason().getIntValue() == 2) {
255             updatePort(msg);
256         } else if (msg.getReason().getIntValue() == 0) {
257             updatePort(msg);
258         } else if (msg.getReason().getIntValue() == 1) {
259             deletePort(msg);
260         }
261     }
262     
263     protected void updatePort(PortStatus msg) {
264         Long portNumber = msg.getPortNo();        
265         Boolean portBandwidth = portFeaturesUtils.getPortBandwidth(msg);
266         
267         if(portBandwidth == null) {
268             LOG.debug("can't get bandwidth info from port: {}, aborting port update", msg.toString());
269         } else {
270             this.getSessionContext().getPhysicalPorts().put(portNumber, msg);
271             this.getSessionContext().getPortsBandwidth().put(portNumber, portBandwidth);                   
272         }            
273     }
274     
275     protected void deletePort(PortGrouping port) {
276         Long portNumber = port.getPortNo();
277         
278         this.getSessionContext().getPhysicalPorts().remove(portNumber);
279         this.getSessionContext().getPortsBandwidth().remove(portNumber);
280     }
281
282     @Override
283     public void onSwitchIdleEvent(SwitchIdleEvent notification) {
284         new Thread(new Runnable() {
285             @Override
286             public void run() {
287                 if (!CONDUCTOR_STATE.WORKING.equals(getConductorState())) {
288                     // idle state in any other conductorState than WORKING means real
289                     // problem and wont be handled by echoReply, but disconnection
290                     disconnect();
291                     OFSessionUtil.getSessionManager().invalidateOnDisconnect(ConnectionConductorImpl.this);
292                 } else {
293                     LOG.debug("first idle state occured, sessionCtx={}|auxId={}", sessionContext, auxiliaryKey);
294                     EchoInputBuilder builder = new EchoInputBuilder();
295                     builder.setVersion(getVersion());
296                     builder.setXid(getSessionContext().getNextXid());
297
298                     Future<RpcResult<EchoOutput>> echoReplyFuture = getConnectionAdapter()
299                             .echo(builder.build());
300
301                     try {
302                         RpcResult<EchoOutput> echoReplyValue = echoReplyFuture.get(getMaxTimeout(),
303                                 getMaxTimeoutUnit());
304                         if (echoReplyValue.isSuccessful()) {
305                             setConductorState(CONDUCTOR_STATE.WORKING);
306                         } else {
307                             for (RpcError replyError : echoReplyValue.getErrors()) {
308                                 Throwable cause = replyError.getCause();
309                                 LOG.error(
310                                         "while receiving echoReply in TIMEOUTING state: "
311                                                 + cause.getMessage(), cause);
312                             }
313                             //switch issue occurred
314                             throw new Exception("switch issue occurred");
315                         }
316                     } catch (Exception e) {
317                         LOG.error("while waiting for echoReply in TIMEOUTING state: "
318                                 + e.getMessage());
319                         errorHandler.handleException(e, sessionContext);
320                         //switch is not responding
321                         disconnect();
322                         OFSessionUtil.getSessionManager().invalidateOnDisconnect(ConnectionConductorImpl.this);
323                     }
324                 }
325             }
326
327         }).start();
328     }
329
330     /**
331      * @param conductorState
332      *            the connectionState to set
333      */
334     @Override
335     public void setConductorState(CONDUCTOR_STATE conductorState) {
336         this.conductorState = conductorState;
337     }
338
339     @Override
340     public CONDUCTOR_STATE getConductorState() {
341         return conductorState;
342     }
343
344     /**
345      * @param expectedState
346      */
347     protected void checkState(CONDUCTOR_STATE expectedState) {
348         if (!conductorState.equals(expectedState)) {
349             throw new IllegalStateException("Expected state: " + expectedState
350                     + ", actual state:" + conductorState);
351         }
352     }
353
354     @Override
355     public void onDisconnectEvent(DisconnectEvent arg0) {
356         SessionManager sessionManager = OFSessionUtil.getSessionManager();
357         sessionManager.invalidateOnDisconnect(this);
358     }
359
360     @Override
361     public Short getVersion() {
362         return version;
363     }
364
365     @Override
366     public Future<Boolean> disconnect() {
367         LOG.trace("disconnecting: sessionCtx={}|auxId={}", sessionContext, auxiliaryKey);
368
369         Future<Boolean> result = null;
370         if (connectionAdapter.isAlive()) {
371             result = connectionAdapter.disconnect();
372         } else {
373             LOG.debug("connection already disconnected");
374             result = Futures.immediateFuture(true);
375         }
376
377         return result;
378     }
379
380     @Override
381     public void setConnectionCookie(SwitchConnectionDistinguisher auxiliaryKey) {
382         this.auxiliaryKey = auxiliaryKey;
383     }
384
385     @Override
386     public void setSessionContext(SessionContext sessionContext) {
387         this.sessionContext = sessionContext;
388     }
389
390     @Override
391     public SwitchConnectionDistinguisher getAuxiliaryKey() {
392         return auxiliaryKey;
393     }
394
395     @Override
396     public SessionContext getSessionContext() {
397         return sessionContext;
398     }
399
400     @Override
401     public ConnectionAdapter getConnectionAdapter() {
402         return connectionAdapter;
403     }
404
405     @Override
406     public void onConnectionReady() {
407         LOG.debug("connection is ready-to-use");
408         if (! firstHelloProcessed) {
409             HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper(
410                     null, handshakeManager, connectionAdapter);
411             hsPool.execute(handshakeStepWrapper);
412             firstHelloProcessed = true;
413         } else {
414             LOG.debug("already touched by hello message");
415         }
416     }
417
418     @Override
419     public void onHandshakeSuccessfull(GetFeaturesOutput featureOutput,
420             Short negotiatedVersion) {
421         postHandshakeBasic(featureOutput, negotiatedVersion);
422         
423         // post-handshake actions
424         if(version == OFConstants.OFP_VERSION_1_3){
425             setDefaultConfig();
426             requestPorts();
427             requestGroupFeatures();
428             requestMeterFeatures();
429         } else if (version == OFConstants.OFP_VERSION_1_0) {
430             //  Because the GetFeaturesOutput contains information about the port
431             //  in OF1.0 (that we would otherwise get from the PortDesc) we have to pass
432             //  it up for parsing to convert into a NodeConnectorUpdate
433             enqueueMessage(featureOutput);
434         }
435         
436         requestDesc();
437     }
438
439     /**
440      * used by tests
441      * @param featureOutput
442      * @param negotiatedVersion
443      */
444     protected void postHandshakeBasic(GetFeaturesOutput featureOutput,
445             Short negotiatedVersion) {
446         version = negotiatedVersion;
447         conductorState = CONDUCTOR_STATE.WORKING;
448         OFSessionUtil.registerSession(this, featureOutput, negotiatedVersion);
449         hsPool.shutdown();
450         hsPool.purge();
451     }
452
453     private void setDefaultConfig(){
454         SetConfigInputBuilder builder = new SetConfigInputBuilder();
455         builder.setVersion(getVersion());
456         builder.setXid(getSessionContext().getNextXid());
457         SwitchConfigFlag flag = SwitchConfigFlag.FRAGNORMAL;
458         builder.setFlags(flag);
459         builder.setMissSendLen(OFConstants.OFPCML_NO_BUFFER);
460         getConnectionAdapter().setConfig(builder.build());
461     }
462
463     /*
464      *  Send an OFPMP_DESC request message to the switch
465      */
466     private void requestDesc() {
467         MultipartRequestInputBuilder builder = new MultipartRequestInputBuilder();
468         builder.setType(MultipartType.OFPMPDESC);
469         builder.setVersion(getVersion());
470         builder.setFlags(new MultipartRequestFlags(false));
471         builder.setMultipartRequestBody(new MultipartRequestDescCaseBuilder().build());
472         builder.setXid(getSessionContext().getNextXid());
473         getConnectionAdapter().multipartRequest(builder.build());
474     }
475
476     private void requestPorts() {
477         MultipartRequestInputBuilder builder = new MultipartRequestInputBuilder();
478         builder.setType(MultipartType.OFPMPPORTDESC);
479         builder.setVersion(getVersion());
480         builder.setFlags(new MultipartRequestFlags(false));
481         builder.setMultipartRequestBody(new MultipartRequestPortDescCaseBuilder().build());
482         builder.setXid(getSessionContext().getNextXid());
483         getConnectionAdapter().multipartRequest(builder.build());
484     }
485     private void requestGroupFeatures(){
486         MultipartRequestInputBuilder mprInput = new MultipartRequestInputBuilder();
487         mprInput.setType(MultipartType.OFPMPGROUPFEATURES);
488         mprInput.setVersion(getVersion());
489         mprInput.setFlags(new MultipartRequestFlags(false));
490         mprInput.setXid(getSessionContext().getNextXid());
491
492         MultipartRequestGroupFeaturesCaseBuilder mprGroupFeaturesBuild = 
493                 new MultipartRequestGroupFeaturesCaseBuilder();
494         mprInput.setMultipartRequestBody(mprGroupFeaturesBuild.build());
495
496         LOG.debug("Send group features statistics request :{}",mprGroupFeaturesBuild);
497         getConnectionAdapter().multipartRequest(mprInput.build());
498         
499     }
500     private void requestMeterFeatures(){
501         MultipartRequestInputBuilder mprInput = new MultipartRequestInputBuilder();
502         mprInput.setType(MultipartType.OFPMPMETERFEATURES);
503         mprInput.setVersion(getVersion());
504         mprInput.setFlags(new MultipartRequestFlags(false));
505         mprInput.setXid(getSessionContext().getNextXid());
506
507         MultipartRequestMeterFeaturesCaseBuilder mprMeterFeaturesBuild =
508                 new MultipartRequestMeterFeaturesCaseBuilder();
509         mprInput.setMultipartRequestBody(mprMeterFeaturesBuild.build());
510
511         LOG.debug("Send meter features statistics request :{}",mprMeterFeaturesBuild);
512         getConnectionAdapter().multipartRequest(mprInput.build());
513         
514     }
515     /**
516      * @param isBitmapNegotiationEnable the isBitmapNegotiationEnable to set
517      */
518     public void setBitmapNegotiationEnable(
519             boolean isBitmapNegotiationEnable) {
520         this.isBitmapNegotiationEnable = isBitmapNegotiationEnable;
521     }
522
523     protected void shutdownPool() {
524         hsPool.shutdownNow();
525         LOG.debug("pool is terminated: {}", hsPool.isTerminated());
526     }
527     
528     @Override
529     public void setId(int conductorId) {
530         this.conductorId = conductorId;
531     }
532 }