import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
private ShardSnapshot restoreFromSnapshot;
-
+ private final ShardTransactionMessageRetrySupport messageRetrySupport;
protected Shard(AbstractBuilder<?, ?> builder) {
super(builder.getId().toString(), builder.getPeerAddresses(),
LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
- store = new ShardDataTree(builder.getSchemaContext());
+ store = new ShardDataTree(builder.getSchemaContext(), builder.getTreeType());
shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
datastoreContext.getDataStoreMXBeanType());
snapshotCohort = new ShardSnapshotCohort(transactionActorFactory, store, LOG, this.name);
-
-
+ messageRetrySupport = new ShardTransactionMessageRetrySupport(this);
}
private void setTransactionCommitTimeout() {
super.postStop();
+ messageRetrySupport.close();
+
if(txCommitTimeoutCheckSchedule != null) {
txCommitTimeoutCheckSchedule.cancel();
}
} else if (BatchedModifications.class.isInstance(message)) {
handleBatchedModifications((BatchedModifications)message);
} else if (message instanceof ForwardedReadyTransaction) {
- commitCoordinator.handleForwardedReadyTransaction((ForwardedReadyTransaction) message,
- getSender(), this);
+ handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
} else if (message instanceof ReadyLocalTransaction) {
handleReadyLocalTransaction((ReadyLocalTransaction)message);
} else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
sender().tell(store.getDataTree(), self());
} else if(message instanceof ServerRemoved){
context().parent().forward(message, context());
+ } else if(ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
+ messageRetrySupport.onTimerMessage(message);
} else {
super.onReceiveCommand(message);
}
commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
}
- private void noLeaderError(String errMessage, Object message) {
- // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
- // it more resilient in case we're in the process of electing a new leader.
- getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(errMessage, persistenceId())), getSelf());
- }
-
protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) {
try {
commitCoordinator.handleBatchedModifications(batched, sender, this);
// the primary/leader shard. However with timing and caching on the front-end, there's a small
// window where it could have a stale leader during leadership transitions.
//
- if(isLeader()) {
- failIfIsolatedLeader(getSender());
-
+ boolean isIsolatedLeader = isIsolatedLeader();
+ if (isLeader() && !isIsolatedLeader) {
handleBatchedModificationsLocal(batched, getSender());
} else {
ActorSelection leader = getLeader();
- if(leader != null) {
+ if (isIsolatedLeader || leader == null) {
+ messageRetrySupport.addMessageToRetry(batched, getSender(),
+ "Could not commit transaction " + batched.getTransactionID());
+ } else {
// TODO: what if this is not the first batch and leadership changed in between batched messages?
// We could check if the commitCoordinator already has a cached entry and forward all the previous
// batched modifications.
LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
leader.forward(batched, getContext());
- } else {
- noLeaderError("Could not commit transaction " + batched.getTransactionID(), batched);
}
}
}
}
private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
- if (isLeader()) {
- failIfIsolatedLeader(getSender());
+ LOG.debug("{}: handleReadyLocalTransaction for {}", persistenceId(), message.getTransactionID());
+ boolean isIsolatedLeader = isIsolatedLeader();
+ if (isLeader() && !isIsolatedLeader) {
try {
commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
} catch (Exception e) {
}
} else {
ActorSelection leader = getLeader();
- if (leader != null) {
+ if (isIsolatedLeader || leader == null) {
+ messageRetrySupport.addMessageToRetry(message, getSender(),
+ "Could not commit transaction " + message.getTransactionID());
+ } else {
LOG.debug("{}: Forwarding ReadyLocalTransaction to leader {}", persistenceId(), leader);
message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
leader.forward(message, getContext());
+ }
+ }
+ }
+
+ private void handleForwardedReadyTransaction(ForwardedReadyTransaction forwardedReady) {
+ LOG.debug("{}: handleForwardedReadyTransaction for {}", persistenceId(), forwardedReady.getTransactionID());
+
+ boolean isIsolatedLeader = isIsolatedLeader();
+ if (isLeader() && !isIsolatedLeader) {
+ commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this);
+ } else {
+ ActorSelection leader = getLeader();
+ if (isIsolatedLeader || leader == null) {
+ messageRetrySupport.addMessageToRetry(forwardedReady, getSender(),
+ "Could not commit transaction " + forwardedReady.getTransactionID());
} else {
- noLeaderError("Could not commit transaction " + message.getTransactionID(), message);
+ LOG.debug("{}: Forwarding ForwardedReadyTransaction to leader {}", persistenceId(), leader);
+
+ ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(forwardedReady.getTransactionID(),
+ forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit());
+ readyLocal.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
+ leader.forward(readyLocal, getContext());
}
}
}
store.closeAllTransactionChains();
}
+
+ if(hasLeader && !isIsolatedLeader()) {
+ messageRetrySupport.retryMessages();
+ }
}
@Override
protected void onLeaderChanged(String oldLeader, String newLeader) {
shardMBean.incrementLeadershipChangeCount();
+
+ if(hasLeader() && !isIsolatedLeader()) {
+ messageRetrySupport.retryMessages();
+ }
}
@Override
return restoreFromSnapshot;
}
+ public TreeType getTreeType() {
+ switch (datastoreContext.getLogicalStoreType()) {
+ case CONFIGURATION:
+ return TreeType.CONFIGURATION;
+ case OPERATIONAL:
+ return TreeType.OPERATIONAL;
+ }
+
+ throw new IllegalStateException("Unhandled logical store type " + datastoreContext.getLogicalStoreType());
+ }
+
protected void verify() {
Preconditions.checkNotNull(id, "id should not be null");
Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");