+ return voidOperation(new CommitTransaction().toSerializable(), CommitTransactionReply.SERIALIZABLE_CLASS);
+ }
+
+ private ListenableFuture<Void> voidOperation(final Object message, final Class expectedResponseClass){
+ Callable<Void> call = new Callable<Void>() {
+
+ @Override public Void call() throws Exception {
+ for(ActorPath actorPath : cohortPaths){
+ ActorSelection cohort = actorContext.actorSelection(actorPath);
+
+ try {
+ Object response =
+ actorContext.executeRemoteOperation(cohort,
+ message,
+ ActorContext.ASK_DURATION);
+
+ if (response != null && !response.getClass()
+ .equals(expectedResponseClass)) {
+ throw new RuntimeException(
+ String.format(
+ "did not get the expected response \n\t\t expected : %s \n\t\t actual : %s",
+ expectedResponseClass.toString(),
+ response.getClass().toString())
+ );
+ }
+ } catch(TimeoutException e){
+ LOG.error(String.format("A timeout occurred when processing operation : %s", message));
+ }
+ }
+ return null;
+ }
+ };
+
+ return executor.submit(call);