fix for BUG-956 - deadlock by rpc invocation
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / HandshakeManagerImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.openflow.md.core;
9
10 import java.util.List;
11 import java.util.concurrent.Future;
12 import java.util.concurrent.TimeUnit;
13
14 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
15 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInputBuilder;
16 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
17 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
20 import org.opendaylight.yangtools.yang.common.RpcResult;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23
24 /**
25  * @author mirehak
26  *
27  */
28 public class HandshakeManagerImpl implements HandshakeManager {
29     
30     private static final Logger LOG = LoggerFactory
31             .getLogger(HandshakeManagerImpl.class);
32     
33     private Short lastProposedVersion;
34     private Short lastReceivedVersion;
35     private final List<Short> versionOrder;
36     
37     private HelloMessage receivedHello;
38     private final ConnectionAdapter connectionAdapter;
39     private GetFeaturesOutput features;
40     private Short version;
41     private ErrorHandler errorHandler;
42     
43     private long maxTimeout = 1000;
44     private TimeUnit maxTimeoutUnit = TimeUnit.MILLISECONDS;
45     private Short highestVersion;
46
47     private Long activeXid;
48
49     private HandshakeListener handshakeListener;
50
51     private boolean useVersionBitmap;
52     
53     @Override
54     public void setReceivedHello(HelloMessage receivedHello) {
55         this.receivedHello = receivedHello;
56     }
57     
58     /**
59      * @param connectionAdapter 
60      * @param highestVersion 
61      * @param versionOrder
62      */
63     public HandshakeManagerImpl(ConnectionAdapter connectionAdapter, Short highestVersion, 
64             List<Short> versionOrder) {
65         this.highestVersion = highestVersion;
66         this.versionOrder = versionOrder;
67         this.connectionAdapter = connectionAdapter;
68     }
69     
70     @Override
71     public void setHandshakeListener(HandshakeListener handshakeListener) {
72         this.handshakeListener = handshakeListener;
73     }
74
75     @Override
76     public synchronized void shake() {
77         LOG.trace("handshake STARTED");
78         setActiveXid(20L);
79         HelloMessage receivedHelloLoc = receivedHello;
80         
81         try {
82             if (receivedHelloLoc == null) {
83                 // first Hello sending
84                 sendHelloMessage(highestVersion, getNextXid());
85                 lastProposedVersion = highestVersion;
86                 LOG.debug("ret - firstHello+wait");
87                 return;
88             }
89
90             // process the 2. and later hellos
91             Short remoteVersion = receivedHelloLoc.getVersion();
92             List<Elements> elements = receivedHelloLoc.getElements();
93             setActiveXid(receivedHelloLoc.getXid());
94             List<Boolean> remoteVersionBitmap = MessageFactory.digVersions(elements);
95             LOG.debug("Hello message: version={}, bitmap={}, xid={}", remoteVersion, 
96                     remoteVersionBitmap, receivedHelloLoc.getXid());
97         
98             if (useVersionBitmap && remoteVersionBitmap != null) {
99                 // versionBitmap on both sides -> ONE STEP DECISION
100                 handleVersionBitmapNegotiation(elements);
101             } else { 
102                 // versionBitmap missing at least on one side -> STEP-BY-STEP NEGOTIATION applying 
103                 handleStepByStepVersionNegotiation(remoteVersion);
104             }
105         } catch (Throwable ex) {
106             errorHandler.handleException(ex, null);
107             connectionAdapter.disconnect();
108             LOG.debug("ret - "+ex.getMessage());
109         }
110     }
111
112     /**
113      * @param remoteVersion
114      * @throws Throwable 
115      */
116     private void handleStepByStepVersionNegotiation(Short remoteVersion) throws Throwable {
117         LOG.debug("remoteVersion:{} lastProposedVersion:{}, highestVersion:{}", 
118                 remoteVersion, lastProposedVersion, highestVersion);
119         
120         if (lastProposedVersion == null) {
121             // first hello has not been sent yet, send it and either wait for next remote 
122             // version or proceed
123             lastProposedVersion = proposeNextVersion(remoteVersion);
124             sendHelloMessage(lastProposedVersion, getNextXid());
125         }
126         
127         if (remoteVersion == lastProposedVersion) {
128             postHandshake(lastProposedVersion, getNextXid());
129             LOG.debug("ret - OK - switch answered with lastProposedVersion");
130         } else {
131             checkNegotiationStalling(remoteVersion);
132
133             if (remoteVersion > (lastProposedVersion == null ? highestVersion : lastProposedVersion)) {
134                 // wait for next version
135                 LOG.debug("ret - wait");
136             } else {
137                 //propose lower version
138                 handleLowerVersionProposal(remoteVersion);
139             }
140         }
141     }
142
143     /**
144      * @param remoteVersion
145      * @throws Throwable 
146      */
147     private void handleLowerVersionProposal(Short remoteVersion) throws Throwable {
148         Short proposedVersion;
149         // find the version from header version field
150         proposedVersion = proposeNextVersion(remoteVersion);
151         lastProposedVersion = proposedVersion;
152         sendHelloMessage(proposedVersion, getNextXid());
153
154         if (proposedVersion != remoteVersion) {
155             LOG.debug("ret - sent+wait");
156         } else {
157             LOG.debug("ret - sent+OK");
158             postHandshake(proposedVersion, getNextXid());
159         }
160     }
161
162     /**
163      * @param elements
164      * @throws Throwable 
165      */
166     private void handleVersionBitmapNegotiation(List<Elements> elements) throws Throwable {
167         Short proposedVersion;
168         proposedVersion = proposeCommonBitmapVersion(elements);
169         if (lastProposedVersion == null) {
170             // first hello has not been sent yet
171             sendHelloMessage(proposedVersion, getNextXid());
172         }
173         postHandshake(proposedVersion, getNextXid());
174         LOG.debug("ret - OK - versionBitmap");
175     }
176     
177     /**
178      * 
179      * @return
180      */
181     private Long getNextXid() {
182         activeXid += 1; 
183         return activeXid;
184     }
185
186     /**
187      * @param xid
188      */
189     private void setActiveXid(Long xid) {
190         this.activeXid = xid;
191     }
192     
193     /**
194      * @param remoteVersion
195      */
196     private void checkNegotiationStalling(Short remoteVersion) {
197         if (lastReceivedVersion != null && lastReceivedVersion.equals(remoteVersion)) {
198             throw new IllegalStateException("version negotiation stalled: version = "+remoteVersion);
199         }
200         lastReceivedVersion = remoteVersion;
201     }
202
203     @Override
204     public GetFeaturesOutput getFeatures() {
205         return features;
206     }
207     
208     @Override
209     public Short getVersion() {
210         return version;
211     }
212
213     /**
214      * find common highest supported bitmap version
215      * @param list
216      * @return
217      */
218     protected Short proposeCommonBitmapVersion(List<Elements> list) {
219         Short supportedHighestVersion = null;
220         if((null != list) && (0 != list.size())) {
221             for(Elements element : list) {
222                 List<Boolean> bitmap = element.getVersionBitmap();
223                 // check for version bitmap
224                 for(short bitPos : ConnectionConductor.versionOrder) {
225                     // with all the version it should work.
226                     if(bitmap.get(bitPos % Integer.SIZE)) {
227                         supportedHighestVersion = bitPos;
228                         break;
229                     }
230                 }
231             }
232             
233             if(null == supportedHighestVersion) {
234                 throw new IllegalArgumentException("no common version found in versionBitmap");
235             }
236         }
237
238         return supportedHighestVersion;
239     }
240
241     /**
242      * find supported version based on remoteVersion
243      * @param remoteVersion
244      * @return
245      */
246     protected short proposeNextVersion(short remoteVersion) {
247         Short proposal = null;
248         for (short offer : versionOrder) {
249             if (offer <= remoteVersion) {
250                 proposal = offer;
251                 break;
252             }
253         }
254         if (proposal == null) {
255             throw new IllegalArgumentException("no equal or lower version found, unsupported version: "
256                     + remoteVersion);
257         }
258         return proposal;
259     }
260     
261     /**
262      * send hello reply without versionBitmap
263      * @param helloVersion
264      * @param helloXid
265      * @throws Throwable 
266      */
267     private void sendHelloMessage(Short helloVersion, Long helloXid) throws Throwable {
268         //Short highestVersion = ConnectionConductor.versionOrder.get(0);
269         //final Long helloXid = 21L;
270         HelloInput helloInput = MessageFactory.createHelloInput(helloVersion, helloXid, versionOrder);
271         
272         LOG.debug("sending hello message: version{}, xid={}, version bitmap={}", 
273                 helloVersion, helloXid, MessageFactory.digVersions(helloInput.getElements()));
274         
275         try {
276             RpcResult<Void> helloResult = connectionAdapter.hello(helloInput).get(maxTimeout, maxTimeoutUnit);
277             RpcUtil.smokeRpc(helloResult);
278             LOG.debug("FIRST HELLO sent.");
279         } catch (Throwable e) {
280             LOG.debug("FIRST HELLO sending failed.");
281             throw e;
282         }
283     }
284
285
286     /**
287      * after handshake set features, register to session
288      * @param proposedVersion
289      * @param xId
290      * @throws Throwable 
291      */
292     protected void postHandshake(Short proposedVersion, Long xid) throws Throwable {
293         // set version
294         long maxTimeout = 3000;
295         version = proposedVersion;
296
297         LOG.debug("version set: " + proposedVersion);
298         // request features
299         GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder();
300         featuresBuilder.setVersion(version).setXid(xid);
301         LOG.debug("sending feature request for version={} and xid={}", version, xid);
302         Future<RpcResult<GetFeaturesOutput>> featuresFuture = connectionAdapter
303                 .getFeatures(featuresBuilder.build());
304         LOG.debug("waiting for features");
305         try {
306             RpcResult<GetFeaturesOutput> rpcFeatures = 
307                     featuresFuture.get(maxTimeout, maxTimeoutUnit);
308             RpcUtil.smokeRpc(rpcFeatures);
309             
310             GetFeaturesOutput featureOutput =  rpcFeatures.getResult();
311             
312             LOG.debug("obtained features: datapathId={}",
313                     featureOutput.getDatapathId());
314             LOG.debug("obtained features: auxiliaryId={}",
315                     featureOutput.getAuxiliaryId());
316             LOG.trace("handshake SETTLED: version={}, datapathId={}, auxiliaryId={}", 
317                     version, featureOutput.getDatapathId(), featureOutput.getAuxiliaryId());
318             
319             handshakeListener.onHandshakeSuccessfull(featureOutput, proposedVersion);
320         } catch (Throwable e) {
321             //handshake failed
322             LOG.error("issuing disconnect during handshake, reason: "+e.getMessage());
323             connectionAdapter.disconnect();
324             throw e;
325         }
326         
327         LOG.debug("postHandshake DONE");
328     }
329
330     @Override
331     public void setUseVersionBitmap(boolean useVersionBitmap) {
332         this.useVersionBitmap = useVersionBitmap;
333     }
334     
335     @Override
336     public void setErrorHandler(ErrorHandler errorHandler) {
337         this.errorHandler = errorHandler;
338     }
339 }