package org.opendaylight.openflowplugin.api.openflow.connection;
import java.util.concurrent.ThreadPoolExecutor;
-
import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeManager;
/**
* OF handshake context holder
*/
-public interface HandshakeContext {
+public interface HandshakeContext extends AutoCloseable {
/**
* @return handshakeManager
*/
package org.opendaylight.openflowplugin.api.openflow.md.core;
+import org.opendaylight.openflowplugin.api.openflow.connection.HandshakeContext;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
/**
*
*/
public interface HandshakeListener {
-
+
/**
* @param featureOutput obtained
* @param version negotiated
*/
void onHandshakeFailure();
+ /**
+ * @param handshakeContext
+ */
+ void setHandshakeContext(HandshakeContext handshakeContext);
}
*/
package org.opendaylight.openflowplugin.api.openflow.md.core;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
/**
*/
Short getVersion();
- /**
- * @return obtained connection features
- */
- GetFeaturesOutput getFeatures();
-
- /**
- * @param receivedHello from switch
- */
- void setReceivedHello(HelloMessage receivedHello);
-
/**
* @param errorHandler the errorHandler to set
*/
void setUseVersionBitmap(boolean isBitmapNegotiationEnable);
/**
+ * @param receivedHello message from device we need to act upon
* process current handshake step
*/
- void shake();
+ void shake(HelloMessage receivedHello);
}
*/
package org.opendaylight.openflowplugin.impl.connection;
-import org.opendaylight.openflowplugin.openflow.md.core.ErrorHandlerSimpleImpl;
-
import java.net.InetAddress;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
-
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
import org.opendaylight.openflowplugin.impl.connection.listener.HandshakeListenerImpl;
import org.opendaylight.openflowplugin.impl.connection.listener.OpenflowProtocolListenerInitialImpl;
import org.opendaylight.openflowplugin.impl.connection.listener.SystemNotificationsListenerImpl;
+import org.opendaylight.openflowplugin.openflow.md.core.ErrorHandlerSimpleImpl;
import org.opendaylight.openflowplugin.openflow.md.core.HandshakeManagerImpl;
import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
@Override
public void onSwitchConnected(final ConnectionAdapter connectionAdapter) {
- LOG.trace("preparing handshake");
+ LOG.trace("preparing handshake: {}", connectionAdapter.getRemoteAddress());
final int handshakeThreadLimit = 1; //TODO: move to constants/parametrize
final ThreadPoolLoggingExecutor handshakePool = createHandshakePool(
LOG.trace("prepare handshake context");
HandshakeContext handshakeContext = new HandshakeContextImpl(handshakePool, handshakeManager);
+ handshakeListener.setHandshakeContext(handshakeContext);
LOG.trace("prepare connection listeners");
final ConnectionReadyListener connectionReadyListener = new ConnectionReadyListenerImpl(
final SystemNotificationsListener systemListener = new SystemNotificationsListenerImpl(connectionContext);
connectionAdapter.setSystemListener(systemListener);
- LOG.trace("connection balet finished");
+ LOG.trace("connection ballet finished");
}
/**
/**
* Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
+ *
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
package org.opendaylight.openflowplugin.impl.connection;
import java.util.concurrent.ThreadPoolExecutor;
-
+import java.util.concurrent.TimeUnit;
import org.opendaylight.openflowplugin.api.openflow.connection.HandshakeContext;
import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- *
+ *
*/
public class HandshakeContextImpl implements HandshakeContext {
+ private static Logger LOG = LoggerFactory.getLogger(HandshakeContextImpl.class);
+
private ThreadPoolExecutor handshakePool;
private HandshakeManager handshakeManager;
return handshakePool;
}
+ @Override
+ public void close() throws Exception {
+ shutdownPoolPolitely();
+ }
+
+ private void shutdownPoolPolitely() {
+ LOG.debug("terminating handshake pool");
+ handshakePool.shutdown();
+ try {
+ handshakePool.awaitTermination(1, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.info("Error while awaiting termination on pool. Will use shutdownNow method.");
+ } finally {
+ handshakePool.purge();
+ if (! handshakePool.isTerminated()) {
+ handshakePool.shutdownNow();
+ }
+ LOG.debug("pool is terminated: {}", handshakePool.isTerminated());
+ }
+ }
}
/**
* Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
+ *
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
* oneshot listener - once connection is ready, initiate handshake (if not already started by device)
*/
public class ConnectionReadyListenerImpl implements ConnectionReadyListener {
-
+
private static final Logger LOG = LoggerFactory.getLogger(ConnectionReadyListenerImpl.class);
-
+
private ConnectionContext connectionContext;
private HandshakeContext handshakeContext;
@Override
public void onConnectionReady() {
- LOG.debug("device is connected and ready-to-use (pipeline prepared)");
+ LOG.debug("device is connected and ready-to-use (pipeline prepared): {}",
+ connectionContext.getConnectionAdapter().getRemoteAddress());
+
if (connectionContext.getConnectionState() == null) {
HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper(
null, handshakeContext.getHandshakeManager(), connectionContext.getConnectionAdapter());
*/
package org.opendaylight.openflowplugin.impl.connection.listener;
-import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
-
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
+import org.opendaylight.openflowplugin.api.openflow.connection.HandshakeContext;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceConnectedHandler;
import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener;
+import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private ConnectionContext connectionContext;
private DeviceConnectedHandler deviceConnectedHandler;
+ private HandshakeContext handshakeContext;
/**
* @param connectionContext
public void onHandshakeFailure() {
LOG.info("handshake failed: {}", connectionContext.getConnectionAdapter().getRemoteAddress());
connectionContext.setConnectionState(ConnectionContext.CONNECTION_STATE.RIP);
- // TODO ensure that connection is closed
+ try {
+ handshakeContext.close();
+ } catch (Exception e) {
+ LOG.warn("Closing handshake context failed: {}", e.getMessage());
+ LOG.debug("Detail in hanshake context close:", e);
+ }
+ }
+
+ @Override
+ public void setHandshakeContext(HandshakeContext handshakeContext) {
+ this.handshakeContext = handshakeContext;
}
}
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
import org.opendaylight.openflowplugin.api.openflow.connection.HandshakeContext;
import org.opendaylight.openflowplugin.openflow.md.core.HandshakeStepWrapper;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.*;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
if (checkState(ConnectionContext.CONNECTION_STATE.HANDSHAKING)) {
final HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper(
hello, handshakeContext.getHandshakeManager(), connectionContext.getConnectionAdapter());
- handshakeContext.getHandshakePool().submit(handshakeStepWrapper);
+ //handshakeContext.getHandshakePool().submit(handshakeStepWrapper);
+ // use up netty thread
+ handshakeStepWrapper.run();
} else {
//TODO: consider disconnecting of bad behaving device
}
package org.opendaylight.openflowplugin.openflow.md.core;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
import org.opendaylight.openflowplugin.api.OFConstants;
+import org.opendaylight.openflowplugin.api.openflow.connection.HandshakeContext;
import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
import org.opendaylight.openflowplugin.api.openflow.md.core.ErrorHandler;
import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.util.concurrent.Futures;
-
/**
* @author mirehak
*/
private int conductorId;
private int ingressMaxQueueSize;
+ private HandshakeContext handshakeContext;
/**
* @param connectionAdapter
@Override
public void onHandshakeSuccessfull(GetFeaturesOutput featureOutput,
- Short negotiatedVersion) {
+ Short negotiatedVersion) {
postHandshakeBasic(featureOutput, negotiatedVersion);
// post-handshake actions
/**
* used by tests
- *
+ *
* @param featureOutput
* @param negotiatedVersion
*/
this.isBitmapNegotiationEnable = isBitmapNegotiationEnable;
}
- protected void shutdownPool() {
- hsPool.shutdownNow();
- LOG.debug("pool is terminated: {}", hsPool.isTerminated());
+ @Override
+ public void setId(int conductorId) {
+ this.conductorId = conductorId;
}
- protected void shutdownPoolPolitely() {
- hsPool.shutdown();
+ @Override
+ public void close() {
+ conductorState = CONDUCTOR_STATE.RIP;
try {
- hsPool.awaitTermination(1, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- LOG.info("Error while awaiting termination on pool. Will use shutdownNow method.");
- shutdownPool();
+ handshakeContext.close();
+ } catch (Exception e) {
+ LOG.warn("Closing handshake context failed: {}", e.getMessage());
+ LOG.debug("Detail in hanshake context close:", e);
}
- hsPool.purge();
- LOG.debug("pool is terminated: {}", hsPool.isTerminated());
}
@Override
- public void setId(int conductorId) {
- this.conductorId = conductorId;
+ public void setHandshakeContext(HandshakeContext handshakeContext) {
+ this.handshakeContext = handshakeContext;
}
- @Override
- public void close() {
- shutdownPoolPolitely();
- conductorState = CONDUCTOR_STATE.RIP;
+ @VisibleForTesting
+ ThreadPoolExecutor getHsPool() {
+ return hsPool;
}
}
*/
package org.opendaylight.openflowplugin.openflow.md.core;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
-import org.opendaylight.openflowplugin.ConnectionException;
import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
import org.opendaylight.openflowplugin.api.openflow.md.core.ErrorHandler;
import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
+import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
*
*/
public class HandshakeManagerImpl implements HandshakeManager {
-
+
private static final Logger LOG = LoggerFactory
.getLogger(HandshakeManagerImpl.class);
-
+
private Short lastProposedVersion;
private Short lastReceivedVersion;
private final List<Short> versionOrder;
-
- private HelloMessage receivedHello;
+
+ //private HelloMessage receivedHello;
private final ConnectionAdapter connectionAdapter;
- private GetFeaturesOutput features;
private Short version;
private ErrorHandler errorHandler;
-
+
private long maxTimeout = 8000;
private TimeUnit maxTimeoutUnit = TimeUnit.MILLISECONDS;
private Short highestVersion;
private HandshakeListener handshakeListener;
private boolean useVersionBitmap;
-
- @Override
- public void setReceivedHello(HelloMessage receivedHello) {
- this.receivedHello = receivedHello;
- }
-
+
/**
- * @param connectionAdapter
- * @param highestVersion
+ * @param connectionAdapter
+ * @param highestVersion
* @param versionOrder
*/
- public HandshakeManagerImpl(ConnectionAdapter connectionAdapter, Short highestVersion,
+ public HandshakeManagerImpl(ConnectionAdapter connectionAdapter, Short highestVersion,
List<Short> versionOrder) {
this.highestVersion = highestVersion;
this.versionOrder = versionOrder;
this.connectionAdapter = connectionAdapter;
}
-
+
@Override
public void setHandshakeListener(HandshakeListener handshakeListener) {
this.handshakeListener = handshakeListener;
}
@Override
- public void shake() {
+ public synchronized void shake(HelloMessage receivedHello) {
if (version != null) {
// Some switches respond with a second HELLO acknowledging our HELLO
LOG.trace("handshake STARTED");
setActiveXid(20L);
- HelloMessage receivedHelloLoc = receivedHello;
-
+
try {
- if (receivedHelloLoc == null) {
+ if (receivedHello == null) {
// first Hello sending
sendHelloMessage(highestVersion, getNextXid());
lastProposedVersion = highestVersion;
}
// process the 2. and later hellos
- Short remoteVersion = receivedHelloLoc.getVersion();
- List<Elements> elements = receivedHelloLoc.getElements();
- setActiveXid(receivedHelloLoc.getXid());
+ Short remoteVersion = receivedHello.getVersion();
+ List<Elements> elements = receivedHello.getElements();
+ setActiveXid(receivedHello.getXid());
List<Boolean> remoteVersionBitmap = MessageFactory.digVersions(elements);
- LOG.debug("Hello message: version={}, bitmap={}, xid={}", remoteVersion,
- remoteVersionBitmap, receivedHelloLoc.getXid());
-
+ LOG.debug("Hello message: version={}, xid={}, bitmap={}", remoteVersion,
+ receivedHello.getXid(), remoteVersionBitmap);
+
if (useVersionBitmap && remoteVersionBitmap != null) {
// versionBitmap on both sides -> ONE STEP DECISION
handleVersionBitmapNegotiation(elements);
- } else {
- // versionBitmap missing at least on one side -> STEP-BY-STEP NEGOTIATION applying
+ } else {
+ // versionBitmap missing at least on one side -> STEP-BY-STEP NEGOTIATION applying
handleStepByStepVersionNegotiation(remoteVersion);
}
} catch (Exception ex) {
errorHandler.handleException(ex, null);
- connectionAdapter.disconnect();
+ LOG.trace("ret - shake fail - closing");
handshakeListener.onHandshakeFailure();
- LOG.trace("ret - shake fail: {}", ex.getMessage());
}
}
/**
* @param remoteVersion
- * @throws Exception
+ * @throws Exception
*/
- private void handleStepByStepVersionNegotiation(Short remoteVersion) throws Exception {
- LOG.debug("remoteVersion:{} lastProposedVersion:{}, highestVersion:{}",
+ private void handleStepByStepVersionNegotiation(final Short remoteVersion) throws Exception {
+ LOG.debug("remoteVersion:{} lastProposedVersion:{}, highestVersion:{}",
remoteVersion, lastProposedVersion, highestVersion);
-
+
if (lastProposedVersion == null) {
- // first hello has not been sent yet, send it and either wait for next remote
+ // first hello has not been sent yet, send it and either wait for next remote
// version or proceed
lastProposedVersion = proposeNextVersion(remoteVersion);
- sendHelloMessage(lastProposedVersion, getNextXid());
+ final Long nextHelloXid = getNextXid();
+ ListenableFuture<Void> helloResult = sendHelloMessage(lastProposedVersion, nextHelloXid);
+ Futures.addCallback(helloResult, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(Void result) {
+ try {
+ stepByStepVersionSubStep(remoteVersion, lastProposedVersion);
+ } catch (Exception e) {
+ errorHandler.handleException(e, null);
+ handshakeListener.onHandshakeFailure();
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.info("hello sending seriously failed [{}]", nextHelloXid);
+ LOG.trace("detail of hello send problem", t);
+ }
+ });
+ } else {
+ stepByStepVersionSubStep(remoteVersion, lastProposedVersion);
}
-
+ }
+
+ private void stepByStepVersionSubStep(Short remoteVersion, Short lastProposedVersion) throws Exception {
if (remoteVersion == lastProposedVersion) {
postHandshake(lastProposedVersion, getNextXid());
LOG.trace("ret - OK - switch answered with lastProposedVersion");
} else {
checkNegotiationStalling(remoteVersion);
- if (remoteVersion > (lastProposedVersion == null ? highestVersion : lastProposedVersion)) {
+ if (remoteVersion > (lastProposedVersion == null ? highestVersion : this.lastProposedVersion)) {
// wait for next version
LOG.trace("ret - wait");
} else {
/**
* @param remoteVersion
- * @throws Exception
+ * @throws Exception
*/
private void handleLowerVersionProposal(Short remoteVersion) throws Exception {
Short proposedVersion;
lastProposedVersion = proposedVersion;
sendHelloMessage(proposedVersion, getNextXid());
- if (proposedVersion != remoteVersion) {
+ if (! Objects.equals(proposedVersion, remoteVersion)) {
LOG.trace("ret - sent+wait");
} else {
LOG.trace("ret - sent+OK");
/**
* @param elements
- * @throws Exception
+ * @throws Exception
*/
private void handleVersionBitmapNegotiation(List<Elements> elements) throws Exception {
- Short proposedVersion;
- proposedVersion = proposeCommonBitmapVersion(elements);
+ final Short proposedVersion = proposeCommonBitmapVersion(elements);
if (lastProposedVersion == null) {
// first hello has not been sent yet
- sendHelloMessage(proposedVersion, getNextXid());
+ Long nexHelloXid = getNextXid();
+ ListenableFuture<Void> helloDone = sendHelloMessage(proposedVersion, nexHelloXid);
+ Futures.addCallback(helloDone, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(Void result) {
+ LOG.trace("ret - DONE - versionBitmap");
+ postHandshake(proposedVersion, getNextXid());
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ // NOOP
+ }
+ });
+ LOG.trace("next proposal [{}] with versionBitmap hooked ..", nexHelloXid);
+ } else {
+ LOG.trace("ret - DONE - versionBitmap");
+ postHandshake(proposedVersion, getNextXid());
}
- postHandshake(proposedVersion, getNextXid());
- LOG.trace("ret - OK - versionBitmap");
}
-
+
/**
- *
+ *
* @return
*/
private Long getNextXid() {
- activeXid += 1;
+ activeXid += 1;
return activeXid;
}
private void setActiveXid(Long xid) {
this.activeXid = xid;
}
-
+
/**
* @param remoteVersion
*/
lastReceivedVersion = remoteVersion;
}
- @Override
- public GetFeaturesOutput getFeatures() {
- return features;
- }
-
@Override
public Short getVersion() {
return version;
}
}
}
-
+
if(null == supportedHighestVersion) {
+ LOG.trace("versionBitmap: no common version found");
throw new IllegalArgumentException("no common version found in versionBitmap");
}
}
}
return proposal;
}
-
+
/**
* send hello reply without versionBitmap
* @param helloVersion
* @param helloXid
- * @throws Exception
+ * @throws Exception
*/
- private void sendHelloMessage(Short helloVersion, Long helloXid) throws Exception {
+ private ListenableFuture<Void> sendHelloMessage(Short helloVersion, final Long helloXid) throws Exception {
//Short highestVersion = ConnectionConductor.versionOrder.get(0);
//final Long helloXid = 21L;
HelloInput helloInput = MessageFactory.createHelloInput(helloVersion, helloXid, versionOrder);
-
- LOG.debug("sending hello message: version{}, xid={}, version bitmap={}",
+
+ final SettableFuture<Void> resultFtr = SettableFuture.create();
+
+ LOG.debug("sending hello message: version{}, xid={}, version bitmap={}",
helloVersion, helloXid, MessageFactory.digVersions(helloInput.getElements()));
-
- try {
- RpcResult<Void> helloResult = connectionAdapter.hello(helloInput).get(maxTimeout, maxTimeoutUnit);
- RpcUtil.smokeRpc(helloResult);
- LOG.debug("FIRST HELLO sent.");
- } catch (Exception e) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("FIRST HELLO sent.", e);
+
+ Future<RpcResult<Void>> helloResult = connectionAdapter.hello(helloInput);
+
+ ListenableFuture<RpcResult<Void>> rpcResultListenableFuture = JdkFutureAdapters.listenInPoolThread(helloResult);
+ Futures.addCallback(rpcResultListenableFuture, new FutureCallback<RpcResult<Void>>() {
+ @Override
+ public void onSuccess(RpcResult<Void> result) {
+ if (result.isSuccessful()) {
+ LOG.debug("hello successfully sent, xid={}, addr={}", helloXid, connectionAdapter.getRemoteAddress());
+ resultFtr.set(null);
+ } else {
+ for (RpcError error : result.getErrors()) {
+ LOG.debug("hello sending failed [{}]: i:{} s:{} m:{}, addr:{}", helloXid,
+ error.getInfo(), error.getSeverity(), error.getMessage(),
+ connectionAdapter.getRemoteAddress());
+ if (error.getCause() != null) {
+ LOG.trace("DETAIL of sending hello failure", error.getCause());
+ }
+ }
+ resultFtr.cancel(false);
+ handshakeListener.onHandshakeFailure();
+ }
}
- handshakeListener.onHandshakeFailure();
- throw new ConnectionException("FIRST HELLO sending failed because of connection issue.");
- }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.warn("sending of hello failed seriously [{}, addr:{}]: {}", helloXid,
+ connectionAdapter.getRemoteAddress(), t.getMessage());
+ LOG.trace("DETAIL of sending of hello failure:", t);
+ resultFtr.cancel(false);
+ handshakeListener.onHandshakeFailure();
+ }
+ });
+ LOG.trace("sending hello message [{}] - result hooked ..", helloXid);
+ return resultFtr;
}
* after handshake set features, register to session
* @param proposedVersion
* @param xid
- * @throws Exception
+ * @throws Exception
*/
- protected void postHandshake(Short proposedVersion, Long xid) throws Exception {
+ protected void postHandshake(final Short proposedVersion, final Long xid) {
// set version
version = proposedVersion;
LOG.debug("sending feature request for version={} and xid={}", version, xid);
Future<RpcResult<GetFeaturesOutput>> featuresFuture = connectionAdapter
.getFeatures(featuresBuilder.build());
- LOG.debug("waiting for features");
- try {
- RpcResult<GetFeaturesOutput> rpcFeatures =
- featuresFuture.get(maxTimeout, maxTimeoutUnit);
- RpcUtil.smokeRpc(rpcFeatures);
-
- GetFeaturesOutput featureOutput = rpcFeatures.getResult();
-
- LOG.debug("obtained features: datapathId={}",
- featureOutput.getDatapathId());
- LOG.debug("obtained features: auxiliaryId={}",
- featureOutput.getAuxiliaryId());
- LOG.trace("handshake SETTLED: version={}, datapathId={}, auxiliaryId={}",
- version, featureOutput.getDatapathId(), featureOutput.getAuxiliaryId());
-
- handshakeListener.onHandshakeSuccessfull(featureOutput, proposedVersion);
- } catch (TimeoutException e) {
- // handshake failed
- LOG.warn("issuing disconnect during handshake, reason: future expired", e);
- connectionAdapter.disconnect();
- handshakeListener.onHandshakeFailure();
- throw e;
- } catch (Exception e) {
- // handshake failed
- LOG.warn("issuing disconnect during handshake, reason - RPC: {}", e.getMessage(), e);
- connectionAdapter.disconnect();
- handshakeListener.onHandshakeFailure();
- throw e;
- }
-
- LOG.debug("postHandshake DONE");
+
+ Futures.addCallback(JdkFutureAdapters.listenInPoolThread(featuresFuture),
+ new FutureCallback<RpcResult<GetFeaturesOutput>>() {
+ @Override
+ public void onSuccess(RpcResult<GetFeaturesOutput> rpcFeatures) {
+ LOG.trace("features are back");
+ if (rpcFeatures.isSuccessful()) {
+ GetFeaturesOutput featureOutput = rpcFeatures.getResult();
+
+ LOG.debug("obtained features: datapathId={}",
+ featureOutput.getDatapathId());
+ LOG.debug("obtained features: auxiliaryId={}",
+ featureOutput.getAuxiliaryId());
+ LOG.trace("handshake SETTLED: version={}, datapathId={}, auxiliaryId={}",
+ version, featureOutput.getDatapathId(), featureOutput.getAuxiliaryId());
+ handshakeListener.onHandshakeSuccessfull(featureOutput, proposedVersion);
+ } else {
+ // handshake failed
+ LOG.warn("issuing disconnect during handshake [{}]", connectionAdapter.getRemoteAddress());
+ for (RpcError rpcError : rpcFeatures.getErrors()) {
+ LOG.debug("handshake - features failure [{}]: i:{} | m:{} | s:{}", xid,
+ rpcError.getInfo(), rpcError.getMessage(), rpcError.getSeverity(),
+ rpcError.getCause()
+ );
+ }
+ handshakeListener.onHandshakeFailure();
+ }
+
+ LOG.debug("postHandshake DONE");
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.warn("getting feature failed seriously [{}, addr:{}]: {}", xid,
+ connectionAdapter.getRemoteAddress(), t.getMessage());
+ LOG.trace("DETAIL of sending of hello failure:", t);
+ }
+ });
+
+ LOG.debug("future features [{}] hooked ..", xid);
+
}
@Override
public void setUseVersionBitmap(boolean useVersionBitmap) {
this.useVersionBitmap = useVersionBitmap;
}
-
+
@Override
public void setErrorHandler(ErrorHandler errorHandler) {
this.errorHandler = errorHandler;
*
*/
public class HandshakeStepWrapper implements Runnable {
-
+
private static final Logger LOG = LoggerFactory
.getLogger(HandshakeStepWrapper.class);
-
+
private HelloMessage helloMessage;
private HandshakeManager handshakeManager;
private ConnectionAdapter connectionAdapter;
-
-
-
+
+
+
/**
* @param helloMessage
* @param handshakeManager
- * @param connectionAdapter
+ * @param connectionAdapter
*/
public HandshakeStepWrapper(HelloMessage helloMessage,
HandshakeManager handshakeManager, ConnectionAdapter connectionAdapter) {
@Override
public void run() {
if (connectionAdapter.isAlive()) {
- handshakeManager.setReceivedHello(helloMessage);
- handshakeManager.shake();
+ handshakeManager.shake(helloMessage);
} else {
LOG.debug("connection is down - skipping handshake step");
}
package org.opendaylight.openflowplugin.openflow.md.core;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
import java.math.BigInteger;
import java.util.ArrayList;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import org.opendaylight.openflowplugin.api.OFConstants;
+import org.opendaylight.openflowplugin.api.openflow.connection.HandshakeContext;
import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
import org.opendaylight.openflowplugin.api.openflow.md.core.IMDMessageTranslator;
import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
+import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.PopListener;
+import org.opendaylight.openflowplugin.api.openflow.statistics.MessageSpy;
import org.opendaylight.openflowplugin.openflow.md.core.plan.ConnectionAdapterStackImpl;
import org.opendaylight.openflowplugin.openflow.md.core.plan.EventFactory;
import org.opendaylight.openflowplugin.openflow.md.core.plan.SwitchTestEvent;
-import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
-import org.opendaylight.openflowplugin.api.openflow.md.queue.PopListener;
import org.opendaylight.openflowplugin.openflow.md.queue.QueueProcessorLightImpl;
-import org.opendaylight.openflowplugin.api.openflow.statistics.MessageSpy;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.Capabilities;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortFeatures;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortFeaturesV10;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessageBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessageBuilder;
import org.opendaylight.yangtools.yang.binding.DataContainer;
private int expectedErrors = 0;
@Mock
private MessageSpy<DataContainer> messageSpy;
+ @Mock
+ HandshakeContext handshakeContext;
public void incrExperimenterMessageCounter() {
this.experimenterMessageCounter++;
connectionConductor.setQueueProcessor(queueProcessor);
connectionConductor.setErrorHandler(errorHandler);
connectionConductor.init();
+ connectionConductor.setHandshakeContext(handshakeContext);
eventPlan = new Stack<>();
adapter.setEventPlan(eventPlan);
adapter.setProceedTimeout(5000L);
libSimulation.join();
}
queueProcessor.shutdown();
- connectionConductor.shutdownPool();
+ connectionConductor.getHsPool().shutdown();
for (Exception problem : adapter.getOccuredExceptions()) {
LOG.error("during simulation on adapter side: "
EventFactory.createDefaultWaitForRpcEvent(45, "getFeatures"));
eventPlan.add(0, EventFactory.createDefaultRpcResponseEvent(45,
- EventFactory.DEFAULT_VERSION, getFeatureResponseMsg()));
+ (short) 0x01, getFeatureResponseMsg()));
int i = 1;
eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(i++, "multipartRequestInput"));
EventFactory.createDefaultWaitForRpcEvent(45, "getFeatures"));
eventPlan.add(0, EventFactory.createDefaultRpcResponseEvent(45,
- EventFactory.DEFAULT_VERSION, getFeatureResponseMsg()));
+ (short) 0x01, getFeatureResponseMsg()));
int i = 1;
eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(i++, "multipartRequestInput"));
*/
private void executeNow() throws InterruptedException {
execute(true);
- connectionConductor.shutdownPool();
+ connectionConductor.getHsPool().shutdown();
}
/**
/**
* Test method for
- * {@link org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductorImpl#onExperimenterMessage(org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage)}
+ * {@link org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductorImpl#onExperimenterMessage(ExperimenterMessage)}
* .
*
* @throws InterruptedException
/**
* Test method for
- * {@link org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductorImpl#processPortStatusMsg(org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage)}
+ * {@link org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductorImpl#processPortStatusMsg(PortStatus)}
* <br><br>
* Tests for getting features from port status message by port version
* <ul>
PortFeatures featuresMal = new PortFeatures(true, false, false, false, null, false, false, false, false, false, false, false, false, false, false, false);
PortFeaturesV10 featuresV10 = new PortFeaturesV10(true, false, false, false, false, false, false, false, false, false, false, false);
- //Malformed features
+ //Malformed features
builder.setVersion((short) 1).setPortNo(portNumber).setReason(PortReason.OFPPRADD).setCurrentFeatures(featuresMal);
connectionConductor.processPortStatusMsg(builder.build());
Assert.assertTrue(connectionConductor.getSessionContext().getPortsBandwidth().isEmpty());
Assert.assertTrue(connectionConductor.getSessionContext().getPhysicalPorts().isEmpty());
- //Version-features mismatch
+ //Version-features mismatch
builder.setCurrentFeatures(features);
connectionConductor.processPortStatusMsg(builder.build());
Assert.assertTrue(connectionConductor.getSessionContext().getPortsBandwidth().isEmpty());
*/
package org.opendaylight.openflowplugin.openflow.md.core;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
import java.util.ArrayList;
import java.util.List;
-
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Futures;
-
/**
* testing handshake
*/
*/
@Before
public void setUp() {
- handshakeManager = new HandshakeManagerImpl(adapter, OFConstants.OFP_VERSION_1_3,
+ handshakeManager = new HandshakeManagerImpl(adapter, OFConstants.OFP_VERSION_1_3,
ConnectionConductor.versionOrder);
handshakeManager.setErrorHandler(errorHandler);
handshakeManager.setHandshakeListener(handshakeListener);
Mockito.when(adapter.getFeatures(Matchers.any(GetFeaturesInput.class)))
.thenReturn(Futures.immediateFuture(resultFeatures));
- handshakeManager.shake();
+ handshakeManager.shake(null);
- handshakeManager.setReceivedHello(createHelloMessage(version, helloXid).build());
- handshakeManager.shake();
+ handshakeManager.shake(createHelloMessage(version, helloXid).build());
Mockito.verify(handshakeListener).onHandshakeSuccessfull(resultFeatures.getResult(), version);
}
Mockito.when(adapter.getFeatures(Matchers.any(GetFeaturesInput.class)))
.thenReturn(Futures.immediateFuture(resultFeatures));
- handshakeManager.setReceivedHello(createHelloMessage(version, helloXid).build());
- handshakeManager.shake();
+ handshakeManager.shake(createHelloMessage(version, helloXid).build());
Mockito.verify(handshakeListener).onHandshakeSuccessfull(resultFeatures.getResult(), version);
}
/**
* Test of version negotiation Where switch version < 1.0
- * Switch delivers first helloMessage with version 0x00 = negotiation unsuccessful
+ * Switch delivers first helloMessage with version 0x00 = negotiation unsuccessful
* @throws Exception
*/
@Test
expectedErrors = 1;
Short version = (short) 0x00;
- handshakeManager.setReceivedHello(createHelloMessage(version, helloXid).build());
- handshakeManager.shake();
+ handshakeManager.shake(createHelloMessage(version, helloXid).build());
Mockito.verify(handshakeListener, Mockito.never()).onHandshakeSuccessfull(
Matchers.any(GetFeaturesOutput.class), Matchers.anyShort());
/**
* Test of version negotiation Where switch version < 1.0
- * Switch delivers first helloMessage with version 0x00 = negotiation unsuccessful
+ * Switch delivers first helloMessage with version 0x00 = negotiation unsuccessful
* @throws Exception
*/
@Test
expectedErrors = 1;
Short version = (short) 0x00;
- handshakeManager.shake();
+ handshakeManager.shake(null);
- handshakeManager.setReceivedHello(createHelloMessage(version, helloXid).build());
- handshakeManager.shake();
+ handshakeManager.shake(createHelloMessage(version, helloXid).build());
Mockito.verify(handshakeListener, Mockito.never()).onHandshakeSuccessfull(
Matchers.any(GetFeaturesOutput.class), Matchers.anyShort());
Mockito.when(adapter.getFeatures(Matchers.any(GetFeaturesInput.class)))
.thenReturn(Futures.immediateFuture(resultFeatures));
- handshakeManager.setReceivedHello(createHelloMessage(version, helloXid).build());
- handshakeManager.shake();
+ handshakeManager.shake(createHelloMessage(version, helloXid).build());
- handshakeManager.setReceivedHello(createHelloMessage(expVersion, helloXid).build());
- handshakeManager.shake();
+ handshakeManager.shake(createHelloMessage(expVersion, helloXid).build());
Mockito.verify(handshakeListener).onHandshakeSuccessfull(
resultFeatures.getResult(), expVersion);
Mockito.when(adapter.getFeatures(Matchers.any(GetFeaturesInput.class)))
.thenReturn(Futures.immediateFuture(resultFeatures));
- handshakeManager.shake();
+ handshakeManager.shake(null);
- handshakeManager.setReceivedHello(createHelloMessage(version, helloXid).build());
- handshakeManager.shake();
+ handshakeManager.shake(createHelloMessage(version, helloXid).build());
- handshakeManager.setReceivedHello(createHelloMessage(expVersion, helloXid).build());
- handshakeManager.shake();
+ handshakeManager.shake(createHelloMessage(expVersion, helloXid).build());
Mockito.verify(handshakeListener).onHandshakeSuccessfull(
resultFeatures.getResult(), expVersion);
Mockito.when(adapter.getFeatures(Matchers.any(GetFeaturesInput.class)))
.thenReturn(Futures.immediateFuture(resultFeatures));
- handshakeManager.setReceivedHello(createHelloMessage(version, helloXid).build());
- handshakeManager.shake();
+ handshakeManager.shake(createHelloMessage(version, helloXid).build());
Mockito.verify(handshakeListener).onHandshakeSuccessfull(
resultFeatures.getResult(), version);
Mockito.when(adapter.getFeatures(Matchers.any(GetFeaturesInput.class)))
.thenReturn(Futures.immediateFuture(resultFeatures));
- handshakeManager.shake();
+ handshakeManager.shake(null);
- handshakeManager.setReceivedHello(createHelloMessage(version, helloXid).build());
- handshakeManager.shake();
+ handshakeManager.shake(createHelloMessage(version, helloXid).build());
Mockito.verify(handshakeListener).onHandshakeSuccessfull(
resultFeatures.getResult(), version);
Mockito.when(adapter.getFeatures(Matchers.any(GetFeaturesInput.class)))
.thenReturn(Futures.immediateFuture(resultFeatures));
- handshakeManager.setReceivedHello(createHelloMessage(version, helloXid).build());
- handshakeManager.shake();
+ handshakeManager.shake(createHelloMessage(version, helloXid).build());
- handshakeManager.setReceivedHello(createHelloMessage(expVersion, helloXid).build());
- handshakeManager.shake();
+ handshakeManager.shake(createHelloMessage(expVersion, helloXid).build());
Mockito.verify(handshakeListener).onHandshakeSuccessfull(
resultFeatures.getResult(), expVersion);
Mockito.when(adapter.getFeatures(Matchers.any(GetFeaturesInput.class)))
.thenReturn(Futures.immediateFuture(resultFeatures));
- handshakeManager.setReceivedHello(createHelloMessage(version, helloXid).build());
- handshakeManager.shake();
+ handshakeManager.shake(createHelloMessage(version, helloXid).build());
- handshakeManager.setReceivedHello(createHelloMessage(expVersion, helloXid).build());
- handshakeManager.shake();
+ handshakeManager.shake(createHelloMessage(expVersion, helloXid).build());
Mockito.verify(handshakeListener).onHandshakeSuccessfull(
resultFeatures.getResult(), expVersion);
Short version = (short) 0x06;
expectedErrors = 1;
- handshakeManager.setReceivedHello(createHelloMessage(version, helloXid).build());
- handshakeManager.shake();
+ handshakeManager.shake(createHelloMessage(version, helloXid).build());
- handshakeManager.setReceivedHello(createHelloMessage(version, helloXid).build());
- handshakeManager.shake();
+ handshakeManager.shake(createHelloMessage(version, helloXid).build());
Mockito.verify(handshakeListener, Mockito.never()).onHandshakeSuccessfull(
Matchers.any(GetFeaturesOutput.class), Matchers.anyShort());
Short version = (short) 0x06;
expectedErrors = 1;
- handshakeManager.shake();
+ handshakeManager.shake(null);
- handshakeManager.setReceivedHello(createHelloMessage(version, helloXid).build());
- handshakeManager.shake();
+ handshakeManager.shake(createHelloMessage(version, helloXid).build());
- handshakeManager.setReceivedHello(createHelloMessage(version, helloXid).build());
- handshakeManager.shake();
+ handshakeManager.shake(createHelloMessage(version, helloXid).build());
Mockito.verify(handshakeListener, Mockito.never()).onHandshakeSuccessfull(
Matchers.any(GetFeaturesOutput.class), Matchers.anyShort());
Mockito.when(adapter.getFeatures(Matchers.any(GetFeaturesInput.class)))
.thenReturn(Futures.immediateFuture(resultFeatures));
- handshakeManager.setReceivedHello(helloMessage.build());
- handshakeManager.shake();
+ handshakeManager.shake(helloMessage.build());
Mockito.verify(handshakeListener).onHandshakeSuccessfull(
resultFeatures.getResult(), version);
Mockito.when(adapter.getFeatures(Matchers.any(GetFeaturesInput.class)))
.thenReturn(Futures.immediateFuture(resultFeatures));
- handshakeManager.shake();
+ handshakeManager.shake(null);
- handshakeManager.setReceivedHello(helloMessage.build());
- handshakeManager.shake();
+ handshakeManager.shake(helloMessage.build());
Mockito.verify(handshakeListener).onHandshakeSuccessfull(
resultFeatures.getResult(), version);
Mockito.when(adapter.getFeatures(Matchers.any(GetFeaturesInput.class)))
.thenReturn(Futures.immediateFuture(resultFeatures));
- handshakeManager.setReceivedHello(helloMessage.build());
- handshakeManager.shake();
+ handshakeManager.shake(helloMessage.build());
Mockito.verify(handshakeListener).onHandshakeSuccessfull(
resultFeatures.getResult(), version);
Mockito.when(adapter.getFeatures(Matchers.any(GetFeaturesInput.class)))
.thenReturn(Futures.immediateFuture(resultFeatures));
- handshakeManager.shake();
+ handshakeManager.shake(null);
- handshakeManager.setReceivedHello(helloMessage.build());
- handshakeManager.shake();
+ handshakeManager.shake(helloMessage.build());
Mockito.verify(handshakeListener).onHandshakeSuccessfull(
resultFeatures.getResult(), version);
HelloMessageBuilder helloMessage = createHelloMessage(version, helloXid);
addVersionBitmap(Lists.newArrayList((short) 0x05, (short) 0x02), helloMessage);
- handshakeManager.setReceivedHello(helloMessage.build());
- handshakeManager.shake();
+ handshakeManager.shake(helloMessage.build());
Mockito.verify(handshakeListener, Mockito.never()).onHandshakeSuccessfull(
Matchers.any(GetFeaturesOutput.class), Matchers.anyShort());
HelloMessageBuilder helloMessage = createHelloMessage(version, helloXid);
addVersionBitmap(Lists.newArrayList((short) 0x05, (short) 0x02), helloMessage);
- handshakeManager.shake();
+ handshakeManager.shake(null);
- handshakeManager.setReceivedHello(helloMessage.build());
- handshakeManager.shake();
+ handshakeManager.shake(helloMessage.build());
Mockito.verify(handshakeListener, Mockito.never()).onHandshakeSuccessfull(
Matchers.any(GetFeaturesOutput.class), Matchers.anyShort());
* @param helloBuilder
* @return
*/
- private static HelloMessageBuilder addVersionBitmap(List<Short> versionOrder,
+ private static HelloMessageBuilder addVersionBitmap(List<Short> versionOrder,
HelloMessageBuilder helloBuilder) {
short highestVersion = versionOrder.get(0);
int elementsCount = highestVersion / Integer.SIZE;
package org.opendaylight.openflowplugin.openflow.md.core.plan;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.Stack;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-
import org.opendaylight.controller.sal.common.util.RpcErrors;
import org.opendaylight.controller.sal.common.util.Rpcs;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.SettableFuture;
-
/**
* @author mirehak
*/
private int planItemCounter;
private boolean autoRead = true;
+ private final ExecutorService pool;
+
/**
* default ctor
*/
public ConnectionAdapterStackImpl() {
- // do nothing
+ pool = Executors.newSingleThreadExecutor();
}
@Override
@Override
public Future<RpcResult<Void>> echoReply(EchoReplyInput arg0) {
checkRpcAndNext(arg0, "echoReply");
- SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
return result;
}
@Override
public Future<RpcResult<Void>> experimenter(ExperimenterInput arg0) {
checkRpcAndNext(arg0, "experimenter");
- SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
return result;
}
@Override
public Future<RpcResult<Void>> flowMod(FlowModInput arg0) {
checkRpcAndNext(arg0, "flowMod");
- SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
return result;
}
@Override
public Future<RpcResult<Void>> groupMod(GroupModInput arg0) {
checkRpcAndNext(arg0, "groupMod");
- SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
return result;
}
@Override
public Future<RpcResult<Void>> hello(HelloInput arg0) {
checkRpcAndNext(arg0, "helloReply");
- SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
return result;
}
@Override
public Future<RpcResult<Void>> meterMod(MeterModInput arg0) {
checkRpcAndNext(arg0, "meterMod");
- SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
return result;
}
@Override
public Future<RpcResult<Void>> packetOut(PacketOutInput arg0) {
checkRpcAndNext(arg0, "packetOut");
- SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
return result;
}
@Override
public Future<RpcResult<Void>> portMod(PortModInput arg0) {
checkRpcAndNext(arg0, "portMod");
- SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
return result;
}
@Override
public Future<RpcResult<Void>> setAsync(SetAsyncInput arg0) {
checkRpcAndNext(arg0, "setAsync");
- SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
return result;
}
@Override
public Future<RpcResult<Void>> setConfig(SetConfigInput arg0) {
checkRpcAndNext(arg0, "setConfig");
- SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
return result;
}
@Override
public Future<RpcResult<Void>> tableMod(TableModInput arg0) {
checkRpcAndNext(arg0, "tableMod");
- SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
return result;
}
/**
* @param rpcInput
* @param rpcName
- * @param msgTmp
* @param switchTestWaitForRpc
* @return
*/
private synchronized void processRpcResponse(
final SwitchTestRcpResponseEvent rpcResponse) {
OfHeader plannedRpcResponseValue = rpcResponse.getPlannedRpcResponse();
- LOG.debug("rpc-responding to OF_LISTENER: " + rpcResponse.getXid());
+ LOG.debug("rpc-responding to OF_LISTENER: {}", rpcResponse.getXid());
@SuppressWarnings("unchecked")
- SettableFuture<RpcResult<?>> response = (SettableFuture<RpcResult<?>>) rpcResults
+ final SettableFuture<RpcResult<?>> response = (SettableFuture<RpcResult<?>>) rpcResults
.get(rpcResponse.getXid());
if (response != null) {
ErrorType.RPC, new Exception(
"rpc response failed (planned behavior)")));
}
- RpcResult<?> result = Rpcs.getRpcResult(successful,
+
+ final RpcResult<?> result = Rpcs.getRpcResult(successful,
plannedRpcResponseValue, errors);
- response.set(result);
+ setFutureViaPool(response, result);
} else {
String msg = "RpcResponse not expected: xid="
+ rpcResponse.getXid() + ", "
LOG.debug("rpc [" + rpcResponse.getXid() + "] .. done");
}
+ private void setFutureViaPool(final SettableFuture<RpcResult<?>> response, final RpcResult<?> result) {
+ pool.execute(new Runnable() {
+ @Override
+ public void run() {
+ response.set(result);
+ }
+ });
+ }
+
/**
* @param arg0
* rpc call content
/**
* @return rpc future result
*/
- private synchronized SettableFuture<RpcResult<Void>> createOneWayRpcResult() {
- SettableFuture<RpcResult<Void>> result = SettableFuture.create();
- List<RpcError> errors = Collections.emptyList();
- result.set(Rpcs.getRpcResult(true, (Void) null, errors));
- return result;
+ private synchronized ListenableFuture<RpcResult<Void>> createOneWayRpcResult() {
+ return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
}
/**
@Override
public Future<RpcResult<Void>> multipartRequest(MultipartRequestInput arg0) {
checkRpcAndNext(arg0, "multipartRequestInput");
- SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
return result;
}
@Override
public InetSocketAddress getRemoteAddress() {
- // TODO Auto-generated method stub
- return null;
+ return InetSocketAddress.createUnresolved("unittest-odl.example.org", 4242);
}
@Override