- }
-
- private void abort(AbortTransaction message) {
- final ListenableFuture<Void> future = cohort.abort();
- final ActorRef sender = getSender();
- final ActorRef self = getSelf();
-
- future.addListener(new Runnable() {
- @Override
- public void run() {
- try {
- future.get();
- sender.tell(new AbortTransactionReply(), self);
- } catch (InterruptedException | ExecutionException e) {
- log.error(e, "An exception happened when aborting");
- }
- }
- }, getContext().dispatcher());
- }
-
- private void commit(CommitTransaction message) {
- // Forward the commit to the shard
- log.info("Commit transaction now + " + shardActor);
- shardActor.forward(new ForwardedCommitTransaction(cohort, modification), getContext());
-
- }
-
- private void preCommit(PreCommitTransaction message) {
- final ListenableFuture<Void> future = cohort.preCommit();
- final ActorRef sender = getSender();
- final ActorRef self = getSelf();
-
- future.addListener(new Runnable() {
- @Override
- public void run() {
- try {
- future.get();
- sender.tell(new PreCommitTransactionReply(), self);
- } catch (InterruptedException | ExecutionException e) {
- log.error(e, "An exception happened when preCommitting");
- }
- }
- }, getContext().dispatcher());
-
- }
-
- private void canCommit(CanCommitTransaction message) {
- final ListenableFuture<Boolean> future = cohort.canCommit();
- final ActorRef sender = getSender();
- final ActorRef self = getSelf();
-
- future.addListener(new Runnable() {
- @Override
- public void run() {
- try {
- Boolean canCommit = future.get();
- sender.tell(new CanCommitTransactionReply(canCommit), self);
- } catch (InterruptedException | ExecutionException e) {
- log.error(e, "An exception happened when aborting");
+
+ private final LoggingAdapter log =
+ Logging.getLogger(getContext().system(), this);
+
+ public static Props props(final DOMStoreThreePhaseCommitCohort cohort,
+ final ActorRef shardActor, final CompositeModification modification) {
+ return Props.create(new Creator<ThreePhaseCommitCohort>() {
+ @Override
+ public ThreePhaseCommitCohort create() throws Exception {
+ return new ThreePhaseCommitCohort(cohort, shardActor,
+ modification);
+ }
+ });
+ }
+
+
+ @Override
+ public void handleReceive(Object message) throws Exception {
+ if (message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
+ canCommit(new CanCommitTransaction());
+ } else if (message.getClass().equals(PreCommitTransaction.SERIALIZABLE_CLASS)) {
+ preCommit(new PreCommitTransaction());
+ } else if (message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
+ commit(new CommitTransaction());
+ } else if (message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
+ abort(new AbortTransaction());
+ } else {
+ throw new Exception ("Not recognized message received,message="+message);