proposal of queueKeeper and default implementation preserving message order
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / ConnectionConductorImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.openflow.md.core;
10
11 import java.util.Arrays;
12 import java.util.Collection;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.Future;
17 import java.util.concurrent.LinkedBlockingQueue;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.TimeoutException;
20
21 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
22 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
23 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
24 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
25 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionManager;
26 import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper;
27 import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperLightImpl;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInputBuilder;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestMessage;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
48 import org.opendaylight.yangtools.yang.binding.DataObject;
49 import org.opendaylight.yangtools.yang.common.RpcError;
50 import org.opendaylight.yangtools.yang.common.RpcResult;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 import com.google.common.util.concurrent.Futures;
55
56 /**
57  * @author mirehak
58  */
59 public class ConnectionConductorImpl implements OpenflowProtocolListener,
60         SystemNotificationsListener, ConnectionConductor, ConnectionReadyListener {
61
62     protected static final Logger LOG = LoggerFactory
63             .getLogger(ConnectionConductorImpl.class);
64
65     /* variable to make BitMap-based negotiation enabled / disabled.
66      * it will help while testing and isolating issues related to processing of
67      * BitMaps from switches.
68      */
69     private static final boolean isBitmapNegotiationEnable = true;
70     private LinkedBlockingQueue<Exception> errorQueue = new LinkedBlockingQueue<>();
71
72     protected final ConnectionAdapter connectionAdapter;
73     private ConnectionConductor.CONDUCTOR_STATE conductorState;
74     private Short version;
75
76     private SwitchConnectionDistinguisher auxiliaryKey;
77
78     private SessionContext sessionContext;
79
80     private Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping;
81
82     protected boolean isFirstHelloNegotiation = true;
83
84     // TODO: use appropriate interface instead of Object
85     private QueueKeeper<Object> queueKeeper;
86
87
88
89     /**
90      * @param connectionAdapter
91      */
92     public ConnectionConductorImpl(ConnectionAdapter connectionAdapter) {
93         this.connectionAdapter = connectionAdapter;
94         conductorState = CONDUCTOR_STATE.HANDSHAKING;
95         new Thread(new ErrorQueueHandler(errorQueue)).start();
96     }
97
98     @Override
99     public void init() {
100         connectionAdapter.setMessageListener(this);
101         connectionAdapter.setSystemListener(this);
102         connectionAdapter.setConnectionReadyListener(this);
103     }
104     
105     /**
106      * @param queueKeeper the queueKeeper to set
107      */
108     @Override
109     public void setQueueKeeper(QueueKeeper<Object> queueKeeper) {
110         this.queueKeeper = queueKeeper;
111     }
112
113
114     /**
115      * send first hello message to switch
116      */
117     protected void sendFirstHelloMessage() {
118         Short highestVersion = ConnectionConductor.versionOrder.get(0);
119         Long helloXid = 21L;
120         HelloInput helloInput = null;
121         
122         if (isBitmapNegotiationEnable) {
123             helloInput = MessageFactory.createHelloInput(highestVersion, helloXid, ConnectionConductor.versionOrder);
124             LOG.debug("sending first hello message: vertsion header={} , version bitmap={}", 
125                     highestVersion, helloInput.getElements());
126         } else {
127             helloInput = MessageFactory.createHelloInput(highestVersion, helloXid);
128             LOG.debug("sending first hello message: version header={} ", highestVersion);
129         }
130         
131         try {
132             RpcResult<Void> helloResult = connectionAdapter.hello(helloInput).get(getMaxTimeout(), getMaxTimeoutUnit());
133             smokeRpc(helloResult);
134             LOG.debug("FIRST HELLO sent.");
135         } catch (Throwable e) {
136             LOG.debug("FIRST HELLO sending failed.");
137             handleException(e);
138         }
139     }
140
141     @Override
142     public void onEchoRequestMessage(final EchoRequestMessage echoRequestMessage) {
143         new Thread(new Runnable() {
144             @Override
145             public void run() {
146                 LOG.debug("echo request received: " + echoRequestMessage.getXid());
147                 EchoReplyInputBuilder builder = new EchoReplyInputBuilder();
148                 builder.setVersion(echoRequestMessage.getVersion());
149                 builder.setXid(echoRequestMessage.getXid());
150                 builder.setData(echoRequestMessage.getData());
151                 
152                 connectionAdapter.echoReply(builder.build());
153             }
154         }).start();            
155     }
156
157     @Override
158     public void onErrorMessage(ErrorMessage errorMessage) {
159         // TODO Auto-generated method stub
160         LOG.debug("error received, type: " + errorMessage.getType()
161                 + "; code: " + errorMessage.getCode());
162     }
163
164     @Override
165     public void onExperimenterMessage(ExperimenterMessage experimenterMessage) {
166         queueKeeper.push(ExperimenterMessage.class, experimenterMessage, this);
167 //        notifyListeners(ExperimenterMessage.class, experimenterMessage);
168     }
169
170     @Override
171     public void onFlowRemovedMessage(FlowRemovedMessage message) {
172         notifyListeners(FlowRemovedMessage.class, message);
173     }
174
175
176     /**
177      * version negotiation happened as per following steps:
178      * 1. If HelloMessage version field has same version, continue connection processing.
179      *    If HelloMessage version is lower than supported versions, just disconnect.
180      * 2. If HelloMessage contains bitmap and common version found in bitmap
181      *    then continue connection processing. if no common version found, just disconnect.
182      * 3. If HelloMessage version is not supported, send HelloMessage with lower supported version.
183      *    If Hello message received again with not supported version, just disconnect.
184      *
185      *   TODO: Better to handle handshake into a maintainable innerclass which uses State-Pattern.
186      */
187     @Override
188     public void onHelloMessage(final HelloMessage hello) {
189         // do handshake
190
191         new Thread(new Runnable() {
192             @Override
193             public void run() {
194                 LOG.info("handshake STARTED");
195                 checkState(CONDUCTOR_STATE.HANDSHAKING);
196                 
197                 Short remoteVersion = hello.getVersion();
198                 List<Elements> elements = hello.getElements();
199                 Long xid = hello.getXid();
200                 Short proposedVersion;
201                 LOG.debug("Hello message version={} and bitmap={}", remoteVersion, elements);
202                 try {
203                     // find the version from header version field
204                     proposedVersion = proposeVersion(remoteVersion);
205                     
206                 } catch (IllegalArgumentException e) {
207                     handleException(e);
208                     connectionAdapter.disconnect();
209                     return;
210                 }
211                 
212                 // sent version is equal to remote --> version is negotiated
213                 if (proposedVersion == remoteVersion) {
214                     LOG.debug("sending helloReply as version in header is supported: {}", proposedVersion);
215                     sendHelloReply(proposedVersion, ++xid);
216                     postHandshake(proposedVersion, ++xid);
217                     
218                 } else if (isBitmapNegotiationEnable && null != elements && 0 != elements.size()) {
219                     try {
220                         // hello contains version bitmap, checking highest common
221                         // version in bitmap
222                         proposedVersion = proposeBitmapVersion(elements);
223                     } catch (IllegalArgumentException ex) {
224                         handleException(ex);
225                         connectionAdapter.disconnect();
226                         return;
227                     }
228                     LOG.debug("sending helloReply for common bitmap version : {}", proposedVersion);
229                     sendHelloReply(proposedVersion, ++xid);
230                     postHandshake(proposedVersion, ++xid);
231                 } else {
232                     if (isFirstHelloNegotiation) {
233                         isFirstHelloNegotiation = false;
234                         LOG.debug("sending helloReply for lowest supported version : {}", proposedVersion);
235                         // send hello reply with lower version number supported
236                         sendHelloReply(proposedVersion, ++xid);
237                     } else {
238                         // terminate the connection.
239                         LOG.debug("Version negotiation failed. unsupported version : {}", remoteVersion);
240                         connectionAdapter.disconnect();
241                     }
242                 }
243             }
244         }).start();
245     }
246
247     /**
248      * send hello reply
249      * @param proposedVersion
250      * @param hello
251      */
252     protected void sendHelloReply(Short proposedVersion, Long xid)
253     {
254         HelloInput helloMsg = MessageFactory.createHelloInput(proposedVersion, xid);
255         RpcResult<Void> result;
256         try {
257             result = connectionAdapter.hello(helloMsg).get(getMaxTimeout(), getMaxTimeoutUnit());
258             smokeRpc(result);
259         } catch (Throwable e) {
260             handleException(e);
261         }
262     }
263
264
265     /**
266      * @param futureResult
267      * @throws Throwable 
268      */
269     private static void smokeRpc(RpcResult<?> result) throws Throwable {
270         if (!result.isSuccessful()) {
271             Throwable firstCause = null;
272             StringBuffer sb = new StringBuffer();
273             for (RpcError error : result.getErrors()) {
274                 if (firstCause != null) {
275                     firstCause = error.getCause();
276                 }
277                 
278                 sb.append("rpcError:").append(error.getCause().getMessage()).append(";");
279             }
280             throw new Exception(sb.toString(), firstCause);
281         }
282     }
283
284     /**
285      * after handshake set features, register to session
286      * @param proposedVersion
287      * @param xId
288      */
289     protected void postHandshake(Short proposedVersion, Long xid) {
290         // set version
291         version = proposedVersion;
292         LOG.debug("version set: " + proposedVersion);
293         // request features
294         GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder();
295         featuresBuilder.setVersion(version).setXid(xid);
296         LOG.debug("sending feature request for version={} and xid={}", version, xid);
297         Future<RpcResult<GetFeaturesOutput>> featuresFuture = connectionAdapter
298                 .getFeatures(featuresBuilder.build());
299         LOG.debug("waiting for features");
300         try {
301             RpcResult<GetFeaturesOutput> rpcFeatures = 
302                     featuresFuture.get(getMaxTimeout(), getMaxTimeoutUnit());
303             smokeRpc(rpcFeatures);
304             
305             GetFeaturesOutput featureOutput =  rpcFeatures.getResult();
306             LOG.debug("obtained features: datapathId={}",
307                     featureOutput.getDatapathId());
308             LOG.debug("obtained features: auxiliaryId={}",
309                     featureOutput.getAuxiliaryId());
310             conductorState = CONDUCTOR_STATE.WORKING;
311
312             OFSessionUtil.registerSession(this,
313                     featureOutput, version);
314             this.setListenerMapping(OFSessionUtil.getListenersMap());
315             LOG.info("handshake SETTLED: datapathId={}, auxiliaryId={}", featureOutput.getDatapathId(), featureOutput.getAuxiliaryId());
316         } catch (Throwable e) {
317             //handshake failed
318             LOG.error("issuing disconnect during handshake, reason: "+e.getMessage());
319             handleException(e);
320             disconnect();
321         }
322     }
323
324     /**
325      * @return rpc-response timeout in [ms]
326      */
327     private long getMaxTimeout() {
328         // TODO:: get from configuration
329         return 2000;
330     }
331     
332     /**
333      * @return milliseconds
334      */
335     private TimeUnit getMaxTimeoutUnit() {
336         // TODO:: get from configuration
337         return TimeUnit.MILLISECONDS;
338     }
339
340
341     /**
342      * @param e
343      */
344     protected void handleException(Throwable e) {
345         String sessionKeyId = null;
346         if (getSessionContext() != null) {
347             sessionKeyId = Arrays.toString(getSessionContext().getSessionKey().getId());
348         }
349         
350         Exception causeAndThread = new Exception(
351                 "IN THREAD: "+Thread.currentThread().getName() +
352                 "; session:"+sessionKeyId, e);
353         try {
354             errorQueue.put(causeAndThread);
355         } catch (InterruptedException e1) {
356             LOG.error(e1.getMessage(), e1);
357         }
358     }
359
360     @Override
361     public void onMultipartReplyMessage(MultipartReplyMessage arg0) {
362         // TODO Auto-generated method stub
363     }
364
365     @Override
366     public void onMultipartRequestMessage(MultipartRequestMessage arg0) {
367         // TODO Auto-generated method stub
368     }
369
370     @Override
371     public void onPacketInMessage(PacketInMessage message) {
372         notifyListeners(PacketInMessage.class, message);
373     }
374
375     @Override
376     public void onPortStatusMessage(PortStatusMessage message) {
377         this.getSessionContext().processPortStatusMsg(message);
378         notifyListeners(PortStatusMessage.class, message);
379     }
380
381     @Override
382     public void onSwitchIdleEvent(SwitchIdleEvent notification) {
383         if (!CONDUCTOR_STATE.WORKING.equals(conductorState)) {
384             // idle state in any other conductorState than WORKING means real
385             // problem and wont be handled by echoReply, but disconnection
386             disconnect();
387             OFSessionUtil.getSessionManager().invalidateOnDisconnect(this);
388         } else {
389             LOG.debug("first idle state occured");
390             EchoInputBuilder builder = new EchoInputBuilder();
391             builder.setVersion(version);
392             // TODO: get xid from sessionContext
393             builder.setXid(42L);
394
395             Future<RpcResult<EchoOutput>> echoReplyFuture = connectionAdapter
396                     .echo(builder.build());
397
398             try {
399                 // TODO: read timeout from config
400                 RpcResult<EchoOutput> echoReplyValue = echoReplyFuture.get(getMaxTimeout(),
401                         getMaxTimeoutUnit());
402                 if (echoReplyValue.isSuccessful()) {
403                     conductorState = CONDUCTOR_STATE.WORKING;
404                 } else {
405                     for (RpcError replyError : echoReplyValue.getErrors()) {
406                         Throwable cause = replyError.getCause();
407                         LOG.error(
408                                 "while receiving echoReply in TIMEOUTING state: "
409                                         + cause.getMessage(), cause);
410                     }
411                     //switch issue occurred
412                     throw new Exception("switch issue occurred");
413                 }
414             } catch (Exception e) {
415                 LOG.error("while waiting for echoReply in TIMEOUTING state: "
416                         + e.getMessage(), e);
417                 //switch is not responding
418                 disconnect();
419                 OFSessionUtil.getSessionManager().invalidateOnDisconnect(this);
420             }
421         }
422     }
423
424     /**
425      * @param conductorState
426      *            the connectionState to set
427      */
428     @Override
429     public void setConductorState(CONDUCTOR_STATE conductorState) {
430         this.conductorState = conductorState;
431     }
432
433     @Override
434     public CONDUCTOR_STATE getConductorState() {
435         return conductorState;
436     }
437
438     /**
439      * @param handshaking
440      */
441     protected void checkState(CONDUCTOR_STATE expectedState) {
442         if (!conductorState.equals(expectedState)) {
443             throw new IllegalStateException("Expected state: " + expectedState
444                     + ", actual state:" + conductorState);
445         }
446     }
447
448     @Override
449     public void onDisconnectEvent(DisconnectEvent arg0) {
450         SessionManager sessionManager = OFSessionUtil.getSessionManager();
451         sessionManager.invalidateOnDisconnect(this);
452     }
453
454     /**
455      * find supported version based on remoteVersion
456      * @param remoteVersion
457      * @return
458      */
459     protected short proposeVersion(short remoteVersion) {
460         Short proposal = null;
461         for (short offer : ConnectionConductor.versionOrder) {
462             if (offer <= remoteVersion) {
463                 proposal = offer;
464                 break;
465             }
466         }
467         if (proposal == null) {
468             throw new IllegalArgumentException("unsupported version: "
469                     + remoteVersion);
470         }
471         return proposal;
472     }
473
474     /**
475      * find common highest supported bitmap version
476      * @param list
477      * @return
478      */
479     protected Short proposeBitmapVersion(List<Elements> list)
480     {
481         Short supportedHighestVersion = null;
482         if((null != list) && (0 != list.size()))
483         {
484            for(Elements element : list)
485            {
486               List<Boolean> bitmap = element.getVersionBitmap();
487               // check for version bitmap
488               for(short bitPos : ConnectionConductor.versionOrder)
489               {
490                   // with all the version it should work.
491                   if(bitmap.get(bitPos % Integer.SIZE))
492                   {
493                       supportedHighestVersion = bitPos;
494                       break;
495                   }
496               }
497            }
498            if(null == supportedHighestVersion)
499             {
500                 throw new IllegalArgumentException("unsupported bitmap version.");
501             }
502
503         }
504
505         return supportedHighestVersion;
506     }
507
508     @Override
509     public Short getVersion() {
510         return version;
511     }
512
513     @Override
514     public Future<Boolean> disconnect() {
515         LOG.info("disconnecting: sessionCtx="+sessionContext+"|auxId="+auxiliaryKey);
516         
517         Future<Boolean> result = null;
518         if (connectionAdapter.isAlive()) {
519             result = connectionAdapter.disconnect();
520         } else {
521             LOG.debug("connection already disconnected");
522             result = Futures.immediateFuture(true);
523         }
524         
525         return result; 
526     }
527
528     @Override
529     public void setConnectionCookie(SwitchConnectionDistinguisher auxiliaryKey) {
530         this.auxiliaryKey = auxiliaryKey;
531     }
532
533     @Override
534     public void setSessionContext(SessionContext sessionContext) {
535         this.sessionContext = sessionContext;
536     }
537
538     @Override
539     public SwitchConnectionDistinguisher getAuxiliaryKey() {
540         return auxiliaryKey;
541     }
542
543     @Override
544     public SessionContext getSessionContext() {
545         return sessionContext;
546     }
547     
548     /**
549      * @param listenerMapping the listenerMapping to set
550      */
551     public void setListenerMapping(
552             Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping) {
553         this.listenerMapping = listenerMapping;
554     }
555
556     /**
557      * @param messageType
558      * @param message
559      * @deprecated use {@link QueueKeeper} strategy
560      */
561     @Deprecated
562     private void notifyListeners(Class<? extends DataObject> messageType, DataObject message) {
563         Collection<IMDMessageListener> listeners = listenerMapping.get(messageType);
564         if (listeners != null) {
565                 for (IMDMessageListener listener : listeners) {
566                     // Pass cookie only for PACKT_IN
567                     if ( messageType.equals("PacketInMessage.class")){
568                         listener.receive(this.getAuxiliaryKey(), this.getSessionContext(), message);
569                     } else {
570                         listener.receive(null, this.getSessionContext(), message);
571                     }
572                 }
573         } else {
574             LOG.warn("No listeners for this message Type {}", messageType);
575         }
576     }
577
578     @Override
579     public ConnectionAdapter getConnectionAdapter() {
580         return connectionAdapter;
581     }
582
583     @Override
584     public void onConnectionReady() {
585         LOG.debug("connection is ready-to-use");
586         //TODO: fire first helloMessage
587         new Thread(new Runnable() {
588             @Override
589             public void run() {
590                 sendFirstHelloMessage();
591             }
592         }).start();
593     }
594     
595 }