Adding support and unit-test for ErrorMessage
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / ConnectionConductorImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.openflow.md.core;
10
11 import java.util.Arrays;
12 import java.util.Collection;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.Future;
17 import java.util.concurrent.LinkedBlockingQueue;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.TimeoutException;
20
21 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
22 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
23 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
24 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
25 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionManager;
26 import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper;
27 import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperLightImpl;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInputBuilder;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestMessage;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
48 import org.opendaylight.yangtools.yang.binding.DataObject;
49 import org.opendaylight.yangtools.yang.common.RpcError;
50 import org.opendaylight.yangtools.yang.common.RpcResult;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 import com.google.common.util.concurrent.Futures;
55
56 /**
57  * @author mirehak
58  */
59 public class ConnectionConductorImpl implements OpenflowProtocolListener,
60         SystemNotificationsListener, ConnectionConductor, ConnectionReadyListener {
61
62     protected static final Logger LOG = LoggerFactory
63             .getLogger(ConnectionConductorImpl.class);
64
65     /* variable to make BitMap-based negotiation enabled / disabled.
66      * it will help while testing and isolating issues related to processing of
67      * BitMaps from switches.
68      */
69     private static final boolean isBitmapNegotiationEnable = true;
70     private LinkedBlockingQueue<Exception> errorQueue = new LinkedBlockingQueue<>();
71
72     protected final ConnectionAdapter connectionAdapter;
73     private ConnectionConductor.CONDUCTOR_STATE conductorState;
74     private Short version;
75
76     private SwitchConnectionDistinguisher auxiliaryKey;
77
78     private SessionContext sessionContext;
79
80     private Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping;
81
82     protected boolean isFirstHelloNegotiation = true;
83
84     // TODO: use appropriate interface instead of Object
85     private QueueKeeper<Object> queueKeeper;
86
87
88
89     /**
90      * @param connectionAdapter
91      */
92     public ConnectionConductorImpl(ConnectionAdapter connectionAdapter) {
93         this.connectionAdapter = connectionAdapter;
94         conductorState = CONDUCTOR_STATE.HANDSHAKING;
95         new Thread(new ErrorQueueHandler(errorQueue)).start();
96     }
97
98     @Override
99     public void init() {
100         connectionAdapter.setMessageListener(this);
101         connectionAdapter.setSystemListener(this);
102         connectionAdapter.setConnectionReadyListener(this);
103     }
104     
105     /**
106      * @param queueKeeper the queueKeeper to set
107      */
108     @Override
109     public void setQueueKeeper(QueueKeeper<Object> queueKeeper) {
110         this.queueKeeper = queueKeeper;
111     }
112
113
114     /**
115      * send first hello message to switch
116      */
117     protected void sendFirstHelloMessage() {
118         Short highestVersion = ConnectionConductor.versionOrder.get(0);
119         Long helloXid = 21L;
120         HelloInput helloInput = null;
121         
122         if (isBitmapNegotiationEnable) {
123             helloInput = MessageFactory.createHelloInput(highestVersion, helloXid, ConnectionConductor.versionOrder);
124             LOG.debug("sending first hello message: vertsion header={} , version bitmap={}", 
125                     highestVersion, helloInput.getElements());
126         } else {
127             helloInput = MessageFactory.createHelloInput(highestVersion, helloXid);
128             LOG.debug("sending first hello message: version header={} ", highestVersion);
129         }
130         
131         try {
132             RpcResult<Void> helloResult = connectionAdapter.hello(helloInput).get(getMaxTimeout(), getMaxTimeoutUnit());
133             smokeRpc(helloResult);
134             LOG.debug("FIRST HELLO sent.");
135         } catch (Throwable e) {
136             LOG.debug("FIRST HELLO sending failed.");
137             handleException(e);
138         }
139     }
140
141     @Override
142     public void onEchoRequestMessage(final EchoRequestMessage echoRequestMessage) {
143         new Thread(new Runnable() {
144             @Override
145             public void run() {
146                 LOG.debug("echo request received: " + echoRequestMessage.getXid());
147                 EchoReplyInputBuilder builder = new EchoReplyInputBuilder();
148                 builder.setVersion(echoRequestMessage.getVersion());
149                 builder.setXid(echoRequestMessage.getXid());
150                 builder.setData(echoRequestMessage.getData());
151                 
152                 connectionAdapter.echoReply(builder.build());
153             }
154         }).start();            
155     }
156
157     @Override
158     public void onErrorMessage(ErrorMessage errorMessage) {
159         queueKeeper.push(ErrorMessage.class, errorMessage, this);
160        // notifyListeners(ErrorMessage.class, errorMessage);
161     }
162
163     @Override
164     public void onExperimenterMessage(ExperimenterMessage experimenterMessage) {
165         queueKeeper.push(ExperimenterMessage.class, experimenterMessage, this);
166 //        notifyListeners(ExperimenterMessage.class, experimenterMessage);
167     }
168
169     @Override
170     public void onFlowRemovedMessage(FlowRemovedMessage message) {
171         notifyListeners(FlowRemovedMessage.class, message);
172     }
173
174
175     /**
176      * version negotiation happened as per following steps:
177      * 1. If HelloMessage version field has same version, continue connection processing.
178      *    If HelloMessage version is lower than supported versions, just disconnect.
179      * 2. If HelloMessage contains bitmap and common version found in bitmap
180      *    then continue connection processing. if no common version found, just disconnect.
181      * 3. If HelloMessage version is not supported, send HelloMessage with lower supported version.
182      *    If Hello message received again with not supported version, just disconnect.
183      *
184      *   TODO: Better to handle handshake into a maintainable innerclass which uses State-Pattern.
185      */
186     @Override
187     public void onHelloMessage(final HelloMessage hello) {
188         // do handshake
189
190         new Thread(new Runnable() {
191             @Override
192             public void run() {
193                 LOG.info("handshake STARTED");
194                 checkState(CONDUCTOR_STATE.HANDSHAKING);
195                 
196                 Short remoteVersion = hello.getVersion();
197                 List<Elements> elements = hello.getElements();
198                 Long xid = hello.getXid();
199                 Short proposedVersion;
200                 LOG.debug("Hello message version={} and bitmap={}", remoteVersion, elements);
201                 try {
202                     // find the version from header version field
203                     proposedVersion = proposeVersion(remoteVersion);
204                     
205                 } catch (IllegalArgumentException e) {
206                     handleException(e);
207                     connectionAdapter.disconnect();
208                     return;
209                 }
210                 
211                 // sent version is equal to remote --> version is negotiated
212                 if (proposedVersion == remoteVersion) {
213                     LOG.debug("sending helloReply as version in header is supported: {}", proposedVersion);
214                     sendHelloReply(proposedVersion, ++xid);
215                     postHandshake(proposedVersion, ++xid);
216                     
217                 } else if (isBitmapNegotiationEnable && null != elements && 0 != elements.size()) {
218                     try {
219                         // hello contains version bitmap, checking highest common
220                         // version in bitmap
221                         proposedVersion = proposeBitmapVersion(elements);
222                     } catch (IllegalArgumentException ex) {
223                         handleException(ex);
224                         connectionAdapter.disconnect();
225                         return;
226                     }
227                     LOG.debug("sending helloReply for common bitmap version : {}", proposedVersion);
228                     sendHelloReply(proposedVersion, ++xid);
229                     postHandshake(proposedVersion, ++xid);
230                 } else {
231                     if (isFirstHelloNegotiation) {
232                         isFirstHelloNegotiation = false;
233                         LOG.debug("sending helloReply for lowest supported version : {}", proposedVersion);
234                         // send hello reply with lower version number supported
235                         sendHelloReply(proposedVersion, ++xid);
236                     } else {
237                         // terminate the connection.
238                         LOG.debug("Version negotiation failed. unsupported version : {}", remoteVersion);
239                         connectionAdapter.disconnect();
240                     }
241                 }
242             }
243         }).start();
244     }
245
246     /**
247      * send hello reply
248      * @param proposedVersion
249      * @param hello
250      */
251     protected void sendHelloReply(Short proposedVersion, Long xid)
252     {
253         HelloInput helloMsg = MessageFactory.createHelloInput(proposedVersion, xid);
254         RpcResult<Void> result;
255         try {
256             result = connectionAdapter.hello(helloMsg).get(getMaxTimeout(), getMaxTimeoutUnit());
257             smokeRpc(result);
258         } catch (Throwable e) {
259             handleException(e);
260         }
261     }
262
263
264     /**
265      * @param futureResult
266      * @throws Throwable 
267      */
268     private static void smokeRpc(RpcResult<?> result) throws Throwable {
269         if (!result.isSuccessful()) {
270             Throwable firstCause = null;
271             StringBuffer sb = new StringBuffer();
272             for (RpcError error : result.getErrors()) {
273                 if (firstCause != null) {
274                     firstCause = error.getCause();
275                 }
276                 
277                 sb.append("rpcError:").append(error.getCause().getMessage()).append(";");
278             }
279             throw new Exception(sb.toString(), firstCause);
280         }
281     }
282
283     /**
284      * after handshake set features, register to session
285      * @param proposedVersion
286      * @param xId
287      */
288     protected void postHandshake(Short proposedVersion, Long xid) {
289         // set version
290         version = proposedVersion;
291         LOG.debug("version set: " + proposedVersion);
292         // request features
293         GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder();
294         featuresBuilder.setVersion(version).setXid(xid);
295         LOG.debug("sending feature request for version={} and xid={}", version, xid);
296         Future<RpcResult<GetFeaturesOutput>> featuresFuture = connectionAdapter
297                 .getFeatures(featuresBuilder.build());
298         LOG.debug("waiting for features");
299         try {
300             RpcResult<GetFeaturesOutput> rpcFeatures = 
301                     featuresFuture.get(getMaxTimeout(), getMaxTimeoutUnit());
302             smokeRpc(rpcFeatures);
303             
304             GetFeaturesOutput featureOutput =  rpcFeatures.getResult();
305             LOG.debug("obtained features: datapathId={}",
306                     featureOutput.getDatapathId());
307             LOG.debug("obtained features: auxiliaryId={}",
308                     featureOutput.getAuxiliaryId());
309             conductorState = CONDUCTOR_STATE.WORKING;
310
311             OFSessionUtil.registerSession(this,
312                     featureOutput, version);
313             this.setListenerMapping(OFSessionUtil.getListenersMap());
314             LOG.info("handshake SETTLED: datapathId={}, auxiliaryId={}", featureOutput.getDatapathId(), featureOutput.getAuxiliaryId());
315         } catch (Throwable e) {
316             //handshake failed
317             LOG.error("issuing disconnect during handshake, reason: "+e.getMessage());
318             handleException(e);
319             disconnect();
320         }
321     }
322
323     /**
324      * @return rpc-response timeout in [ms]
325      */
326     private long getMaxTimeout() {
327         // TODO:: get from configuration
328         return 2000;
329     }
330     
331     /**
332      * @return milliseconds
333      */
334     private TimeUnit getMaxTimeoutUnit() {
335         // TODO:: get from configuration
336         return TimeUnit.MILLISECONDS;
337     }
338
339
340     /**
341      * @param e
342      */
343     protected void handleException(Throwable e) {
344         String sessionKeyId = null;
345         if (getSessionContext() != null) {
346             sessionKeyId = Arrays.toString(getSessionContext().getSessionKey().getId());
347         }
348         
349         Exception causeAndThread = new Exception(
350                 "IN THREAD: "+Thread.currentThread().getName() +
351                 "; session:"+sessionKeyId, e);
352         try {
353             errorQueue.put(causeAndThread);
354         } catch (InterruptedException e1) {
355             LOG.error(e1.getMessage(), e1);
356         }
357     }
358
359     @Override
360     public void onMultipartReplyMessage(MultipartReplyMessage arg0) {
361         // TODO Auto-generated method stub
362     }
363
364     @Override
365     public void onMultipartRequestMessage(MultipartRequestMessage arg0) {
366         // TODO Auto-generated method stub
367     }
368
369     @Override
370     public void onPacketInMessage(PacketInMessage message) {
371         notifyListeners(PacketInMessage.class, message);
372     }
373
374     @Override
375     public void onPortStatusMessage(PortStatusMessage message) {
376         this.getSessionContext().processPortStatusMsg(message);
377         notifyListeners(PortStatusMessage.class, message);
378     }
379
380     @Override
381     public void onSwitchIdleEvent(SwitchIdleEvent notification) {
382         if (!CONDUCTOR_STATE.WORKING.equals(conductorState)) {
383             // idle state in any other conductorState than WORKING means real
384             // problem and wont be handled by echoReply, but disconnection
385             disconnect();
386             OFSessionUtil.getSessionManager().invalidateOnDisconnect(this);
387         } else {
388             LOG.debug("first idle state occured");
389             EchoInputBuilder builder = new EchoInputBuilder();
390             builder.setVersion(version);
391             // TODO: get xid from sessionContext
392             builder.setXid(42L);
393
394             Future<RpcResult<EchoOutput>> echoReplyFuture = connectionAdapter
395                     .echo(builder.build());
396
397             try {
398                 // TODO: read timeout from config
399                 RpcResult<EchoOutput> echoReplyValue = echoReplyFuture.get(getMaxTimeout(),
400                         getMaxTimeoutUnit());
401                 if (echoReplyValue.isSuccessful()) {
402                     conductorState = CONDUCTOR_STATE.WORKING;
403                 } else {
404                     for (RpcError replyError : echoReplyValue.getErrors()) {
405                         Throwable cause = replyError.getCause();
406                         LOG.error(
407                                 "while receiving echoReply in TIMEOUTING state: "
408                                         + cause.getMessage(), cause);
409                     }
410                     //switch issue occurred
411                     throw new Exception("switch issue occurred");
412                 }
413             } catch (Exception e) {
414                 LOG.error("while waiting for echoReply in TIMEOUTING state: "
415                         + e.getMessage(), e);
416                 //switch is not responding
417                 disconnect();
418                 OFSessionUtil.getSessionManager().invalidateOnDisconnect(this);
419             }
420         }
421     }
422
423     /**
424      * @param conductorState
425      *            the connectionState to set
426      */
427     @Override
428     public void setConductorState(CONDUCTOR_STATE conductorState) {
429         this.conductorState = conductorState;
430     }
431
432     @Override
433     public CONDUCTOR_STATE getConductorState() {
434         return conductorState;
435     }
436
437     /**
438      * @param handshaking
439      */
440     protected void checkState(CONDUCTOR_STATE expectedState) {
441         if (!conductorState.equals(expectedState)) {
442             throw new IllegalStateException("Expected state: " + expectedState
443                     + ", actual state:" + conductorState);
444         }
445     }
446
447     @Override
448     public void onDisconnectEvent(DisconnectEvent arg0) {
449         SessionManager sessionManager = OFSessionUtil.getSessionManager();
450         sessionManager.invalidateOnDisconnect(this);
451     }
452
453     /**
454      * find supported version based on remoteVersion
455      * @param remoteVersion
456      * @return
457      */
458     protected short proposeVersion(short remoteVersion) {
459         Short proposal = null;
460         for (short offer : ConnectionConductor.versionOrder) {
461             if (offer <= remoteVersion) {
462                 proposal = offer;
463                 break;
464             }
465         }
466         if (proposal == null) {
467             throw new IllegalArgumentException("unsupported version: "
468                     + remoteVersion);
469         }
470         return proposal;
471     }
472
473     /**
474      * find common highest supported bitmap version
475      * @param list
476      * @return
477      */
478     protected Short proposeBitmapVersion(List<Elements> list)
479     {
480         Short supportedHighestVersion = null;
481         if((null != list) && (0 != list.size()))
482         {
483            for(Elements element : list)
484            {
485               List<Boolean> bitmap = element.getVersionBitmap();
486               // check for version bitmap
487               for(short bitPos : ConnectionConductor.versionOrder)
488               {
489                   // with all the version it should work.
490                   if(bitmap.get(bitPos % Integer.SIZE))
491                   {
492                       supportedHighestVersion = bitPos;
493                       break;
494                   }
495               }
496            }
497            if(null == supportedHighestVersion)
498             {
499                 throw new IllegalArgumentException("unsupported bitmap version.");
500             }
501
502         }
503
504         return supportedHighestVersion;
505     }
506
507     @Override
508     public Short getVersion() {
509         return version;
510     }
511
512     @Override
513     public Future<Boolean> disconnect() {
514         LOG.info("disconnecting: sessionCtx="+sessionContext+"|auxId="+auxiliaryKey);
515         
516         Future<Boolean> result = null;
517         if (connectionAdapter.isAlive()) {
518             result = connectionAdapter.disconnect();
519         } else {
520             LOG.debug("connection already disconnected");
521             result = Futures.immediateFuture(true);
522         }
523         
524         return result; 
525     }
526
527     @Override
528     public void setConnectionCookie(SwitchConnectionDistinguisher auxiliaryKey) {
529         this.auxiliaryKey = auxiliaryKey;
530     }
531
532     @Override
533     public void setSessionContext(SessionContext sessionContext) {
534         this.sessionContext = sessionContext;
535     }
536
537     @Override
538     public SwitchConnectionDistinguisher getAuxiliaryKey() {
539         return auxiliaryKey;
540     }
541
542     @Override
543     public SessionContext getSessionContext() {
544         return sessionContext;
545     }
546     
547     /**
548      * @param listenerMapping the listenerMapping to set
549      */
550     public void setListenerMapping(
551             Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping) {
552         this.listenerMapping = listenerMapping;
553     }
554
555     /**
556      * @param messageType
557      * @param message
558      * @deprecated use {@link QueueKeeper} strategy
559      */
560     @Deprecated
561     private void notifyListeners(Class<? extends DataObject> messageType, DataObject message) {
562         Collection<IMDMessageListener> listeners = listenerMapping.get(messageType);
563         if (listeners != null) {
564                 for (IMDMessageListener listener : listeners) {
565                     // Pass cookie only for PACKT_IN
566                     if ( messageType.equals("PacketInMessage.class")){
567                         listener.receive(this.getAuxiliaryKey(), this.getSessionContext(), message);
568                     } else {
569                         listener.receive(null, this.getSessionContext(), message);
570                     }
571                 }
572         } else {
573             LOG.warn("No listeners for this message Type {}", messageType);
574         }
575     }
576
577     @Override
578     public ConnectionAdapter getConnectionAdapter() {
579         return connectionAdapter;
580     }
581
582     @Override
583     public void onConnectionReady() {
584         LOG.debug("connection is ready-to-use");
585         //TODO: fire first helloMessage
586         new Thread(new Runnable() {
587             @Override
588             public void run() {
589                 sendFirstHelloMessage();
590             }
591         }).start();
592     }
593     
594 }