site improvement, warnings removal, applied queueKeeper, fixed tests, errorHandler...
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / ConnectionConductorImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.openflow.md.core;
10
11 import java.util.List;
12 import java.util.concurrent.Future;
13 import java.util.concurrent.TimeUnit;
14
15 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
16 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
17 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
18 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
19 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionManager;
20 import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInputBuilder;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestMessage;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
41 import org.opendaylight.yangtools.yang.common.RpcError;
42 import org.opendaylight.yangtools.yang.common.RpcResult;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 import com.google.common.util.concurrent.Futures;
47
48 /**
49  * @author mirehak
50  */
51 public class ConnectionConductorImpl implements OpenflowProtocolListener,
52         SystemNotificationsListener, ConnectionConductor, ConnectionReadyListener {
53
54     protected static final Logger LOG = LoggerFactory
55             .getLogger(ConnectionConductorImpl.class);
56
57     /* variable to make BitMap-based negotiation enabled / disabled.
58      * it will help while testing and isolating issues related to processing of
59      * BitMaps from switches.
60      */
61     private static final boolean isBitmapNegotiationEnable = true;
62     private ErrorHandler errorHandler;
63
64     private final ConnectionAdapter connectionAdapter;
65     private ConnectionConductor.CONDUCTOR_STATE conductorState;
66     private Short version;
67
68     private SwitchConnectionDistinguisher auxiliaryKey;
69
70     private SessionContext sessionContext;
71
72     protected boolean isFirstHelloNegotiation = true;
73
74     // TODO: use appropriate interface instead of Object
75     private QueueKeeper<Object> queueKeeper;
76
77
78
79     /**
80      * @param connectionAdapter
81      */
82     public ConnectionConductorImpl(ConnectionAdapter connectionAdapter) {
83         this.connectionAdapter = connectionAdapter;
84         conductorState = CONDUCTOR_STATE.HANDSHAKING;
85     }
86
87     @Override
88     public void init() {
89         connectionAdapter.setMessageListener(this);
90         connectionAdapter.setSystemListener(this);
91         connectionAdapter.setConnectionReadyListener(this);
92     }
93     
94     /**
95      * @param queueKeeper the queueKeeper to set
96      */
97     @Override
98     public void setQueueKeeper(QueueKeeper<Object> queueKeeper) {
99         this.queueKeeper = queueKeeper;
100     }
101
102     /**
103      * @param errorHandler the errorHandler to set
104      */
105     @Override
106     public void setErrorHandler(ErrorHandler errorHandler) {
107         this.errorHandler = errorHandler;
108     }
109
110     /**
111      * send first hello message to switch
112      */
113     protected void sendFirstHelloMessage() {
114         Short highestVersion = ConnectionConductor.versionOrder.get(0);
115         Long helloXid = 21L;
116         HelloInput helloInput = null;
117         
118         if (isBitmapNegotiationEnable) {
119             helloInput = MessageFactory.createHelloInput(highestVersion, helloXid, ConnectionConductor.versionOrder);
120             LOG.debug("sending first hello message: vertsion header={} , version bitmap={}", 
121                     highestVersion, MessageFactory.digVersions(helloInput.getElements()));
122         } else {
123             helloInput = MessageFactory.createHelloInput(highestVersion, helloXid);
124             LOG.debug("sending first hello message: version header={} ", highestVersion);
125         }
126         
127         try {
128             RpcResult<Void> helloResult = connectionAdapter.hello(helloInput).get(getMaxTimeout(), getMaxTimeoutUnit());
129             smokeRpc(helloResult);
130             LOG.debug("FIRST HELLO sent.");
131         } catch (Throwable e) {
132             LOG.debug("FIRST HELLO sending failed.");
133             errorHandler.handleException(e, getSessionContext());
134         }
135     }
136
137     @Override
138     public void onEchoRequestMessage(final EchoRequestMessage echoRequestMessage) {
139         new Thread(new Runnable() {
140             @Override
141             public void run() {
142                 LOG.debug("echo request received: " + echoRequestMessage.getXid());
143                 EchoReplyInputBuilder builder = new EchoReplyInputBuilder();
144                 builder.setVersion(echoRequestMessage.getVersion());
145                 builder.setXid(echoRequestMessage.getXid());
146                 builder.setData(echoRequestMessage.getData());
147                 
148                 getConnectionAdapter().echoReply(builder.build());
149             }
150         }).start();            
151     }
152
153     @Override
154     public void onErrorMessage(ErrorMessage errorMessage) {
155         queueKeeper.push(ErrorMessage.class, errorMessage, this);
156     }
157
158     @Override
159     public void onExperimenterMessage(ExperimenterMessage experimenterMessage) {
160         queueKeeper.push(ExperimenterMessage.class, experimenterMessage, this);
161     }
162
163     @Override
164     public void onFlowRemovedMessage(FlowRemovedMessage message) {
165         queueKeeper.push(FlowRemovedMessage.class, message, this);
166     }
167
168
169     /**
170      * version negotiation happened as per following steps:
171      * 1. If HelloMessage version field has same version, continue connection processing.
172      *    If HelloMessage version is lower than supported versions, just disconnect.
173      * 2. If HelloMessage contains bitmap and common version found in bitmap
174      *    then continue connection processing. if no common version found, just disconnect.
175      * 3. If HelloMessage version is not supported, send HelloMessage with lower supported version.
176      *    If Hello message received again with not supported version, just disconnect.
177      *
178      *   TODO: Better to handle handshake into a maintainable innerclass which uses State-Pattern.
179      */
180     @Override
181     public void onHelloMessage(final HelloMessage hello) {
182         // do handshake
183
184         new Thread(new Runnable() {
185             @Override
186             public void run() {
187                 LOG.info("handshake STARTED");
188                 checkState(CONDUCTOR_STATE.HANDSHAKING);
189                 
190                 Short remoteVersion = hello.getVersion();
191                 List<Elements> elements = hello.getElements();
192                 Long xid = hello.getXid();
193                 Short proposedVersion;
194                 LOG.debug("Hello message version={} and bitmap={}", remoteVersion, MessageFactory.digVersions(elements));
195                 try {
196                     // find the version from header version field
197                     proposedVersion = proposeVersion(remoteVersion);
198                     
199                 } catch (IllegalArgumentException e) {
200                     errorHandler.handleException(e, getSessionContext());
201                     getConnectionAdapter().disconnect();
202                     return;
203                 }
204                 
205                 // sent version is equal to remote --> version is negotiated
206                 if (proposedVersion == remoteVersion) {
207                     LOG.debug("sending helloReply as version in header is supported: {}", proposedVersion);
208                     sendHelloReply(proposedVersion, ++xid);
209                     postHandshake(proposedVersion, ++xid);
210                     
211                 } else if (isBitmapNegotiationEnable && null != elements && 0 != elements.size()) {
212                     try {
213                         // hello contains version bitmap, checking highest common
214                         // version in bitmap
215                         proposedVersion = proposeBitmapVersion(elements);
216                     } catch (IllegalArgumentException ex) {
217                         errorHandler.handleException(ex, getSessionContext());
218                         getConnectionAdapter().disconnect();
219                         return;
220                     }
221                     LOG.debug("sending helloReply for common bitmap version : {}", proposedVersion);
222                     sendHelloReply(proposedVersion, ++xid);
223                     postHandshake(proposedVersion, ++xid);
224                 } else {
225                     if (isFirstHelloNegotiation) {
226                         isFirstHelloNegotiation = false;
227                         LOG.debug("sending helloReply for lowest supported version : {}", proposedVersion);
228                         // send hello reply with lower version number supported
229                         sendHelloReply(proposedVersion, ++xid);
230                     } else {
231                         // terminate the connection.
232                         LOG.debug("Version negotiation failed. unsupported version : {}", remoteVersion);
233                         getConnectionAdapter().disconnect();
234                     }
235                 }
236             }
237         }).start();
238     }
239
240     /**
241      * send hello reply
242      * @param proposedVersion
243      * @param hello
244      */
245     protected void sendHelloReply(Short proposedVersion, Long xid)
246     {
247         HelloInput helloMsg = MessageFactory.createHelloInput(proposedVersion, xid);
248         RpcResult<Void> result;
249         try {
250             result = connectionAdapter.hello(helloMsg).get(getMaxTimeout(), getMaxTimeoutUnit());
251             smokeRpc(result);
252         } catch (Throwable e) {
253             errorHandler.handleException(e, getSessionContext());
254         }
255     }
256
257
258     /**
259      * @param futureResult
260      * @throws Throwable 
261      */
262     private static void smokeRpc(RpcResult<?> result) throws Throwable {
263         if (!result.isSuccessful()) {
264             Throwable firstCause = null;
265             StringBuffer sb = new StringBuffer();
266             for (RpcError error : result.getErrors()) {
267                 if (firstCause != null) {
268                     firstCause = error.getCause();
269                 }
270                 
271                 sb.append("rpcError:").append(error.getCause().getMessage()).append(";");
272             }
273             throw new Exception(sb.toString(), firstCause);
274         }
275     }
276
277     /**
278      * after handshake set features, register to session
279      * @param proposedVersion
280      * @param xId
281      */
282     protected void postHandshake(Short proposedVersion, Long xid) {
283         // set version
284         version = proposedVersion;
285         LOG.debug("version set: " + proposedVersion);
286         // request features
287         GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder();
288         featuresBuilder.setVersion(version).setXid(xid);
289         LOG.debug("sending feature request for version={} and xid={}", version, xid);
290         Future<RpcResult<GetFeaturesOutput>> featuresFuture = connectionAdapter
291                 .getFeatures(featuresBuilder.build());
292         LOG.debug("waiting for features");
293         try {
294             RpcResult<GetFeaturesOutput> rpcFeatures = 
295                     featuresFuture.get(getMaxTimeout(), getMaxTimeoutUnit());
296             smokeRpc(rpcFeatures);
297             
298             GetFeaturesOutput featureOutput =  rpcFeatures.getResult();
299             LOG.debug("obtained features: datapathId={}",
300                     featureOutput.getDatapathId());
301             LOG.debug("obtained features: auxiliaryId={}",
302                     featureOutput.getAuxiliaryId());
303             conductorState = CONDUCTOR_STATE.WORKING;
304
305             OFSessionUtil.registerSession(this,
306                     featureOutput, version);
307             LOG.info("handshake SETTLED: datapathId={}, auxiliaryId={}", featureOutput.getDatapathId(), featureOutput.getAuxiliaryId());
308         } catch (Throwable e) {
309             //handshake failed
310             LOG.error("issuing disconnect during handshake, reason: "+e.getMessage());
311             errorHandler.handleException(e, getSessionContext());
312             disconnect();
313         }
314     }
315
316     /**
317      * @return rpc-response timeout in [ms]
318      */
319     protected long getMaxTimeout() {
320         // TODO:: get from configuration
321         return 2000;
322     }
323     
324     /**
325      * @return milliseconds
326      */
327     protected TimeUnit getMaxTimeoutUnit() {
328         // TODO:: get from configuration
329         return TimeUnit.MILLISECONDS;
330     }
331
332     @Override
333     public void onMultipartReplyMessage(MultipartReplyMessage message) {
334         queueKeeper.push(MultipartReplyMessage.class, message, this);
335     }
336
337     @Override
338     public void onMultipartRequestMessage(MultipartRequestMessage message) {
339         queueKeeper.push(MultipartRequestMessage.class, message, this);
340     }
341
342     @Override
343     public void onPacketInMessage(PacketInMessage message) {
344         queueKeeper.push(PacketInMessage.class, message, this);
345     }
346
347     @Override
348     public void onPortStatusMessage(PortStatusMessage message) {
349         this.getSessionContext().processPortStatusMsg(message);
350         queueKeeper.push(PortStatusMessage.class, message, this);
351     }
352
353     @Override
354     public void onSwitchIdleEvent(SwitchIdleEvent notification) {
355         new Thread(new Runnable() {
356             @Override
357             public void run() {
358                 if (!CONDUCTOR_STATE.WORKING.equals(getConductorState())) {
359                     // idle state in any other conductorState than WORKING means real
360                     // problem and wont be handled by echoReply, but disconnection
361                     disconnect();
362                     OFSessionUtil.getSessionManager().invalidateOnDisconnect(ConnectionConductorImpl.this);
363                 } else {
364                     LOG.debug("first idle state occured");
365                     EchoInputBuilder builder = new EchoInputBuilder();
366                     builder.setVersion(getVersion());
367                     builder.setXid(getSessionContext().getNextXid());
368                     
369                     Future<RpcResult<EchoOutput>> echoReplyFuture = getConnectionAdapter()
370                             .echo(builder.build());
371                     
372                     try {
373                         RpcResult<EchoOutput> echoReplyValue = echoReplyFuture.get(getMaxTimeout(),
374                                 getMaxTimeoutUnit());
375                         if (echoReplyValue.isSuccessful()) {
376                             setConductorState(CONDUCTOR_STATE.WORKING);
377                         } else {
378                             for (RpcError replyError : echoReplyValue.getErrors()) {
379                                 Throwable cause = replyError.getCause();
380                                 LOG.error(
381                                         "while receiving echoReply in TIMEOUTING state: "
382                                                 + cause.getMessage(), cause);
383                             }
384                             //switch issue occurred
385                             throw new Exception("switch issue occurred");
386                         }
387                     } catch (Exception e) {
388                         LOG.error("while waiting for echoReply in TIMEOUTING state: "
389                                 + e.getMessage(), e);
390                         //switch is not responding
391                         disconnect();
392                         OFSessionUtil.getSessionManager().invalidateOnDisconnect(ConnectionConductorImpl.this);
393                     }
394                 }
395             }
396             
397         }).start();
398     }
399
400     /**
401      * @param conductorState
402      *            the connectionState to set
403      */
404     @Override
405     public void setConductorState(CONDUCTOR_STATE conductorState) {
406         this.conductorState = conductorState;
407     }
408
409     @Override
410     public CONDUCTOR_STATE getConductorState() {
411         return conductorState;
412     }
413
414     /**
415      * @param handshaking
416      */
417     protected void checkState(CONDUCTOR_STATE expectedState) {
418         if (!conductorState.equals(expectedState)) {
419             throw new IllegalStateException("Expected state: " + expectedState
420                     + ", actual state:" + conductorState);
421         }
422     }
423
424     @Override
425     public void onDisconnectEvent(DisconnectEvent arg0) {
426         SessionManager sessionManager = OFSessionUtil.getSessionManager();
427         sessionManager.invalidateOnDisconnect(this);
428     }
429
430     /**
431      * find supported version based on remoteVersion
432      * @param remoteVersion
433      * @return
434      */
435     protected short proposeVersion(short remoteVersion) {
436         Short proposal = null;
437         for (short offer : ConnectionConductor.versionOrder) {
438             if (offer <= remoteVersion) {
439                 proposal = offer;
440                 break;
441             }
442         }
443         if (proposal == null) {
444             throw new IllegalArgumentException("unsupported version: "
445                     + remoteVersion);
446         }
447         return proposal;
448     }
449
450     /**
451      * find common highest supported bitmap version
452      * @param list
453      * @return
454      */
455     protected Short proposeBitmapVersion(List<Elements> list)
456     {
457         Short supportedHighestVersion = null;
458         if((null != list) && (0 != list.size()))
459         {
460            for(Elements element : list)
461            {
462               List<Boolean> bitmap = element.getVersionBitmap();
463               // check for version bitmap
464               for(short bitPos : ConnectionConductor.versionOrder)
465               {
466                   // with all the version it should work.
467                   if(bitmap.get(bitPos % Integer.SIZE))
468                   {
469                       supportedHighestVersion = bitPos;
470                       break;
471                   }
472               }
473            }
474            if(null == supportedHighestVersion)
475             {
476                 throw new IllegalArgumentException("unsupported bitmap version.");
477             }
478
479         }
480
481         return supportedHighestVersion;
482     }
483
484     @Override
485     public Short getVersion() {
486         return version;
487     }
488
489     @Override
490     public Future<Boolean> disconnect() {
491         LOG.info("disconnecting: sessionCtx="+sessionContext+"|auxId="+auxiliaryKey);
492         
493         Future<Boolean> result = null;
494         if (connectionAdapter.isAlive()) {
495             result = connectionAdapter.disconnect();
496         } else {
497             LOG.debug("connection already disconnected");
498             result = Futures.immediateFuture(true);
499         }
500         
501         return result; 
502     }
503
504     @Override
505     public void setConnectionCookie(SwitchConnectionDistinguisher auxiliaryKey) {
506         this.auxiliaryKey = auxiliaryKey;
507     }
508
509     @Override
510     public void setSessionContext(SessionContext sessionContext) {
511         this.sessionContext = sessionContext;
512     }
513
514     @Override
515     public SwitchConnectionDistinguisher getAuxiliaryKey() {
516         return auxiliaryKey;
517     }
518
519     @Override
520     public SessionContext getSessionContext() {
521         return sessionContext;
522     }
523     
524     @Override
525     public ConnectionAdapter getConnectionAdapter() {
526         return connectionAdapter;
527     }
528
529     @Override
530     public void onConnectionReady() {
531         LOG.debug("connection is ready-to-use");
532         new Thread(new Runnable() {
533             @Override
534             public void run() {
535                 sendFirstHelloMessage();
536             }
537         }).start();
538     }
539     
540 }