Bug 1680 - default table-miss-entry feature should be pulled out into separate module
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / ConnectionConductorImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.openflow.md.core;
10
11 import java.util.concurrent.Future;
12 import java.util.concurrent.LinkedBlockingQueue;
13 import java.util.concurrent.ThreadPoolExecutor;
14 import java.util.concurrent.TimeUnit;
15
16 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
17 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
18 import org.opendaylight.openflowplugin.api.OFConstants;
19 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
20 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
21 import org.opendaylight.openflowplugin.openflow.md.core.session.PortFeaturesUtil;
22 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
23 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionManager;
24 import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper;
25 import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper.QueueType;
26 import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperFactory;
27 import org.opendaylight.openflowplugin.openflow.md.queue.QueueProcessor;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartRequestFlags;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.SwitchConfigFlag;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInputBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInputBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestDescCaseBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestGroupFeaturesCaseBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestMeterFeaturesCaseBuilder;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestPortDescCaseBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
56 import org.opendaylight.yangtools.yang.binding.DataObject;
57 import org.opendaylight.yangtools.yang.common.RpcError;
58 import org.opendaylight.yangtools.yang.common.RpcResult;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
61
62 import com.google.common.util.concurrent.Futures;
63
64 /**
65  * @author mirehak
66  */
67 public class ConnectionConductorImpl implements OpenflowProtocolListener,
68         SystemNotificationsListener, ConnectionConductor, ConnectionReadyListener, HandshakeListener {
69
70     /** ingress queue limit */
71     private static final int INGRESS_QUEUE_MAX_SIZE = 200;
72
73     protected static final Logger LOG = LoggerFactory
74             .getLogger(ConnectionConductorImpl.class);
75
76     /* variable to make BitMap-based negotiation enabled / disabled.
77      * it will help while testing and isolating issues related to processing of
78      * BitMaps from switches.
79      */
80     private boolean isBitmapNegotiationEnable = true;
81     protected ErrorHandler errorHandler;
82
83     private final ConnectionAdapter connectionAdapter;
84     private ConnectionConductor.CONDUCTOR_STATE conductorState;
85     private Short version;
86
87     protected SwitchConnectionDistinguisher auxiliaryKey;
88
89     protected SessionContext sessionContext;
90
91     private QueueProcessor<OfHeader, DataObject> queueProcessor;
92     private QueueKeeper<OfHeader> queue;
93     private ThreadPoolExecutor hsPool;
94     private HandshakeManager handshakeManager;
95
96     private boolean firstHelloProcessed;
97     
98     private PortFeaturesUtil portFeaturesUtils;
99
100     private int conductorId;
101
102     private int ingressMaxQueueSize;
103
104     
105     /**
106      * @param connectionAdapter
107      */
108     public ConnectionConductorImpl(ConnectionAdapter connectionAdapter) {
109         this(connectionAdapter, INGRESS_QUEUE_MAX_SIZE);
110     }
111
112     /**
113      * @param connectionAdapter
114      * @param ingressMaxQueueSize ingress queue limit (blocking)
115      */
116     public ConnectionConductorImpl(ConnectionAdapter connectionAdapter, int ingressMaxQueueSize) {
117         this.connectionAdapter = connectionAdapter;
118         this.ingressMaxQueueSize = ingressMaxQueueSize;
119         conductorState = CONDUCTOR_STATE.HANDSHAKING;
120         firstHelloProcessed = false;
121         handshakeManager = new HandshakeManagerImpl(connectionAdapter,
122                 ConnectionConductor.versionOrder.get(0), ConnectionConductor.versionOrder);
123         handshakeManager.setUseVersionBitmap(isBitmapNegotiationEnable);
124         handshakeManager.setHandshakeListener(this);
125         portFeaturesUtils = PortFeaturesUtil.getInstance();
126     }
127
128     @Override
129     public void init() {
130         int handshakeThreadLimit = 1;
131         hsPool = new ThreadPoolLoggingExecutor(handshakeThreadLimit , handshakeThreadLimit, 0L, 
132                 TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 
133                 "OFHandshake-"+conductorId);
134         
135         connectionAdapter.setMessageListener(this);
136         connectionAdapter.setSystemListener(this);
137         connectionAdapter.setConnectionReadyListener(this);
138         queue = QueueKeeperFactory.createFairQueueKeeper(queueProcessor, ingressMaxQueueSize);
139     }
140
141     @Override
142     public void setQueueProcessor(QueueProcessor<OfHeader, DataObject> queueProcessor) {
143         this.queueProcessor = queueProcessor;
144     }
145
146     /**
147      * @param errorHandler the errorHandler to set
148      */
149     @Override
150     public void setErrorHandler(ErrorHandler errorHandler) {
151         this.errorHandler = errorHandler;
152         handshakeManager.setErrorHandler(errorHandler);
153     }
154
155     @Override
156     public void onEchoRequestMessage(final EchoRequestMessage echoRequestMessage) {
157         new Thread(new Runnable() {
158             @Override
159             public void run() {
160                 LOG.debug("echo request received: " + echoRequestMessage.getXid());
161                 EchoReplyInputBuilder builder = new EchoReplyInputBuilder();
162                 builder.setVersion(echoRequestMessage.getVersion());
163                 builder.setXid(echoRequestMessage.getXid());
164                 builder.setData(echoRequestMessage.getData());
165
166                 getConnectionAdapter().echoReply(builder.build());
167             }
168         }).start();
169     }
170
171     @Override
172     public void onErrorMessage(ErrorMessage errorMessage) {
173         enqueueMessage(errorMessage);
174     }
175
176     
177     /**
178      * @param message
179      */
180     private void enqueueMessage(OfHeader message) {
181         enqueueMessage(message, QueueType.DEFAULT);
182     }
183
184     /**
185      * @param message
186      * @param queueType enqueue type
187      */
188     private void enqueueMessage(OfHeader message, QueueType queueType) {
189         queue.push(message, this, queueType);
190     }
191
192     @Override
193     public void onExperimenterMessage(ExperimenterMessage experimenterMessage) {
194         enqueueMessage(experimenterMessage);
195     }
196
197     @Override
198     public void onFlowRemovedMessage(FlowRemovedMessage message) {
199         enqueueMessage(message);
200     }
201
202
203     /**
204      * version negotiation happened as per following steps:
205      * 1. If HelloMessage version field has same version, continue connection processing.
206      *    If HelloMessage version is lower than supported versions, just disconnect.
207      * 2. If HelloMessage contains bitmap and common version found in bitmap
208      *    then continue connection processing. if no common version found, just disconnect.
209      * 3. If HelloMessage version is not supported, send HelloMessage with lower supported version.
210      * 4. If Hello message received again with not supported version, just disconnect.
211      */
212     @Override
213     public void onHelloMessage(final HelloMessage hello) {
214         LOG.debug("processing HELLO.xid: {}", hello.getXid());
215         firstHelloProcessed = true;
216         checkState(CONDUCTOR_STATE.HANDSHAKING);
217         HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper(
218                 hello, handshakeManager, connectionAdapter);
219         hsPool.submit(handshakeStepWrapper);
220     }
221
222     /**
223      * @return rpc-response timeout in [ms]
224      */
225     protected long getMaxTimeout() {
226         // TODO:: get from configuration
227         return 2000;
228     }
229
230     /**
231      * @return milliseconds
232      */
233     protected TimeUnit getMaxTimeoutUnit() {
234         // TODO:: get from configuration
235         return TimeUnit.MILLISECONDS;
236     }
237
238     @Override
239     public void onMultipartReplyMessage(MultipartReplyMessage message) {
240         enqueueMessage(message);
241     }
242
243     @Override
244     public void onPacketInMessage(PacketInMessage message) {
245         enqueueMessage(message, QueueKeeper.QueueType.UNORDERED);
246     }
247
248     @Override
249     public void onPortStatusMessage(PortStatusMessage message) {
250         processPortStatusMsg(message);
251         enqueueMessage(message);
252     }
253     
254     protected void processPortStatusMsg(PortStatus msg) {
255         if (msg.getReason().getIntValue() == 2) {
256             updatePort(msg);
257         } else if (msg.getReason().getIntValue() == 0) {
258             updatePort(msg);
259         } else if (msg.getReason().getIntValue() == 1) {
260             deletePort(msg);
261         }
262     }
263     
264     protected void updatePort(PortStatus msg) {
265         Long portNumber = msg.getPortNo();        
266         Boolean portBandwidth = portFeaturesUtils.getPortBandwidth(msg);
267         
268         if(portBandwidth == null) {
269             LOG.debug("can't get bandwidth info from port: {}, aborting port update", msg.toString());
270         } else {
271             this.getSessionContext().getPhysicalPorts().put(portNumber, msg);
272             this.getSessionContext().getPortsBandwidth().put(portNumber, portBandwidth);                   
273         }            
274     }
275     
276     protected void deletePort(PortGrouping port) {
277         Long portNumber = port.getPortNo();
278         
279         this.getSessionContext().getPhysicalPorts().remove(portNumber);
280         this.getSessionContext().getPortsBandwidth().remove(portNumber);
281     }
282
283     @Override
284     public void onSwitchIdleEvent(SwitchIdleEvent notification) {
285         new Thread(new Runnable() {
286             @Override
287             public void run() {
288                 if (!CONDUCTOR_STATE.WORKING.equals(getConductorState())) {
289                     // idle state in any other conductorState than WORKING means real
290                     // problem and wont be handled by echoReply, but disconnection
291                     disconnect();
292                     OFSessionUtil.getSessionManager().invalidateOnDisconnect(ConnectionConductorImpl.this);
293                 } else {
294                     LOG.debug("first idle state occured, sessionCtx={}|auxId={}", sessionContext, auxiliaryKey);
295                     EchoInputBuilder builder = new EchoInputBuilder();
296                     builder.setVersion(getVersion());
297                     builder.setXid(getSessionContext().getNextXid());
298
299                     Future<RpcResult<EchoOutput>> echoReplyFuture = getConnectionAdapter()
300                             .echo(builder.build());
301
302                     try {
303                         RpcResult<EchoOutput> echoReplyValue = echoReplyFuture.get(getMaxTimeout(),
304                                 getMaxTimeoutUnit());
305                         if (echoReplyValue.isSuccessful()) {
306                             setConductorState(CONDUCTOR_STATE.WORKING);
307                         } else {
308                             for (RpcError replyError : echoReplyValue.getErrors()) {
309                                 Throwable cause = replyError.getCause();
310                                 LOG.error(
311                                         "while receiving echoReply in TIMEOUTING state: "
312                                                 + cause.getMessage(), cause);
313                             }
314                             //switch issue occurred
315                             throw new Exception("switch issue occurred");
316                         }
317                     } catch (Exception e) {
318                         LOG.error("while waiting for echoReply in TIMEOUTING state: "
319                                 + e.getMessage());
320                         errorHandler.handleException(e, sessionContext);
321                         //switch is not responding
322                         disconnect();
323                         OFSessionUtil.getSessionManager().invalidateOnDisconnect(ConnectionConductorImpl.this);
324                     }
325                 }
326             }
327
328         }).start();
329     }
330
331     /**
332      * @param conductorState
333      *            the connectionState to set
334      */
335     @Override
336     public void setConductorState(CONDUCTOR_STATE conductorState) {
337         this.conductorState = conductorState;
338     }
339
340     @Override
341     public CONDUCTOR_STATE getConductorState() {
342         return conductorState;
343     }
344
345     /**
346      * @param expectedState
347      */
348     protected void checkState(CONDUCTOR_STATE expectedState) {
349         if (!conductorState.equals(expectedState)) {
350             throw new IllegalStateException("Expected state: " + expectedState
351                     + ", actual state:" + conductorState);
352         }
353     }
354
355     @Override
356     public void onDisconnectEvent(DisconnectEvent arg0) {
357         SessionManager sessionManager = OFSessionUtil.getSessionManager();
358         sessionManager.invalidateOnDisconnect(this);
359     }
360
361     @Override
362     public Short getVersion() {
363         return version;
364     }
365
366     @Override
367     public Future<Boolean> disconnect() {
368         LOG.trace("disconnecting: sessionCtx={}|auxId={}", sessionContext, auxiliaryKey);
369
370         Future<Boolean> result = null;
371         if (connectionAdapter.isAlive()) {
372             result = connectionAdapter.disconnect();
373         } else {
374             LOG.debug("connection already disconnected");
375             result = Futures.immediateFuture(true);
376         }
377
378         return result;
379     }
380
381     @Override
382     public void setConnectionCookie(SwitchConnectionDistinguisher auxiliaryKey) {
383         this.auxiliaryKey = auxiliaryKey;
384     }
385
386     @Override
387     public void setSessionContext(SessionContext sessionContext) {
388         this.sessionContext = sessionContext;
389     }
390
391     @Override
392     public SwitchConnectionDistinguisher getAuxiliaryKey() {
393         return auxiliaryKey;
394     }
395
396     @Override
397     public SessionContext getSessionContext() {
398         return sessionContext;
399     }
400
401     @Override
402     public ConnectionAdapter getConnectionAdapter() {
403         return connectionAdapter;
404     }
405
406     @Override
407     public void onConnectionReady() {
408         LOG.debug("connection is ready-to-use");
409         if (! firstHelloProcessed) {
410             HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper(
411                     null, handshakeManager, connectionAdapter);
412             hsPool.execute(handshakeStepWrapper);
413             firstHelloProcessed = true;
414         } else {
415             LOG.debug("already touched by hello message");
416         }
417     }
418
419     @Override
420     public void onHandshakeSuccessfull(GetFeaturesOutput featureOutput,
421             Short negotiatedVersion) {
422         postHandshakeBasic(featureOutput, negotiatedVersion);
423         
424         // post-handshake actions
425         if(version == OFConstants.OFP_VERSION_1_3){
426             setDefaultConfig();
427             requestPorts();
428             requestGroupFeatures();
429             requestMeterFeatures();
430         } else if (version == OFConstants.OFP_VERSION_1_0) {
431             //  Because the GetFeaturesOutput contains information about the port
432             //  in OF1.0 (that we would otherwise get from the PortDesc) we have to pass
433             //  it up for parsing to convert into a NodeConnectorUpdate
434             enqueueMessage(featureOutput);
435         }
436         
437         requestDesc();
438     }
439
440     /**
441      * used by tests
442      * @param featureOutput
443      * @param negotiatedVersion
444      */
445     protected void postHandshakeBasic(GetFeaturesOutput featureOutput,
446             Short negotiatedVersion) {
447         version = negotiatedVersion;
448         conductorState = CONDUCTOR_STATE.WORKING;
449         OFSessionUtil.registerSession(this, featureOutput, negotiatedVersion);
450         hsPool.shutdown();
451         hsPool.purge();
452     }
453
454     private void setDefaultConfig(){
455         SetConfigInputBuilder builder = new SetConfigInputBuilder();
456         builder.setVersion(getVersion());
457         builder.setXid(getSessionContext().getNextXid());
458         SwitchConfigFlag flag = SwitchConfigFlag.FRAGNORMAL;
459         builder.setFlags(flag);
460         builder.setMissSendLen(OFConstants.OFPCML_NO_BUFFER);
461         getConnectionAdapter().setConfig(builder.build());
462     }
463
464     /*
465      *  Send an OFPMP_DESC request message to the switch
466      */
467     private void requestDesc() {
468         MultipartRequestInputBuilder builder = new MultipartRequestInputBuilder();
469         builder.setType(MultipartType.OFPMPDESC);
470         builder.setVersion(getVersion());
471         builder.setFlags(new MultipartRequestFlags(false));
472         builder.setMultipartRequestBody(new MultipartRequestDescCaseBuilder().build());
473         builder.setXid(getSessionContext().getNextXid());
474         getConnectionAdapter().multipartRequest(builder.build());
475     }
476
477     private void requestPorts() {
478         MultipartRequestInputBuilder builder = new MultipartRequestInputBuilder();
479         builder.setType(MultipartType.OFPMPPORTDESC);
480         builder.setVersion(getVersion());
481         builder.setFlags(new MultipartRequestFlags(false));
482         builder.setMultipartRequestBody(new MultipartRequestPortDescCaseBuilder().build());
483         builder.setXid(getSessionContext().getNextXid());
484         getConnectionAdapter().multipartRequest(builder.build());
485     }
486     private void requestGroupFeatures(){
487         MultipartRequestInputBuilder mprInput = new MultipartRequestInputBuilder();
488         mprInput.setType(MultipartType.OFPMPGROUPFEATURES);
489         mprInput.setVersion(getVersion());
490         mprInput.setFlags(new MultipartRequestFlags(false));
491         mprInput.setXid(getSessionContext().getNextXid());
492
493         MultipartRequestGroupFeaturesCaseBuilder mprGroupFeaturesBuild = 
494                 new MultipartRequestGroupFeaturesCaseBuilder();
495         mprInput.setMultipartRequestBody(mprGroupFeaturesBuild.build());
496
497         LOG.debug("Send group features statistics request :{}",mprGroupFeaturesBuild);
498         getConnectionAdapter().multipartRequest(mprInput.build());
499         
500     }
501     private void requestMeterFeatures(){
502         MultipartRequestInputBuilder mprInput = new MultipartRequestInputBuilder();
503         mprInput.setType(MultipartType.OFPMPMETERFEATURES);
504         mprInput.setVersion(getVersion());
505         mprInput.setFlags(new MultipartRequestFlags(false));
506         mprInput.setXid(getSessionContext().getNextXid());
507
508         MultipartRequestMeterFeaturesCaseBuilder mprMeterFeaturesBuild =
509                 new MultipartRequestMeterFeaturesCaseBuilder();
510         mprInput.setMultipartRequestBody(mprMeterFeaturesBuild.build());
511
512         LOG.debug("Send meter features statistics request :{}",mprMeterFeaturesBuild);
513         getConnectionAdapter().multipartRequest(mprInput.build());
514         
515     }
516     /**
517      * @param isBitmapNegotiationEnable the isBitmapNegotiationEnable to set
518      */
519     public void setBitmapNegotiationEnable(
520             boolean isBitmapNegotiationEnable) {
521         this.isBitmapNegotiationEnable = isBitmapNegotiationEnable;
522     }
523
524     protected void shutdownPool() {
525         hsPool.shutdownNow();
526         LOG.debug("pool is terminated: {}", hsPool.isTerminated());
527     }
528     
529     @Override
530     public void setId(int conductorId) {
531         this.conductorId = conductorId;
532     }
533 }