}
@Override
- public void consume(DataObject message) {
+ public void consume(final DataObject message) {
LOG.debug("Consume msg");
if (disconnectOccured ) {
return;
if (message instanceof OfHeader) {
LOG.debug("OFheader msg received");
RpcResponseKey key = createRpcResponseKey((OfHeader) message);
- SettableFuture<RpcResult<?>> rpcFuture = findRpcResponse(key);
+ final SettableFuture<RpcResult<?>> rpcFuture = findRpcResponse(key);
if (rpcFuture != null) {
LOG.debug("corresponding rpcFuture found");
- List<RpcError> errors = Collections.emptyList();
- rpcFuture.set(Rpcs.getRpcResult(true, message, errors));
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ List<RpcError> errors = Collections.emptyList();
+ LOG.debug("before setting rpcFuture");
+ rpcFuture.set(Rpcs.getRpcResult(true, message, errors));
+ LOG.debug("after setting rpcFuture");
+ }
+ }).start();
responseCache.invalidate(key);
} else {
LOG.warn("received unexpected rpc response: "+key);
io.netty.util.concurrent.Future<? super Void> future)
throws Exception {
transportResult.set(future.isSuccess());
- transportResult.setException(future.cause());
+ if (!future.isSuccess()) {
+ transportResult.setException(future.cause());
+ }
}
});
return transportResult;