OPNFLWPLUG-1049 : Device connection hold time to maintain delay between
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / connection / HandshakeManagerImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.connection;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.SettableFuture;
16 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
17 import java.math.BigInteger;
18 import java.time.LocalDateTime;
19 import java.util.List;
20 import java.util.Objects;
21 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
22 import org.opendaylight.openflowplugin.api.OFConstants;
23 import org.opendaylight.openflowplugin.api.openflow.connection.DeviceConnectionStatusProvider;
24 import org.opendaylight.openflowplugin.api.openflow.md.core.ErrorHandler;
25 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener;
26 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeManager;
27 import org.opendaylight.openflowplugin.impl.common.DeviceConnectionRateLimiter;
28 import org.opendaylight.openflowplugin.impl.util.MessageFactory;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInputBuilder;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloOutput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
35 import org.opendaylight.yangtools.yang.common.RpcError;
36 import org.opendaylight.yangtools.yang.common.RpcResult;
37 import org.opendaylight.yangtools.yang.common.Uint64;
38 import org.opendaylight.yangtools.yang.common.Uint8;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 public class HandshakeManagerImpl implements HandshakeManager {
43
44     private static final long ACTIVE_XID = 20L;
45
46     private static final Logger LOG = LoggerFactory.getLogger(HandshakeManagerImpl.class);
47
48     private Short lastProposedVersion;
49     private Short lastReceivedVersion;
50     private final List<Short> versionOrder;
51
52     private final ConnectionAdapter connectionAdapter;
53     private Short version;
54     private final ErrorHandler errorHandler;
55
56     private final Short highestVersion;
57
58     private Long activeXid;
59
60     private final HandshakeListener handshakeListener;
61
62     private boolean useVersionBitmap; // not final just for unit test
63
64     private final DeviceConnectionRateLimiter deviceConnectionRateLimiter;
65     private final int deviceConnectionHoldTime;
66     private final DeviceConnectionStatusProvider deviceConnectionStatusProvider;
67
68     /**
69      * Constructor.
70      *
71      * @param connectionAdapter connection adaptor for switch
72      * @param highestVersion    highest openflow version
73      * @param versionOrder      list of version in order for connection protocol negotiation
74      * @param errorHandler      the ErrorHandler
75      * @param handshakeListener the HandshakeListener
76      * @param useVersionBitmap  should use negotiation bit map
77      * @param deviceConnectionRateLimiter  device connection rate limiter utility
78      * @param deviceConnectionHoldTime  deivce connection hold time in seconds
79      * @param deviceConnectionStatusProvider  utility for maintaining device connection states
80      */
81     public HandshakeManagerImpl(final ConnectionAdapter connectionAdapter, final Short highestVersion,
82                                 final List<Short> versionOrder, final ErrorHandler errorHandler,
83                                 final HandshakeListener handshakeListener, final boolean useVersionBitmap,
84                                 final DeviceConnectionRateLimiter deviceConnectionRateLimiter,
85                                 final int deviceConnectionHoldTime,
86                                 final DeviceConnectionStatusProvider deviceConnectionStatusProvider) {
87         this.highestVersion = highestVersion;
88         this.versionOrder = versionOrder;
89         this.connectionAdapter = connectionAdapter;
90         this.errorHandler = errorHandler;
91         this.handshakeListener = handshakeListener;
92         this.useVersionBitmap = useVersionBitmap;
93         this.deviceConnectionRateLimiter = deviceConnectionRateLimiter;
94         this.deviceConnectionHoldTime = deviceConnectionHoldTime;
95         this.deviceConnectionStatusProvider = deviceConnectionStatusProvider;
96     }
97
98     @Override
99     @SuppressWarnings("checkstyle:IllegalCatch")
100     public synchronized void shake(final HelloMessage receivedHello) {
101
102         if (version != null) {
103             // Some switches respond with a second HELLO acknowledging our HELLO
104             // but we've already completed the handshake based on the negotiated
105             // version and have registered this switch.
106             LOG.debug("Hello recieved after handshake already settled ... ignoring.");
107             return;
108         }
109
110         LOG.trace("handshake STARTED");
111         setActiveXid(ACTIVE_XID);
112
113         try {
114             if (receivedHello == null) {
115                 // first Hello sending
116                 sendHelloMessage(highestVersion, getNextXid());
117                 lastProposedVersion = highestVersion;
118                 LOG.trace("ret - firstHello+wait");
119                 return;
120             }
121
122             // process the 2. and later hellos
123             Uint8 remoteVersion = receivedHello.getVersion();
124             List<Elements> elements = receivedHello.getElements();
125             setActiveXid(receivedHello.getXid().toJava());
126             List<Boolean> remoteVersionBitmap = MessageFactory.digVersions(elements);
127             LOG.debug("Hello message: version={}, xid={}, bitmap={}", remoteVersion, receivedHello.getXid(),
128                       remoteVersionBitmap);
129
130             if (useVersionBitmap && remoteVersionBitmap != null) {
131                 // versionBitmap on both sides -> ONE STEP DECISION
132                 handleVersionBitmapNegotiation(elements);
133             } else {
134                 // versionBitmap missing at least on one side -> STEP-BY-STEP NEGOTIATION applying
135                 handleStepByStepVersionNegotiation(remoteVersion.toJava());
136             }
137         } catch (Exception ex) {
138             errorHandler.handleException(ex);
139             LOG.trace("ret - shake fail - closing");
140             handshakeListener.onHandshakeFailure();
141         }
142     }
143
144     /**
145      * Handles the version negotiation step by step.
146      *
147      * @param remoteVersion remote version
148      * @throws Exception exception
149      */
150     @SuppressWarnings("checkstyle:IllegalCatch")
151     private void handleStepByStepVersionNegotiation(final Short remoteVersion) throws Exception {
152         LOG.debug("remoteVersion:{} lastProposedVersion:{}, highestVersion:{}", remoteVersion, lastProposedVersion,
153                   highestVersion);
154
155         if (lastProposedVersion == null) {
156             // first hello has not been sent yet, send it and either wait for next remote
157             // version or proceed
158             lastProposedVersion = proposeNextVersion(remoteVersion);
159             final Long nextHelloXid = getNextXid();
160             ListenableFuture<Void> helloResult = sendHelloMessage(lastProposedVersion, nextHelloXid);
161             Futures.addCallback(helloResult, new FutureCallback<Void>() {
162                 @Override
163                 public void onSuccess(final Void result) {
164                     try {
165                         stepByStepVersionSubStep(remoteVersion);
166                     } catch (Exception e) {
167                         errorHandler.handleException(e);
168                         handshakeListener.onHandshakeFailure();
169                     }
170                 }
171
172                 @Override
173                 public void onFailure(final Throwable throwable) {
174                     LOG.info("hello sending seriously failed [{}]", nextHelloXid);
175                     LOG.trace("detail of hello send problem", throwable);
176                 }
177             }, MoreExecutors.directExecutor());
178         } else {
179             stepByStepVersionSubStep(remoteVersion);
180         }
181     }
182
183     private void stepByStepVersionSubStep(final Short remoteVersion) {
184         if (remoteVersion >= lastProposedVersion) {
185             postHandshake(lastProposedVersion, getNextXid());
186             LOG.trace("ret - OK - switch answered with lastProposedVersion");
187         } else {
188             checkNegotiationStalling(remoteVersion);
189
190             if (remoteVersion > (lastProposedVersion == null ? highestVersion : lastProposedVersion)) {
191                 // wait for next version
192                 LOG.trace("ret - wait");
193             } else {
194                 //propose lower version
195                 handleLowerVersionProposal(remoteVersion);
196             }
197         }
198     }
199
200     /**
201      * Handles a proposal for a lower version.
202      *
203      * @param remoteVersion remote version
204      * @throws Exception exception
205      */
206     private void handleLowerVersionProposal(final Short remoteVersion) {
207         Short proposedVersion;
208         // find the version from header version field
209         proposedVersion = proposeNextVersion(remoteVersion);
210         lastProposedVersion = proposedVersion;
211         sendHelloMessage(proposedVersion, getNextXid());
212
213         if (!Objects.equals(proposedVersion, remoteVersion)) {
214             LOG.trace("ret - sent+wait");
215         } else {
216             LOG.trace("ret - sent+OK");
217             postHandshake(proposedVersion, getNextXid());
218         }
219     }
220
221     /**
222      * Handles the negotiation of the version bitmap.
223      *
224      * @param elements version elements
225      * @throws Exception exception
226      */
227     private void handleVersionBitmapNegotiation(final List<Elements> elements) {
228         final Short proposedVersion = proposeCommonBitmapVersion(elements);
229         if (lastProposedVersion == null) {
230             // first hello has not been sent yet
231             Long nexHelloXid = getNextXid();
232             ListenableFuture<Void> helloDone = sendHelloMessage(proposedVersion, nexHelloXid);
233             Futures.addCallback(helloDone, new FutureCallback<Void>() {
234                 @Override
235                 public void onSuccess(final Void result) {
236                     LOG.trace("ret - DONE - versionBitmap");
237                     postHandshake(proposedVersion, getNextXid());
238                 }
239
240                 @Override
241                 public void onFailure(final Throwable throwable) {
242                     // NOOP
243                 }
244             }, MoreExecutors.directExecutor());
245             LOG.trace("next proposal [{}] with versionBitmap hooked ..", nexHelloXid);
246         } else {
247             LOG.trace("ret - DONE - versionBitmap");
248             postHandshake(proposedVersion, getNextXid());
249         }
250     }
251
252     private Long getNextXid() {
253         activeXid += 1;
254         return activeXid;
255     }
256
257     private void setActiveXid(final Long xid) {
258         this.activeXid = xid;
259     }
260
261     /**
262      * Checks negotiation stalling.
263      *
264      * @param remoteVersion remove version
265      */
266     private void checkNegotiationStalling(final Short remoteVersion) {
267         if (lastReceivedVersion != null && lastReceivedVersion.equals(remoteVersion)) {
268             throw new IllegalStateException("version negotiation stalled: version = " + remoteVersion);
269         }
270         lastReceivedVersion = remoteVersion;
271     }
272
273     @Override
274     public Short getVersion() {
275         return version;
276     }
277
278     /**
279      * find common highest supported bitmap version.
280      *
281      * @param list bitmap list
282      * @return proposed bitmap value
283      */
284     protected Short proposeCommonBitmapVersion(final List<Elements> list) {
285         Short supportedHighestVersion = null;
286         if (null != list && 0 != list.size()) {
287             for (Elements element : list) {
288                 List<Boolean> bitmap = element.getVersionBitmap();
289                 // check for version bitmap
290                 for (short bitPos : OFConstants.VERSION_ORDER) {
291                     // with all the version it should work.
292                     if (bitmap.get(bitPos % Integer.SIZE)) {
293                         supportedHighestVersion = bitPos;
294                         break;
295                     }
296                 }
297             }
298
299             if (null == supportedHighestVersion) {
300                 LOG.trace("versionBitmap: no common version found");
301                 throw new IllegalArgumentException("no common version found in versionBitmap");
302             }
303         }
304
305         return supportedHighestVersion;
306     }
307
308     /**
309      * find supported version based on remoteVersion.
310      *
311      * @param remoteVersion openflow version supported by remote entity
312      * @return openflow version
313      */
314     protected short proposeNextVersion(final short remoteVersion) {
315         Short proposal = null;
316         for (short offer : versionOrder) {
317             if (offer <= remoteVersion) {
318                 proposal = offer;
319                 break;
320             }
321         }
322         if (proposal == null) {
323             throw new IllegalArgumentException(
324                     "no equal or lower version found, unsupported version: " + remoteVersion);
325         }
326         return proposal;
327     }
328
329     /**
330      * send hello reply without versionBitmap.
331      *
332      * @param helloVersion initial hello version for openflow connection negotiation
333      * @param helloXid     transaction id
334      */
335     private ListenableFuture<Void> sendHelloMessage(final Short helloVersion, final Long helloXid) {
336
337
338         HelloInput helloInput = MessageFactory.createHelloInput(helloVersion, helloXid, versionOrder);
339
340         final SettableFuture<Void> resultFtr = SettableFuture.create();
341
342         LOG.debug("sending hello message: version{}, xid={}, version bitmap={}", helloVersion, helloXid,
343                   MessageFactory.digVersions(helloInput.getElements()));
344
345         Futures.addCallback(connectionAdapter.hello(helloInput), new FutureCallback<RpcResult<HelloOutput>>() {
346             @Override
347             public void onSuccess(final RpcResult<HelloOutput> result) {
348                 if (result.isSuccessful()) {
349                     LOG.debug("hello successfully sent, xid={}, addr={}", helloXid,
350                               connectionAdapter.getRemoteAddress());
351                     resultFtr.set(null);
352                 } else {
353                     for (RpcError error : result.getErrors()) {
354                         LOG.debug("hello sending failed [{}]: i:{} s:{} m:{}, addr:{}", helloXid, error.getInfo(),
355                                   error.getSeverity(), error.getMessage(), connectionAdapter.getRemoteAddress());
356                         if (error.getCause() != null) {
357                             LOG.trace("DETAIL of sending hello failure", error.getCause());
358                         }
359                     }
360                     resultFtr.cancel(false);
361                     handshakeListener.onHandshakeFailure();
362                 }
363             }
364
365             @Override
366             public void onFailure(final Throwable throwable) {
367                 LOG.warn("sending of hello failed seriously [{}, addr:{}]: {}", helloXid,
368                          connectionAdapter.getRemoteAddress(), throwable.getMessage());
369                 LOG.trace("DETAIL of sending of hello failure:", throwable);
370                 resultFtr.cancel(false);
371                 handshakeListener.onHandshakeFailure();
372             }
373         }, MoreExecutors.directExecutor());
374         LOG.trace("sending hello message [{}] - result hooked ..", helloXid);
375         return resultFtr;
376     }
377
378
379     /**
380      * after handshake set features, register to session.
381      *
382      * @param proposedVersion proposed openflow version
383      * @param xid             transaction id
384      */
385     protected void postHandshake(final Short proposedVersion, final Long xid) {
386         // set version
387         version = proposedVersion;
388
389         LOG.debug("version set: {}", proposedVersion);
390         // request features
391         GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder();
392         featuresBuilder.setVersion(version).setXid(xid);
393         LOG.debug("sending feature request for version={} and xid={}", version, xid);
394
395         Futures.addCallback(connectionAdapter.getFeatures(featuresBuilder.build()),
396                 new FutureCallback<RpcResult<GetFeaturesOutput>>() {
397                     @Override
398                     public void onSuccess(final RpcResult<GetFeaturesOutput> rpcFeatures) {
399                         LOG.trace("features are back");
400                         if (rpcFeatures.isSuccessful()) {
401                             GetFeaturesOutput featureOutput = rpcFeatures.getResult();
402
403                             final Uint64 dpId = featureOutput.getDatapathId();
404                             BigInteger datapathId = dpId == null ? null : dpId.toJava();
405                             connectionAdapter.setDatapathId(datapathId);
406                             if (datapathId == null || !isAllowedToConnect(datapathId)) {
407                                 connectionAdapter.disconnect();
408                                 return;
409                             }
410
411                             LOG.debug("obtained features: datapathId={}", featureOutput.getDatapathId());
412                             LOG.debug("obtained features: auxiliaryId={}", featureOutput.getAuxiliaryId());
413                             LOG.trace("handshake SETTLED: version={}, datapathId={}, auxiliaryId={}",
414                                       version, featureOutput.getDatapathId(),
415                                       featureOutput.getAuxiliaryId());
416                             handshakeListener.onHandshakeSuccessful(featureOutput, proposedVersion);
417                         } else {
418                             // handshake failed
419                             LOG.warn("issuing disconnect during handshake [{}]",
420                                      connectionAdapter.getRemoteAddress());
421                             for (RpcError rpcError : rpcFeatures.getErrors()) {
422                                 LOG.debug("handshake - features failure [{}]: i:{} | m:{} | s:{}", xid,
423                                           rpcError.getInfo(), rpcError.getMessage(), rpcError.getSeverity(),
424                                           rpcError.getCause());
425                             }
426                             handshakeListener.onHandshakeFailure();
427                         }
428
429                         LOG.debug("postHandshake DONE");
430                     }
431
432                     @Override
433                     public void onFailure(final Throwable throwable) {
434                         LOG.warn("getting feature failed seriously [{}, addr:{}]: {}", xid,
435                                  connectionAdapter.getRemoteAddress(), throwable.getMessage());
436                         LOG.trace("DETAIL of sending of hello failure:", throwable);
437                     }
438                 }, MoreExecutors.directExecutor());
439         LOG.debug("future features [{}] hooked ..", xid);
440     }
441
442     public boolean isAllowedToConnect(BigInteger nodeId) {
443         // The device isn't allowed for connection till device connection hold time is over
444         if (deviceConnectionHoldTime > 0) {
445             LocalDateTime lastConnectionTime = deviceConnectionStatusProvider.getDeviceLastConnectionTime(nodeId);
446             if (lastConnectionTime == null) {
447                 LOG.debug("Initial connection attempt by device {} to the controller node. Allowing to connect after {}"
448                         + "seconds", nodeId, deviceConnectionHoldTime);
449                 deviceConnectionStatusProvider.addDeviceLastConnectionTime(nodeId, LocalDateTime.now());
450                 return false;
451             } else if (LocalDateTime.now().isBefore(lastConnectionTime.plusSeconds(deviceConnectionHoldTime))) {
452                 LOG.trace("Device trying to connect before the connection delay {} seconds, disconnecting the device "
453                                 + "{}", deviceConnectionHoldTime, nodeId);
454                 return false;
455             }
456         }
457
458         if (!deviceConnectionRateLimiter.tryAquire()) {
459             LOG.debug("Permit not acquired for device {}, disconnecting the device.", nodeId);
460             connectionAdapter.disconnect();
461             return false;
462         }
463         return true;
464     }
465
466     /**
467      * Method for unit testing, only.
468      * This method is not thread safe and can only safely be used from a test.
469      */
470     @VisibleForTesting
471     @SuppressFBWarnings("IS2_INCONSISTENT_SYNC") // because shake() is synchronized
472     void setUseVersionBitmap(final boolean useVersionBitmap) {
473         this.useVersionBitmap = useVersionBitmap;
474     }
475
476 }