handshake refactor
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / HandshakeManagerImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.openflow.md.core;
9
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.JdkFutureAdapters;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.SettableFuture;
15 import java.util.List;
16 import java.util.Objects;
17 import java.util.concurrent.Future;
18 import java.util.concurrent.TimeUnit;
19 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
20 import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
21 import org.opendaylight.openflowplugin.api.openflow.md.core.ErrorHandler;
22 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener;
23 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeManager;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInputBuilder;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
29 import org.opendaylight.yangtools.yang.common.RpcError;
30 import org.opendaylight.yangtools.yang.common.RpcResult;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 /**
35  * @author mirehak
36  *
37  */
38 public class HandshakeManagerImpl implements HandshakeManager {
39
40     private static final Logger LOG = LoggerFactory
41             .getLogger(HandshakeManagerImpl.class);
42
43     private Short lastProposedVersion;
44     private Short lastReceivedVersion;
45     private final List<Short> versionOrder;
46
47     //private HelloMessage receivedHello;
48     private final ConnectionAdapter connectionAdapter;
49     private Short version;
50     private ErrorHandler errorHandler;
51
52     private long maxTimeout = 8000;
53     private TimeUnit maxTimeoutUnit = TimeUnit.MILLISECONDS;
54     private Short highestVersion;
55
56     private Long activeXid;
57
58     private HandshakeListener handshakeListener;
59
60     private boolean useVersionBitmap;
61
62     /**
63      * @param connectionAdapter
64      * @param highestVersion
65      * @param versionOrder
66      */
67     public HandshakeManagerImpl(ConnectionAdapter connectionAdapter, Short highestVersion,
68             List<Short> versionOrder) {
69         this.highestVersion = highestVersion;
70         this.versionOrder = versionOrder;
71         this.connectionAdapter = connectionAdapter;
72     }
73
74     @Override
75     public void setHandshakeListener(HandshakeListener handshakeListener) {
76         this.handshakeListener = handshakeListener;
77     }
78
79     @Override
80     public synchronized void shake(HelloMessage receivedHello) {
81
82         if (version != null) {
83             // Some switches respond with a second HELLO acknowledging our HELLO
84             // but we've already completed the handshake based on the negotiated
85             // version and have registered this switch.
86             LOG.debug("Hello recieved after handshake already settled ... ignoring.");
87             return;
88         }
89
90         LOG.trace("handshake STARTED");
91         setActiveXid(20L);
92
93         try {
94             if (receivedHello == null) {
95                 // first Hello sending
96                 sendHelloMessage(highestVersion, getNextXid());
97                 lastProposedVersion = highestVersion;
98                 LOG.trace("ret - firstHello+wait");
99                 return;
100             }
101
102             // process the 2. and later hellos
103             Short remoteVersion = receivedHello.getVersion();
104             List<Elements> elements = receivedHello.getElements();
105             setActiveXid(receivedHello.getXid());
106             List<Boolean> remoteVersionBitmap = MessageFactory.digVersions(elements);
107             LOG.debug("Hello message: version={}, xid={}, bitmap={}", remoteVersion,
108                     receivedHello.getXid(), remoteVersionBitmap);
109
110             if (useVersionBitmap && remoteVersionBitmap != null) {
111                 // versionBitmap on both sides -> ONE STEP DECISION
112                 handleVersionBitmapNegotiation(elements);
113             } else {
114                 // versionBitmap missing at least on one side -> STEP-BY-STEP NEGOTIATION applying
115                 handleStepByStepVersionNegotiation(remoteVersion);
116             }
117         } catch (Exception ex) {
118             errorHandler.handleException(ex, null);
119             LOG.trace("ret - shake fail - closing");
120             handshakeListener.onHandshakeFailure();
121         }
122     }
123
124     /**
125      * @param remoteVersion
126      * @throws Exception
127      */
128     private void handleStepByStepVersionNegotiation(final Short remoteVersion) throws Exception {
129         LOG.debug("remoteVersion:{} lastProposedVersion:{}, highestVersion:{}",
130                 remoteVersion, lastProposedVersion, highestVersion);
131
132         if (lastProposedVersion == null) {
133             // first hello has not been sent yet, send it and either wait for next remote
134             // version or proceed
135             lastProposedVersion = proposeNextVersion(remoteVersion);
136             final Long nextHelloXid = getNextXid();
137             ListenableFuture<Void> helloResult = sendHelloMessage(lastProposedVersion, nextHelloXid);
138             Futures.addCallback(helloResult, new FutureCallback<Void>() {
139                 @Override
140                 public void onSuccess(Void result) {
141                     try {
142                         stepByStepVersionSubStep(remoteVersion, lastProposedVersion);
143                     } catch (Exception e) {
144                         errorHandler.handleException(e, null);
145                         handshakeListener.onHandshakeFailure();
146                     }
147                 }
148
149                 @Override
150                 public void onFailure(Throwable t) {
151                     LOG.info("hello sending seriously failed [{}]", nextHelloXid);
152                     LOG.trace("detail of hello send problem", t);
153                 }
154             });
155         } else {
156             stepByStepVersionSubStep(remoteVersion, lastProposedVersion);
157         }
158     }
159
160     private void stepByStepVersionSubStep(Short remoteVersion, Short lastProposedVersion) throws Exception {
161         if (remoteVersion == lastProposedVersion) {
162             postHandshake(lastProposedVersion, getNextXid());
163             LOG.trace("ret - OK - switch answered with lastProposedVersion");
164         } else {
165             checkNegotiationStalling(remoteVersion);
166
167             if (remoteVersion > (lastProposedVersion == null ? highestVersion : this.lastProposedVersion)) {
168                 // wait for next version
169                 LOG.trace("ret - wait");
170             } else {
171                 //propose lower version
172                 handleLowerVersionProposal(remoteVersion);
173             }
174         }
175     }
176
177     /**
178      * @param remoteVersion
179      * @throws Exception
180      */
181     private void handleLowerVersionProposal(Short remoteVersion) throws Exception {
182         Short proposedVersion;
183         // find the version from header version field
184         proposedVersion = proposeNextVersion(remoteVersion);
185         lastProposedVersion = proposedVersion;
186         sendHelloMessage(proposedVersion, getNextXid());
187
188         if (! Objects.equals(proposedVersion, remoteVersion)) {
189             LOG.trace("ret - sent+wait");
190         } else {
191             LOG.trace("ret - sent+OK");
192             postHandshake(proposedVersion, getNextXid());
193         }
194     }
195
196     /**
197      * @param elements
198      * @throws Exception
199      */
200     private void handleVersionBitmapNegotiation(List<Elements> elements) throws Exception {
201         final Short proposedVersion = proposeCommonBitmapVersion(elements);
202         if (lastProposedVersion == null) {
203             // first hello has not been sent yet
204             Long nexHelloXid = getNextXid();
205             ListenableFuture<Void> helloDone = sendHelloMessage(proposedVersion, nexHelloXid);
206             Futures.addCallback(helloDone, new FutureCallback<Void>() {
207                 @Override
208                 public void onSuccess(Void result) {
209                     LOG.trace("ret - DONE - versionBitmap");
210                     postHandshake(proposedVersion, getNextXid());
211                 }
212
213                 @Override
214                 public void onFailure(Throwable t) {
215                     // NOOP
216                 }
217             });
218             LOG.trace("next proposal [{}] with versionBitmap hooked ..", nexHelloXid);
219         } else {
220             LOG.trace("ret - DONE - versionBitmap");
221             postHandshake(proposedVersion, getNextXid());
222         }
223     }
224
225     /**
226      *
227      * @return
228      */
229     private Long getNextXid() {
230         activeXid += 1;
231         return activeXid;
232     }
233
234     /**
235      * @param xid
236      */
237     private void setActiveXid(Long xid) {
238         this.activeXid = xid;
239     }
240
241     /**
242      * @param remoteVersion
243      */
244     private void checkNegotiationStalling(Short remoteVersion) {
245         if (lastReceivedVersion != null && lastReceivedVersion.equals(remoteVersion)) {
246             throw new IllegalStateException("version negotiation stalled: version = "+remoteVersion);
247         }
248         lastReceivedVersion = remoteVersion;
249     }
250
251     @Override
252     public Short getVersion() {
253         return version;
254     }
255
256     /**
257      * find common highest supported bitmap version
258      * @param list
259      * @return
260      */
261     protected Short proposeCommonBitmapVersion(List<Elements> list) {
262         Short supportedHighestVersion = null;
263         if((null != list) && (0 != list.size())) {
264             for(Elements element : list) {
265                 List<Boolean> bitmap = element.getVersionBitmap();
266                 // check for version bitmap
267                 for(short bitPos : ConnectionConductor.versionOrder) {
268                     // with all the version it should work.
269                     if(bitmap.get(bitPos % Integer.SIZE)) {
270                         supportedHighestVersion = bitPos;
271                         break;
272                     }
273                 }
274             }
275
276             if(null == supportedHighestVersion) {
277                 LOG.trace("versionBitmap: no common version found");
278                 throw new IllegalArgumentException("no common version found in versionBitmap");
279             }
280         }
281
282         return supportedHighestVersion;
283     }
284
285     /**
286      * find supported version based on remoteVersion
287      * @param remoteVersion
288      * @return
289      */
290     protected short proposeNextVersion(short remoteVersion) {
291         Short proposal = null;
292         for (short offer : versionOrder) {
293             if (offer <= remoteVersion) {
294                 proposal = offer;
295                 break;
296             }
297         }
298         if (proposal == null) {
299             throw new IllegalArgumentException("no equal or lower version found, unsupported version: "
300                     + remoteVersion);
301         }
302         return proposal;
303     }
304
305     /**
306      * send hello reply without versionBitmap
307      * @param helloVersion
308      * @param helloXid
309      * @throws Exception
310      */
311     private ListenableFuture<Void> sendHelloMessage(Short helloVersion, final Long helloXid) throws Exception {
312         //Short highestVersion = ConnectionConductor.versionOrder.get(0);
313         //final Long helloXid = 21L;
314         HelloInput helloInput = MessageFactory.createHelloInput(helloVersion, helloXid, versionOrder);
315
316         final SettableFuture<Void> resultFtr = SettableFuture.create();
317
318         LOG.debug("sending hello message: version{}, xid={}, version bitmap={}",
319                 helloVersion, helloXid, MessageFactory.digVersions(helloInput.getElements()));
320
321         Future<RpcResult<Void>> helloResult = connectionAdapter.hello(helloInput);
322
323         ListenableFuture<RpcResult<Void>> rpcResultListenableFuture = JdkFutureAdapters.listenInPoolThread(helloResult);
324         Futures.addCallback(rpcResultListenableFuture, new FutureCallback<RpcResult<Void>>() {
325             @Override
326             public void onSuccess(RpcResult<Void> result) {
327                 if (result.isSuccessful()) {
328                     LOG.debug("hello successfully sent, xid={}, addr={}", helloXid, connectionAdapter.getRemoteAddress());
329                     resultFtr.set(null);
330                 } else {
331                     for (RpcError error : result.getErrors()) {
332                         LOG.debug("hello sending failed [{}]: i:{} s:{} m:{}, addr:{}", helloXid,
333                                 error.getInfo(), error.getSeverity(), error.getMessage(),
334                                 connectionAdapter.getRemoteAddress());
335                         if (error.getCause() != null) {
336                             LOG.trace("DETAIL of sending hello failure", error.getCause());
337                         }
338                     }
339                     resultFtr.cancel(false);
340                     handshakeListener.onHandshakeFailure();
341                 }
342             }
343
344             @Override
345             public void onFailure(Throwable t) {
346                 LOG.warn("sending of hello failed seriously [{}, addr:{}]: {}", helloXid,
347                         connectionAdapter.getRemoteAddress(), t.getMessage());
348                 LOG.trace("DETAIL of sending of hello failure:", t);
349                 resultFtr.cancel(false);
350                 handshakeListener.onHandshakeFailure();
351             }
352         });
353         LOG.trace("sending hello message [{}] - result hooked ..", helloXid);
354         return resultFtr;
355     }
356
357
358     /**
359      * after handshake set features, register to session
360      * @param proposedVersion
361      * @param xid
362      * @throws Exception
363      */
364     protected void postHandshake(final Short proposedVersion, final Long xid) {
365         // set version
366         version = proposedVersion;
367
368         LOG.debug("version set: {}", proposedVersion);
369         // request features
370         GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder();
371         featuresBuilder.setVersion(version).setXid(xid);
372         LOG.debug("sending feature request for version={} and xid={}", version, xid);
373         Future<RpcResult<GetFeaturesOutput>> featuresFuture = connectionAdapter
374                 .getFeatures(featuresBuilder.build());
375
376         Futures.addCallback(JdkFutureAdapters.listenInPoolThread(featuresFuture),
377                 new FutureCallback<RpcResult<GetFeaturesOutput>>() {
378                     @Override
379                     public void onSuccess(RpcResult<GetFeaturesOutput> rpcFeatures) {
380                         LOG.trace("features are back");
381                         if (rpcFeatures.isSuccessful()) {
382                             GetFeaturesOutput featureOutput = rpcFeatures.getResult();
383
384                             LOG.debug("obtained features: datapathId={}",
385                                     featureOutput.getDatapathId());
386                             LOG.debug("obtained features: auxiliaryId={}",
387                                     featureOutput.getAuxiliaryId());
388                             LOG.trace("handshake SETTLED: version={}, datapathId={}, auxiliaryId={}",
389                                     version, featureOutput.getDatapathId(), featureOutput.getAuxiliaryId());
390                             handshakeListener.onHandshakeSuccessfull(featureOutput, proposedVersion);
391                         } else {
392                             // handshake failed
393                             LOG.warn("issuing disconnect during handshake [{}]", connectionAdapter.getRemoteAddress());
394                             for (RpcError rpcError : rpcFeatures.getErrors()) {
395                                 LOG.debug("handshake - features failure [{}]: i:{} | m:{} | s:{}", xid,
396                                         rpcError.getInfo(), rpcError.getMessage(), rpcError.getSeverity(),
397                                         rpcError.getCause()
398                                 );
399                             }
400                             handshakeListener.onHandshakeFailure();
401                         }
402
403                         LOG.debug("postHandshake DONE");
404                     }
405
406                     @Override
407                     public void onFailure(Throwable t) {
408                         LOG.warn("getting feature failed seriously [{}, addr:{}]: {}", xid,
409                                 connectionAdapter.getRemoteAddress(), t.getMessage());
410                         LOG.trace("DETAIL of sending of hello failure:", t);
411                     }
412                 });
413
414         LOG.debug("future features [{}] hooked ..", xid);
415
416     }
417
418     @Override
419     public void setUseVersionBitmap(boolean useVersionBitmap) {
420         this.useVersionBitmap = useVersionBitmap;
421     }
422
423     @Override
424     public void setErrorHandler(ErrorHandler errorHandler) {
425         this.errorHandler = errorHandler;
426     }
427 }