Merge "Reduce use of JdkFutureAdapters"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / connection / HandshakeManagerImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.connection;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.SettableFuture;
16 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
17 import java.util.List;
18 import java.util.Objects;
19 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
20 import org.opendaylight.openflowplugin.api.OFConstants;
21 import org.opendaylight.openflowplugin.api.openflow.md.core.ErrorHandler;
22 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener;
23 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeManager;
24 import org.opendaylight.openflowplugin.impl.common.DeviceConnectionRateLimiter;
25 import org.opendaylight.openflowplugin.impl.util.MessageFactory;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInputBuilder;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloOutput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
32 import org.opendaylight.yangtools.yang.common.RpcError;
33 import org.opendaylight.yangtools.yang.common.RpcResult;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 public class HandshakeManagerImpl implements HandshakeManager {
38
39     private static final long ACTIVE_XID = 20L;
40
41     private static final Logger LOG = LoggerFactory.getLogger(HandshakeManagerImpl.class);
42
43     private Short lastProposedVersion;
44     private Short lastReceivedVersion;
45     private final List<Short> versionOrder;
46
47     private final ConnectionAdapter connectionAdapter;
48     private Short version;
49     private final ErrorHandler errorHandler;
50
51     private final Short highestVersion;
52
53     private Long activeXid;
54
55     private final HandshakeListener handshakeListener;
56
57     private boolean useVersionBitmap; // not final just for unit test
58
59     private final DeviceConnectionRateLimiter deviceConnectionRateLimiter;
60
61     /**
62      * Constructor.
63      *
64      * @param connectionAdapter connection adaptor for switch
65      * @param highestVersion    highest openflow version
66      * @param versionOrder      list of version in order for connection protocol negotiation
67      * @param errorHandler      the ErrorHandler
68      * @param handshakeListener the HandshakeListener
69      * @param useVersionBitmap  should use negotiation bit map
70      */
71     public HandshakeManagerImpl(ConnectionAdapter connectionAdapter, Short highestVersion, List<Short> versionOrder,
72                                 ErrorHandler errorHandler, HandshakeListener handshakeListener,
73                                 boolean useVersionBitmap, DeviceConnectionRateLimiter deviceConnectionRateLimiter) {
74         this.highestVersion = highestVersion;
75         this.versionOrder = versionOrder;
76         this.connectionAdapter = connectionAdapter;
77         this.errorHandler = errorHandler;
78         this.handshakeListener = handshakeListener;
79         this.useVersionBitmap = useVersionBitmap;
80         this.deviceConnectionRateLimiter = deviceConnectionRateLimiter;
81     }
82
83     @Override
84     @SuppressWarnings("checkstyle:IllegalCatch")
85     public synchronized void shake(HelloMessage receivedHello) {
86
87         if (version != null) {
88             // Some switches respond with a second HELLO acknowledging our HELLO
89             // but we've already completed the handshake based on the negotiated
90             // version and have registered this switch.
91             LOG.debug("Hello recieved after handshake already settled ... ignoring.");
92             return;
93         }
94
95         LOG.trace("handshake STARTED");
96         setActiveXid(ACTIVE_XID);
97
98         try {
99             if (receivedHello == null) {
100                 // first Hello sending
101                 sendHelloMessage(highestVersion, getNextXid());
102                 lastProposedVersion = highestVersion;
103                 LOG.trace("ret - firstHello+wait");
104                 return;
105             }
106
107             // process the 2. and later hellos
108             Short remoteVersion = receivedHello.getVersion();
109             List<Elements> elements = receivedHello.getElements();
110             setActiveXid(receivedHello.getXid());
111             List<Boolean> remoteVersionBitmap = MessageFactory.digVersions(elements);
112             LOG.debug("Hello message: version={}, xid={}, bitmap={}", remoteVersion, receivedHello.getXid(),
113                       remoteVersionBitmap);
114
115             if (useVersionBitmap && remoteVersionBitmap != null) {
116                 // versionBitmap on both sides -> ONE STEP DECISION
117                 handleVersionBitmapNegotiation(elements);
118             } else {
119                 // versionBitmap missing at least on one side -> STEP-BY-STEP NEGOTIATION applying
120                 handleStepByStepVersionNegotiation(remoteVersion);
121             }
122         } catch (Exception ex) {
123             errorHandler.handleException(ex);
124             LOG.trace("ret - shake fail - closing");
125             handshakeListener.onHandshakeFailure();
126         }
127     }
128
129     /**
130      * Handles the version negotiation step by step.
131      *
132      * @param remoteVersion remote version
133      * @throws Exception exception
134      */
135     @SuppressWarnings("checkstyle:IllegalCatch")
136     private void handleStepByStepVersionNegotiation(final Short remoteVersion) throws Exception {
137         LOG.debug("remoteVersion:{} lastProposedVersion:{}, highestVersion:{}", remoteVersion, lastProposedVersion,
138                   highestVersion);
139
140         if (lastProposedVersion == null) {
141             // first hello has not been sent yet, send it and either wait for next remote
142             // version or proceed
143             lastProposedVersion = proposeNextVersion(remoteVersion);
144             final Long nextHelloXid = getNextXid();
145             ListenableFuture<Void> helloResult = sendHelloMessage(lastProposedVersion, nextHelloXid);
146             Futures.addCallback(helloResult, new FutureCallback<Void>() {
147                 @Override
148                 public void onSuccess(Void result) {
149                     try {
150                         stepByStepVersionSubStep(remoteVersion);
151                     } catch (Exception e) {
152                         errorHandler.handleException(e);
153                         handshakeListener.onHandshakeFailure();
154                     }
155                 }
156
157                 @Override
158                 public void onFailure(Throwable throwable) {
159                     LOG.info("hello sending seriously failed [{}]", nextHelloXid);
160                     LOG.trace("detail of hello send problem", throwable);
161                 }
162             }, MoreExecutors.directExecutor());
163         } else {
164             stepByStepVersionSubStep(remoteVersion);
165         }
166     }
167
168     private void stepByStepVersionSubStep(Short remoteVersion) {
169         if (remoteVersion >= lastProposedVersion) {
170             postHandshake(lastProposedVersion, getNextXid());
171             LOG.trace("ret - OK - switch answered with lastProposedVersion");
172         } else {
173             checkNegotiationStalling(remoteVersion);
174
175             if (remoteVersion > (lastProposedVersion == null ? highestVersion : lastProposedVersion)) {
176                 // wait for next version
177                 LOG.trace("ret - wait");
178             } else {
179                 //propose lower version
180                 handleLowerVersionProposal(remoteVersion);
181             }
182         }
183     }
184
185     /**
186      * Handles a proposal for a lower version.
187      *
188      * @param remoteVersion remote version
189      * @throws Exception exception
190      */
191     private void handleLowerVersionProposal(Short remoteVersion) {
192         Short proposedVersion;
193         // find the version from header version field
194         proposedVersion = proposeNextVersion(remoteVersion);
195         lastProposedVersion = proposedVersion;
196         sendHelloMessage(proposedVersion, getNextXid());
197
198         if (!Objects.equals(proposedVersion, remoteVersion)) {
199             LOG.trace("ret - sent+wait");
200         } else {
201             LOG.trace("ret - sent+OK");
202             postHandshake(proposedVersion, getNextXid());
203         }
204     }
205
206     /**
207      * Handles the negotiation of the version bitmap.
208      *
209      * @param elements version elements
210      * @throws Exception exception
211      */
212     private void handleVersionBitmapNegotiation(List<Elements> elements) {
213         final Short proposedVersion = proposeCommonBitmapVersion(elements);
214         if (lastProposedVersion == null) {
215             // first hello has not been sent yet
216             Long nexHelloXid = getNextXid();
217             ListenableFuture<Void> helloDone = sendHelloMessage(proposedVersion, nexHelloXid);
218             Futures.addCallback(helloDone, new FutureCallback<Void>() {
219                 @Override
220                 public void onSuccess(Void result) {
221                     LOG.trace("ret - DONE - versionBitmap");
222                     postHandshake(proposedVersion, getNextXid());
223                 }
224
225                 @Override
226                 public void onFailure(Throwable throwable) {
227                     // NOOP
228                 }
229             }, MoreExecutors.directExecutor());
230             LOG.trace("next proposal [{}] with versionBitmap hooked ..", nexHelloXid);
231         } else {
232             LOG.trace("ret - DONE - versionBitmap");
233             postHandshake(proposedVersion, getNextXid());
234         }
235     }
236
237     private Long getNextXid() {
238         activeXid += 1;
239         return activeXid;
240     }
241
242     private void setActiveXid(Long xid) {
243         this.activeXid = xid;
244     }
245
246     /**
247      * Checks negotiation stalling.
248      *
249      * @param remoteVersion remove version
250      */
251     private void checkNegotiationStalling(Short remoteVersion) {
252         if (lastReceivedVersion != null && lastReceivedVersion.equals(remoteVersion)) {
253             throw new IllegalStateException("version negotiation stalled: version = " + remoteVersion);
254         }
255         lastReceivedVersion = remoteVersion;
256     }
257
258     @Override
259     public Short getVersion() {
260         return version;
261     }
262
263     /**
264      * find common highest supported bitmap version.
265      *
266      * @param list bitmap list
267      * @return proposed bitmap value
268      */
269     protected Short proposeCommonBitmapVersion(List<Elements> list) {
270         Short supportedHighestVersion = null;
271         if (null != list && 0 != list.size()) {
272             for (Elements element : list) {
273                 List<Boolean> bitmap = element.getVersionBitmap();
274                 // check for version bitmap
275                 for (short bitPos : OFConstants.VERSION_ORDER) {
276                     // with all the version it should work.
277                     if (bitmap.get(bitPos % Integer.SIZE)) {
278                         supportedHighestVersion = bitPos;
279                         break;
280                     }
281                 }
282             }
283
284             if (null == supportedHighestVersion) {
285                 LOG.trace("versionBitmap: no common version found");
286                 throw new IllegalArgumentException("no common version found in versionBitmap");
287             }
288         }
289
290         return supportedHighestVersion;
291     }
292
293     /**
294      * find supported version based on remoteVersion.
295      *
296      * @param remoteVersion openflow version supported by remote entity
297      * @return openflow version
298      */
299     protected short proposeNextVersion(short remoteVersion) {
300         Short proposal = null;
301         for (short offer : versionOrder) {
302             if (offer <= remoteVersion) {
303                 proposal = offer;
304                 break;
305             }
306         }
307         if (proposal == null) {
308             throw new IllegalArgumentException(
309                     "no equal or lower version found, unsupported version: " + remoteVersion);
310         }
311         return proposal;
312     }
313
314     /**
315      * send hello reply without versionBitmap.
316      *
317      * @param helloVersion initial hello version for openflow connection negotiation
318      * @param helloXid     transaction id
319      */
320     private ListenableFuture<Void> sendHelloMessage(Short helloVersion, final Long helloXid) {
321
322
323         HelloInput helloInput = MessageFactory.createHelloInput(helloVersion, helloXid, versionOrder);
324
325         final SettableFuture<Void> resultFtr = SettableFuture.create();
326
327         LOG.debug("sending hello message: version{}, xid={}, version bitmap={}", helloVersion, helloXid,
328                   MessageFactory.digVersions(helloInput.getElements()));
329
330         Futures.addCallback(connectionAdapter.hello(helloInput), new FutureCallback<RpcResult<HelloOutput>>() {
331             @Override
332             public void onSuccess(RpcResult<HelloOutput> result) {
333                 if (result.isSuccessful()) {
334                     LOG.debug("hello successfully sent, xid={}, addr={}", helloXid,
335                               connectionAdapter.getRemoteAddress());
336                     resultFtr.set(null);
337                 } else {
338                     for (RpcError error : result.getErrors()) {
339                         LOG.debug("hello sending failed [{}]: i:{} s:{} m:{}, addr:{}", helloXid, error.getInfo(),
340                                   error.getSeverity(), error.getMessage(), connectionAdapter.getRemoteAddress());
341                         if (error.getCause() != null) {
342                             LOG.trace("DETAIL of sending hello failure", error.getCause());
343                         }
344                     }
345                     resultFtr.cancel(false);
346                     handshakeListener.onHandshakeFailure();
347                 }
348             }
349
350             @Override
351             public void onFailure(Throwable throwable) {
352                 LOG.warn("sending of hello failed seriously [{}, addr:{}]: {}", helloXid,
353                          connectionAdapter.getRemoteAddress(), throwable.getMessage());
354                 LOG.trace("DETAIL of sending of hello failure:", throwable);
355                 resultFtr.cancel(false);
356                 handshakeListener.onHandshakeFailure();
357             }
358         }, MoreExecutors.directExecutor());
359         LOG.trace("sending hello message [{}] - result hooked ..", helloXid);
360         return resultFtr;
361     }
362
363
364     /**
365      * after handshake set features, register to session.
366      *
367      * @param proposedVersion proposed openflow version
368      * @param xid             transaction id
369      */
370     protected void postHandshake(final Short proposedVersion, final Long xid) {
371         // set version
372         version = proposedVersion;
373
374         LOG.debug("version set: {}", proposedVersion);
375         // request features
376         GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder();
377         featuresBuilder.setVersion(version).setXid(xid);
378         LOG.debug("sending feature request for version={} and xid={}", version, xid);
379
380         Futures.addCallback(connectionAdapter.getFeatures(featuresBuilder.build()),
381                 new FutureCallback<RpcResult<GetFeaturesOutput>>() {
382                     @Override
383                     public void onSuccess(RpcResult<GetFeaturesOutput> rpcFeatures) {
384                         LOG.trace("features are back");
385                         if (rpcFeatures.isSuccessful()) {
386                             GetFeaturesOutput featureOutput = rpcFeatures.getResult();
387                             if (!deviceConnectionRateLimiter.tryAquire()) {
388                                 LOG.warn("Openflowplugin hit the device connection rate limit threshold. Denying"
389                                         + " the connection from device {}", featureOutput.getDatapathId());
390                                 connectionAdapter.disconnect();
391                                 return;
392                             }
393
394                             LOG.debug("obtained features: datapathId={}", featureOutput.getDatapathId());
395                             LOG.debug("obtained features: auxiliaryId={}", featureOutput.getAuxiliaryId());
396                             LOG.trace("handshake SETTLED: version={}, datapathId={}, auxiliaryId={}",
397                                       version, featureOutput.getDatapathId(),
398                                       featureOutput.getAuxiliaryId());
399                             handshakeListener.onHandshakeSuccessful(featureOutput, proposedVersion);
400                         } else {
401                             // handshake failed
402                             LOG.warn("issuing disconnect during handshake [{}]",
403                                      connectionAdapter.getRemoteAddress());
404                             for (RpcError rpcError : rpcFeatures.getErrors()) {
405                                 LOG.debug("handshake - features failure [{}]: i:{} | m:{} | s:{}", xid,
406                                           rpcError.getInfo(), rpcError.getMessage(), rpcError.getSeverity(),
407                                           rpcError.getCause());
408                             }
409                             handshakeListener.onHandshakeFailure();
410                         }
411
412                         LOG.debug("postHandshake DONE");
413                     }
414
415                     @Override
416                     public void onFailure(Throwable throwable) {
417                         LOG.warn("getting feature failed seriously [{}, addr:{}]: {}", xid,
418                                  connectionAdapter.getRemoteAddress(), throwable.getMessage());
419                         LOG.trace("DETAIL of sending of hello failure:", throwable);
420                     }
421                 }, MoreExecutors.directExecutor());
422         LOG.debug("future features [{}] hooked ..", xid);
423     }
424
425     /**
426      * Method for unit testing, only.
427      * This method is not thread safe and can only safely be used from a test.
428      */
429     @VisibleForTesting
430     @SuppressFBWarnings("IS2_INCONSISTENT_SYNC") // because shake() is synchronized
431     void setUseVersionBitmap(boolean useVersionBitmap) {
432         this.useVersionBitmap = useVersionBitmap;
433     }
434
435 }