Merge "handled review comments (sending rpc message to library and version negotiation )"
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / ConnectionConductorImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.openflow.md.core;
10
11 import java.util.ArrayList;
12 import java.util.Collection;
13 import java.util.List;
14 import java.util.concurrent.Future;
15 import java.util.concurrent.LinkedBlockingQueue;
16 import java.util.concurrent.TimeUnit;
17
18 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
19 import org.opendaylight.openflowplugin.openflow.core.IMessageListener;
20 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
21 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
22 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionManager;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.HelloElementType;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInputBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInputBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestMessage;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.ElementsBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
45 import org.opendaylight.yangtools.yang.binding.DataObject;
46 import org.opendaylight.yangtools.yang.common.RpcError;
47 import org.opendaylight.yangtools.yang.common.RpcResult;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 import com.google.common.collect.ImmutableMap;
52 import com.google.common.collect.Lists;
53
54 /**
55  * @author mirehak
56  */
57 public class ConnectionConductorImpl implements OpenflowProtocolListener,
58         SystemNotificationsListener, ConnectionConductor {
59
60     private static final Logger LOG = LoggerFactory
61             .getLogger(ConnectionConductorImpl.class);
62
63     /* variable to make BitMap-based negotiation enabled / disabled.
64      * it will help while testing and isolating issues related to processing of
65      * BitMaps from switches.
66      */
67     private static final boolean isBitmapNegotiationEnable = true;
68     private LinkedBlockingQueue<Exception> errorQueue = new LinkedBlockingQueue<>();
69
70     private final ConnectionAdapter connectionAdapter;
71     private final List<Short> versionOrder;
72     private ConnectionConductor.CONDUCTOR_STATE conductorState;
73     private Short version;
74
75     private SwitchConnectionDistinguisher auxiliaryKey;
76
77     private SessionContext sessionContext;
78
79     private ImmutableMap<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping;
80
81     private boolean isFirstHelloNegotiation = true;
82
83
84
85     /**
86      * @param connectionAdapter
87      */
88     public ConnectionConductorImpl(ConnectionAdapter connectionAdapter) {
89         this.connectionAdapter = connectionAdapter;
90         conductorState = CONDUCTOR_STATE.HANDSHAKING;
91         versionOrder = Lists.newArrayList((short) 0x04, (short) 0x01);
92         // TODO: add a thread pool to handle ErrorQueueHandler
93         new Thread(new ErrorQueueHandler(errorQueue)).start();
94     }
95
96     @Override
97     public void init() {
98         connectionAdapter.setMessageListener(this);
99         connectionAdapter.setSystemListener(this);
100         //TODO : Wait for library to provide interface from which we can send first hello message
101 //        sendFirstHelloMessage();
102     }
103
104
105     /**
106      * send first hello message to switch
107      */
108     private void sendFirstHelloMessage() {
109         short highestVersion = versionOrder.get(0);
110         Long helloXid = 1L;
111         HelloInputBuilder helloInputbuilder = new HelloInputBuilder();
112         helloInputbuilder.setVersion(highestVersion);
113         helloInputbuilder.setXid(helloXid);
114         if (isBitmapNegotiationEnable) {
115             int elementsCount = highestVersion / Integer.SIZE;
116             ElementsBuilder elementsBuilder = new ElementsBuilder();
117
118             List<Elements> elementList = new ArrayList<Elements>();
119             int orderIndex = versionOrder.size();
120             int value = versionOrder.get(--orderIndex);
121             for (int index = 0; index <= elementsCount; index++) {
122                 List<Boolean> booleanList = new ArrayList<Boolean>();
123                 for (int i = 0; i < Integer.SIZE; i++) {
124                     if (value == ((index * Integer.SIZE) + i)) {
125                         booleanList.add(true);
126                         value = (orderIndex == 0) ? highestVersion : versionOrder.get(--orderIndex);
127                     } else {
128                         booleanList.add(false);
129                     }
130                 }
131                 elementsBuilder.setType(HelloElementType.forValue(1));
132                 elementsBuilder.setVersionBitmap(booleanList);
133                 elementList.add(elementsBuilder.build());
134             }
135             helloInputbuilder.setElements(elementList);
136             LOG.debug("sending first hello message: version header={} , version bitmap={}", highestVersion, elementList);
137         } else {
138             LOG.debug("sending first hello message: version header={} ", highestVersion);
139         }
140         connectionAdapter.hello(helloInputbuilder.build());
141
142     }
143
144     @Override
145     public void onEchoRequestMessage(EchoRequestMessage echoRequestMessage) {
146         LOG.debug("echo request received: " + echoRequestMessage.getXid());
147         EchoReplyInputBuilder builder = new EchoReplyInputBuilder();
148         builder.setVersion(echoRequestMessage.getVersion());
149         builder.setXid(echoRequestMessage.getXid());
150         builder.setData(echoRequestMessage.getData());
151
152         connectionAdapter.echoReply(builder.build());
153     }
154
155     @Override
156     public void onErrorMessage(ErrorMessage errorMessage) {
157         // TODO Auto-generated method stub
158         LOG.debug("error received, type: " + errorMessage.getType()
159                 + "; code: " + errorMessage.getCode());
160     }
161
162     @Override
163     public void onExperimenterMessage(ExperimenterMessage experimenterMessage) {
164         // TODO Auto-generated method stub
165         LOG.debug("experimenter received, type: "
166                 + experimenterMessage.getExpType());
167     }
168
169     @Override
170     public void onFlowRemovedMessage(FlowRemovedMessage arg0) {
171         // TODO Auto-generated method stub
172     }
173
174
175     /**
176      * version negotiation happened as per following steps:
177      * 1. If HelloMessage version field has same version, continue connection processing.
178      *    If HelloMessage version is lower than supported versions, just disconnect.
179      * 2. If HelloMessage contains bitmap and common version found in bitmap
180      *    then continue connection processing. if no common version found, just disconnect.
181      * 3. If HelloMessage version is not supported, send HelloMessage with lower supported version.
182      *    If Hello message received again with not supported version, just disconnect.
183      *
184      *   TODO: Better to handle handshake into a maintainable innerclass which uses State-Pattern.
185      */
186     @Override
187     public void onHelloMessage(HelloMessage hello) {
188         // do handshake
189         LOG.info("handshake STARTED");
190         checkState(CONDUCTOR_STATE.HANDSHAKING);
191
192         Short remoteVersion = hello.getVersion();
193         List<Elements> elements = hello.getElements();
194         long xid = hello.getXid();
195         short proposedVersion;
196         LOG.debug("Hello message version={} and bitmap={}", remoteVersion, elements);
197         try {
198             // find the version from header version field
199             proposedVersion = proposeVersion(remoteVersion);
200
201         } catch (IllegalArgumentException e) {
202             handleException(e);
203             connectionAdapter.disconnect();
204             throw e;
205         }
206
207         // sent version is equal to remote --> version is negotiated
208         if (proposedVersion == remoteVersion) {
209             LOG.debug("sending helloReply as version in header is supported: {}", proposedVersion);
210             sendHelloReply(proposedVersion, ++xid);
211             postHandshake(proposedVersion, ++xid);
212
213         } else if (isBitmapNegotiationEnable && null != elements && 0 != elements.size()) {
214             try {
215                 // hello contains version bitmap, checking highest common
216                 // version in bitmap
217                 proposedVersion = proposeBitmapVersion(elements);
218             } catch (IllegalArgumentException ex) {
219                 handleException(ex);
220                 connectionAdapter.disconnect();
221                 throw ex;
222             }
223             LOG.debug("sending helloReply for common bitmap version : {}", proposedVersion);
224             sendHelloReply(proposedVersion, ++xid);
225             postHandshake(proposedVersion, ++xid);
226         } else {
227             if (isFirstHelloNegotiation) {
228                 isFirstHelloNegotiation = false;
229                 LOG.debug("sending helloReply for lowest supported version : {}", proposedVersion);
230                 // send hello reply with lower version number supported
231                 sendHelloReply(proposedVersion, ++xid);
232             } else {
233                 // terminate the connection.
234                 LOG.debug("Version negotiation failed. unsupported version : {}", remoteVersion);
235                 connectionAdapter.disconnect();
236             }
237         }
238     }
239
240     /**
241      * send hello reply
242      * @param proposedVersion
243      * @param hello
244      */
245     private void sendHelloReply(Short proposedVersion, Long xid)
246     {
247         HelloInputBuilder helloBuilder = new HelloInputBuilder();
248         helloBuilder.setVersion(proposedVersion).setXid(xid);
249         connectionAdapter.hello(helloBuilder.build());
250     }
251
252
253     /**
254      * after handshake set features, register to session
255      * @param proposedVersion
256      * @param xId
257      */
258     private void postHandshake(Short proposedVersion, Long xid) {
259         // set version
260         version = proposedVersion;
261         LOG.debug("version set: " + proposedVersion);
262         // request features
263         GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder();
264             featuresBuilder.setVersion(version).setXid(xid);
265         LOG.debug("sending feature request for version={} and xid={}", version, xid);
266         Future<RpcResult<GetFeaturesOutput>> featuresFuture = connectionAdapter
267                 .getFeatures(featuresBuilder.build());
268         LOG.debug("waiting for features");
269         RpcResult<GetFeaturesOutput> rpcFeatures;
270         try {
271             rpcFeatures = featuresFuture.get(getMaxTimeout(),
272                     TimeUnit.MILLISECONDS);
273             if (!rpcFeatures.isSuccessful()) {
274                 LOG.error("obtained features problem: {}"
275                         , rpcFeatures.getErrors());
276             } else {
277                 GetFeaturesOutput featureOutput =  rpcFeatures.getResult();
278                 LOG.debug("obtained features: datapathId={}"
279                         , featureOutput.getDatapathId());
280                 conductorState = CONDUCTOR_STATE.WORKING;
281
282                 OFSessionUtil.registerSession(this,
283                         featureOutput, version);
284                 LOG.info("handshake SETTLED: datapathId={}, auxiliaryId={}", featureOutput.getDatapathId(), featureOutput.getAuxiliaryId());
285             }
286         } catch (Exception e) {
287             handleException(e);
288         }
289     }
290
291     /**
292      * @return rpc-response timeout in [ms]
293      */
294     private long getMaxTimeout() {
295         // TODO:: get from configuration
296         return 2000;
297     }
298
299     /**
300      * @param e
301      */
302     private void handleException(Exception e) {
303         try {
304             errorQueue.put(e);
305         } catch (InterruptedException e1) {
306             LOG.error(e1.getMessage(), e1);
307         }
308     }
309
310     @Override
311     public void onMultipartReplyMessage(MultipartReplyMessage arg0) {
312         // TODO Auto-generated method stub
313     }
314
315     @Override
316     public void onMultipartRequestMessage(MultipartRequestMessage arg0) {
317         // TODO Auto-generated method stub
318     }
319
320     @Override
321     public void onPacketInMessage(PacketInMessage message) {
322         notifyListeners(PacketInMessage.class, message);
323     }
324
325     @Override
326     public void onPortStatusMessage(PortStatusMessage arg0) {
327         // TODO Auto-generated method stub
328     }
329
330     @Override
331     public void onSwitchIdleEvent(SwitchIdleEvent notification) {
332         if (!CONDUCTOR_STATE.WORKING.equals(conductorState)) {
333             // idle state in any other conductorState than WORKING means real
334             // problem and wont be handled by echoReply, but disconnection
335             OFSessionUtil.getSessionManager().invalidateOnDisconnect(this);
336         } else {
337             LOG.debug("first idle state occured");
338             EchoInputBuilder builder = new EchoInputBuilder();
339             builder.setVersion(version);
340             // TODO: get xid from sessionContext
341             builder.setXid(42L);
342
343             Future<RpcResult<EchoOutput>> echoReplyFuture = connectionAdapter
344                     .echo(builder.build());
345
346             try {
347                 // TODO: read timeout from config
348                 RpcResult<EchoOutput> echoReplyValue = echoReplyFuture.get(5,
349                         TimeUnit.SECONDS);
350                 if (echoReplyValue.isSuccessful()) {
351                     conductorState = CONDUCTOR_STATE.WORKING;
352                 } else {
353                     for (RpcError replyError : echoReplyValue.getErrors()) {
354                         Throwable cause = replyError.getCause();
355                         LOG.error(
356                                 "while receiving echoReply in TIMEOUTING state: "
357                                         + cause.getMessage(), cause);
358                     }
359                 }
360             } catch (Exception e) {
361                 LOG.error("while waiting for echoReply in TIMEOUTING state: "
362                         + e.getMessage(), e);
363             }
364         }
365     }
366
367     /**
368      * @param conductorState
369      *            the connectionState to set
370      */
371     @Override
372     public void setConductorState(CONDUCTOR_STATE conductorState) {
373         this.conductorState = conductorState;
374     }
375
376     @Override
377     public CONDUCTOR_STATE getConductorState() {
378         return conductorState;
379     }
380
381     /**
382      * @param handshaking
383      */
384     private void checkState(CONDUCTOR_STATE expectedState) {
385         if (!conductorState.equals(expectedState)) {
386             throw new IllegalStateException("Expected state: " + expectedState
387                     + ", actual state:" + conductorState);
388         }
389     }
390
391     @Override
392     public void onDisconnectEvent(DisconnectEvent arg0) {
393         SessionManager sessionManager = OFSessionUtil.getSessionManager();
394         sessionManager.invalidateOnDisconnect(this);
395     }
396
397     /**
398      * find supported version based on remoteVersion
399      * @param remoteVersion
400      * @return
401      */
402     protected short proposeVersion(short remoteVersion) {
403         Short proposal = null;
404         for (short offer : versionOrder) {
405             if (offer <= remoteVersion) {
406                 proposal = offer;
407                 break;
408             }
409         }
410         if (proposal == null) {
411             throw new IllegalArgumentException("unsupported version: "
412                     + remoteVersion);
413         }
414         return proposal;
415     }
416
417     /**
418      * find common highest supported bitmap version
419      * @param list
420      * @return
421      */
422     protected short proposeBitmapVersion(List<Elements> list)
423     {
424         Short supportedHighestVersion = null;
425         if((null != list) && (0 != list.size()))
426         {
427            for(Elements element : list)
428            {
429               List<Boolean> bitmap = element.getVersionBitmap();
430               // check for version bitmap
431               for(short bitPos : versionOrder)
432               {
433                   // with all the version it should work.
434                   if(bitmap.get(bitPos % Integer.SIZE))
435                   {
436                       supportedHighestVersion = bitPos;
437                       break;
438                   }
439               }
440            }
441            if(null == supportedHighestVersion)
442             {
443                 throw new IllegalArgumentException("unsupported bitmap version.");
444             }
445
446         }
447
448         return supportedHighestVersion;
449     }
450
451     @Override
452     public Short getVersion() {
453         return version;
454     }
455
456     @Override
457     public Future<Boolean> disconnect() {
458         return connectionAdapter.disconnect();
459     }
460
461     @Override
462     public void setConnectionCookie(SwitchConnectionDistinguisher auxiliaryKey) {
463         this.auxiliaryKey = auxiliaryKey;
464     }
465
466     @Override
467     public void setSessionContext(SessionContext sessionContext) {
468         this.sessionContext = sessionContext;
469     }
470
471     @Override
472     public SwitchConnectionDistinguisher getAuxiliaryKey() {
473         return auxiliaryKey;
474     }
475
476     @Override
477     public SessionContext getSessionContext() {
478         return sessionContext;
479     }
480
481     /**
482      * @param listenerMapping the listenerMapping to set
483      */
484     public void setListenerMapping(
485             ImmutableMap<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping) {
486         //TODO: adjust the listener interface
487         this.listenerMapping = listenerMapping;
488     }
489
490     /**
491      * @param messageType
492      * @param message
493      */
494     private void notifyListeners(Class<? extends DataObject> messageType, DataObject message) {
495         Collection<IMDMessageListener> listeners = listenerMapping.get(messageType);
496         if (listeners != null) {
497                 for (IMDMessageListener listener : listeners) {
498                 //TODO : need to add unit-tests
499                 //listener.receive(this.getAuxiliaryKey(), this.getSessionContext(), message);
500             }
501         }
502     }
503
504     @Override
505     public ConnectionAdapter getConnectionAdapter() {
506         return connectionAdapter;
507     }
508 }