import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
protected Cache<RpcResponseKey, SettableFuture<?>> responseCache;
private SystemNotificationsListener systemListener;
private boolean disconnectOccured = false;
+ private ExecutorService threadPool;
/**
* default ctor
.concurrencyLevel(1)
.expireAfterWrite(RPC_RESPONSE_EXPIRATION, TimeUnit.MINUTES)
.removalListener(new ResponseRemovalListener()).build();
+ threadPool = Executors.newCachedThreadPool();
LOG.info("ConnectionAdapter created");
-
-
}
/**
@Override
public void consume(final DataObject message) {
+ threadPool.execute(new Runnable() {
+ @Override
+ public void run() {
+ consumeIntern(message);
+ }
+ });
+ }
+
+ protected void consumeIntern(final DataObject message) {
LOG.debug("Consume msg");
if (disconnectOccured ) {
return;
final SettableFuture<RpcResult<?>> rpcFuture = findRpcResponse(key);
if (rpcFuture != null) {
LOG.debug("corresponding rpcFuture found");
- new Thread(new Runnable() {
- @Override
- public void run() {
- List<RpcError> errors = Collections.emptyList();
- LOG.debug("before setting rpcFuture");
- rpcFuture.set(Rpcs.getRpcResult(true, message, errors));
- LOG.debug("after setting rpcFuture");
- }
- }).start();
+ List<RpcError> errors = Collections.emptyList();
+ LOG.debug("before setting rpcFuture");
+ rpcFuture.set(Rpcs.getRpcResult(true, message, errors));
+ LOG.debug("after setting rpcFuture");
responseCache.invalidate(key);
} else {
LOG.warn("received unexpected rpc response: "+key);