Post "Clustering optimization" updates
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / HandshakeManagerImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.openflow.md.core;
9
10 import java.util.List;
11 import java.util.Objects;
12 import java.util.concurrent.Future;
13
14 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
15 import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
16 import org.opendaylight.openflowplugin.api.openflow.md.core.ErrorHandler;
17 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener;
18 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeManager;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInputBuilder;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
24 import org.opendaylight.yangtools.yang.common.RpcError;
25 import org.opendaylight.yangtools.yang.common.RpcResult;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 import com.google.common.util.concurrent.FutureCallback;
30 import com.google.common.util.concurrent.Futures;
31 import com.google.common.util.concurrent.JdkFutureAdapters;
32 import com.google.common.util.concurrent.ListenableFuture;
33 import com.google.common.util.concurrent.SettableFuture;
34
35 /**
36  * @author mirehak
37  *
38  */
39 public class HandshakeManagerImpl implements HandshakeManager {
40
41     private static final long activeXID = 20L;
42
43     private static final Logger LOG = LoggerFactory
44             .getLogger(HandshakeManagerImpl.class);
45
46     private Short lastProposedVersion;
47     private Short lastReceivedVersion;
48     private final List<Short> versionOrder;
49
50
51     private final ConnectionAdapter connectionAdapter;
52     private Short version;
53     private ErrorHandler errorHandler;
54
55
56
57     private Short highestVersion;
58
59     private Long activeXid;
60
61     private HandshakeListener handshakeListener;
62
63     private boolean useVersionBitmap;
64
65     /**
66      * @param connectionAdapter connection adaptor for switch
67      * @param highestVersion highest openflow version
68      * @param versionOrder list of version in order for connection protocol negotiation
69      */
70     public HandshakeManagerImpl(ConnectionAdapter connectionAdapter, Short highestVersion,
71             List<Short> versionOrder) {
72         this.highestVersion = highestVersion;
73         this.versionOrder = versionOrder;
74         this.connectionAdapter = connectionAdapter;
75     }
76
77     @Override
78     public void setHandshakeListener(HandshakeListener handshakeListener) {
79         this.handshakeListener = handshakeListener;
80     }
81
82     @Override
83     public synchronized void shake(HelloMessage receivedHello) {
84
85         if (version != null) {
86             // Some switches respond with a second HELLO acknowledging our HELLO
87             // but we've already completed the handshake based on the negotiated
88             // version and have registered this switch.
89             LOG.debug("Hello recieved after handshake already settled ... ignoring.");
90             return;
91         }
92
93         LOG.trace("handshake STARTED");
94         setActiveXid(activeXID);
95
96         try {
97             if (receivedHello == null) {
98                 // first Hello sending
99                 sendHelloMessage(highestVersion, getNextXid());
100                 lastProposedVersion = highestVersion;
101                 LOG.trace("ret - firstHello+wait");
102                 return;
103             }
104
105             // process the 2. and later hellos
106             Short remoteVersion = receivedHello.getVersion();
107             List<Elements> elements = receivedHello.getElements();
108             setActiveXid(receivedHello.getXid());
109             List<Boolean> remoteVersionBitmap = MessageFactory.digVersions(elements);
110             LOG.debug("Hello message: version={}, xid={}, bitmap={}", remoteVersion,
111                     receivedHello.getXid(), remoteVersionBitmap);
112
113             if (useVersionBitmap && remoteVersionBitmap != null) {
114                 // versionBitmap on both sides -> ONE STEP DECISION
115                 handleVersionBitmapNegotiation(elements);
116             } else {
117                 // versionBitmap missing at least on one side -> STEP-BY-STEP NEGOTIATION applying
118                 handleStepByStepVersionNegotiation(remoteVersion);
119             }
120         } catch (Exception ex) {
121             errorHandler.handleException(ex, null);
122             LOG.trace("ret - shake fail - closing");
123             handshakeListener.onHandshakeFailure();
124         }
125     }
126
127     /**
128      * @param remoteVersion remote version
129      * @throws Exception exception
130      */
131     private void handleStepByStepVersionNegotiation(final Short remoteVersion) throws Exception {
132         LOG.debug("remoteVersion:{} lastProposedVersion:{}, highestVersion:{}",
133                 remoteVersion, lastProposedVersion, highestVersion);
134
135         if (lastProposedVersion == null) {
136             // first hello has not been sent yet, send it and either wait for next remote
137             // version or proceed
138             lastProposedVersion = proposeNextVersion(remoteVersion);
139             final Long nextHelloXid = getNextXid();
140             ListenableFuture<Void> helloResult = sendHelloMessage(lastProposedVersion, nextHelloXid);
141             Futures.addCallback(helloResult, new FutureCallback<Void>() {
142                 @Override
143                 public void onSuccess(Void result) {
144                     try {
145                         stepByStepVersionSubStep(remoteVersion, lastProposedVersion);
146                     } catch (Exception e) {
147                         errorHandler.handleException(e, null);
148                         handshakeListener.onHandshakeFailure();
149                     }
150                 }
151
152                 @Override
153                 public void onFailure(Throwable t) {
154                     LOG.info("hello sending seriously failed [{}]", nextHelloXid);
155                     LOG.trace("detail of hello send problem", t);
156                 }
157             });
158         } else {
159             stepByStepVersionSubStep(remoteVersion, lastProposedVersion);
160         }
161     }
162
163     private void stepByStepVersionSubStep(Short remoteVersion, Short lastProposedVersion) throws Exception {
164         if (remoteVersion.equals(lastProposedVersion)) {
165             postHandshake(lastProposedVersion, getNextXid());
166             LOG.trace("ret - OK - switch answered with lastProposedVersion");
167         } else {
168             checkNegotiationStalling(remoteVersion);
169
170             if (remoteVersion > (lastProposedVersion == null ? highestVersion : this.lastProposedVersion)) {
171                 // wait for next version
172                 LOG.trace("ret - wait");
173             } else {
174                 //propose lower version
175                 handleLowerVersionProposal(remoteVersion);
176             }
177         }
178     }
179
180     /**
181      * @param remoteVersion remote version
182      * @throws Exception exception
183      */
184     private void handleLowerVersionProposal(Short remoteVersion) throws Exception {
185         Short proposedVersion;
186         // find the version from header version field
187         proposedVersion = proposeNextVersion(remoteVersion);
188         lastProposedVersion = proposedVersion;
189         sendHelloMessage(proposedVersion, getNextXid());
190
191         if (! Objects.equals(proposedVersion, remoteVersion)) {
192             LOG.trace("ret - sent+wait");
193         } else {
194             LOG.trace("ret - sent+OK");
195             postHandshake(proposedVersion, getNextXid());
196         }
197     }
198
199     /**
200      * @param elements version elements
201      * @throws Exception exception
202      */
203     private void handleVersionBitmapNegotiation(List<Elements> elements) throws Exception {
204         final Short proposedVersion = proposeCommonBitmapVersion(elements);
205         if (lastProposedVersion == null) {
206             // first hello has not been sent yet
207             Long nexHelloXid = getNextXid();
208             ListenableFuture<Void> helloDone = sendHelloMessage(proposedVersion, nexHelloXid);
209             Futures.addCallback(helloDone, new FutureCallback<Void>() {
210                 @Override
211                 public void onSuccess(Void result) {
212                     LOG.trace("ret - DONE - versionBitmap");
213                     postHandshake(proposedVersion, getNextXid());
214                 }
215
216                 @Override
217                 public void onFailure(Throwable t) {
218                     // NOOP
219                 }
220             });
221             LOG.trace("next proposal [{}] with versionBitmap hooked ..", nexHelloXid);
222         } else {
223             LOG.trace("ret - DONE - versionBitmap");
224             postHandshake(proposedVersion, getNextXid());
225         }
226     }
227
228     /**
229      *
230      * @return next tx id
231      */
232     private Long getNextXid() {
233         activeXid += 1;
234         return activeXid;
235     }
236
237     /**
238      * @param xid tx id
239      */
240     private void setActiveXid(Long xid) {
241         this.activeXid = xid;
242     }
243
244     /**
245      * @param remoteVersion remove version
246      */
247     private void checkNegotiationStalling(Short remoteVersion) {
248         if (lastReceivedVersion != null && lastReceivedVersion.equals(remoteVersion)) {
249             throw new IllegalStateException("version negotiation stalled: version = "+remoteVersion);
250         }
251         lastReceivedVersion = remoteVersion;
252     }
253
254     @Override
255     public Short getVersion() {
256         return version;
257     }
258
259     /**
260      * find common highest supported bitmap version
261      * @param list bitmap list
262      * @return proposed bitmap value
263      */
264     protected Short proposeCommonBitmapVersion(List<Elements> list) {
265         Short supportedHighestVersion = null;
266         if((null != list) && (0 != list.size())) {
267             for(Elements element : list) {
268                 List<Boolean> bitmap = element.getVersionBitmap();
269                 // check for version bitmap
270                 for(short bitPos : ConnectionConductor.versionOrder) {
271                     // with all the version it should work.
272                     if(bitmap.get(bitPos % Integer.SIZE)) {
273                         supportedHighestVersion = bitPos;
274                         break;
275                     }
276                 }
277             }
278
279             if(null == supportedHighestVersion) {
280                 LOG.trace("versionBitmap: no common version found");
281                 throw new IllegalArgumentException("no common version found in versionBitmap");
282             }
283         }
284
285         return supportedHighestVersion;
286     }
287
288     /**
289      * find supported version based on remoteVersion
290      * @param remoteVersion openflow version supported by remote entity
291      * @return openflow version
292      */
293     protected short proposeNextVersion(short remoteVersion) {
294         Short proposal = null;
295         for (short offer : versionOrder) {
296             if (offer <= remoteVersion) {
297                 proposal = offer;
298                 break;
299             }
300         }
301         if (proposal == null) {
302             throw new IllegalArgumentException("no equal or lower version found, unsupported version: "
303                     + remoteVersion);
304         }
305         return proposal;
306     }
307
308     /**
309      * send hello reply without versionBitmap
310      * @param helloVersion initial hello version for openflow connection negotiation
311      * @param helloXid transaction id
312      * @throws Exception
313      */
314     private ListenableFuture<Void> sendHelloMessage(Short helloVersion, final Long helloXid) throws Exception {
315
316
317         HelloInput helloInput = MessageFactory.createHelloInput(helloVersion, helloXid, versionOrder);
318
319         final SettableFuture<Void> resultFtr = SettableFuture.create();
320
321         LOG.debug("sending hello message: version{}, xid={}, version bitmap={}",
322                 helloVersion, helloXid, MessageFactory.digVersions(helloInput.getElements()));
323
324         Future<RpcResult<Void>> helloResult = connectionAdapter.hello(helloInput);
325
326         ListenableFuture<RpcResult<Void>> rpcResultListenableFuture = JdkFutureAdapters.listenInPoolThread(helloResult);
327         Futures.addCallback(rpcResultListenableFuture, new FutureCallback<RpcResult<Void>>() {
328             @Override
329             public void onSuccess(RpcResult<Void> result) {
330                 if (result.isSuccessful()) {
331                     LOG.debug("hello successfully sent, xid={}, addr={}", helloXid, connectionAdapter.getRemoteAddress());
332                     resultFtr.set(null);
333                 } else {
334                     for (RpcError error : result.getErrors()) {
335                         LOG.debug("hello sending failed [{}]: i:{} s:{} m:{}, addr:{}", helloXid,
336                                 error.getInfo(), error.getSeverity(), error.getMessage(),
337                                 connectionAdapter.getRemoteAddress());
338                         if (error.getCause() != null) {
339                             LOG.trace("DETAIL of sending hello failure", error.getCause());
340                         }
341                     }
342                     resultFtr.cancel(false);
343                     handshakeListener.onHandshakeFailure();
344                 }
345             }
346
347             @Override
348             public void onFailure(Throwable t) {
349                 LOG.warn("sending of hello failed seriously [{}, addr:{}]: {}", helloXid,
350                         connectionAdapter.getRemoteAddress(), t.getMessage());
351                 LOG.trace("DETAIL of sending of hello failure:", t);
352                 resultFtr.cancel(false);
353                 handshakeListener.onHandshakeFailure();
354             }
355         });
356         LOG.trace("sending hello message [{}] - result hooked ..", helloXid);
357         return resultFtr;
358     }
359
360
361     /**
362      * after handshake set features, register to session
363      * @param proposedVersion proposed openflow version
364      * @param xid transaction id
365      */
366     protected void postHandshake(final Short proposedVersion, final Long xid) {
367         // set version
368         version = proposedVersion;
369
370         LOG.debug("version set: {}", proposedVersion);
371         // request features
372         GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder();
373         featuresBuilder.setVersion(version).setXid(xid);
374         LOG.debug("sending feature request for version={} and xid={}", version, xid);
375         Future<RpcResult<GetFeaturesOutput>> featuresFuture = connectionAdapter
376                 .getFeatures(featuresBuilder.build());
377
378         Futures.addCallback(JdkFutureAdapters.listenInPoolThread(featuresFuture),
379                 new FutureCallback<RpcResult<GetFeaturesOutput>>() {
380                     @Override
381                     public void onSuccess(RpcResult<GetFeaturesOutput> rpcFeatures) {
382                         LOG.trace("features are back");
383                         if (rpcFeatures.isSuccessful()) {
384                             GetFeaturesOutput featureOutput = rpcFeatures.getResult();
385
386                             LOG.debug("obtained features: datapathId={}",
387                                     featureOutput.getDatapathId());
388                             LOG.debug("obtained features: auxiliaryId={}",
389                                     featureOutput.getAuxiliaryId());
390                             LOG.trace("handshake SETTLED: version={}, datapathId={}, auxiliaryId={}",
391                                     version, featureOutput.getDatapathId(), featureOutput.getAuxiliaryId());
392                             handshakeListener.onHandshakeSuccessful(featureOutput, proposedVersion);
393                         } else {
394                             // handshake failed
395                             LOG.warn("issuing disconnect during handshake [{}]", connectionAdapter.getRemoteAddress());
396                             for (RpcError rpcError : rpcFeatures.getErrors()) {
397                                 LOG.debug("handshake - features failure [{}]: i:{} | m:{} | s:{}", xid,
398                                         rpcError.getInfo(), rpcError.getMessage(), rpcError.getSeverity(),
399                                         rpcError.getCause()
400                                 );
401                             }
402                             handshakeListener.onHandshakeFailure();
403                         }
404
405                         LOG.debug("postHandshake DONE");
406                     }
407
408                     @Override
409                     public void onFailure(Throwable t) {
410                         LOG.warn("getting feature failed seriously [{}, addr:{}]: {}", xid,
411                                 connectionAdapter.getRemoteAddress(), t.getMessage());
412                         LOG.trace("DETAIL of sending of hello failure:", t);
413                     }
414                 });
415
416         LOG.debug("future features [{}] hooked ..", xid);
417
418     }
419
420     @Override
421     public void setUseVersionBitmap(boolean useVersionBitmap) {
422         this.useVersionBitmap = useVersionBitmap;
423     }
424
425     @Override
426     public void setErrorHandler(ErrorHandler errorHandler) {
427         this.errorHandler = errorHandler;
428     }
429 }