fixed it-test (added bundles)
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / ConnectionConductorImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.openflow.md.core;
10
11 import java.util.ArrayList;
12 import java.util.Collection;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.concurrent.Future;
16 import java.util.concurrent.LinkedBlockingQueue;
17 import java.util.concurrent.TimeUnit;
18
19 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
20 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
21 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
22 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
23 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionManager;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.HelloElementType;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInputBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInputBuilder;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestMessage;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.ElementsBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
46 import org.opendaylight.yangtools.yang.binding.DataObject;
47 import org.opendaylight.yangtools.yang.common.RpcError;
48 import org.opendaylight.yangtools.yang.common.RpcResult;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 import com.google.common.collect.Lists;
53 import com.google.common.util.concurrent.Futures;
54
55 /**
56  * @author mirehak
57  */
58 public class ConnectionConductorImpl implements OpenflowProtocolListener,
59         SystemNotificationsListener, ConnectionConductor, ConnectionReadyListener {
60
61     private static final Logger LOG = LoggerFactory
62             .getLogger(ConnectionConductorImpl.class);
63
64     /* variable to make BitMap-based negotiation enabled / disabled.
65      * it will help while testing and isolating issues related to processing of
66      * BitMaps from switches.
67      */
68     private static final boolean isBitmapNegotiationEnable = true;
69     private LinkedBlockingQueue<Exception> errorQueue = new LinkedBlockingQueue<>();
70
71     private final ConnectionAdapter connectionAdapter;
72     private final List<Short> versionOrder;
73     private ConnectionConductor.CONDUCTOR_STATE conductorState;
74     private Short version;
75
76     private SwitchConnectionDistinguisher auxiliaryKey;
77
78     private SessionContext sessionContext;
79
80     private Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping;
81
82     private boolean isFirstHelloNegotiation = true;
83
84
85
86     /**
87      * @param connectionAdapter
88      */
89     public ConnectionConductorImpl(ConnectionAdapter connectionAdapter) {
90         this.connectionAdapter = connectionAdapter;
91         conductorState = CONDUCTOR_STATE.HANDSHAKING;
92         versionOrder = Lists.newArrayList((short) 0x04, (short) 0x01);
93         // TODO: add a thread pool to handle ErrorQueueHandler
94         new Thread(new ErrorQueueHandler(errorQueue)).start();
95     }
96
97     @Override
98     public void init() {
99         connectionAdapter.setMessageListener(this);
100         connectionAdapter.setSystemListener(this);
101         connectionAdapter.setConnectionReadyListener(this);
102     }
103
104
105     /**
106      * send first hello message to switch
107      */
108     private void sendFirstHelloMessage() {
109         short highestVersion = versionOrder.get(0);
110         Long helloXid = 1L;
111         HelloInputBuilder helloInputbuilder = new HelloInputBuilder();
112         helloInputbuilder.setVersion(highestVersion);
113         helloInputbuilder.setXid(helloXid);
114         if (isBitmapNegotiationEnable) {
115             int elementsCount = highestVersion / Integer.SIZE;
116             ElementsBuilder elementsBuilder = new ElementsBuilder();
117
118             List<Elements> elementList = new ArrayList<Elements>();
119             int orderIndex = versionOrder.size();
120             int value = versionOrder.get(--orderIndex);
121             for (int index = 0; index <= elementsCount; index++) {
122                 List<Boolean> booleanList = new ArrayList<Boolean>();
123                 for (int i = 0; i < Integer.SIZE; i++) {
124                     if (value == ((index * Integer.SIZE) + i)) {
125                         booleanList.add(true);
126                         value = (orderIndex == 0) ? highestVersion : versionOrder.get(--orderIndex);
127                     } else {
128                         booleanList.add(false);
129                     }
130                 }
131                 elementsBuilder.setType(HelloElementType.forValue(1));
132                 elementsBuilder.setVersionBitmap(booleanList);
133                 elementList.add(elementsBuilder.build());
134             }
135             helloInputbuilder.setElements(elementList);
136             LOG.debug("sending first hello message: version header={} , version bitmap={}", highestVersion, elementList);
137         } else {
138             LOG.debug("sending first hello message: version header={} ", highestVersion);
139         }
140         connectionAdapter.hello(helloInputbuilder.build());
141
142     }
143
144     @Override
145     public void onEchoRequestMessage(EchoRequestMessage echoRequestMessage) {
146         LOG.debug("echo request received: " + echoRequestMessage.getXid());
147         EchoReplyInputBuilder builder = new EchoReplyInputBuilder();
148         builder.setVersion(echoRequestMessage.getVersion());
149         builder.setXid(echoRequestMessage.getXid());
150         builder.setData(echoRequestMessage.getData());
151
152         connectionAdapter.echoReply(builder.build());
153     }
154
155     @Override
156     public void onErrorMessage(ErrorMessage errorMessage) {
157         // TODO Auto-generated method stub
158         LOG.debug("error received, type: " + errorMessage.getType()
159                 + "; code: " + errorMessage.getCode());
160     }
161
162     @Override
163     public void onExperimenterMessage(ExperimenterMessage experimenterMessage) {
164         LOG.debug("experimenter received, type: "
165                 + experimenterMessage.getExpType());
166         notifyListeners(ExperimenterMessage.class, experimenterMessage);
167     }
168
169     @Override
170     public void onFlowRemovedMessage(FlowRemovedMessage message) {
171         notifyListeners(FlowRemovedMessage.class, message);
172     }
173
174
175     /**
176      * version negotiation happened as per following steps:
177      * 1. If HelloMessage version field has same version, continue connection processing.
178      *    If HelloMessage version is lower than supported versions, just disconnect.
179      * 2. If HelloMessage contains bitmap and common version found in bitmap
180      *    then continue connection processing. if no common version found, just disconnect.
181      * 3. If HelloMessage version is not supported, send HelloMessage with lower supported version.
182      *    If Hello message received again with not supported version, just disconnect.
183      *
184      *   TODO: Better to handle handshake into a maintainable innerclass which uses State-Pattern.
185      */
186     @Override
187     public void onHelloMessage(final HelloMessage hello) {
188         // do handshake
189         LOG.info("handshake STARTED");
190         checkState(CONDUCTOR_STATE.HANDSHAKING);
191
192         new Thread(new Runnable() {
193
194             @Override
195             public void run() {
196                 Short remoteVersion = hello.getVersion();
197                 List<Elements> elements = hello.getElements();
198                 long xid = hello.getXid();
199                 short proposedVersion;
200                 LOG.debug("Hello message version={} and bitmap={}", remoteVersion, elements);
201                 try {
202                     // find the version from header version field
203                     proposedVersion = proposeVersion(remoteVersion);
204                     
205                 } catch (IllegalArgumentException e) {
206                     handleException(e);
207                     connectionAdapter.disconnect();
208                     throw e;
209                 }
210                 
211                 // sent version is equal to remote --> version is negotiated
212                 if (proposedVersion == remoteVersion) {
213                     LOG.debug("sending helloReply as version in header is supported: {}", proposedVersion);
214                     sendHelloReply(proposedVersion, ++xid);
215                     postHandshake(proposedVersion, ++xid);
216                     
217                 } else if (isBitmapNegotiationEnable && null != elements && 0 != elements.size()) {
218                     try {
219                         // hello contains version bitmap, checking highest common
220                         // version in bitmap
221                         proposedVersion = proposeBitmapVersion(elements);
222                     } catch (IllegalArgumentException ex) {
223                         handleException(ex);
224                         connectionAdapter.disconnect();
225                         throw ex;
226                     }
227                     LOG.debug("sending helloReply for common bitmap version : {}", proposedVersion);
228                     sendHelloReply(proposedVersion, ++xid);
229                     postHandshake(proposedVersion, ++xid);
230                 } else {
231                     if (isFirstHelloNegotiation) {
232                         isFirstHelloNegotiation = false;
233                         LOG.debug("sending helloReply for lowest supported version : {}", proposedVersion);
234                         // send hello reply with lower version number supported
235                         sendHelloReply(proposedVersion, ++xid);
236                     } else {
237                         // terminate the connection.
238                         LOG.debug("Version negotiation failed. unsupported version : {}", remoteVersion);
239                         connectionAdapter.disconnect();
240                     }
241                 }
242             }
243             
244         }).start();
245     }
246
247     /**
248      * send hello reply
249      * @param proposedVersion
250      * @param hello
251      */
252     private void sendHelloReply(Short proposedVersion, Long xid)
253     {
254         HelloInputBuilder helloBuilder = new HelloInputBuilder();
255         helloBuilder.setVersion(proposedVersion).setXid(xid);
256         connectionAdapter.hello(helloBuilder.build());
257     }
258
259
260     /**
261      * after handshake set features, register to session
262      * @param proposedVersion
263      * @param xId
264      */
265     private void postHandshake(Short proposedVersion, Long xid) {
266         // set version
267         version = proposedVersion;
268         LOG.debug("version set: " + proposedVersion);
269         // request features
270         GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder();
271         featuresBuilder.setVersion(version).setXid(xid);
272         LOG.debug("sending feature request for version={} and xid={}", version, xid);
273         Future<RpcResult<GetFeaturesOutput>> featuresFuture = connectionAdapter
274                 .getFeatures(featuresBuilder.build());
275         LOG.debug("waiting for features");
276         RpcResult<GetFeaturesOutput> rpcFeatures;
277         try {
278             rpcFeatures = featuresFuture.get(getMaxTimeout(),
279                     TimeUnit.MILLISECONDS);
280             if (!rpcFeatures.isSuccessful()) {
281                 LOG.error("obtained features problem: {}"
282                         , rpcFeatures.getErrors());
283             } else {
284                 GetFeaturesOutput featureOutput =  rpcFeatures.getResult();
285                 LOG.debug("obtained features: datapathId={}",
286                         featureOutput.getDatapathId());
287                 LOG.debug("obtained features: auxiliaryId={}",
288                         featureOutput.getAuxiliaryId());
289                 conductorState = CONDUCTOR_STATE.WORKING;
290
291                 OFSessionUtil.registerSession(this,
292                         featureOutput, version);
293                 this.setListenerMapping(OFSessionUtil.getListenersMap());
294                 LOG.info("handshake SETTLED: datapathId={}, auxiliaryId={}", featureOutput.getDatapathId(), featureOutput.getAuxiliaryId());
295             }
296         } catch (Exception e) {
297             //handshake failed
298             LOG.error("issuing disconnect during handshake, reason: "+e.getMessage());
299             handleException(e);
300             disconnect();
301         }
302     }
303
304     /**
305      * @return rpc-response timeout in [ms]
306      */
307     private long getMaxTimeout() {
308         // TODO:: get from configuration
309         return 2000;
310     }
311
312     /**
313      * @param e
314      */
315     private void handleException(Exception e) {
316         Exception causeAndThread = new Exception("IN THREAD: "+Thread.currentThread().getName(), e);
317         try {
318             errorQueue.put(causeAndThread);
319         } catch (InterruptedException e1) {
320             LOG.error(e1.getMessage(), e1);
321         }
322     }
323
324     @Override
325     public void onMultipartReplyMessage(MultipartReplyMessage arg0) {
326         // TODO Auto-generated method stub
327     }
328
329     @Override
330     public void onMultipartRequestMessage(MultipartRequestMessage arg0) {
331         // TODO Auto-generated method stub
332     }
333
334     @Override
335     public void onPacketInMessage(PacketInMessage message) {
336         notifyListeners(PacketInMessage.class, message);
337     }
338
339     @Override
340     public void onPortStatusMessage(PortStatusMessage message) {
341         this.getSessionContext().processPortStatusMsg(message);
342         notifyListeners(PortStatusMessage.class, message);
343     }
344
345     @Override
346     public void onSwitchIdleEvent(SwitchIdleEvent notification) {
347         if (!CONDUCTOR_STATE.WORKING.equals(conductorState)) {
348             // idle state in any other conductorState than WORKING means real
349             // problem and wont be handled by echoReply, but disconnection
350             disconnect();
351             OFSessionUtil.getSessionManager().invalidateOnDisconnect(this);
352         } else {
353             LOG.debug("first idle state occured");
354             EchoInputBuilder builder = new EchoInputBuilder();
355             builder.setVersion(version);
356             // TODO: get xid from sessionContext
357             builder.setXid(42L);
358
359             Future<RpcResult<EchoOutput>> echoReplyFuture = connectionAdapter
360                     .echo(builder.build());
361
362             try {
363                 // TODO: read timeout from config
364                 RpcResult<EchoOutput> echoReplyValue = echoReplyFuture.get(getMaxTimeout(),
365                         TimeUnit.SECONDS);
366                 if (echoReplyValue.isSuccessful()) {
367                     conductorState = CONDUCTOR_STATE.WORKING;
368                 } else {
369                     for (RpcError replyError : echoReplyValue.getErrors()) {
370                         Throwable cause = replyError.getCause();
371                         LOG.error(
372                                 "while receiving echoReply in TIMEOUTING state: "
373                                         + cause.getMessage(), cause);
374                     }
375                     //switch issue occurred
376                     throw new Exception("switch issue occurred");
377                 }
378             } catch (Exception e) {
379                 LOG.error("while waiting for echoReply in TIMEOUTING state: "
380                         + e.getMessage(), e);
381                 //switch is not responding
382                 disconnect();
383                 OFSessionUtil.getSessionManager().invalidateOnDisconnect(this);
384             }
385         }
386     }
387
388     /**
389      * @param conductorState
390      *            the connectionState to set
391      */
392     @Override
393     public void setConductorState(CONDUCTOR_STATE conductorState) {
394         this.conductorState = conductorState;
395     }
396
397     @Override
398     public CONDUCTOR_STATE getConductorState() {
399         return conductorState;
400     }
401
402     /**
403      * @param handshaking
404      */
405     private void checkState(CONDUCTOR_STATE expectedState) {
406         if (!conductorState.equals(expectedState)) {
407             throw new IllegalStateException("Expected state: " + expectedState
408                     + ", actual state:" + conductorState);
409         }
410     }
411
412     @Override
413     public void onDisconnectEvent(DisconnectEvent arg0) {
414         SessionManager sessionManager = OFSessionUtil.getSessionManager();
415         sessionManager.invalidateOnDisconnect(this);
416     }
417
418     /**
419      * find supported version based on remoteVersion
420      * @param remoteVersion
421      * @return
422      */
423     protected short proposeVersion(short remoteVersion) {
424         Short proposal = null;
425         for (short offer : versionOrder) {
426             if (offer <= remoteVersion) {
427                 proposal = offer;
428                 break;
429             }
430         }
431         if (proposal == null) {
432             throw new IllegalArgumentException("unsupported version: "
433                     + remoteVersion);
434         }
435         return proposal;
436     }
437
438     /**
439      * find common highest supported bitmap version
440      * @param list
441      * @return
442      */
443     protected short proposeBitmapVersion(List<Elements> list)
444     {
445         Short supportedHighestVersion = null;
446         if((null != list) && (0 != list.size()))
447         {
448            for(Elements element : list)
449            {
450               List<Boolean> bitmap = element.getVersionBitmap();
451               // check for version bitmap
452               for(short bitPos : versionOrder)
453               {
454                   // with all the version it should work.
455                   if(bitmap.get(bitPos % Integer.SIZE))
456                   {
457                       supportedHighestVersion = bitPos;
458                       break;
459                   }
460               }
461            }
462            if(null == supportedHighestVersion)
463             {
464                 throw new IllegalArgumentException("unsupported bitmap version.");
465             }
466
467         }
468
469         return supportedHighestVersion;
470     }
471
472     @Override
473     public Short getVersion() {
474         return version;
475     }
476
477     @Override
478     public Future<Boolean> disconnect() {
479         LOG.info("disconnecting: sessionCtx="+sessionContext+"|auxId="+auxiliaryKey);
480         
481         Future<Boolean> result = null;
482         if (connectionAdapter.isAlive()) {
483             result = connectionAdapter.disconnect();
484         } else {
485             LOG.debug("connection already disconnected");
486             result = Futures.immediateFuture(true);
487         }
488         
489         return result; 
490     }
491
492     @Override
493     public void setConnectionCookie(SwitchConnectionDistinguisher auxiliaryKey) {
494         this.auxiliaryKey = auxiliaryKey;
495     }
496
497     @Override
498     public void setSessionContext(SessionContext sessionContext) {
499         this.sessionContext = sessionContext;
500     }
501
502     @Override
503     public SwitchConnectionDistinguisher getAuxiliaryKey() {
504         return auxiliaryKey;
505     }
506
507     @Override
508     public SessionContext getSessionContext() {
509         return sessionContext;
510     }
511     
512     /**
513      * @param listenerMapping the listenerMapping to set
514      */
515     public void setListenerMapping(
516             Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping) {
517         //TODO: adjust the listener interface
518         this.listenerMapping = listenerMapping;
519     }
520
521     /**
522      * @param messageType
523      * @param message
524      */
525     private void notifyListeners(Class<? extends DataObject> messageType, DataObject message) {
526         Collection<IMDMessageListener> listeners = listenerMapping.get(messageType);
527         if (listeners != null) {
528                 for (IMDMessageListener listener : listeners) {
529                     // Pass cookie only for PACKT_IN
530                     if ( messageType.equals("PacketInMessage.class")){
531                         listener.receive(this.getAuxiliaryKey(), this.getSessionContext(), message);
532                     } else {
533                         listener.receive(null, this.getSessionContext(), message);
534                     }
535                 }
536         } else {
537             LOG.warn("No listeners for this message Type {}", messageType);
538         }
539     }
540
541     @Override
542     public ConnectionAdapter getConnectionAdapter() {
543         return connectionAdapter;
544     }
545
546     @Override
547     public void onConnectionReady() {
548         // TODO Auto-generated method stub
549     }
550 }