private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
+ @VisibleForTesting
+ static final Object GET_SHARD_MBEAN_MESSAGE = "getShardMBeanMessage";
+
@VisibleForTesting
static final String DEFAULT_NAME = "default";
}
commitCoordinator = new ShardCommitCoordinator(store,
- TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES),
+ datastoreContext.getShardCommitQueueExpiryTimeoutInMillis(),
datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name);
setTransactionCommitTimeout();
private void setTransactionCommitTimeout() {
transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
- datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
+ datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS) / 2;
}
public static Props props(final ShardIdentifier name,
onDatastoreContext((DatastoreContext)message);
} else if(message instanceof RegisterRoleChangeListener){
roleChangeNotifier.get().forward(message, context());
- } else if (message instanceof FollowerInitialSyncUpStatus){
+ } else if (message instanceof FollowerInitialSyncUpStatus) {
shardMBean.setFollowerInitialSyncStatus(((FollowerInitialSyncUpStatus) message).isInitialSyncDone());
context().parent().tell(message, self());
+ } else if(GET_SHARD_MBEAN_MESSAGE.equals(message)){
+ sender().tell(getShardMBean(), self());
} else {
super.onReceiveCommand(message);
}
private void handleTransactionCommitTimeoutCheck() {
CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
if(cohortEntry != null) {
- long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime();
- if(elapsed > transactionCommitTimeout) {
+ if(cohortEntry.isExpired(transactionCommitTimeout)) {
LOG.warn("{}: Current transaction {} has timed out after {} ms - aborting",
persistenceId(), cohortEntry.getTransactionID(), transactionCommitTimeout);
doAbortTransaction(cohortEntry.getTransactionID(), null);
}
}
+
+ commitCoordinator.cleanupExpiredCohortEntries();
}
private static boolean isEmptyCommit(final DataTreeCandidate candidate) {
// or if cohortEntry has no modifications
// we can apply modification to the state immediately
if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) {
- applyModificationToState(getSender(), cohortEntry.getTransactionID(), candidate);
+ applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate);
} else {
- Shard.this.persistData(getSender(), cohortEntry.getTransactionID(),
+ Shard.this.persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(),
DataTreeCandidatePayload.create(candidate));
}
}
}
private void handleBatchedModifications(BatchedModifications batched) {
- // This message is sent to prepare the modificationsa transaction directly on the Shard as an
+ // This message is sent to prepare the modifications transaction directly on the Shard as an
// optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last
// BatchedModifications message, the caller sets the ready flag in the message indicating
// modifications are complete. The reply contains the cohort actor path (this actor) for the caller
@Override
protected void onRecoveryComplete() {
+ store.recoveryDone();
//notify shard manager
getContext().parent().tell(new ActorInitialized(), getSelf());