package org.opendaylight.openflowplugin.openflow.md.core.plan;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.Stack;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-
import org.opendaylight.controller.sal.common.util.RpcErrors;
import org.opendaylight.controller.sal.common.util.Rpcs;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.SettableFuture;
-
/**
* @author mirehak
*/
private int planItemCounter;
private boolean autoRead = true;
+ private final ExecutorService pool;
+
/**
* default ctor
*/
public ConnectionAdapterStackImpl() {
- // do nothing
+ pool = Executors.newSingleThreadExecutor();
}
@Override
@Override
public Future<RpcResult<Void>> echoReply(EchoReplyInput arg0) {
checkRpcAndNext(arg0, "echoReply");
- SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
return result;
}
@Override
public Future<RpcResult<Void>> experimenter(ExperimenterInput arg0) {
checkRpcAndNext(arg0, "experimenter");
- SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
return result;
}
@Override
public Future<RpcResult<Void>> flowMod(FlowModInput arg0) {
checkRpcAndNext(arg0, "flowMod");
- SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
return result;
}
@Override
public Future<RpcResult<Void>> groupMod(GroupModInput arg0) {
checkRpcAndNext(arg0, "groupMod");
- SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
return result;
}
@Override
public Future<RpcResult<Void>> hello(HelloInput arg0) {
checkRpcAndNext(arg0, "helloReply");
- SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
return result;
}
@Override
public Future<RpcResult<Void>> meterMod(MeterModInput arg0) {
checkRpcAndNext(arg0, "meterMod");
- SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
return result;
}
@Override
public Future<RpcResult<Void>> packetOut(PacketOutInput arg0) {
checkRpcAndNext(arg0, "packetOut");
- SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
return result;
}
@Override
public Future<RpcResult<Void>> portMod(PortModInput arg0) {
checkRpcAndNext(arg0, "portMod");
- SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
return result;
}
@Override
public Future<RpcResult<Void>> setAsync(SetAsyncInput arg0) {
checkRpcAndNext(arg0, "setAsync");
- SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
return result;
}
@Override
public Future<RpcResult<Void>> setConfig(SetConfigInput arg0) {
checkRpcAndNext(arg0, "setConfig");
- SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
return result;
}
@Override
public Future<RpcResult<Void>> tableMod(TableModInput arg0) {
checkRpcAndNext(arg0, "tableMod");
- SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
return result;
}
/**
* @param rpcInput
* @param rpcName
- * @param msgTmp
* @param switchTestWaitForRpc
* @return
*/
private synchronized void processRpcResponse(
final SwitchTestRcpResponseEvent rpcResponse) {
OfHeader plannedRpcResponseValue = rpcResponse.getPlannedRpcResponse();
- LOG.debug("rpc-responding to OF_LISTENER: " + rpcResponse.getXid());
+ LOG.debug("rpc-responding to OF_LISTENER: {}", rpcResponse.getXid());
@SuppressWarnings("unchecked")
- SettableFuture<RpcResult<?>> response = (SettableFuture<RpcResult<?>>) rpcResults
+ final SettableFuture<RpcResult<?>> response = (SettableFuture<RpcResult<?>>) rpcResults
.get(rpcResponse.getXid());
if (response != null) {
ErrorType.RPC, new Exception(
"rpc response failed (planned behavior)")));
}
- RpcResult<?> result = Rpcs.getRpcResult(successful,
+
+ final RpcResult<?> result = Rpcs.getRpcResult(successful,
plannedRpcResponseValue, errors);
- response.set(result);
+ setFutureViaPool(response, result);
} else {
String msg = "RpcResponse not expected: xid="
+ rpcResponse.getXid() + ", "
LOG.debug("rpc [" + rpcResponse.getXid() + "] .. done");
}
+ private void setFutureViaPool(final SettableFuture<RpcResult<?>> response, final RpcResult<?> result) {
+ pool.execute(new Runnable() {
+ @Override
+ public void run() {
+ response.set(result);
+ }
+ });
+ }
+
/**
* @param arg0
* rpc call content
/**
* @return rpc future result
*/
- private synchronized SettableFuture<RpcResult<Void>> createOneWayRpcResult() {
- SettableFuture<RpcResult<Void>> result = SettableFuture.create();
- List<RpcError> errors = Collections.emptyList();
- result.set(Rpcs.getRpcResult(true, (Void) null, errors));
- return result;
+ private synchronized ListenableFuture<RpcResult<Void>> createOneWayRpcResult() {
+ return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
}
/**
@Override
public Future<RpcResult<Void>> multipartRequest(MultipartRequestInput arg0) {
checkRpcAndNext(arg0, "multipartRequestInput");
- SettableFuture<RpcResult<Void>> result = createOneWayRpcResult();
+ ListenableFuture<RpcResult<Void>> result = createOneWayRpcResult();
return result;
}
@Override
public InetSocketAddress getRemoteAddress() {
- // TODO Auto-generated method stub
- return null;
+ return InetSocketAddress.createUnresolved("unittest-odl.example.org", 4242);
}
@Override