- private ListenableFuture<Void> voidOperation(final Object message, final Class expectedResponseClass){
- Callable<Void> call = new Callable<Void>() {
-
- @Override public Void call() throws Exception {
- for(ActorPath actorPath : cohortPaths){
- ActorSelection cohort = actorContext.actorSelection(actorPath);
-
- try {
- Object response =
- actorContext.executeRemoteOperation(cohort,
- message,
- ActorContext.ASK_DURATION);
-
- if (response != null && !response.getClass()
- .equals(expectedResponseClass)) {
- throw new RuntimeException(
- String.format(
- "did not get the expected response \n\t\t expected : %s \n\t\t actual : %s",
- expectedResponseClass.toString(),
- response.getClass().toString())
- );
+ private ListenableFuture<Void> voidOperation(final Object message,
+ final Class<?> expectedResponseClass, final boolean propagateException) {
+
+ Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
+
+ final SettableFuture<Void> returnFuture = SettableFuture.create();
+
+ combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
+ @Override
+ public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
+
+ Throwable exceptionToPropagate = failure;
+ if(exceptionToPropagate == null) {
+ for(Object response: responses) {
+ if(!response.getClass().equals(expectedResponseClass)) {
+ exceptionToPropagate = new IllegalArgumentException(
+ String.format("Unexpected response type {}",
+ response.getClass()));
+ break;