sending hello upon connection established
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / ConnectionConductorImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.openflow.md.core;
10
11 import java.util.Arrays;
12 import java.util.Collection;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.Future;
17 import java.util.concurrent.LinkedBlockingQueue;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.TimeoutException;
20
21 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
22 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
23 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
24 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
25 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionManager;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInputBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestMessage;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
46 import org.opendaylight.yangtools.yang.binding.DataObject;
47 import org.opendaylight.yangtools.yang.common.RpcError;
48 import org.opendaylight.yangtools.yang.common.RpcResult;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 import com.google.common.util.concurrent.Futures;
53
54 /**
55  * @author mirehak
56  */
57 public class ConnectionConductorImpl implements OpenflowProtocolListener,
58         SystemNotificationsListener, ConnectionConductor, ConnectionReadyListener {
59
60     protected static final Logger LOG = LoggerFactory
61             .getLogger(ConnectionConductorImpl.class);
62
63     /* variable to make BitMap-based negotiation enabled / disabled.
64      * it will help while testing and isolating issues related to processing of
65      * BitMaps from switches.
66      */
67     private static final boolean isBitmapNegotiationEnable = true;
68     private LinkedBlockingQueue<Exception> errorQueue = new LinkedBlockingQueue<>();
69
70     protected final ConnectionAdapter connectionAdapter;
71     private ConnectionConductor.CONDUCTOR_STATE conductorState;
72     private Short version;
73
74     private SwitchConnectionDistinguisher auxiliaryKey;
75
76     private SessionContext sessionContext;
77
78     private Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping;
79
80     protected boolean isFirstHelloNegotiation = true;
81
82
83
84     /**
85      * @param connectionAdapter
86      */
87     public ConnectionConductorImpl(ConnectionAdapter connectionAdapter) {
88         this.connectionAdapter = connectionAdapter;
89         conductorState = CONDUCTOR_STATE.HANDSHAKING;
90         new Thread(new ErrorQueueHandler(errorQueue)).start();
91     }
92
93     @Override
94     public void init() {
95         connectionAdapter.setMessageListener(this);
96         connectionAdapter.setSystemListener(this);
97         connectionAdapter.setConnectionReadyListener(this);
98     }
99
100
101     /**
102      * send first hello message to switch
103      */
104     protected void sendFirstHelloMessage() {
105         Short highestVersion = ConnectionConductor.versionOrder.get(0);
106         Long helloXid = 21L;
107         HelloInput helloInput = null;
108         
109         if (isBitmapNegotiationEnable) {
110             helloInput = MessageFactory.createHelloInput(highestVersion, helloXid, ConnectionConductor.versionOrder);
111             LOG.debug("sending first hello message: vertsion header={} , version bitmap={}", 
112                     highestVersion, helloInput.getElements());
113         } else {
114             helloInput = MessageFactory.createHelloInput(highestVersion, helloXid);
115             LOG.debug("sending first hello message: version header={} ", highestVersion);
116         }
117         
118         try {
119             RpcResult<Void> helloResult = connectionAdapter.hello(helloInput).get(getMaxTimeout(), getMaxTimeoutUnit());
120             smokeRpc(helloResult);
121             LOG.debug("FIRST HELLO sent.");
122         } catch (Throwable e) {
123             LOG.debug("FIRST HELLO sending failed.");
124             handleException(e);
125         }
126     }
127
128     @Override
129     public void onEchoRequestMessage(final EchoRequestMessage echoRequestMessage) {
130         new Thread(new Runnable() {
131             @Override
132             public void run() {
133                 LOG.debug("echo request received: " + echoRequestMessage.getXid());
134                 EchoReplyInputBuilder builder = new EchoReplyInputBuilder();
135                 builder.setVersion(echoRequestMessage.getVersion());
136                 builder.setXid(echoRequestMessage.getXid());
137                 builder.setData(echoRequestMessage.getData());
138                 
139                 connectionAdapter.echoReply(builder.build());
140             }
141         }).start();            
142     }
143
144     @Override
145     public void onErrorMessage(ErrorMessage errorMessage) {
146         // TODO Auto-generated method stub
147         LOG.debug("error received, type: " + errorMessage.getType()
148                 + "; code: " + errorMessage.getCode());
149     }
150
151     @Override
152     public void onExperimenterMessage(ExperimenterMessage experimenterMessage) {
153         LOG.debug("experimenter received, type: "
154                 + experimenterMessage.getExpType());
155         notifyListeners(ExperimenterMessage.class, experimenterMessage);
156     }
157
158     @Override
159     public void onFlowRemovedMessage(FlowRemovedMessage message) {
160         notifyListeners(FlowRemovedMessage.class, message);
161     }
162
163
164     /**
165      * version negotiation happened as per following steps:
166      * 1. If HelloMessage version field has same version, continue connection processing.
167      *    If HelloMessage version is lower than supported versions, just disconnect.
168      * 2. If HelloMessage contains bitmap and common version found in bitmap
169      *    then continue connection processing. if no common version found, just disconnect.
170      * 3. If HelloMessage version is not supported, send HelloMessage with lower supported version.
171      *    If Hello message received again with not supported version, just disconnect.
172      *
173      *   TODO: Better to handle handshake into a maintainable innerclass which uses State-Pattern.
174      */
175     @Override
176     public void onHelloMessage(final HelloMessage hello) {
177         // do handshake
178
179         new Thread(new Runnable() {
180             @Override
181             public void run() {
182                 LOG.info("handshake STARTED");
183                 checkState(CONDUCTOR_STATE.HANDSHAKING);
184                 
185                 Short remoteVersion = hello.getVersion();
186                 List<Elements> elements = hello.getElements();
187                 Long xid = hello.getXid();
188                 Short proposedVersion;
189                 LOG.debug("Hello message version={} and bitmap={}", remoteVersion, elements);
190                 try {
191                     // find the version from header version field
192                     proposedVersion = proposeVersion(remoteVersion);
193                     
194                 } catch (IllegalArgumentException e) {
195                     handleException(e);
196                     connectionAdapter.disconnect();
197                     return;
198                 }
199                 
200                 // sent version is equal to remote --> version is negotiated
201                 if (proposedVersion == remoteVersion) {
202                     LOG.debug("sending helloReply as version in header is supported: {}", proposedVersion);
203                     sendHelloReply(proposedVersion, ++xid);
204                     postHandshake(proposedVersion, ++xid);
205                     
206                 } else if (isBitmapNegotiationEnable && null != elements && 0 != elements.size()) {
207                     try {
208                         // hello contains version bitmap, checking highest common
209                         // version in bitmap
210                         proposedVersion = proposeBitmapVersion(elements);
211                     } catch (IllegalArgumentException ex) {
212                         handleException(ex);
213                         connectionAdapter.disconnect();
214                         return;
215                     }
216                     LOG.debug("sending helloReply for common bitmap version : {}", proposedVersion);
217                     sendHelloReply(proposedVersion, ++xid);
218                     postHandshake(proposedVersion, ++xid);
219                 } else {
220                     if (isFirstHelloNegotiation) {
221                         isFirstHelloNegotiation = false;
222                         LOG.debug("sending helloReply for lowest supported version : {}", proposedVersion);
223                         // send hello reply with lower version number supported
224                         sendHelloReply(proposedVersion, ++xid);
225                     } else {
226                         // terminate the connection.
227                         LOG.debug("Version negotiation failed. unsupported version : {}", remoteVersion);
228                         connectionAdapter.disconnect();
229                     }
230                 }
231             }
232         }).start();
233     }
234
235     /**
236      * send hello reply
237      * @param proposedVersion
238      * @param hello
239      */
240     protected void sendHelloReply(Short proposedVersion, Long xid)
241     {
242         HelloInput helloMsg = MessageFactory.createHelloInput(proposedVersion, xid);
243         RpcResult<Void> result;
244         try {
245             result = connectionAdapter.hello(helloMsg).get(getMaxTimeout(), getMaxTimeoutUnit());
246             smokeRpc(result);
247         } catch (Throwable e) {
248             handleException(e);
249         }
250     }
251
252
253     /**
254      * @param futureResult
255      * @throws Throwable 
256      */
257     private static void smokeRpc(RpcResult<?> result) throws Throwable {
258         if (!result.isSuccessful()) {
259             Throwable firstCause = null;
260             StringBuffer sb = new StringBuffer();
261             for (RpcError error : result.getErrors()) {
262                 if (firstCause != null) {
263                     firstCause = error.getCause();
264                 }
265                 
266                 sb.append("rpcError:").append(error.getCause().getMessage()).append(";");
267             }
268             throw new Exception(sb.toString(), firstCause);
269         }
270     }
271
272     /**
273      * after handshake set features, register to session
274      * @param proposedVersion
275      * @param xId
276      */
277     protected void postHandshake(Short proposedVersion, Long xid) {
278         // set version
279         version = proposedVersion;
280         LOG.debug("version set: " + proposedVersion);
281         // request features
282         GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder();
283         featuresBuilder.setVersion(version).setXid(xid);
284         LOG.debug("sending feature request for version={} and xid={}", version, xid);
285         Future<RpcResult<GetFeaturesOutput>> featuresFuture = connectionAdapter
286                 .getFeatures(featuresBuilder.build());
287         LOG.debug("waiting for features");
288         try {
289             RpcResult<GetFeaturesOutput> rpcFeatures = 
290                     featuresFuture.get(getMaxTimeout(), getMaxTimeoutUnit());
291             smokeRpc(rpcFeatures);
292             
293             GetFeaturesOutput featureOutput =  rpcFeatures.getResult();
294             LOG.debug("obtained features: datapathId={}",
295                     featureOutput.getDatapathId());
296             LOG.debug("obtained features: auxiliaryId={}",
297                     featureOutput.getAuxiliaryId());
298             conductorState = CONDUCTOR_STATE.WORKING;
299
300             OFSessionUtil.registerSession(this,
301                     featureOutput, version);
302             this.setListenerMapping(OFSessionUtil.getListenersMap());
303             LOG.info("handshake SETTLED: datapathId={}, auxiliaryId={}", featureOutput.getDatapathId(), featureOutput.getAuxiliaryId());
304         } catch (Throwable e) {
305             //handshake failed
306             LOG.error("issuing disconnect during handshake, reason: "+e.getMessage());
307             handleException(e);
308             disconnect();
309         }
310     }
311
312     /**
313      * @return rpc-response timeout in [ms]
314      */
315     private long getMaxTimeout() {
316         // TODO:: get from configuration
317         return 2000;
318     }
319     
320     /**
321      * @return milliseconds
322      */
323     private TimeUnit getMaxTimeoutUnit() {
324         // TODO:: get from configuration
325         return TimeUnit.MILLISECONDS;
326     }
327
328
329     /**
330      * @param e
331      */
332     protected void handleException(Throwable e) {
333         String sessionKeyId = null;
334         if (getSessionContext() != null) {
335             sessionKeyId = Arrays.toString(getSessionContext().getSessionKey().getId());
336         }
337         
338         Exception causeAndThread = new Exception(
339                 "IN THREAD: "+Thread.currentThread().getName() +
340                 "; session:"+sessionKeyId, e);
341         try {
342             errorQueue.put(causeAndThread);
343         } catch (InterruptedException e1) {
344             LOG.error(e1.getMessage(), e1);
345         }
346     }
347
348     @Override
349     public void onMultipartReplyMessage(MultipartReplyMessage arg0) {
350         // TODO Auto-generated method stub
351     }
352
353     @Override
354     public void onMultipartRequestMessage(MultipartRequestMessage arg0) {
355         // TODO Auto-generated method stub
356     }
357
358     @Override
359     public void onPacketInMessage(PacketInMessage message) {
360         notifyListeners(PacketInMessage.class, message);
361     }
362
363     @Override
364     public void onPortStatusMessage(PortStatusMessage message) {
365         this.getSessionContext().processPortStatusMsg(message);
366         notifyListeners(PortStatusMessage.class, message);
367     }
368
369     @Override
370     public void onSwitchIdleEvent(SwitchIdleEvent notification) {
371         if (!CONDUCTOR_STATE.WORKING.equals(conductorState)) {
372             // idle state in any other conductorState than WORKING means real
373             // problem and wont be handled by echoReply, but disconnection
374             disconnect();
375             OFSessionUtil.getSessionManager().invalidateOnDisconnect(this);
376         } else {
377             LOG.debug("first idle state occured");
378             EchoInputBuilder builder = new EchoInputBuilder();
379             builder.setVersion(version);
380             // TODO: get xid from sessionContext
381             builder.setXid(42L);
382
383             Future<RpcResult<EchoOutput>> echoReplyFuture = connectionAdapter
384                     .echo(builder.build());
385
386             try {
387                 // TODO: read timeout from config
388                 RpcResult<EchoOutput> echoReplyValue = echoReplyFuture.get(getMaxTimeout(),
389                         getMaxTimeoutUnit());
390                 if (echoReplyValue.isSuccessful()) {
391                     conductorState = CONDUCTOR_STATE.WORKING;
392                 } else {
393                     for (RpcError replyError : echoReplyValue.getErrors()) {
394                         Throwable cause = replyError.getCause();
395                         LOG.error(
396                                 "while receiving echoReply in TIMEOUTING state: "
397                                         + cause.getMessage(), cause);
398                     }
399                     //switch issue occurred
400                     throw new Exception("switch issue occurred");
401                 }
402             } catch (Exception e) {
403                 LOG.error("while waiting for echoReply in TIMEOUTING state: "
404                         + e.getMessage(), e);
405                 //switch is not responding
406                 disconnect();
407                 OFSessionUtil.getSessionManager().invalidateOnDisconnect(this);
408             }
409         }
410     }
411
412     /**
413      * @param conductorState
414      *            the connectionState to set
415      */
416     @Override
417     public void setConductorState(CONDUCTOR_STATE conductorState) {
418         this.conductorState = conductorState;
419     }
420
421     @Override
422     public CONDUCTOR_STATE getConductorState() {
423         return conductorState;
424     }
425
426     /**
427      * @param handshaking
428      */
429     protected void checkState(CONDUCTOR_STATE expectedState) {
430         if (!conductorState.equals(expectedState)) {
431             throw new IllegalStateException("Expected state: " + expectedState
432                     + ", actual state:" + conductorState);
433         }
434     }
435
436     @Override
437     public void onDisconnectEvent(DisconnectEvent arg0) {
438         SessionManager sessionManager = OFSessionUtil.getSessionManager();
439         sessionManager.invalidateOnDisconnect(this);
440     }
441
442     /**
443      * find supported version based on remoteVersion
444      * @param remoteVersion
445      * @return
446      */
447     protected short proposeVersion(short remoteVersion) {
448         Short proposal = null;
449         for (short offer : ConnectionConductor.versionOrder) {
450             if (offer <= remoteVersion) {
451                 proposal = offer;
452                 break;
453             }
454         }
455         if (proposal == null) {
456             throw new IllegalArgumentException("unsupported version: "
457                     + remoteVersion);
458         }
459         return proposal;
460     }
461
462     /**
463      * find common highest supported bitmap version
464      * @param list
465      * @return
466      */
467     protected Short proposeBitmapVersion(List<Elements> list)
468     {
469         Short supportedHighestVersion = null;
470         if((null != list) && (0 != list.size()))
471         {
472            for(Elements element : list)
473            {
474               List<Boolean> bitmap = element.getVersionBitmap();
475               // check for version bitmap
476               for(short bitPos : ConnectionConductor.versionOrder)
477               {
478                   // with all the version it should work.
479                   if(bitmap.get(bitPos % Integer.SIZE))
480                   {
481                       supportedHighestVersion = bitPos;
482                       break;
483                   }
484               }
485            }
486            if(null == supportedHighestVersion)
487             {
488                 throw new IllegalArgumentException("unsupported bitmap version.");
489             }
490
491         }
492
493         return supportedHighestVersion;
494     }
495
496     @Override
497     public Short getVersion() {
498         return version;
499     }
500
501     @Override
502     public Future<Boolean> disconnect() {
503         LOG.info("disconnecting: sessionCtx="+sessionContext+"|auxId="+auxiliaryKey);
504         
505         Future<Boolean> result = null;
506         if (connectionAdapter.isAlive()) {
507             result = connectionAdapter.disconnect();
508         } else {
509             LOG.debug("connection already disconnected");
510             result = Futures.immediateFuture(true);
511         }
512         
513         return result; 
514     }
515
516     @Override
517     public void setConnectionCookie(SwitchConnectionDistinguisher auxiliaryKey) {
518         this.auxiliaryKey = auxiliaryKey;
519     }
520
521     @Override
522     public void setSessionContext(SessionContext sessionContext) {
523         this.sessionContext = sessionContext;
524     }
525
526     @Override
527     public SwitchConnectionDistinguisher getAuxiliaryKey() {
528         return auxiliaryKey;
529     }
530
531     @Override
532     public SessionContext getSessionContext() {
533         return sessionContext;
534     }
535     
536     /**
537      * @param listenerMapping the listenerMapping to set
538      */
539     public void setListenerMapping(
540             Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping) {
541         this.listenerMapping = listenerMapping;
542     }
543
544     /**
545      * @param messageType
546      * @param message
547      */
548     private void notifyListeners(Class<? extends DataObject> messageType, DataObject message) {
549         Collection<IMDMessageListener> listeners = listenerMapping.get(messageType);
550         if (listeners != null) {
551                 for (IMDMessageListener listener : listeners) {
552                     // Pass cookie only for PACKT_IN
553                     if ( messageType.equals("PacketInMessage.class")){
554                         listener.receive(this.getAuxiliaryKey(), this.getSessionContext(), message);
555                     } else {
556                         listener.receive(null, this.getSessionContext(), message);
557                     }
558                 }
559         } else {
560             LOG.warn("No listeners for this message Type {}", messageType);
561         }
562     }
563
564     @Override
565     public ConnectionAdapter getConnectionAdapter() {
566         return connectionAdapter;
567     }
568
569     @Override
570     public void onConnectionReady() {
571         LOG.debug("connection is ready-to-use");
572         //TODO: fire first helloMessage
573         new Thread(new Runnable() {
574             @Override
575             public void run() {
576                 sendFirstHelloMessage();
577             }
578         }).start();
579     }
580     
581 }