X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fopenflow%2Fmd%2Fcore%2FHandshakeManagerImpl.java;h=9d4e34b90bd28e5f16ed6766e01141f93c453bd3;hb=213dc5f10a2193184808e67f5fb81edc19ccd8bc;hp=b108d336380b8429d28a512c53d9a7b19873220e;hpb=ef039b94f2d69254f64a8ecfe0f534aa55d062d6;p=openflowplugin.git diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/HandshakeManagerImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/HandshakeManagerImpl.java index b108d33638..9d4e34b90b 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/HandshakeManagerImpl.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/HandshakeManagerImpl.java @@ -7,13 +7,16 @@ */ package org.opendaylight.openflowplugin.openflow.md.core; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.JdkFutureAdapters; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import java.util.List; +import java.util.Objects; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; -import org.opendaylight.openflowplugin.ConnectionException; import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor; import org.opendaylight.openflowplugin.api.openflow.md.core.ErrorHandler; import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener; @@ -23,6 +26,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements; +import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,20 +36,19 @@ import org.slf4j.LoggerFactory; * */ public class HandshakeManagerImpl implements HandshakeManager { - + private static final Logger LOG = LoggerFactory .getLogger(HandshakeManagerImpl.class); - + private Short lastProposedVersion; private Short lastReceivedVersion; private final List versionOrder; - - private HelloMessage receivedHello; + + //private HelloMessage receivedHello; private final ConnectionAdapter connectionAdapter; - private GetFeaturesOutput features; private Short version; private ErrorHandler errorHandler; - + private long maxTimeout = 8000; private TimeUnit maxTimeoutUnit = TimeUnit.MILLISECONDS; private Short highestVersion; @@ -55,31 +58,26 @@ public class HandshakeManagerImpl implements HandshakeManager { private HandshakeListener handshakeListener; private boolean useVersionBitmap; - - @Override - public void setReceivedHello(HelloMessage receivedHello) { - this.receivedHello = receivedHello; - } - + /** - * @param connectionAdapter - * @param highestVersion + * @param connectionAdapter + * @param highestVersion * @param versionOrder */ - public HandshakeManagerImpl(ConnectionAdapter connectionAdapter, Short highestVersion, + public HandshakeManagerImpl(ConnectionAdapter connectionAdapter, Short highestVersion, List versionOrder) { this.highestVersion = highestVersion; this.versionOrder = versionOrder; this.connectionAdapter = connectionAdapter; } - + @Override public void setHandshakeListener(HandshakeListener handshakeListener) { this.handshakeListener = handshakeListener; } @Override - public void shake() { + public synchronized void shake(HelloMessage receivedHello) { if (version != null) { // Some switches respond with a second HELLO acknowledging our HELLO @@ -91,10 +89,9 @@ public class HandshakeManagerImpl implements HandshakeManager { LOG.trace("handshake STARTED"); setActiveXid(20L); - HelloMessage receivedHelloLoc = receivedHello; - + try { - if (receivedHelloLoc == null) { + if (receivedHello == null) { // first Hello sending sendHelloMessage(highestVersion, getNextXid()); lastProposedVersion = highestVersion; @@ -103,50 +100,71 @@ public class HandshakeManagerImpl implements HandshakeManager { } // process the 2. and later hellos - Short remoteVersion = receivedHelloLoc.getVersion(); - List elements = receivedHelloLoc.getElements(); - setActiveXid(receivedHelloLoc.getXid()); + Short remoteVersion = receivedHello.getVersion(); + List elements = receivedHello.getElements(); + setActiveXid(receivedHello.getXid()); List remoteVersionBitmap = MessageFactory.digVersions(elements); - LOG.debug("Hello message: version={}, bitmap={}, xid={}", remoteVersion, - remoteVersionBitmap, receivedHelloLoc.getXid()); - + LOG.debug("Hello message: version={}, xid={}, bitmap={}", remoteVersion, + receivedHello.getXid(), remoteVersionBitmap); + if (useVersionBitmap && remoteVersionBitmap != null) { // versionBitmap on both sides -> ONE STEP DECISION handleVersionBitmapNegotiation(elements); - } else { - // versionBitmap missing at least on one side -> STEP-BY-STEP NEGOTIATION applying + } else { + // versionBitmap missing at least on one side -> STEP-BY-STEP NEGOTIATION applying handleStepByStepVersionNegotiation(remoteVersion); } } catch (Exception ex) { errorHandler.handleException(ex, null); - connectionAdapter.disconnect(); + LOG.trace("ret - shake fail - closing"); handshakeListener.onHandshakeFailure(); - LOG.trace("ret - shake fail: {}", ex.getMessage()); } } /** * @param remoteVersion - * @throws Exception + * @throws Exception */ - private void handleStepByStepVersionNegotiation(Short remoteVersion) throws Exception { - LOG.debug("remoteVersion:{} lastProposedVersion:{}, highestVersion:{}", + private void handleStepByStepVersionNegotiation(final Short remoteVersion) throws Exception { + LOG.debug("remoteVersion:{} lastProposedVersion:{}, highestVersion:{}", remoteVersion, lastProposedVersion, highestVersion); - + if (lastProposedVersion == null) { - // first hello has not been sent yet, send it and either wait for next remote + // first hello has not been sent yet, send it and either wait for next remote // version or proceed lastProposedVersion = proposeNextVersion(remoteVersion); - sendHelloMessage(lastProposedVersion, getNextXid()); + final Long nextHelloXid = getNextXid(); + ListenableFuture helloResult = sendHelloMessage(lastProposedVersion, nextHelloXid); + Futures.addCallback(helloResult, new FutureCallback() { + @Override + public void onSuccess(Void result) { + try { + stepByStepVersionSubStep(remoteVersion, lastProposedVersion); + } catch (Exception e) { + errorHandler.handleException(e, null); + handshakeListener.onHandshakeFailure(); + } + } + + @Override + public void onFailure(Throwable t) { + LOG.info("hello sending seriously failed [{}]", nextHelloXid); + LOG.trace("detail of hello send problem", t); + } + }); + } else { + stepByStepVersionSubStep(remoteVersion, lastProposedVersion); } - + } + + private void stepByStepVersionSubStep(Short remoteVersion, Short lastProposedVersion) throws Exception { if (remoteVersion == lastProposedVersion) { postHandshake(lastProposedVersion, getNextXid()); LOG.trace("ret - OK - switch answered with lastProposedVersion"); } else { checkNegotiationStalling(remoteVersion); - if (remoteVersion > (lastProposedVersion == null ? highestVersion : lastProposedVersion)) { + if (remoteVersion > (lastProposedVersion == null ? highestVersion : this.lastProposedVersion)) { // wait for next version LOG.trace("ret - wait"); } else { @@ -158,7 +176,7 @@ public class HandshakeManagerImpl implements HandshakeManager { /** * @param remoteVersion - * @throws Exception + * @throws Exception */ private void handleLowerVersionProposal(Short remoteVersion) throws Exception { Short proposedVersion; @@ -167,7 +185,7 @@ public class HandshakeManagerImpl implements HandshakeManager { lastProposedVersion = proposedVersion; sendHelloMessage(proposedVersion, getNextXid()); - if (proposedVersion != remoteVersion) { + if (! Objects.equals(proposedVersion, remoteVersion)) { LOG.trace("ret - sent+wait"); } else { LOG.trace("ret - sent+OK"); @@ -177,25 +195,39 @@ public class HandshakeManagerImpl implements HandshakeManager { /** * @param elements - * @throws Exception + * @throws Exception */ private void handleVersionBitmapNegotiation(List elements) throws Exception { - Short proposedVersion; - proposedVersion = proposeCommonBitmapVersion(elements); + final Short proposedVersion = proposeCommonBitmapVersion(elements); if (lastProposedVersion == null) { // first hello has not been sent yet - sendHelloMessage(proposedVersion, getNextXid()); + Long nexHelloXid = getNextXid(); + ListenableFuture helloDone = sendHelloMessage(proposedVersion, nexHelloXid); + Futures.addCallback(helloDone, new FutureCallback() { + @Override + public void onSuccess(Void result) { + LOG.trace("ret - DONE - versionBitmap"); + postHandshake(proposedVersion, getNextXid()); + } + + @Override + public void onFailure(Throwable t) { + // NOOP + } + }); + LOG.trace("next proposal [{}] with versionBitmap hooked ..", nexHelloXid); + } else { + LOG.trace("ret - DONE - versionBitmap"); + postHandshake(proposedVersion, getNextXid()); } - postHandshake(proposedVersion, getNextXid()); - LOG.trace("ret - OK - versionBitmap"); } - + /** - * + * * @return */ private Long getNextXid() { - activeXid += 1; + activeXid += 1; return activeXid; } @@ -205,7 +237,7 @@ public class HandshakeManagerImpl implements HandshakeManager { private void setActiveXid(Long xid) { this.activeXid = xid; } - + /** * @param remoteVersion */ @@ -216,11 +248,6 @@ public class HandshakeManagerImpl implements HandshakeManager { lastReceivedVersion = remoteVersion; } - @Override - public GetFeaturesOutput getFeatures() { - return features; - } - @Override public Short getVersion() { return version; @@ -245,8 +272,9 @@ public class HandshakeManagerImpl implements HandshakeManager { } } } - + if(null == supportedHighestVersion) { + LOG.trace("versionBitmap: no common version found"); throw new IllegalArgumentException("no common version found in versionBitmap"); } } @@ -273,32 +301,57 @@ public class HandshakeManagerImpl implements HandshakeManager { } return proposal; } - + /** * send hello reply without versionBitmap * @param helloVersion * @param helloXid - * @throws Exception + * @throws Exception */ - private void sendHelloMessage(Short helloVersion, Long helloXid) throws Exception { + private ListenableFuture sendHelloMessage(Short helloVersion, final Long helloXid) throws Exception { //Short highestVersion = ConnectionConductor.versionOrder.get(0); //final Long helloXid = 21L; HelloInput helloInput = MessageFactory.createHelloInput(helloVersion, helloXid, versionOrder); - - LOG.debug("sending hello message: version{}, xid={}, version bitmap={}", + + final SettableFuture resultFtr = SettableFuture.create(); + + LOG.debug("sending hello message: version{}, xid={}, version bitmap={}", helloVersion, helloXid, MessageFactory.digVersions(helloInput.getElements())); - - try { - RpcResult helloResult = connectionAdapter.hello(helloInput).get(maxTimeout, maxTimeoutUnit); - RpcUtil.smokeRpc(helloResult); - LOG.debug("FIRST HELLO sent."); - } catch (Exception e) { - if (LOG.isTraceEnabled()) { - LOG.trace("FIRST HELLO sent.", e); + + Future> helloResult = connectionAdapter.hello(helloInput); + + ListenableFuture> rpcResultListenableFuture = JdkFutureAdapters.listenInPoolThread(helloResult); + Futures.addCallback(rpcResultListenableFuture, new FutureCallback>() { + @Override + public void onSuccess(RpcResult result) { + if (result.isSuccessful()) { + LOG.debug("hello successfully sent, xid={}, addr={}", helloXid, connectionAdapter.getRemoteAddress()); + resultFtr.set(null); + } else { + for (RpcError error : result.getErrors()) { + LOG.debug("hello sending failed [{}]: i:{} s:{} m:{}, addr:{}", helloXid, + error.getInfo(), error.getSeverity(), error.getMessage(), + connectionAdapter.getRemoteAddress()); + if (error.getCause() != null) { + LOG.trace("DETAIL of sending hello failure", error.getCause()); + } + } + resultFtr.cancel(false); + handshakeListener.onHandshakeFailure(); + } } - handshakeListener.onHandshakeFailure(); - throw new ConnectionException("FIRST HELLO sending failed because of connection issue."); - } + + @Override + public void onFailure(Throwable t) { + LOG.warn("sending of hello failed seriously [{}, addr:{}]: {}", helloXid, + connectionAdapter.getRemoteAddress(), t.getMessage()); + LOG.trace("DETAIL of sending of hello failure:", t); + resultFtr.cancel(false); + handshakeListener.onHandshakeFailure(); + } + }); + LOG.trace("sending hello message [{}] - result hooked ..", helloXid); + return resultFtr; } @@ -306,9 +359,9 @@ public class HandshakeManagerImpl implements HandshakeManager { * after handshake set features, register to session * @param proposedVersion * @param xid - * @throws Exception + * @throws Exception */ - protected void postHandshake(Short proposedVersion, Long xid) throws Exception { + protected void postHandshake(final Short proposedVersion, final Long xid) { // set version version = proposedVersion; @@ -319,44 +372,54 @@ public class HandshakeManagerImpl implements HandshakeManager { LOG.debug("sending feature request for version={} and xid={}", version, xid); Future> featuresFuture = connectionAdapter .getFeatures(featuresBuilder.build()); - LOG.debug("waiting for features"); - try { - RpcResult rpcFeatures = - featuresFuture.get(maxTimeout, maxTimeoutUnit); - RpcUtil.smokeRpc(rpcFeatures); - - GetFeaturesOutput featureOutput = rpcFeatures.getResult(); - - LOG.debug("obtained features: datapathId={}", - featureOutput.getDatapathId()); - LOG.debug("obtained features: auxiliaryId={}", - featureOutput.getAuxiliaryId()); - LOG.trace("handshake SETTLED: version={}, datapathId={}, auxiliaryId={}", - version, featureOutput.getDatapathId(), featureOutput.getAuxiliaryId()); - - handshakeListener.onHandshakeSuccessfull(featureOutput, proposedVersion); - } catch (TimeoutException e) { - // handshake failed - LOG.warn("issuing disconnect during handshake, reason: future expired", e); - connectionAdapter.disconnect(); - handshakeListener.onHandshakeFailure(); - throw e; - } catch (Exception e) { - // handshake failed - LOG.warn("issuing disconnect during handshake, reason - RPC: {}", e.getMessage(), e); - connectionAdapter.disconnect(); - handshakeListener.onHandshakeFailure(); - throw e; - } - - LOG.debug("postHandshake DONE"); + + Futures.addCallback(JdkFutureAdapters.listenInPoolThread(featuresFuture), + new FutureCallback>() { + @Override + public void onSuccess(RpcResult rpcFeatures) { + LOG.trace("features are back"); + if (rpcFeatures.isSuccessful()) { + GetFeaturesOutput featureOutput = rpcFeatures.getResult(); + + LOG.debug("obtained features: datapathId={}", + featureOutput.getDatapathId()); + LOG.debug("obtained features: auxiliaryId={}", + featureOutput.getAuxiliaryId()); + LOG.trace("handshake SETTLED: version={}, datapathId={}, auxiliaryId={}", + version, featureOutput.getDatapathId(), featureOutput.getAuxiliaryId()); + handshakeListener.onHandshakeSuccessfull(featureOutput, proposedVersion); + } else { + // handshake failed + LOG.warn("issuing disconnect during handshake [{}]", connectionAdapter.getRemoteAddress()); + for (RpcError rpcError : rpcFeatures.getErrors()) { + LOG.debug("handshake - features failure [{}]: i:{} | m:{} | s:{}", xid, + rpcError.getInfo(), rpcError.getMessage(), rpcError.getSeverity(), + rpcError.getCause() + ); + } + handshakeListener.onHandshakeFailure(); + } + + LOG.debug("postHandshake DONE"); + } + + @Override + public void onFailure(Throwable t) { + LOG.warn("getting feature failed seriously [{}, addr:{}]: {}", xid, + connectionAdapter.getRemoteAddress(), t.getMessage()); + LOG.trace("DETAIL of sending of hello failure:", t); + } + }); + + LOG.debug("future features [{}] hooked ..", xid); + } @Override public void setUseVersionBitmap(boolean useVersionBitmap) { this.useVersionBitmap = useVersionBitmap; } - + @Override public void setErrorHandler(ErrorHandler errorHandler) { this.errorHandler = errorHandler;