package org.opendaylight.controller.cluster.datastore.entityownership;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_NODE_ID;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
+import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Props;
-import akka.dispatch.OnComplete;
-import akka.pattern.AskTimeoutException;
import akka.pattern.Patterns;
-import java.util.ArrayList;
-import java.util.List;
+import com.google.common.base.Optional;
+import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.Shard;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateRemoved;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.Future;
* @author Thomas Pantelis
*/
class EntityOwnershipShard extends Shard {
- private int transactionIDCounter = 0;
private final String localMemberName;
- private final List<BatchedModifications> retryModifications = new ArrayList<>();
+ private final EntityOwnershipShardCommitCoordinator commitCoordinator;
private static DatastoreContext noPersistenceDatastoreContext(DatastoreContext datastoreContext) {
return DatastoreContext.newBuilderFrom(datastoreContext).persistent(false).build();
DatastoreContext datastoreContext, SchemaContext schemaContext, String localMemberName) {
super(name, peerAddresses, noPersistenceDatastoreContext(datastoreContext), schemaContext);
this.localMemberName = localMemberName;
+ this.commitCoordinator = new EntityOwnershipShardCommitCoordinator(localMemberName, LOG);
}
@Override
super.onDatastoreContext(noPersistenceDatastoreContext(context));
}
+ @Override
+ protected void onRecoveryComplete() {
+ super.onRecoveryComplete();
+
+ new CandidateListChangeListener(getSelf(), getDataStore());
+ }
+
@Override
public void onReceiveCommand(final Object message) throws Exception {
if(message instanceof RegisterCandidateLocal) {
onRegisterCandidateLocal((RegisterCandidateLocal)message);
} else if(message instanceof UnregisterCandidateLocal) {
onUnregisterCandidateLocal((UnregisterCandidateLocal)message);
- } else {
+ } else if(message instanceof CandidateAdded){
+ onCandidateAdded((CandidateAdded) message);
+ } else if(message instanceof CandidateRemoved){
+ onCandidateRemoved((CandidateRemoved) message);
+ } else if(!commitCoordinator.handleMessage(message, this)) {
super.onReceiveCommand(message);
}
}
// TODO - add the listener locally.
- BatchedModifications modifications = new BatchedModifications(
- TransactionIdentifier.create(localMemberName, ++transactionIDCounter).toString(),
- DataStoreVersions.CURRENT_VERSION, "");
- modifications.setDoCommitOnReady(true);
- modifications.setReady(true);
- modifications.setTotalMessagesSent(1);
-
NormalizedNode<?, ?> entityOwners = entityOwnersWithCandidate(registerCandidate.getEntity().getType(),
registerCandidate.getEntity().getId(), localMemberName);
- modifications.addModification(new MergeModification(ENTITY_OWNERS_PATH, entityOwners));
-
- tryCommitModifications(modifications);
+ commitCoordinator.commitModification(new MergeModification(ENTITY_OWNERS_PATH, entityOwners), this);
getSender().tell(SuccessReply.INSTANCE, getSelf());
}
- private void tryCommitModifications(final BatchedModifications modifications) {
+ void tryCommitModifications(final BatchedModifications modifications) {
if(isLeader()) {
- if(isIsolatedLeader()) {
- LOG.debug("Leader is isolated - adding BatchedModifications {} for retry", modifications.getTransactionID());
-
- retryModifications.add(modifications);
- } else {
- LOG.debug("Committing BatchedModifications {} locally", modifications.getTransactionID());
-
- // Note that it's possible the commit won't get consensus and will timeout and not be applied
- // to the state. However we don't need to retry it in that case b/c it will be committed to
- // the journal first and, once a majority of followers come back on line and it is replicated,
- // it will be applied at that point.
- handleBatchedModificationsLocal(modifications, self());
- }
+ LOG.debug("Committing BatchedModifications {} locally", modifications.getTransactionID());
+
+ // Note that it's possible the commit won't get consensus and will timeout and not be applied
+ // to the state. However we don't need to retry it in that case b/c it will be committed to
+ // the journal first and, once a majority of followers come back on line and it is replicated,
+ // it will be applied at that point.
+ handleBatchedModificationsLocal(modifications, self());
} else {
final ActorSelection leader = getLeader();
if (leader != null) {
Future<Object> future = Patterns.ask(leader, modifications, TimeUnit.SECONDS.toMillis(
getDatastoreContext().getShardTransactionCommitTimeoutInSeconds()));
- future.onComplete(new OnComplete<Object>() {
- @Override
- public void onComplete(Throwable failure, Object response) {
- if(failure != null) {
- if(failure instanceof AskTimeoutException) {
- LOG.debug("BatchedModifications {} to leader {} timed out - retrying",
- modifications.getTransactionID(), leader);
- tryCommitModifications(modifications);
- } else {
- LOG.error("BatchedModifications {} to leader {} failed",
- modifications.getTransactionID(), leader, failure);
- }
- } else {
- LOG.debug("BatchedModifications {} to leader {} succeeded",
- modifications.getTransactionID(), leader);
- }
- }
- }, getContext().dispatcher());
- } else {
- LOG.debug("No leader - adding BatchedModifications {} for retry", modifications.getTransactionID());
-
- retryModifications.add(modifications);
+
+ Patterns.pipe(future, getContext().dispatcher()).pipeTo(getSelf(), ActorRef.noSender());
}
}
}
+ boolean hasLeader() {
+ return getLeader() != null && !isIsolatedLeader();
+ }
+
@Override
protected void onStateChanged() {
super.onStateChanged();
- if(!retryModifications.isEmpty() && getLeader() != null && !isIsolatedLeader()) {
- LOG.debug("# BatchedModifications to retry {}", retryModifications.size());
-
- List<BatchedModifications> retryModificationsCopy = new ArrayList<>(retryModifications);
- retryModifications.clear();
- for(BatchedModifications mods: retryModificationsCopy) {
- tryCommitModifications(mods);
- }
- }
+ commitCoordinator.onStateChanged(this, isLeader());
}
private void onUnregisterCandidateLocal(UnregisterCandidateLocal unregisterCandidate) {
getSender().tell(SuccessReply.INSTANCE, getSelf());
}
+ private void onCandidateRemoved(CandidateRemoved message) {
+ if(!isLeader()){
+ return;
+ }
+
+ LOG.debug("onCandidateRemoved: {}", message);
+
+ String currentOwner = getCurrentOwner(message.getEntityPath());
+ if(message.getRemovedCandidate().equals(currentOwner)){
+ writeNewOwner(message.getEntityPath(), newOwner(message.getRemainingCandidates()));
+ }
+ }
+
+ private void onCandidateAdded(CandidateAdded message) {
+ if(!isLeader()){
+ return;
+ }
+
+ LOG.debug("onCandidateAdded: {}", message);
+
+ String currentOwner = getCurrentOwner(message.getEntityPath());
+ if(currentOwner == null){
+ writeNewOwner(message.getEntityPath(), newOwner(message.getAllCandidates()));
+ }
+ }
+
+ private void writeNewOwner(YangInstanceIdentifier entityPath, String newOwner) {
+ LOG.debug("Writing new owner {} for entity {}", newOwner, entityPath);
+
+ commitCoordinator.commitModification(new WriteModification(entityPath.node(ENTITY_OWNER_QNAME),
+ ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)), this);
+ }
+
+ private String newOwner(Collection<String> candidates) {
+ if(candidates.size() > 0){
+ return candidates.iterator().next();
+ }
+
+ return "";
+ }
+
+ private String getCurrentOwner(YangInstanceIdentifier entityId) {
+ DataTreeSnapshot snapshot = getDataStore().getDataTree().takeSnapshot();
+ Optional<NormalizedNode<?, ?>> optionalEntityOwner = snapshot.readNode(entityId.node(ENTITY_OWNER_QNAME));
+ if(optionalEntityOwner.isPresent()){
+ return optionalEntityOwner.get().getValue().toString();
+ }
+ return null;
+ }
+
public static Props props(final ShardIdentifier name, final Map<String, String> peerAddresses,
final DatastoreContext datastoreContext, final SchemaContext schemaContext, final String localMemberName) {
return Props.create(new Creator(name, peerAddresses, datastoreContext, schemaContext, localMemberName));