Bump odlparent to 5.0.0
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / connection / HandshakeManagerImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.connection;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.JdkFutureAdapters;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import com.google.common.util.concurrent.SettableFuture;
17 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
18 import java.util.List;
19 import java.util.Objects;
20 import java.util.concurrent.Future;
21 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
22 import org.opendaylight.openflowplugin.api.OFConstants;
23 import org.opendaylight.openflowplugin.api.openflow.md.core.ErrorHandler;
24 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener;
25 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeManager;
26 import org.opendaylight.openflowplugin.impl.common.DeviceConnectionRateLimiter;
27 import org.opendaylight.openflowplugin.impl.util.MessageFactory;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInputBuilder;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloOutput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
34 import org.opendaylight.yangtools.yang.common.RpcError;
35 import org.opendaylight.yangtools.yang.common.RpcResult;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 public class HandshakeManagerImpl implements HandshakeManager {
40
41     private static final long ACTIVE_XID = 20L;
42
43     private static final Logger LOG = LoggerFactory.getLogger(HandshakeManagerImpl.class);
44
45     private Short lastProposedVersion;
46     private Short lastReceivedVersion;
47     private final List<Short> versionOrder;
48
49     private final ConnectionAdapter connectionAdapter;
50     private Short version;
51     private final ErrorHandler errorHandler;
52
53     private final Short highestVersion;
54
55     private Long activeXid;
56
57     private final HandshakeListener handshakeListener;
58
59     private boolean useVersionBitmap; // not final just for unit test
60
61     private final DeviceConnectionRateLimiter deviceConnectionRateLimiter;
62
63     /**
64      * Constructor.
65      *
66      * @param connectionAdapter connection adaptor for switch
67      * @param highestVersion    highest openflow version
68      * @param versionOrder      list of version in order for connection protocol negotiation
69      * @param errorHandler      the ErrorHandler
70      * @param handshakeListener the HandshakeListener
71      * @param useVersionBitmap  should use negotiation bit map
72      */
73     public HandshakeManagerImpl(ConnectionAdapter connectionAdapter, Short highestVersion, List<Short> versionOrder,
74                                 ErrorHandler errorHandler, HandshakeListener handshakeListener,
75                                 boolean useVersionBitmap, DeviceConnectionRateLimiter deviceConnectionRateLimiter) {
76         this.highestVersion = highestVersion;
77         this.versionOrder = versionOrder;
78         this.connectionAdapter = connectionAdapter;
79         this.errorHandler = errorHandler;
80         this.handshakeListener = handshakeListener;
81         this.useVersionBitmap = useVersionBitmap;
82         this.deviceConnectionRateLimiter = deviceConnectionRateLimiter;
83     }
84
85     @Override
86     @SuppressWarnings("checkstyle:IllegalCatch")
87     public synchronized void shake(HelloMessage receivedHello) {
88
89         if (version != null) {
90             // Some switches respond with a second HELLO acknowledging our HELLO
91             // but we've already completed the handshake based on the negotiated
92             // version and have registered this switch.
93             LOG.debug("Hello recieved after handshake already settled ... ignoring.");
94             return;
95         }
96
97         LOG.trace("handshake STARTED");
98         setActiveXid(ACTIVE_XID);
99
100         try {
101             if (receivedHello == null) {
102                 // first Hello sending
103                 sendHelloMessage(highestVersion, getNextXid());
104                 lastProposedVersion = highestVersion;
105                 LOG.trace("ret - firstHello+wait");
106                 return;
107             }
108
109             // process the 2. and later hellos
110             Short remoteVersion = receivedHello.getVersion();
111             List<Elements> elements = receivedHello.getElements();
112             setActiveXid(receivedHello.getXid());
113             List<Boolean> remoteVersionBitmap = MessageFactory.digVersions(elements);
114             LOG.debug("Hello message: version={}, xid={}, bitmap={}", remoteVersion, receivedHello.getXid(),
115                       remoteVersionBitmap);
116
117             if (useVersionBitmap && remoteVersionBitmap != null) {
118                 // versionBitmap on both sides -> ONE STEP DECISION
119                 handleVersionBitmapNegotiation(elements);
120             } else {
121                 // versionBitmap missing at least on one side -> STEP-BY-STEP NEGOTIATION applying
122                 handleStepByStepVersionNegotiation(remoteVersion);
123             }
124         } catch (Exception ex) {
125             errorHandler.handleException(ex);
126             LOG.trace("ret - shake fail - closing");
127             handshakeListener.onHandshakeFailure();
128         }
129     }
130
131     /**
132      * Handles the version negotiation step by step.
133      *
134      * @param remoteVersion remote version
135      * @throws Exception exception
136      */
137     @SuppressWarnings("checkstyle:IllegalCatch")
138     private void handleStepByStepVersionNegotiation(final Short remoteVersion) throws Exception {
139         LOG.debug("remoteVersion:{} lastProposedVersion:{}, highestVersion:{}", remoteVersion, lastProposedVersion,
140                   highestVersion);
141
142         if (lastProposedVersion == null) {
143             // first hello has not been sent yet, send it and either wait for next remote
144             // version or proceed
145             lastProposedVersion = proposeNextVersion(remoteVersion);
146             final Long nextHelloXid = getNextXid();
147             ListenableFuture<Void> helloResult = sendHelloMessage(lastProposedVersion, nextHelloXid);
148             Futures.addCallback(helloResult, new FutureCallback<Void>() {
149                 @Override
150                 public void onSuccess(Void result) {
151                     try {
152                         stepByStepVersionSubStep(remoteVersion);
153                     } catch (Exception e) {
154                         errorHandler.handleException(e);
155                         handshakeListener.onHandshakeFailure();
156                     }
157                 }
158
159                 @Override
160                 public void onFailure(Throwable throwable) {
161                     LOG.info("hello sending seriously failed [{}]", nextHelloXid);
162                     LOG.trace("detail of hello send problem", throwable);
163                 }
164             }, MoreExecutors.directExecutor());
165         } else {
166             stepByStepVersionSubStep(remoteVersion);
167         }
168     }
169
170     private void stepByStepVersionSubStep(Short remoteVersion) {
171         if (remoteVersion >= lastProposedVersion) {
172             postHandshake(lastProposedVersion, getNextXid());
173             LOG.trace("ret - OK - switch answered with lastProposedVersion");
174         } else {
175             checkNegotiationStalling(remoteVersion);
176
177             if (remoteVersion > (lastProposedVersion == null ? highestVersion : lastProposedVersion)) {
178                 // wait for next version
179                 LOG.trace("ret - wait");
180             } else {
181                 //propose lower version
182                 handleLowerVersionProposal(remoteVersion);
183             }
184         }
185     }
186
187     /**
188      * Handles a proposal for a lower version.
189      *
190      * @param remoteVersion remote version
191      * @throws Exception exception
192      */
193     private void handleLowerVersionProposal(Short remoteVersion) {
194         Short proposedVersion;
195         // find the version from header version field
196         proposedVersion = proposeNextVersion(remoteVersion);
197         lastProposedVersion = proposedVersion;
198         sendHelloMessage(proposedVersion, getNextXid());
199
200         if (!Objects.equals(proposedVersion, remoteVersion)) {
201             LOG.trace("ret - sent+wait");
202         } else {
203             LOG.trace("ret - sent+OK");
204             postHandshake(proposedVersion, getNextXid());
205         }
206     }
207
208     /**
209      * Handles the negotiation of the version bitmap.
210      *
211      * @param elements version elements
212      * @throws Exception exception
213      */
214     private void handleVersionBitmapNegotiation(List<Elements> elements) {
215         final Short proposedVersion = proposeCommonBitmapVersion(elements);
216         if (lastProposedVersion == null) {
217             // first hello has not been sent yet
218             Long nexHelloXid = getNextXid();
219             ListenableFuture<Void> helloDone = sendHelloMessage(proposedVersion, nexHelloXid);
220             Futures.addCallback(helloDone, new FutureCallback<Void>() {
221                 @Override
222                 public void onSuccess(Void result) {
223                     LOG.trace("ret - DONE - versionBitmap");
224                     postHandshake(proposedVersion, getNextXid());
225                 }
226
227                 @Override
228                 public void onFailure(Throwable throwable) {
229                     // NOOP
230                 }
231             }, MoreExecutors.directExecutor());
232             LOG.trace("next proposal [{}] with versionBitmap hooked ..", nexHelloXid);
233         } else {
234             LOG.trace("ret - DONE - versionBitmap");
235             postHandshake(proposedVersion, getNextXid());
236         }
237     }
238
239     private Long getNextXid() {
240         activeXid += 1;
241         return activeXid;
242     }
243
244     private void setActiveXid(Long xid) {
245         this.activeXid = xid;
246     }
247
248     /**
249      * Checks negotiation stalling.
250      *
251      * @param remoteVersion remove version
252      */
253     private void checkNegotiationStalling(Short remoteVersion) {
254         if (lastReceivedVersion != null && lastReceivedVersion.equals(remoteVersion)) {
255             throw new IllegalStateException("version negotiation stalled: version = " + remoteVersion);
256         }
257         lastReceivedVersion = remoteVersion;
258     }
259
260     @Override
261     public Short getVersion() {
262         return version;
263     }
264
265     /**
266      * find common highest supported bitmap version.
267      *
268      * @param list bitmap list
269      * @return proposed bitmap value
270      */
271     protected Short proposeCommonBitmapVersion(List<Elements> list) {
272         Short supportedHighestVersion = null;
273         if (null != list && 0 != list.size()) {
274             for (Elements element : list) {
275                 List<Boolean> bitmap = element.getVersionBitmap();
276                 // check for version bitmap
277                 for (short bitPos : OFConstants.VERSION_ORDER) {
278                     // with all the version it should work.
279                     if (bitmap.get(bitPos % Integer.SIZE)) {
280                         supportedHighestVersion = bitPos;
281                         break;
282                     }
283                 }
284             }
285
286             if (null == supportedHighestVersion) {
287                 LOG.trace("versionBitmap: no common version found");
288                 throw new IllegalArgumentException("no common version found in versionBitmap");
289             }
290         }
291
292         return supportedHighestVersion;
293     }
294
295     /**
296      * find supported version based on remoteVersion.
297      *
298      * @param remoteVersion openflow version supported by remote entity
299      * @return openflow version
300      */
301     protected short proposeNextVersion(short remoteVersion) {
302         Short proposal = null;
303         for (short offer : versionOrder) {
304             if (offer <= remoteVersion) {
305                 proposal = offer;
306                 break;
307             }
308         }
309         if (proposal == null) {
310             throw new IllegalArgumentException(
311                     "no equal or lower version found, unsupported version: " + remoteVersion);
312         }
313         return proposal;
314     }
315
316     /**
317      * send hello reply without versionBitmap.
318      *
319      * @param helloVersion initial hello version for openflow connection negotiation
320      * @param helloXid     transaction id
321      */
322     private ListenableFuture<Void> sendHelloMessage(Short helloVersion, final Long helloXid) {
323
324
325         HelloInput helloInput = MessageFactory.createHelloInput(helloVersion, helloXid, versionOrder);
326
327         final SettableFuture<Void> resultFtr = SettableFuture.create();
328
329         LOG.debug("sending hello message: version{}, xid={}, version bitmap={}", helloVersion, helloXid,
330                   MessageFactory.digVersions(helloInput.getElements()));
331
332         Future<RpcResult<HelloOutput>> helloResult = connectionAdapter.hello(helloInput);
333
334         ListenableFuture<RpcResult<HelloOutput>> rpcResultListenableFuture
335                 = JdkFutureAdapters.listenInPoolThread(helloResult);
336         Futures.addCallback(rpcResultListenableFuture, new FutureCallback<RpcResult<HelloOutput>>() {
337             @Override
338             public void onSuccess(RpcResult<HelloOutput> result) {
339                 if (result.isSuccessful()) {
340                     LOG.debug("hello successfully sent, xid={}, addr={}", helloXid,
341                               connectionAdapter.getRemoteAddress());
342                     resultFtr.set(null);
343                 } else {
344                     for (RpcError error : result.getErrors()) {
345                         LOG.debug("hello sending failed [{}]: i:{} s:{} m:{}, addr:{}", helloXid, error.getInfo(),
346                                   error.getSeverity(), error.getMessage(), connectionAdapter.getRemoteAddress());
347                         if (error.getCause() != null) {
348                             LOG.trace("DETAIL of sending hello failure", error.getCause());
349                         }
350                     }
351                     resultFtr.cancel(false);
352                     handshakeListener.onHandshakeFailure();
353                 }
354             }
355
356             @Override
357             public void onFailure(Throwable throwable) {
358                 LOG.warn("sending of hello failed seriously [{}, addr:{}]: {}", helloXid,
359                          connectionAdapter.getRemoteAddress(), throwable.getMessage());
360                 LOG.trace("DETAIL of sending of hello failure:", throwable);
361                 resultFtr.cancel(false);
362                 handshakeListener.onHandshakeFailure();
363             }
364         }, MoreExecutors.directExecutor());
365         LOG.trace("sending hello message [{}] - result hooked ..", helloXid);
366         return resultFtr;
367     }
368
369
370     /**
371      * after handshake set features, register to session.
372      *
373      * @param proposedVersion proposed openflow version
374      * @param xid             transaction id
375      */
376     protected void postHandshake(final Short proposedVersion, final Long xid) {
377         // set version
378         version = proposedVersion;
379
380         LOG.debug("version set: {}", proposedVersion);
381         // request features
382         GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder();
383         featuresBuilder.setVersion(version).setXid(xid);
384         LOG.debug("sending feature request for version={} and xid={}", version, xid);
385         Future<RpcResult<GetFeaturesOutput>> featuresFuture = connectionAdapter.getFeatures(featuresBuilder.build());
386
387         Futures.addCallback(JdkFutureAdapters.listenInPoolThread(featuresFuture),
388                 new FutureCallback<RpcResult<GetFeaturesOutput>>() {
389                     @Override
390                     public void onSuccess(RpcResult<GetFeaturesOutput> rpcFeatures) {
391                         LOG.trace("features are back");
392                         if (rpcFeatures.isSuccessful()) {
393                             GetFeaturesOutput featureOutput = rpcFeatures.getResult();
394                             if (!deviceConnectionRateLimiter.tryAquire()) {
395                                 LOG.warn("Openflowplugin hit the device connection rate limit threshold. Denying"
396                                         + " the connection from device {}", featureOutput.getDatapathId());
397                                 connectionAdapter.disconnect();
398                                 return;
399                             }
400
401                             LOG.debug("obtained features: datapathId={}", featureOutput.getDatapathId());
402                             LOG.debug("obtained features: auxiliaryId={}", featureOutput.getAuxiliaryId());
403                             LOG.trace("handshake SETTLED: version={}, datapathId={}, auxiliaryId={}",
404                                       version, featureOutput.getDatapathId(),
405                                       featureOutput.getAuxiliaryId());
406                             handshakeListener.onHandshakeSuccessful(featureOutput, proposedVersion);
407                         } else {
408                             // handshake failed
409                             LOG.warn("issuing disconnect during handshake [{}]",
410                                      connectionAdapter.getRemoteAddress());
411                             for (RpcError rpcError : rpcFeatures.getErrors()) {
412                                 LOG.debug("handshake - features failure [{}]: i:{} | m:{} | s:{}", xid,
413                                           rpcError.getInfo(), rpcError.getMessage(), rpcError.getSeverity(),
414                                           rpcError.getCause());
415                             }
416                             handshakeListener.onHandshakeFailure();
417                         }
418
419                         LOG.debug("postHandshake DONE");
420                     }
421
422                     @Override
423                     public void onFailure(Throwable throwable) {
424                         LOG.warn("getting feature failed seriously [{}, addr:{}]: {}", xid,
425                                  connectionAdapter.getRemoteAddress(), throwable.getMessage());
426                         LOG.trace("DETAIL of sending of hello failure:", throwable);
427                     }
428                 }, MoreExecutors.directExecutor());
429         LOG.debug("future features [{}] hooked ..", xid);
430     }
431
432     /**
433      * Method for unit testing, only.
434      * This method is not thread safe and can only safely be used from a test.
435      */
436     @VisibleForTesting
437     @SuppressFBWarnings("IS2_INCONSISTENT_SYNC") // because shake() is synchronized
438     void setUseVersionBitmap(boolean useVersionBitmap) {
439         this.useVersionBitmap = useVersionBitmap;
440     }
441
442 }