extended integration tests with OFLibrary, improved error handling for threads
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / ConnectionConductorImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.openflow.md.core;
10
11 import java.util.ArrayList;
12 import java.util.Collection;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.concurrent.Future;
16 import java.util.concurrent.LinkedBlockingQueue;
17 import java.util.concurrent.TimeUnit;
18
19 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
20 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
21 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
22 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionManager;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.HelloElementType;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInputBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInputBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestMessage;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.ElementsBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
45 import org.opendaylight.yangtools.yang.binding.DataObject;
46 import org.opendaylight.yangtools.yang.common.RpcError;
47 import org.opendaylight.yangtools.yang.common.RpcResult;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 import com.google.common.collect.Lists;
52 import com.google.common.util.concurrent.Futures;
53
54 /**
55  * @author mirehak
56  */
57 public class ConnectionConductorImpl implements OpenflowProtocolListener,
58         SystemNotificationsListener, ConnectionConductor {
59
60     private static final Logger LOG = LoggerFactory
61             .getLogger(ConnectionConductorImpl.class);
62
63     /* variable to make BitMap-based negotiation enabled / disabled.
64      * it will help while testing and isolating issues related to processing of
65      * BitMaps from switches.
66      */
67     private static final boolean isBitmapNegotiationEnable = true;
68     private LinkedBlockingQueue<Exception> errorQueue = new LinkedBlockingQueue<>();
69
70     private final ConnectionAdapter connectionAdapter;
71     private final List<Short> versionOrder;
72     private ConnectionConductor.CONDUCTOR_STATE conductorState;
73     private Short version;
74
75     private SwitchConnectionDistinguisher auxiliaryKey;
76
77     private SessionContext sessionContext;
78
79     private Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping;
80
81     private boolean isFirstHelloNegotiation = true;
82
83
84
85     /**
86      * @param connectionAdapter
87      */
88     public ConnectionConductorImpl(ConnectionAdapter connectionAdapter) {
89         this.connectionAdapter = connectionAdapter;
90         conductorState = CONDUCTOR_STATE.HANDSHAKING;
91         versionOrder = Lists.newArrayList((short) 0x04, (short) 0x01);
92         // TODO: add a thread pool to handle ErrorQueueHandler
93         new Thread(new ErrorQueueHandler(errorQueue)).start();
94     }
95
96     @Override
97     public void init() {
98         connectionAdapter.setMessageListener(this);
99         connectionAdapter.setSystemListener(this);
100         //TODO : Wait for library to provide interface from which we can send first hello message
101 //        sendFirstHelloMessage();
102     }
103
104
105     /**
106      * send first hello message to switch
107      */
108     private void sendFirstHelloMessage() {
109         short highestVersion = versionOrder.get(0);
110         Long helloXid = 1L;
111         HelloInputBuilder helloInputbuilder = new HelloInputBuilder();
112         helloInputbuilder.setVersion(highestVersion);
113         helloInputbuilder.setXid(helloXid);
114         if (isBitmapNegotiationEnable) {
115             int elementsCount = highestVersion / Integer.SIZE;
116             ElementsBuilder elementsBuilder = new ElementsBuilder();
117
118             List<Elements> elementList = new ArrayList<Elements>();
119             int orderIndex = versionOrder.size();
120             int value = versionOrder.get(--orderIndex);
121             for (int index = 0; index <= elementsCount; index++) {
122                 List<Boolean> booleanList = new ArrayList<Boolean>();
123                 for (int i = 0; i < Integer.SIZE; i++) {
124                     if (value == ((index * Integer.SIZE) + i)) {
125                         booleanList.add(true);
126                         value = (orderIndex == 0) ? highestVersion : versionOrder.get(--orderIndex);
127                     } else {
128                         booleanList.add(false);
129                     }
130                 }
131                 elementsBuilder.setType(HelloElementType.forValue(1));
132                 elementsBuilder.setVersionBitmap(booleanList);
133                 elementList.add(elementsBuilder.build());
134             }
135             helloInputbuilder.setElements(elementList);
136             LOG.debug("sending first hello message: version header={} , version bitmap={}", highestVersion, elementList);
137         } else {
138             LOG.debug("sending first hello message: version header={} ", highestVersion);
139         }
140         connectionAdapter.hello(helloInputbuilder.build());
141
142     }
143
144     @Override
145     public void onEchoRequestMessage(EchoRequestMessage echoRequestMessage) {
146         LOG.debug("echo request received: " + echoRequestMessage.getXid());
147         EchoReplyInputBuilder builder = new EchoReplyInputBuilder();
148         builder.setVersion(echoRequestMessage.getVersion());
149         builder.setXid(echoRequestMessage.getXid());
150         builder.setData(echoRequestMessage.getData());
151
152         connectionAdapter.echoReply(builder.build());
153     }
154
155     @Override
156     public void onErrorMessage(ErrorMessage errorMessage) {
157         // TODO Auto-generated method stub
158         LOG.debug("error received, type: " + errorMessage.getType()
159                 + "; code: " + errorMessage.getCode());
160     }
161
162     @Override
163     public void onExperimenterMessage(ExperimenterMessage experimenterMessage) {
164         LOG.debug("experimenter received, type: "
165                 + experimenterMessage.getExpType());
166         notifyListeners(ExperimenterMessage.class, experimenterMessage);
167     }
168
169     @Override
170     public void onFlowRemovedMessage(FlowRemovedMessage message) {
171         notifyListeners(FlowRemovedMessage.class, message);
172     }
173
174
175     /**
176      * version negotiation happened as per following steps:
177      * 1. If HelloMessage version field has same version, continue connection processing.
178      *    If HelloMessage version is lower than supported versions, just disconnect.
179      * 2. If HelloMessage contains bitmap and common version found in bitmap
180      *    then continue connection processing. if no common version found, just disconnect.
181      * 3. If HelloMessage version is not supported, send HelloMessage with lower supported version.
182      *    If Hello message received again with not supported version, just disconnect.
183      *
184      *   TODO: Better to handle handshake into a maintainable innerclass which uses State-Pattern.
185      */
186     @Override
187     public void onHelloMessage(HelloMessage hello) {
188         // do handshake
189         LOG.info("handshake STARTED");
190         checkState(CONDUCTOR_STATE.HANDSHAKING);
191
192         Short remoteVersion = hello.getVersion();
193         List<Elements> elements = hello.getElements();
194         long xid = hello.getXid();
195         short proposedVersion;
196         LOG.debug("Hello message version={} and bitmap={}", remoteVersion, elements);
197         try {
198             // find the version from header version field
199             proposedVersion = proposeVersion(remoteVersion);
200
201         } catch (IllegalArgumentException e) {
202             handleException(e);
203             connectionAdapter.disconnect();
204             throw e;
205         }
206
207         // sent version is equal to remote --> version is negotiated
208         if (proposedVersion == remoteVersion) {
209             LOG.debug("sending helloReply as version in header is supported: {}", proposedVersion);
210             sendHelloReply(proposedVersion, ++xid);
211             postHandshake(proposedVersion, ++xid);
212
213         } else if (isBitmapNegotiationEnable && null != elements && 0 != elements.size()) {
214             try {
215                 // hello contains version bitmap, checking highest common
216                 // version in bitmap
217                 proposedVersion = proposeBitmapVersion(elements);
218             } catch (IllegalArgumentException ex) {
219                 handleException(ex);
220                 connectionAdapter.disconnect();
221                 throw ex;
222             }
223             LOG.debug("sending helloReply for common bitmap version : {}", proposedVersion);
224             sendHelloReply(proposedVersion, ++xid);
225             postHandshake(proposedVersion, ++xid);
226         } else {
227             if (isFirstHelloNegotiation) {
228                 isFirstHelloNegotiation = false;
229                 LOG.debug("sending helloReply for lowest supported version : {}", proposedVersion);
230                 // send hello reply with lower version number supported
231                 sendHelloReply(proposedVersion, ++xid);
232             } else {
233                 // terminate the connection.
234                 LOG.debug("Version negotiation failed. unsupported version : {}", remoteVersion);
235                 connectionAdapter.disconnect();
236             }
237         }
238     }
239
240     /**
241      * send hello reply
242      * @param proposedVersion
243      * @param hello
244      */
245     private void sendHelloReply(Short proposedVersion, Long xid)
246     {
247         HelloInputBuilder helloBuilder = new HelloInputBuilder();
248         helloBuilder.setVersion(proposedVersion).setXid(xid);
249         connectionAdapter.hello(helloBuilder.build());
250     }
251
252
253     /**
254      * after handshake set features, register to session
255      * @param proposedVersion
256      * @param xId
257      */
258     private void postHandshake(Short proposedVersion, Long xid) {
259         // set version
260         version = proposedVersion;
261         LOG.debug("version set: " + proposedVersion);
262         // request features
263         GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder();
264         featuresBuilder.setVersion(version).setXid(xid);
265         LOG.debug("sending feature request for version={} and xid={}", version, xid);
266         Future<RpcResult<GetFeaturesOutput>> featuresFuture = connectionAdapter
267                 .getFeatures(featuresBuilder.build());
268         LOG.debug("waiting for features");
269         RpcResult<GetFeaturesOutput> rpcFeatures;
270         try {
271             rpcFeatures = featuresFuture.get(getMaxTimeout(),
272                     TimeUnit.MILLISECONDS);
273             if (!rpcFeatures.isSuccessful()) {
274                 LOG.error("obtained features problem: {}"
275                         , rpcFeatures.getErrors());
276             } else {
277                 GetFeaturesOutput featureOutput =  rpcFeatures.getResult();
278                 LOG.debug("obtained features: datapathId={}",
279                         featureOutput.getDatapathId());
280                 LOG.debug("obtained features: auxiliaryId={}",
281                         featureOutput.getAuxiliaryId());
282                 conductorState = CONDUCTOR_STATE.WORKING;
283
284                 OFSessionUtil.registerSession(this,
285                         featureOutput, version);
286                 this.setListenerMapping(OFSessionUtil.getListenersMap());
287                 LOG.info("handshake SETTLED: datapathId={}, auxiliaryId={}", featureOutput.getDatapathId(), featureOutput.getAuxiliaryId());
288             }
289         } catch (Exception e) {
290             //handshake failed
291             LOG.error("issuing disconnect during handshake, reason: "+e.getMessage());
292             handleException(e);
293             disconnect();
294         }
295     }
296
297     /**
298      * @return rpc-response timeout in [ms]
299      */
300     private long getMaxTimeout() {
301         // TODO:: get from configuration
302         return 2000;
303     }
304
305     /**
306      * @param e
307      */
308     private void handleException(Exception e) {
309         Exception causeAndThread = new Exception("IN THREAD: "+Thread.currentThread().getName(), e);
310         try {
311             errorQueue.put(causeAndThread);
312         } catch (InterruptedException e1) {
313             LOG.error(e1.getMessage(), e1);
314         }
315     }
316
317     @Override
318     public void onMultipartReplyMessage(MultipartReplyMessage arg0) {
319         // TODO Auto-generated method stub
320     }
321
322     @Override
323     public void onMultipartRequestMessage(MultipartRequestMessage arg0) {
324         // TODO Auto-generated method stub
325     }
326
327     @Override
328     public void onPacketInMessage(PacketInMessage message) {
329         notifyListeners(PacketInMessage.class, message);
330     }
331
332     @Override
333     public void onPortStatusMessage(PortStatusMessage message) {
334         this.getSessionContext().processPortStatusMsg(message);
335         notifyListeners(PortStatusMessage.class, message);
336     }
337
338     @Override
339     public void onSwitchIdleEvent(SwitchIdleEvent notification) {
340         if (!CONDUCTOR_STATE.WORKING.equals(conductorState)) {
341             // idle state in any other conductorState than WORKING means real
342             // problem and wont be handled by echoReply, but disconnection
343             disconnect();
344             OFSessionUtil.getSessionManager().invalidateOnDisconnect(this);
345         } else {
346             LOG.debug("first idle state occured");
347             EchoInputBuilder builder = new EchoInputBuilder();
348             builder.setVersion(version);
349             // TODO: get xid from sessionContext
350             builder.setXid(42L);
351
352             Future<RpcResult<EchoOutput>> echoReplyFuture = connectionAdapter
353                     .echo(builder.build());
354
355             try {
356                 // TODO: read timeout from config
357                 RpcResult<EchoOutput> echoReplyValue = echoReplyFuture.get(getMaxTimeout(),
358                         TimeUnit.SECONDS);
359                 if (echoReplyValue.isSuccessful()) {
360                     conductorState = CONDUCTOR_STATE.WORKING;
361                 } else {
362                     for (RpcError replyError : echoReplyValue.getErrors()) {
363                         Throwable cause = replyError.getCause();
364                         LOG.error(
365                                 "while receiving echoReply in TIMEOUTING state: "
366                                         + cause.getMessage(), cause);
367                     }
368                     //switch issue occurred
369                     throw new Exception("switch issue occurred");
370                 }
371             } catch (Exception e) {
372                 LOG.error("while waiting for echoReply in TIMEOUTING state: "
373                         + e.getMessage(), e);
374                 //switch is not responding
375                 disconnect();
376                 OFSessionUtil.getSessionManager().invalidateOnDisconnect(this);
377             }
378         }
379     }
380
381     /**
382      * @param conductorState
383      *            the connectionState to set
384      */
385     @Override
386     public void setConductorState(CONDUCTOR_STATE conductorState) {
387         this.conductorState = conductorState;
388     }
389
390     @Override
391     public CONDUCTOR_STATE getConductorState() {
392         return conductorState;
393     }
394
395     /**
396      * @param handshaking
397      */
398     private void checkState(CONDUCTOR_STATE expectedState) {
399         if (!conductorState.equals(expectedState)) {
400             throw new IllegalStateException("Expected state: " + expectedState
401                     + ", actual state:" + conductorState);
402         }
403     }
404
405     @Override
406     public void onDisconnectEvent(DisconnectEvent arg0) {
407         SessionManager sessionManager = OFSessionUtil.getSessionManager();
408         sessionManager.invalidateOnDisconnect(this);
409     }
410
411     /**
412      * find supported version based on remoteVersion
413      * @param remoteVersion
414      * @return
415      */
416     protected short proposeVersion(short remoteVersion) {
417         Short proposal = null;
418         for (short offer : versionOrder) {
419             if (offer <= remoteVersion) {
420                 proposal = offer;
421                 break;
422             }
423         }
424         if (proposal == null) {
425             throw new IllegalArgumentException("unsupported version: "
426                     + remoteVersion);
427         }
428         return proposal;
429     }
430
431     /**
432      * find common highest supported bitmap version
433      * @param list
434      * @return
435      */
436     protected short proposeBitmapVersion(List<Elements> list)
437     {
438         Short supportedHighestVersion = null;
439         if((null != list) && (0 != list.size()))
440         {
441            for(Elements element : list)
442            {
443               List<Boolean> bitmap = element.getVersionBitmap();
444               // check for version bitmap
445               for(short bitPos : versionOrder)
446               {
447                   // with all the version it should work.
448                   if(bitmap.get(bitPos % Integer.SIZE))
449                   {
450                       supportedHighestVersion = bitPos;
451                       break;
452                   }
453               }
454            }
455            if(null == supportedHighestVersion)
456             {
457                 throw new IllegalArgumentException("unsupported bitmap version.");
458             }
459
460         }
461
462         return supportedHighestVersion;
463     }
464
465     @Override
466     public Short getVersion() {
467         return version;
468     }
469
470     @Override
471     public Future<Boolean> disconnect() {
472         LOG.info("disconnecting: sessionCtx="+sessionContext+"|auxId="+auxiliaryKey);
473         
474         Future<Boolean> result = null;
475         if (connectionAdapter.isAlive()) {
476             result = connectionAdapter.disconnect();
477         } else {
478             LOG.debug("connection already disconnected");
479             result = Futures.immediateFuture(true);
480         }
481         
482         return result; 
483     }
484
485     @Override
486     public void setConnectionCookie(SwitchConnectionDistinguisher auxiliaryKey) {
487         this.auxiliaryKey = auxiliaryKey;
488     }
489
490     @Override
491     public void setSessionContext(SessionContext sessionContext) {
492         this.sessionContext = sessionContext;
493     }
494
495     @Override
496     public SwitchConnectionDistinguisher getAuxiliaryKey() {
497         return auxiliaryKey;
498     }
499
500     @Override
501     public SessionContext getSessionContext() {
502         return sessionContext;
503     }
504     
505     /**
506      * @param listenerMapping the listenerMapping to set
507      */
508     public void setListenerMapping(
509             Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping) {
510         //TODO: adjust the listener interface
511         this.listenerMapping = listenerMapping;
512     }
513
514     /**
515      * @param messageType
516      * @param message
517      */
518     private void notifyListeners(Class<? extends DataObject> messageType, DataObject message) {
519         Collection<IMDMessageListener> listeners = listenerMapping.get(messageType);
520         if (listeners != null) {
521                 for (IMDMessageListener listener : listeners) {
522                     // Pass cookie only for PACKT_IN
523                     if ( messageType.equals("PacketInMessage.class")){
524                         listener.receive(this.getAuxiliaryKey(), this.getSessionContext(), message);
525                     } else {
526                         listener.receive(null, this.getSessionContext(), message);
527                     }
528                 }
529         } else {
530             LOG.warn("No listeners for this message Type {}", messageType);
531         }
532     }
533
534     @Override
535     public ConnectionAdapter getConnectionAdapter() {
536         return connectionAdapter;
537     }
538 }