OPNFLWPLUG-1010 Adopt mdsal changes proposed through weather item TSC-99
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / connection / HandshakeManagerImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.connection;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.JdkFutureAdapters;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import com.google.common.util.concurrent.SettableFuture;
17 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
18 import java.util.List;
19 import java.util.Objects;
20 import java.util.concurrent.Future;
21 import javax.annotation.Nonnull;
22 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
23 import org.opendaylight.openflowplugin.api.OFConstants;
24 import org.opendaylight.openflowplugin.api.openflow.md.core.ErrorHandler;
25 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener;
26 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeManager;
27 import org.opendaylight.openflowplugin.impl.common.DeviceConnectionRateLimiter;
28 import org.opendaylight.openflowplugin.impl.util.MessageFactory;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInputBuilder;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloOutput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
35 import org.opendaylight.yangtools.yang.common.RpcError;
36 import org.opendaylight.yangtools.yang.common.RpcResult;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 public class HandshakeManagerImpl implements HandshakeManager {
41
42     private static final long ACTIVE_XID = 20L;
43
44     private static final Logger LOG = LoggerFactory.getLogger(HandshakeManagerImpl.class);
45
46     private Short lastProposedVersion;
47     private Short lastReceivedVersion;
48     private final List<Short> versionOrder;
49
50     private final ConnectionAdapter connectionAdapter;
51     private Short version;
52     private final ErrorHandler errorHandler;
53
54     private final Short highestVersion;
55
56     private Long activeXid;
57
58     private final HandshakeListener handshakeListener;
59
60     private boolean useVersionBitmap; // not final just for unit test
61
62     private final DeviceConnectionRateLimiter deviceConnectionRateLimiter;
63
64     /**
65      * Constructor.
66      *
67      * @param connectionAdapter connection adaptor for switch
68      * @param highestVersion    highest openflow version
69      * @param versionOrder      list of version in order for connection protocol negotiation
70      * @param errorHandler      the ErrorHandler
71      * @param handshakeListener the HandshakeListener
72      * @param useVersionBitmap  should use negotiation bit map
73      */
74     public HandshakeManagerImpl(ConnectionAdapter connectionAdapter, Short highestVersion, List<Short> versionOrder,
75                                 ErrorHandler errorHandler, HandshakeListener handshakeListener,
76                                 boolean useVersionBitmap, DeviceConnectionRateLimiter deviceConnectionRateLimiter) {
77         this.highestVersion = highestVersion;
78         this.versionOrder = versionOrder;
79         this.connectionAdapter = connectionAdapter;
80         this.errorHandler = errorHandler;
81         this.handshakeListener = handshakeListener;
82         this.useVersionBitmap = useVersionBitmap;
83         this.deviceConnectionRateLimiter = deviceConnectionRateLimiter;
84     }
85
86     @Override
87     @SuppressWarnings("checkstyle:IllegalCatch")
88     public synchronized void shake(HelloMessage receivedHello) {
89
90         if (version != null) {
91             // Some switches respond with a second HELLO acknowledging our HELLO
92             // but we've already completed the handshake based on the negotiated
93             // version and have registered this switch.
94             LOG.debug("Hello recieved after handshake already settled ... ignoring.");
95             return;
96         }
97
98         LOG.trace("handshake STARTED");
99         setActiveXid(ACTIVE_XID);
100
101         try {
102             if (receivedHello == null) {
103                 // first Hello sending
104                 sendHelloMessage(highestVersion, getNextXid());
105                 lastProposedVersion = highestVersion;
106                 LOG.trace("ret - firstHello+wait");
107                 return;
108             }
109
110             // process the 2. and later hellos
111             Short remoteVersion = receivedHello.getVersion();
112             List<Elements> elements = receivedHello.getElements();
113             setActiveXid(receivedHello.getXid());
114             List<Boolean> remoteVersionBitmap = MessageFactory.digVersions(elements);
115             LOG.debug("Hello message: version={}, xid={}, bitmap={}", remoteVersion, receivedHello.getXid(),
116                       remoteVersionBitmap);
117
118             if (useVersionBitmap && remoteVersionBitmap != null) {
119                 // versionBitmap on both sides -> ONE STEP DECISION
120                 handleVersionBitmapNegotiation(elements);
121             } else {
122                 // versionBitmap missing at least on one side -> STEP-BY-STEP NEGOTIATION applying
123                 handleStepByStepVersionNegotiation(remoteVersion);
124             }
125         } catch (Exception ex) {
126             errorHandler.handleException(ex);
127             LOG.trace("ret - shake fail - closing");
128             handshakeListener.onHandshakeFailure();
129         }
130     }
131
132     /**
133      * Handles the version negotiation step by step.
134      *
135      * @param remoteVersion remote version
136      * @throws Exception exception
137      */
138     @SuppressWarnings("checkstyle:IllegalCatch")
139     private void handleStepByStepVersionNegotiation(final Short remoteVersion) throws Exception {
140         LOG.debug("remoteVersion:{} lastProposedVersion:{}, highestVersion:{}", remoteVersion, lastProposedVersion,
141                   highestVersion);
142
143         if (lastProposedVersion == null) {
144             // first hello has not been sent yet, send it and either wait for next remote
145             // version or proceed
146             lastProposedVersion = proposeNextVersion(remoteVersion);
147             final Long nextHelloXid = getNextXid();
148             ListenableFuture<Void> helloResult = sendHelloMessage(lastProposedVersion, nextHelloXid);
149             Futures.addCallback(helloResult, new FutureCallback<Void>() {
150                 @Override
151                 public void onSuccess(Void result) {
152                     try {
153                         stepByStepVersionSubStep(remoteVersion);
154                     } catch (Exception e) {
155                         errorHandler.handleException(e);
156                         handshakeListener.onHandshakeFailure();
157                     }
158                 }
159
160                 @Override
161                 public void onFailure(Throwable throwable) {
162                     LOG.info("hello sending seriously failed [{}]", nextHelloXid);
163                     LOG.trace("detail of hello send problem", throwable);
164                 }
165             }, MoreExecutors.directExecutor());
166         } else {
167             stepByStepVersionSubStep(remoteVersion);
168         }
169     }
170
171     private void stepByStepVersionSubStep(Short remoteVersion) throws Exception {
172         if (remoteVersion.equals(lastProposedVersion)) {
173             postHandshake(lastProposedVersion, getNextXid());
174             LOG.trace("ret - OK - switch answered with lastProposedVersion");
175         } else {
176             checkNegotiationStalling(remoteVersion);
177
178             if (remoteVersion > (lastProposedVersion == null ? highestVersion : lastProposedVersion)) {
179                 // wait for next version
180                 LOG.trace("ret - wait");
181             } else {
182                 //propose lower version
183                 handleLowerVersionProposal(remoteVersion);
184             }
185         }
186     }
187
188     /**
189      * Handles a proposal for a lower version.
190      *
191      * @param remoteVersion remote version
192      * @throws Exception exception
193      */
194     private void handleLowerVersionProposal(Short remoteVersion) throws Exception {
195         Short proposedVersion;
196         // find the version from header version field
197         proposedVersion = proposeNextVersion(remoteVersion);
198         lastProposedVersion = proposedVersion;
199         sendHelloMessage(proposedVersion, getNextXid());
200
201         if (!Objects.equals(proposedVersion, remoteVersion)) {
202             LOG.trace("ret - sent+wait");
203         } else {
204             LOG.trace("ret - sent+OK");
205             postHandshake(proposedVersion, getNextXid());
206         }
207     }
208
209     /**
210      * Handles the negotiation of the version bitmap.
211      *
212      * @param elements version elements
213      * @throws Exception exception
214      */
215     private void handleVersionBitmapNegotiation(List<Elements> elements) throws Exception {
216         final Short proposedVersion = proposeCommonBitmapVersion(elements);
217         if (lastProposedVersion == null) {
218             // first hello has not been sent yet
219             Long nexHelloXid = getNextXid();
220             ListenableFuture<Void> helloDone = sendHelloMessage(proposedVersion, nexHelloXid);
221             Futures.addCallback(helloDone, new FutureCallback<Void>() {
222                 @Override
223                 public void onSuccess(Void result) {
224                     LOG.trace("ret - DONE - versionBitmap");
225                     postHandshake(proposedVersion, getNextXid());
226                 }
227
228                 @Override
229                 public void onFailure(Throwable throwable) {
230                     // NOOP
231                 }
232             }, MoreExecutors.directExecutor());
233             LOG.trace("next proposal [{}] with versionBitmap hooked ..", nexHelloXid);
234         } else {
235             LOG.trace("ret - DONE - versionBitmap");
236             postHandshake(proposedVersion, getNextXid());
237         }
238     }
239
240     private Long getNextXid() {
241         activeXid += 1;
242         return activeXid;
243     }
244
245     private void setActiveXid(Long xid) {
246         this.activeXid = xid;
247     }
248
249     /**
250      * Checks negotiation stalling.
251      *
252      * @param remoteVersion remove version
253      */
254     private void checkNegotiationStalling(Short remoteVersion) {
255         if (lastReceivedVersion != null && lastReceivedVersion.equals(remoteVersion)) {
256             throw new IllegalStateException("version negotiation stalled: version = " + remoteVersion);
257         }
258         lastReceivedVersion = remoteVersion;
259     }
260
261     @Override
262     public Short getVersion() {
263         return version;
264     }
265
266     /**
267      * find common highest supported bitmap version.
268      *
269      * @param list bitmap list
270      * @return proposed bitmap value
271      */
272     protected Short proposeCommonBitmapVersion(List<Elements> list) {
273         Short supportedHighestVersion = null;
274         if (null != list && 0 != list.size()) {
275             for (Elements element : list) {
276                 List<Boolean> bitmap = element.getVersionBitmap();
277                 // check for version bitmap
278                 for (short bitPos : OFConstants.VERSION_ORDER) {
279                     // with all the version it should work.
280                     if (bitmap.get(bitPos % Integer.SIZE)) {
281                         supportedHighestVersion = bitPos;
282                         break;
283                     }
284                 }
285             }
286
287             if (null == supportedHighestVersion) {
288                 LOG.trace("versionBitmap: no common version found");
289                 throw new IllegalArgumentException("no common version found in versionBitmap");
290             }
291         }
292
293         return supportedHighestVersion;
294     }
295
296     /**
297      * find supported version based on remoteVersion.
298      *
299      * @param remoteVersion openflow version supported by remote entity
300      * @return openflow version
301      */
302     protected short proposeNextVersion(short remoteVersion) {
303         Short proposal = null;
304         for (short offer : versionOrder) {
305             if (offer <= remoteVersion) {
306                 proposal = offer;
307                 break;
308             }
309         }
310         if (proposal == null) {
311             throw new IllegalArgumentException(
312                     "no equal or lower version found, unsupported version: " + remoteVersion);
313         }
314         return proposal;
315     }
316
317     /**
318      * send hello reply without versionBitmap.
319      *
320      * @param helloVersion initial hello version for openflow connection negotiation
321      * @param helloXid     transaction id
322      */
323     private ListenableFuture<Void> sendHelloMessage(Short helloVersion, final Long helloXid) throws Exception {
324
325
326         HelloInput helloInput = MessageFactory.createHelloInput(helloVersion, helloXid, versionOrder);
327
328         final SettableFuture<Void> resultFtr = SettableFuture.create();
329
330         LOG.debug("sending hello message: version{}, xid={}, version bitmap={}", helloVersion, helloXid,
331                   MessageFactory.digVersions(helloInput.getElements()));
332
333         Future<RpcResult<HelloOutput>> helloResult = connectionAdapter.hello(helloInput);
334
335         ListenableFuture<RpcResult<HelloOutput>> rpcResultListenableFuture
336                 = JdkFutureAdapters.listenInPoolThread(helloResult);
337         Futures.addCallback(rpcResultListenableFuture, new FutureCallback<RpcResult<HelloOutput>>() {
338             @Override
339             public void onSuccess(@Nonnull RpcResult<HelloOutput> result) {
340                 if (result.isSuccessful()) {
341                     LOG.debug("hello successfully sent, xid={}, addr={}", helloXid,
342                               connectionAdapter.getRemoteAddress());
343                     resultFtr.set(null);
344                 } else {
345                     for (RpcError error : result.getErrors()) {
346                         LOG.debug("hello sending failed [{}]: i:{} s:{} m:{}, addr:{}", helloXid, error.getInfo(),
347                                   error.getSeverity(), error.getMessage(), connectionAdapter.getRemoteAddress());
348                         if (error.getCause() != null) {
349                             LOG.trace("DETAIL of sending hello failure", error.getCause());
350                         }
351                     }
352                     resultFtr.cancel(false);
353                     handshakeListener.onHandshakeFailure();
354                 }
355             }
356
357             @Override
358             public void onFailure(Throwable throwable) {
359                 LOG.warn("sending of hello failed seriously [{}, addr:{}]: {}", helloXid,
360                          connectionAdapter.getRemoteAddress(), throwable.getMessage());
361                 LOG.trace("DETAIL of sending of hello failure:", throwable);
362                 resultFtr.cancel(false);
363                 handshakeListener.onHandshakeFailure();
364             }
365         }, MoreExecutors.directExecutor());
366         LOG.trace("sending hello message [{}] - result hooked ..", helloXid);
367         return resultFtr;
368     }
369
370
371     /**
372      * after handshake set features, register to session.
373      *
374      * @param proposedVersion proposed openflow version
375      * @param xid             transaction id
376      */
377     protected void postHandshake(final Short proposedVersion, final Long xid) {
378         // set version
379         version = proposedVersion;
380
381         LOG.debug("version set: {}", proposedVersion);
382         // request features
383         GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder();
384         featuresBuilder.setVersion(version).setXid(xid);
385         LOG.debug("sending feature request for version={} and xid={}", version, xid);
386         Future<RpcResult<GetFeaturesOutput>> featuresFuture = connectionAdapter.getFeatures(featuresBuilder.build());
387
388         Futures.addCallback(JdkFutureAdapters.listenInPoolThread(featuresFuture),
389                 new FutureCallback<RpcResult<GetFeaturesOutput>>() {
390                     @Override
391                     public void onSuccess(@Nonnull RpcResult<GetFeaturesOutput> rpcFeatures) {
392                         LOG.trace("features are back");
393                         if (rpcFeatures.isSuccessful()) {
394                             GetFeaturesOutput featureOutput = rpcFeatures.getResult();
395                             if (!deviceConnectionRateLimiter.tryAquire()) {
396                                 LOG.warn("Openflowplugin hit the device connection rate limit threshold. Denying"
397                                         + " the connection from device {}", featureOutput.getDatapathId());
398                                 connectionAdapter.disconnect();
399                                 return;
400                             }
401
402                             LOG.debug("obtained features: datapathId={}", featureOutput.getDatapathId());
403                             LOG.debug("obtained features: auxiliaryId={}", featureOutput.getAuxiliaryId());
404                             LOG.trace("handshake SETTLED: version={}, datapathId={}, auxiliaryId={}",
405                                       version, featureOutput.getDatapathId(),
406                                       featureOutput.getAuxiliaryId());
407                             handshakeListener.onHandshakeSuccessful(featureOutput, proposedVersion);
408                         } else {
409                             // handshake failed
410                             LOG.warn("issuing disconnect during handshake [{}]",
411                                      connectionAdapter.getRemoteAddress());
412                             for (RpcError rpcError : rpcFeatures.getErrors()) {
413                                 LOG.debug("handshake - features failure [{}]: i:{} | m:{} | s:{}", xid,
414                                           rpcError.getInfo(), rpcError.getMessage(), rpcError.getSeverity(),
415                                           rpcError.getCause());
416                             }
417                             handshakeListener.onHandshakeFailure();
418                         }
419
420                         LOG.debug("postHandshake DONE");
421                     }
422
423                     @Override
424                     public void onFailure(Throwable throwable) {
425                         LOG.warn("getting feature failed seriously [{}, addr:{}]: {}", xid,
426                                  connectionAdapter.getRemoteAddress(), throwable.getMessage());
427                         LOG.trace("DETAIL of sending of hello failure:", throwable);
428                     }
429                 }, MoreExecutors.directExecutor());
430         LOG.debug("future features [{}] hooked ..", xid);
431     }
432
433     /**
434      * Method for unit testing, only.
435      * This method is not thread safe and can only safely be used from a test.
436      */
437     @VisibleForTesting
438     @SuppressFBWarnings("IS2_INCONSISTENT_SYNC") // because shake() is synchronized
439     void setUseVersionBitmap(boolean useVersionBitmap) {
440         this.useVersionBitmap = useVersionBitmap;
441     }
442
443 }