import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
-import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Creator;
import java.util.concurrent.ExecutionException;
-public class ThreePhaseCommitCohort extends UntypedActor {
+public class ThreePhaseCommitCohort extends AbstractUntypedActor {
private final DOMStoreThreePhaseCommitCohort cohort;
private final ActorRef shardActor;
private final CompositeModification modification;
});
}
+
@Override
- public void onReceive(Object message) throws Exception {
- log.debug("Received message {}", message);
-
- if (message instanceof CanCommitTransaction) {
- canCommit((CanCommitTransaction) message);
- } else if (message instanceof PreCommitTransaction) {
- preCommit((PreCommitTransaction) message);
- } else if (message instanceof CommitTransaction) {
- commit((CommitTransaction) message);
- } else if (message instanceof AbortTransaction) {
- abort((AbortTransaction) message);
+ public void handleReceive(Object message) throws Exception {
+ if (message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
+ canCommit(new CanCommitTransaction());
+ } else if (message.getClass().equals(PreCommitTransaction.SERIALIZABLE_CLASS)) {
+ preCommit(new PreCommitTransaction());
+ } else if (message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
+ commit(new CommitTransaction());
+ } else if (message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
+ abort(new AbortTransaction());
+ } else {
+ throw new Exception ("Not recognized message received,message="+message);
}
}
public void run() {
try {
future.get();
- sender.tell(new AbortTransactionReply(), self);
+ sender.tell(new AbortTransactionReply().toSerializable(), self);
} catch (InterruptedException | ExecutionException e) {
log.error(e, "An exception happened when aborting");
}
public void run() {
try {
future.get();
- sender.tell(new PreCommitTransactionReply(), self);
+ sender.tell(new PreCommitTransactionReply().toSerializable(), self);
} catch (InterruptedException | ExecutionException e) {
log.error(e, "An exception happened when preCommitting");
}
public void run() {
try {
Boolean canCommit = future.get();
- sender.tell(new CanCommitTransactionReply(canCommit), self);
+ sender.tell(new CanCommitTransactionReply(canCommit).toSerializable(), self);
} catch (InterruptedException | ExecutionException e) {
log.error(e, "An exception happened when aborting");
}