import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
+ @VisibleForTesting
+ static final Object GET_SHARD_MBEAN_MESSAGE = "getShardMBeanMessage";
+
@VisibleForTesting
static final String DEFAULT_NAME = "default";
protected Shard(final ShardIdentifier name, final Map<String, String> peerAddresses,
final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
- super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig()));
+ super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig()),
+ DataStoreVersions.CURRENT_VERSION);
this.name = name.toString();
this.datastoreContext = datastoreContext;
}
commitCoordinator = new ShardCommitCoordinator(store,
- TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES),
+ datastoreContext.getShardCommitQueueExpiryTimeoutInMillis(),
datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name);
setTransactionCommitTimeout();
private void setTransactionCommitTimeout() {
transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
- datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
+ datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS) / 2;
}
public static Props props(final ShardIdentifier name,
} else if (message instanceof ForwardedReadyTransaction) {
commitCoordinator.handleForwardedReadyTransaction((ForwardedReadyTransaction) message,
getSender(), this);
+ } else if (message instanceof ReadyLocalTransaction) {
+ handleReadyLocalTransaction((ReadyLocalTransaction)message);
} else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
} else if (CommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
onDatastoreContext((DatastoreContext)message);
} else if(message instanceof RegisterRoleChangeListener){
roleChangeNotifier.get().forward(message, context());
- } else if (message instanceof FollowerInitialSyncUpStatus){
+ } else if (message instanceof FollowerInitialSyncUpStatus) {
shardMBean.setFollowerInitialSyncStatus(((FollowerInitialSyncUpStatus) message).isInitialSyncDone());
context().parent().tell(message, self());
+ } else if(GET_SHARD_MBEAN_MESSAGE.equals(message)){
+ sender().tell(getShardMBean(), self());
} else {
super.onReceiveCommand(message);
}
private void handleTransactionCommitTimeoutCheck() {
CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
if(cohortEntry != null) {
- long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime();
- if(elapsed > transactionCommitTimeout) {
+ if(cohortEntry.isExpired(transactionCommitTimeout)) {
LOG.warn("{}: Current transaction {} has timed out after {} ms - aborting",
persistenceId(), cohortEntry.getTransactionID(), transactionCommitTimeout);
doAbortTransaction(cohortEntry.getTransactionID(), null);
}
}
+
+ commitCoordinator.cleanupExpiredCohortEntries();
}
private static boolean isEmptyCommit(final DataTreeCandidate candidate) {
// or if cohortEntry has no modifications
// we can apply modification to the state immediately
if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) {
- applyModificationToState(getSender(), cohortEntry.getTransactionID(), candidate);
+ applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate);
} else {
- Shard.this.persistData(getSender(), cohortEntry.getTransactionID(),
+ Shard.this.persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(),
DataTreeCandidatePayload.create(candidate));
}
}
commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
}
+ private void noLeaderError(Object message) {
+ // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
+ // it more resilient in case we're in the process of electing a new leader.
+ getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
+ "Could not find the leader for shard %s. This typically happens" +
+ " when the system is coming up or recovering and a leader is being elected. Try again" +
+ " later.", persistenceId()))), getSelf());
+ }
+
private void handleBatchedModifications(BatchedModifications batched) {
- // This message is sent to prepare the modificationsa transaction directly on the Shard as an
+ // This message is sent to prepare the modifications transaction directly on the Shard as an
// optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last
// BatchedModifications message, the caller sets the ready flag in the message indicating
// modifications are complete. The reply contains the cohort actor path (this actor) for the caller
LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
leader.forward(batched, getContext());
} else {
- // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
- // it more resilient in case we're in the process of electing a new leader.
- getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
- "Could not find the leader for shard %s. This typically happens" +
- " when the system is coming up or recovering and a leader is being elected. Try again" +
- " later.", persistenceId()))), getSelf());
+ noLeaderError(batched);
+ }
+ }
+ }
+
+ private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
+ if (isLeader()) {
+ try {
+ commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
+ } catch (Exception e) {
+ LOG.error("{}: Error handling LocalModifications for Tx {}", persistenceId(),
+ message.getTransactionID(), e);
+ getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+ }
+ } else {
+ ActorSelection leader = getLeader();
+ if (leader != null) {
+ LOG.debug("{}: Forwarding LocalModifications to leader {}", persistenceId(), leader);
+ leader.forward(message, getContext());
+ } else {
+ noLeaderError(message);
}
}
}
ShardTransactionIdentifier transactionId, String transactionChainId,
short clientVersion ) {
- return transactionActorFactory.newShardTransaction(TransactionProxy.TransactionType.fromInt(transactionType),
+ return transactionActorFactory.newShardTransaction(TransactionType.fromInt(transactionType),
transactionId, transactionChainId, clientVersion);
}
@Override
protected void onRecoveryComplete() {
+ store.recoveryDone();
//notify shard manager
getContext().parent().tell(new ActorInitialized(), getSelf());