import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
+import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Props;
-import akka.dispatch.OnComplete;
-import akka.pattern.AskTimeoutException;
import akka.pattern.Patterns;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
* @author Thomas Pantelis
*/
class EntityOwnershipShard extends Shard {
- private int transactionIDCounter = 0;
private final String localMemberName;
- private final List<BatchedModifications> retryModifications = new ArrayList<>();
+ private final EntityOwnershipShardCommitCoordinator commitCoordinator;
private static DatastoreContext noPersistenceDatastoreContext(DatastoreContext datastoreContext) {
return DatastoreContext.newBuilderFrom(datastoreContext).persistent(false).build();
DatastoreContext datastoreContext, SchemaContext schemaContext, String localMemberName) {
super(name, peerAddresses, noPersistenceDatastoreContext(datastoreContext), schemaContext);
this.localMemberName = localMemberName;
+ this.commitCoordinator = new EntityOwnershipShardCommitCoordinator(localMemberName, LOG);
}
@Override
onRegisterCandidateLocal((RegisterCandidateLocal)message);
} else if(message instanceof UnregisterCandidateLocal) {
onUnregisterCandidateLocal((UnregisterCandidateLocal)message);
- } else {
+ } else if(!commitCoordinator.handleMessage(message, this)) {
super.onReceiveCommand(message);
}
}
// TODO - add the listener locally.
- BatchedModifications modifications = new BatchedModifications(
- TransactionIdentifier.create(localMemberName, ++transactionIDCounter).toString(),
- DataStoreVersions.CURRENT_VERSION, "");
- modifications.setDoCommitOnReady(true);
- modifications.setReady(true);
- modifications.setTotalMessagesSent(1);
-
NormalizedNode<?, ?> entityOwners = entityOwnersWithCandidate(registerCandidate.getEntity().getType(),
registerCandidate.getEntity().getId(), localMemberName);
- modifications.addModification(new MergeModification(ENTITY_OWNERS_PATH, entityOwners));
-
- tryCommitModifications(modifications);
+ commitCoordinator.commitModification(new MergeModification(ENTITY_OWNERS_PATH, entityOwners), this);
getSender().tell(SuccessReply.INSTANCE, getSelf());
}
- private void tryCommitModifications(final BatchedModifications modifications) {
+ void tryCommitModifications(final BatchedModifications modifications) {
if(isLeader()) {
- if(isIsolatedLeader()) {
- LOG.debug("Leader is isolated - adding BatchedModifications {} for retry", modifications.getTransactionID());
-
- retryModifications.add(modifications);
- } else {
- LOG.debug("Committing BatchedModifications {} locally", modifications.getTransactionID());
-
- // Note that it's possible the commit won't get consensus and will timeout and not be applied
- // to the state. However we don't need to retry it in that case b/c it will be committed to
- // the journal first and, once a majority of followers come back on line and it is replicated,
- // it will be applied at that point.
- handleBatchedModificationsLocal(modifications, self());
- }
+ LOG.debug("Committing BatchedModifications {} locally", modifications.getTransactionID());
+
+ // Note that it's possible the commit won't get consensus and will timeout and not be applied
+ // to the state. However we don't need to retry it in that case b/c it will be committed to
+ // the journal first and, once a majority of followers come back on line and it is replicated,
+ // it will be applied at that point.
+ handleBatchedModificationsLocal(modifications, self());
} else {
final ActorSelection leader = getLeader();
if (leader != null) {
Future<Object> future = Patterns.ask(leader, modifications, TimeUnit.SECONDS.toMillis(
getDatastoreContext().getShardTransactionCommitTimeoutInSeconds()));
- future.onComplete(new OnComplete<Object>() {
- @Override
- public void onComplete(Throwable failure, Object response) {
- if(failure != null) {
- if(failure instanceof AskTimeoutException) {
- LOG.debug("BatchedModifications {} to leader {} timed out - retrying",
- modifications.getTransactionID(), leader);
- tryCommitModifications(modifications);
- } else {
- LOG.error("BatchedModifications {} to leader {} failed",
- modifications.getTransactionID(), leader, failure);
- }
- } else {
- LOG.debug("BatchedModifications {} to leader {} succeeded",
- modifications.getTransactionID(), leader);
- }
- }
- }, getContext().dispatcher());
- } else {
- LOG.debug("No leader - adding BatchedModifications {} for retry", modifications.getTransactionID());
-
- retryModifications.add(modifications);
+
+ Patterns.pipe(future, getContext().dispatcher()).pipeTo(getSelf(), ActorRef.noSender());
}
}
}
+ boolean hasLeader() {
+ return getLeader() != null && !isIsolatedLeader();
+ }
+
@Override
protected void onStateChanged() {
super.onStateChanged();
- if(!retryModifications.isEmpty() && getLeader() != null && !isIsolatedLeader()) {
- LOG.debug("# BatchedModifications to retry {}", retryModifications.size());
-
- List<BatchedModifications> retryModificationsCopy = new ArrayList<>(retryModifications);
- retryModifications.clear();
- for(BatchedModifications mods: retryModificationsCopy) {
- tryCommitModifications(mods);
- }
- }
+ commitCoordinator.onStateChanged(this, isLeader());
}
private void onUnregisterCandidateLocal(UnregisterCandidateLocal unregisterCandidate) {