X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fopenflow%2Fmd%2Fcore%2FHandshakeManagerImpl.java;h=9392e8ee7eea9b6ab175af0ce2a39ddc5c00e1fb;hb=6e5df2b3308ffdc906dc1ef84a4ddde37f8080f6;hp=a9f5a0d62edbc6a52ae3fdb7bb2d67da49fe0f4e;hpb=ad83ecdb48cdb3dee6d74ef3609dbfcbd99e5bbc;p=openflowplugin.git diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/HandshakeManagerImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/HandshakeManagerImpl.java index a9f5a0d62e..9392e8ee7e 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/HandshakeManagerImpl.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/HandshakeManagerImpl.java @@ -7,14 +7,16 @@ */ package org.opendaylight.openflowplugin.openflow.md.core; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.JdkFutureAdapters; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import java.util.List; +import java.util.Objects; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; -import org.opendaylight.openflowplugin.ConnectionException; -import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor; +import org.opendaylight.openflowplugin.api.OFConstants; import org.opendaylight.openflowplugin.api.openflow.md.core.ErrorHandler; import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener; import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeManager; @@ -23,31 +25,29 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements; +import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * @author mirehak - * - */ public class HandshakeManagerImpl implements HandshakeManager { - + + private static final long activeXID = 20L; + private static final Logger LOG = LoggerFactory .getLogger(HandshakeManagerImpl.class); - + private Short lastProposedVersion; private Short lastReceivedVersion; private final List versionOrder; - - private HelloMessage receivedHello; + + private final ConnectionAdapter connectionAdapter; - private GetFeaturesOutput features; private Short version; private ErrorHandler errorHandler; - - private long maxTimeout = 8000; - private TimeUnit maxTimeoutUnit = TimeUnit.MILLISECONDS; + + + private Short highestVersion; private Long activeXid; @@ -55,31 +55,26 @@ public class HandshakeManagerImpl implements HandshakeManager { private HandshakeListener handshakeListener; private boolean useVersionBitmap; - - @Override - public void setReceivedHello(HelloMessage receivedHello) { - this.receivedHello = receivedHello; - } - + /** - * @param connectionAdapter - * @param highestVersion - * @param versionOrder + * @param connectionAdapter connection adaptor for switch + * @param highestVersion highest openflow version + * @param versionOrder list of version in order for connection protocol negotiation */ - public HandshakeManagerImpl(ConnectionAdapter connectionAdapter, Short highestVersion, + public HandshakeManagerImpl(ConnectionAdapter connectionAdapter, Short highestVersion, List versionOrder) { this.highestVersion = highestVersion; this.versionOrder = versionOrder; this.connectionAdapter = connectionAdapter; } - + @Override public void setHandshakeListener(HandshakeListener handshakeListener) { this.handshakeListener = handshakeListener; } @Override - public void shake() { + public synchronized void shake(HelloMessage receivedHello) { if (version != null) { // Some switches respond with a second HELLO acknowledging our HELLO @@ -90,11 +85,10 @@ public class HandshakeManagerImpl implements HandshakeManager { } LOG.trace("handshake STARTED"); - setActiveXid(20L); - HelloMessage receivedHelloLoc = receivedHello; - + setActiveXid(activeXID); + try { - if (receivedHelloLoc == null) { + if (receivedHello == null) { // first Hello sending sendHelloMessage(highestVersion, getNextXid()); lastProposedVersion = highestVersion; @@ -103,50 +97,71 @@ public class HandshakeManagerImpl implements HandshakeManager { } // process the 2. and later hellos - Short remoteVersion = receivedHelloLoc.getVersion(); - List elements = receivedHelloLoc.getElements(); - setActiveXid(receivedHelloLoc.getXid()); + Short remoteVersion = receivedHello.getVersion(); + List elements = receivedHello.getElements(); + setActiveXid(receivedHello.getXid()); List remoteVersionBitmap = MessageFactory.digVersions(elements); - LOG.debug("Hello message: version={}, bitmap={}, xid={}", remoteVersion, - remoteVersionBitmap, receivedHelloLoc.getXid()); - + LOG.debug("Hello message: version={}, xid={}, bitmap={}", remoteVersion, + receivedHello.getXid(), remoteVersionBitmap); + if (useVersionBitmap && remoteVersionBitmap != null) { // versionBitmap on both sides -> ONE STEP DECISION handleVersionBitmapNegotiation(elements); - } else { - // versionBitmap missing at least on one side -> STEP-BY-STEP NEGOTIATION applying + } else { + // versionBitmap missing at least on one side -> STEP-BY-STEP NEGOTIATION applying handleStepByStepVersionNegotiation(remoteVersion); } } catch (Exception ex) { - errorHandler.handleException(ex, null); - connectionAdapter.disconnect(); + errorHandler.handleException(ex); + LOG.trace("ret - shake fail - closing"); handshakeListener.onHandshakeFailure(); - LOG.trace("ret - shake fail: {}", ex.getMessage()); } } /** - * @param remoteVersion - * @throws Exception + * @param remoteVersion remote version + * @throws Exception exception */ - private void handleStepByStepVersionNegotiation(Short remoteVersion) throws Exception { - LOG.debug("remoteVersion:{} lastProposedVersion:{}, highestVersion:{}", + private void handleStepByStepVersionNegotiation(final Short remoteVersion) throws Exception { + LOG.debug("remoteVersion:{} lastProposedVersion:{}, highestVersion:{}", remoteVersion, lastProposedVersion, highestVersion); - + if (lastProposedVersion == null) { - // first hello has not been sent yet, send it and either wait for next remote + // first hello has not been sent yet, send it and either wait for next remote // version or proceed lastProposedVersion = proposeNextVersion(remoteVersion); - sendHelloMessage(lastProposedVersion, getNextXid()); + final Long nextHelloXid = getNextXid(); + ListenableFuture helloResult = sendHelloMessage(lastProposedVersion, nextHelloXid); + Futures.addCallback(helloResult, new FutureCallback() { + @Override + public void onSuccess(Void result) { + try { + stepByStepVersionSubStep(remoteVersion, lastProposedVersion); + } catch (Exception e) { + errorHandler.handleException(e); + handshakeListener.onHandshakeFailure(); + } + } + + @Override + public void onFailure(Throwable t) { + LOG.info("hello sending seriously failed [{}]", nextHelloXid); + LOG.trace("detail of hello send problem", t); + } + }); + } else { + stepByStepVersionSubStep(remoteVersion, lastProposedVersion); } - - if (remoteVersion == lastProposedVersion) { + } + + private void stepByStepVersionSubStep(Short remoteVersion, Short lastProposedVersion) throws Exception { + if (remoteVersion.equals(lastProposedVersion)) { postHandshake(lastProposedVersion, getNextXid()); LOG.trace("ret - OK - switch answered with lastProposedVersion"); } else { checkNegotiationStalling(remoteVersion); - if (remoteVersion > (lastProposedVersion == null ? highestVersion : lastProposedVersion)) { + if (remoteVersion > (lastProposedVersion == null ? highestVersion : this.lastProposedVersion)) { // wait for next version LOG.trace("ret - wait"); } else { @@ -157,8 +172,8 @@ public class HandshakeManagerImpl implements HandshakeManager { } /** - * @param remoteVersion - * @throws Exception + * @param remoteVersion remote version + * @throws Exception exception */ private void handleLowerVersionProposal(Short remoteVersion) throws Exception { Short proposedVersion; @@ -167,7 +182,7 @@ public class HandshakeManagerImpl implements HandshakeManager { lastProposedVersion = proposedVersion; sendHelloMessage(proposedVersion, getNextXid()); - if (proposedVersion != remoteVersion) { + if (! Objects.equals(proposedVersion, remoteVersion)) { LOG.trace("ret - sent+wait"); } else { LOG.trace("ret - sent+OK"); @@ -176,38 +191,52 @@ public class HandshakeManagerImpl implements HandshakeManager { } /** - * @param elements - * @throws Exception + * @param elements version elements + * @throws Exception exception */ private void handleVersionBitmapNegotiation(List elements) throws Exception { - Short proposedVersion; - proposedVersion = proposeCommonBitmapVersion(elements); + final Short proposedVersion = proposeCommonBitmapVersion(elements); if (lastProposedVersion == null) { // first hello has not been sent yet - sendHelloMessage(proposedVersion, getNextXid()); + Long nexHelloXid = getNextXid(); + ListenableFuture helloDone = sendHelloMessage(proposedVersion, nexHelloXid); + Futures.addCallback(helloDone, new FutureCallback() { + @Override + public void onSuccess(Void result) { + LOG.trace("ret - DONE - versionBitmap"); + postHandshake(proposedVersion, getNextXid()); + } + + @Override + public void onFailure(Throwable t) { + // NOOP + } + }); + LOG.trace("next proposal [{}] with versionBitmap hooked ..", nexHelloXid); + } else { + LOG.trace("ret - DONE - versionBitmap"); + postHandshake(proposedVersion, getNextXid()); } - postHandshake(proposedVersion, getNextXid()); - LOG.trace("ret - OK - versionBitmap"); } - + /** - * - * @return + * + * @return next tx id */ private Long getNextXid() { - activeXid += 1; + activeXid += 1; return activeXid; } /** - * @param xid + * @param xid tx id */ private void setActiveXid(Long xid) { this.activeXid = xid; } - + /** - * @param remoteVersion + * @param remoteVersion remove version */ private void checkNegotiationStalling(Short remoteVersion) { if (lastReceivedVersion != null && lastReceivedVersion.equals(remoteVersion)) { @@ -216,11 +245,6 @@ public class HandshakeManagerImpl implements HandshakeManager { lastReceivedVersion = remoteVersion; } - @Override - public GetFeaturesOutput getFeatures() { - return features; - } - @Override public Short getVersion() { return version; @@ -228,8 +252,8 @@ public class HandshakeManagerImpl implements HandshakeManager { /** * find common highest supported bitmap version - * @param list - * @return + * @param list bitmap list + * @return proposed bitmap value */ protected Short proposeCommonBitmapVersion(List list) { Short supportedHighestVersion = null; @@ -237,7 +261,7 @@ public class HandshakeManagerImpl implements HandshakeManager { for(Elements element : list) { List bitmap = element.getVersionBitmap(); // check for version bitmap - for(short bitPos : ConnectionConductor.versionOrder) { + for(short bitPos : OFConstants.VERSION_ORDER) { // with all the version it should work. if(bitmap.get(bitPos % Integer.SIZE)) { supportedHighestVersion = bitPos; @@ -245,8 +269,9 @@ public class HandshakeManagerImpl implements HandshakeManager { } } } - + if(null == supportedHighestVersion) { + LOG.trace("versionBitmap: no common version found"); throw new IllegalArgumentException("no common version found in versionBitmap"); } } @@ -256,8 +281,8 @@ public class HandshakeManagerImpl implements HandshakeManager { /** * find supported version based on remoteVersion - * @param remoteVersion - * @return + * @param remoteVersion openflow version supported by remote entity + * @return openflow version */ protected short proposeNextVersion(short remoteVersion) { Short proposal = null; @@ -273,42 +298,66 @@ public class HandshakeManagerImpl implements HandshakeManager { } return proposal; } - + /** * send hello reply without versionBitmap - * @param helloVersion - * @param helloXid - * @throws Exception + * @param helloVersion initial hello version for openflow connection negotiation + * @param helloXid transaction id + * @throws Exception */ - private void sendHelloMessage(Short helloVersion, Long helloXid) throws Exception { - //Short highestVersion = ConnectionConductor.versionOrder.get(0); - //final Long helloXid = 21L; + private ListenableFuture sendHelloMessage(Short helloVersion, final Long helloXid) throws Exception { + + HelloInput helloInput = MessageFactory.createHelloInput(helloVersion, helloXid, versionOrder); - - LOG.debug("sending hello message: version{}, xid={}, version bitmap={}", + + final SettableFuture resultFtr = SettableFuture.create(); + + LOG.debug("sending hello message: version{}, xid={}, version bitmap={}", helloVersion, helloXid, MessageFactory.digVersions(helloInput.getElements())); - - try { - RpcResult helloResult = connectionAdapter.hello(helloInput).get(maxTimeout, maxTimeoutUnit); - RpcUtil.smokeRpc(helloResult); - LOG.debug("FIRST HELLO sent."); - } catch (Exception e) { - if (LOG.isTraceEnabled()) { - LOG.trace("FIRST HELLO sent.", e); + + Future> helloResult = connectionAdapter.hello(helloInput); + + ListenableFuture> rpcResultListenableFuture = JdkFutureAdapters.listenInPoolThread(helloResult); + Futures.addCallback(rpcResultListenableFuture, new FutureCallback>() { + @Override + public void onSuccess(RpcResult result) { + if (result.isSuccessful()) { + LOG.debug("hello successfully sent, xid={}, addr={}", helloXid, connectionAdapter.getRemoteAddress()); + resultFtr.set(null); + } else { + for (RpcError error : result.getErrors()) { + LOG.debug("hello sending failed [{}]: i:{} s:{} m:{}, addr:{}", helloXid, + error.getInfo(), error.getSeverity(), error.getMessage(), + connectionAdapter.getRemoteAddress()); + if (error.getCause() != null) { + LOG.trace("DETAIL of sending hello failure", error.getCause()); + } + } + resultFtr.cancel(false); + handshakeListener.onHandshakeFailure(); + } } - handshakeListener.onHandshakeFailure(); - throw new ConnectionException("FIRST HELLO sending failed because of connection issue."); - } + + @Override + public void onFailure(Throwable t) { + LOG.warn("sending of hello failed seriously [{}, addr:{}]: {}", helloXid, + connectionAdapter.getRemoteAddress(), t.getMessage()); + LOG.trace("DETAIL of sending of hello failure:", t); + resultFtr.cancel(false); + handshakeListener.onHandshakeFailure(); + } + }); + LOG.trace("sending hello message [{}] - result hooked ..", helloXid); + return resultFtr; } /** * after handshake set features, register to session - * @param proposedVersion - * @param xId - * @throws Exception + * @param proposedVersion proposed openflow version + * @param xid transaction id */ - protected void postHandshake(Short proposedVersion, Long xid) throws Exception { + protected void postHandshake(final Short proposedVersion, final Long xid) { // set version version = proposedVersion; @@ -319,44 +368,54 @@ public class HandshakeManagerImpl implements HandshakeManager { LOG.debug("sending feature request for version={} and xid={}", version, xid); Future> featuresFuture = connectionAdapter .getFeatures(featuresBuilder.build()); - LOG.debug("waiting for features"); - try { - RpcResult rpcFeatures = - featuresFuture.get(maxTimeout, maxTimeoutUnit); - RpcUtil.smokeRpc(rpcFeatures); - - GetFeaturesOutput featureOutput = rpcFeatures.getResult(); - - LOG.debug("obtained features: datapathId={}", - featureOutput.getDatapathId()); - LOG.debug("obtained features: auxiliaryId={}", - featureOutput.getAuxiliaryId()); - LOG.trace("handshake SETTLED: version={}, datapathId={}, auxiliaryId={}", - version, featureOutput.getDatapathId(), featureOutput.getAuxiliaryId()); - - handshakeListener.onHandshakeSuccessfull(featureOutput, proposedVersion); - } catch (TimeoutException e) { - // handshake failed - LOG.warn("issuing disconnect during handshake, reason: future expired", e); - connectionAdapter.disconnect(); - handshakeListener.onHandshakeFailure(); - throw e; - } catch (Exception e) { - // handshake failed - LOG.warn("issuing disconnect during handshake, reason - RPC: {}", e.getMessage(), e); - connectionAdapter.disconnect(); - handshakeListener.onHandshakeFailure(); - throw e; - } - - LOG.debug("postHandshake DONE"); + + Futures.addCallback(JdkFutureAdapters.listenInPoolThread(featuresFuture), + new FutureCallback>() { + @Override + public void onSuccess(RpcResult rpcFeatures) { + LOG.trace("features are back"); + if (rpcFeatures.isSuccessful()) { + GetFeaturesOutput featureOutput = rpcFeatures.getResult(); + + LOG.debug("obtained features: datapathId={}", + featureOutput.getDatapathId()); + LOG.debug("obtained features: auxiliaryId={}", + featureOutput.getAuxiliaryId()); + LOG.trace("handshake SETTLED: version={}, datapathId={}, auxiliaryId={}", + version, featureOutput.getDatapathId(), featureOutput.getAuxiliaryId()); + handshakeListener.onHandshakeSuccessful(featureOutput, proposedVersion); + } else { + // handshake failed + LOG.warn("issuing disconnect during handshake [{}]", connectionAdapter.getRemoteAddress()); + for (RpcError rpcError : rpcFeatures.getErrors()) { + LOG.debug("handshake - features failure [{}]: i:{} | m:{} | s:{}", xid, + rpcError.getInfo(), rpcError.getMessage(), rpcError.getSeverity(), + rpcError.getCause() + ); + } + handshakeListener.onHandshakeFailure(); + } + + LOG.debug("postHandshake DONE"); + } + + @Override + public void onFailure(Throwable t) { + LOG.warn("getting feature failed seriously [{}, addr:{}]: {}", xid, + connectionAdapter.getRemoteAddress(), t.getMessage()); + LOG.trace("DETAIL of sending of hello failure:", t); + } + }); + + LOG.debug("future features [{}] hooked ..", xid); + } @Override public void setUseVersionBitmap(boolean useVersionBitmap) { this.useVersionBitmap = useVersionBitmap; } - + @Override public void setErrorHandler(ErrorHandler errorHandler) { this.errorHandler = errorHandler;