import akka.actor.ActorSelection;
import akka.actor.Cancellable;
import akka.actor.Props;
-import akka.persistence.RecoveryFailure;
import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
@Override
public void onReceiveRecover(final Object message) throws Exception {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: onReceiveRecover: Received message {} from {}", persistenceId(),
- message.getClass().toString(), getSender());
- }
-
- if (message instanceof RecoveryFailure){
- LOG.error("{}: Recovery failed because of this cause",
- persistenceId(), ((RecoveryFailure) message).cause());
+ LOG.debug("{}: onReceiveRecover: Received message {} from {}", persistenceId(), message.getClass(),
+ getSender());
- // Even though recovery failed, we still need to finish our recovery, eg send the
- // ActorInitialized message and start the txCommitTimeoutCheckSchedule.
- onRecoveryComplete();
- } else {
- super.onReceiveRecover(message);
- if(LOG.isTraceEnabled()) {
- appendEntriesReplyTracker.begin();
- }
+ super.onReceiveRecover(message);
+ if (LOG.isTraceEnabled()) {
+ appendEntriesReplyTracker.begin();
}
}
handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
} else if (message instanceof ReadyLocalTransaction) {
handleReadyLocalTransaction((ReadyLocalTransaction)message);
- } else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
+ } else if (CanCommitTransaction.isSerializedType(message)) {
handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
- } else if (CommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
+ } else if (CommitTransaction.isSerializedType(message)) {
handleCommitTransaction(CommitTransaction.fromSerializable(message));
- } else if (AbortTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
+ } else if (AbortTransaction.isSerializedType(message)) {
handleAbortTransaction(AbortTransaction.fromSerializable(message));
- } else if (CloseTransactionChain.SERIALIZABLE_CLASS.isInstance(message)) {
+ } else if (CloseTransactionChain.isSerializedType(message)) {
closeTransactionChain(CloseTransactionChain.fromSerializable(message));
} else if (message instanceof RegisterChangeListener) {
changeSupport.onMessage((RegisterChangeListener) message, isLeader(), hasLeader());
try {
cohortEntry.commit();
- sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
+ sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), getSelf());
shardMBean.incrementCommittedTransactionCount();
shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
LOG.error("{}: Failed to re-apply transaction {}", persistenceId(), transactionID, e);
}
- sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
+ sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(),
+ getSelf());
} else {
// This really shouldn't happen - it likely means that persistence or replication
// took so long to complete such that the cohort entry was expired from the cache.
}
private ActorRef createTypedTransactionActor(int transactionType,
- ShardTransactionIdentifier transactionId, String transactionChainId,
- short clientVersion ) {
+ ShardTransactionIdentifier transactionId, String transactionChainId) {
return transactionActorFactory.newShardTransaction(TransactionType.fromInt(transactionType),
- transactionId, transactionChainId, clientVersion);
+ transactionId, transactionChainId);
}
private void createTransaction(CreateTransaction createTransaction) {
}
ActorRef transactionActor = createTransaction(createTransaction.getTransactionType(),
- createTransaction.getTransactionId(), createTransaction.getTransactionChainId(),
- createTransaction.getVersion());
+ createTransaction.getTransactionId(), createTransaction.getTransactionChainId());
getSender().tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor),
createTransaction.getTransactionId(), createTransaction.getVersion()).toSerializable(), getSelf());
}
private ActorRef createTransaction(int transactionType, String remoteTransactionId,
- String transactionChainId, short clientVersion) {
+ String transactionChainId) {
ShardTransactionIdentifier transactionId = new ShardTransactionIdentifier(remoteTransactionId);
}
ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId,
- transactionChainId, clientVersion);
+ transactionChainId);
return transactionActor;
}