X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fentityownership%2FEntityOwnershipShard.java;h=629f9382a1b754c0088e5f7cab8a2474851ac9d9;hb=refs%2Fchanges%2F98%2F26798%2F1;hp=2fabf0a878ecf38783eea74eb39332d8ad27e712;hpb=0b9b1dcba996fd76e0e1bde731692570747f5efd;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java index 2fabf0a878..629f9382a1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java @@ -7,27 +7,54 @@ */ package org.opendaylight.controller.cluster.datastore.entityownership; +import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH; +import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_NODE_ID; +import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME; +import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate; +import akka.actor.ActorRef; +import akka.actor.ActorSelection; import akka.actor.Props; +import akka.pattern.Patterns; +import com.google.common.base.Optional; +import java.util.Collection; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.Shard; +import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded; +import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateRemoved; +import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal; +import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; +import org.opendaylight.controller.cluster.datastore.messages.SuccessReply; +import org.opendaylight.controller.cluster.datastore.modification.MergeModification; +import org.opendaylight.controller.cluster.datastore.modification.WriteModification; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import scala.concurrent.Future; /** * Special Shard for EntityOwnership. * * @author Thomas Pantelis */ -public class EntityOwnershipShard extends Shard { +class EntityOwnershipShard extends Shard { + private final String localMemberName; + private final EntityOwnershipShardCommitCoordinator commitCoordinator; private static DatastoreContext noPersistenceDatastoreContext(DatastoreContext datastoreContext) { return DatastoreContext.newBuilderFrom(datastoreContext).persistent(false).build(); } protected EntityOwnershipShard(ShardIdentifier name, Map peerAddresses, - DatastoreContext datastoreContext, SchemaContext schemaContext) { + DatastoreContext datastoreContext, SchemaContext schemaContext, String localMemberName) { super(name, peerAddresses, noPersistenceDatastoreContext(datastoreContext), schemaContext); + this.localMemberName = localMemberName; + this.commitCoordinator = new EntityOwnershipShardCommitCoordinator(localMemberName, LOG); } @Override @@ -35,22 +62,148 @@ public class EntityOwnershipShard extends Shard { super.onDatastoreContext(noPersistenceDatastoreContext(context)); } + @Override + protected void onRecoveryComplete() { + super.onRecoveryComplete(); + + new CandidateListChangeListener(getSelf(), getDataStore()); + } + + @Override + public void onReceiveCommand(final Object message) throws Exception { + if(message instanceof RegisterCandidateLocal) { + onRegisterCandidateLocal((RegisterCandidateLocal)message); + } else if(message instanceof UnregisterCandidateLocal) { + onUnregisterCandidateLocal((UnregisterCandidateLocal)message); + } else if(message instanceof CandidateAdded){ + onCandidateAdded((CandidateAdded) message); + } else if(message instanceof CandidateRemoved){ + onCandidateRemoved((CandidateRemoved) message); + } else if(!commitCoordinator.handleMessage(message, this)) { + super.onReceiveCommand(message); + } + } + + private void onRegisterCandidateLocal(RegisterCandidateLocal registerCandidate) { + LOG.debug("onRegisterCandidateLocal: {}", registerCandidate); + + // TODO - add the listener locally. + + NormalizedNode entityOwners = entityOwnersWithCandidate(registerCandidate.getEntity().getType(), + registerCandidate.getEntity().getId(), localMemberName); + commitCoordinator.commitModification(new MergeModification(ENTITY_OWNERS_PATH, entityOwners), this); + + getSender().tell(SuccessReply.INSTANCE, getSelf()); + } + + void tryCommitModifications(final BatchedModifications modifications) { + if(isLeader()) { + LOG.debug("Committing BatchedModifications {} locally", modifications.getTransactionID()); + + // Note that it's possible the commit won't get consensus and will timeout and not be applied + // to the state. However we don't need to retry it in that case b/c it will be committed to + // the journal first and, once a majority of followers come back on line and it is replicated, + // it will be applied at that point. + handleBatchedModificationsLocal(modifications, self()); + } else { + final ActorSelection leader = getLeader(); + if (leader != null) { + LOG.debug("Sending BatchedModifications {} to leader {}", modifications.getTransactionID(), leader); + + Future future = Patterns.ask(leader, modifications, TimeUnit.SECONDS.toMillis( + getDatastoreContext().getShardTransactionCommitTimeoutInSeconds())); + + Patterns.pipe(future, getContext().dispatcher()).pipeTo(getSelf(), ActorRef.noSender()); + } + } + } + + boolean hasLeader() { + return getLeader() != null && !isIsolatedLeader(); + } + + @Override + protected void onStateChanged() { + super.onStateChanged(); + + commitCoordinator.onStateChanged(this, isLeader()); + } + + private void onUnregisterCandidateLocal(UnregisterCandidateLocal unregisterCandidate) { + // TODO - implement + getSender().tell(SuccessReply.INSTANCE, getSelf()); + } + + private void onCandidateRemoved(CandidateRemoved message) { + if(!isLeader()){ + return; + } + + LOG.debug("onCandidateRemoved: {}", message); + + String currentOwner = getCurrentOwner(message.getEntityPath()); + if(message.getRemovedCandidate().equals(currentOwner)){ + writeNewOwner(message.getEntityPath(), newOwner(message.getRemainingCandidates())); + } + } + + private void onCandidateAdded(CandidateAdded message) { + if(!isLeader()){ + return; + } + + LOG.debug("onCandidateAdded: {}", message); + + String currentOwner = getCurrentOwner(message.getEntityPath()); + if(currentOwner == null){ + writeNewOwner(message.getEntityPath(), newOwner(message.getAllCandidates())); + } + } + + private void writeNewOwner(YangInstanceIdentifier entityPath, String newOwner) { + LOG.debug("Writing new owner {} for entity {}", newOwner, entityPath); + + commitCoordinator.commitModification(new WriteModification(entityPath.node(ENTITY_OWNER_QNAME), + ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)), this); + } + + private String newOwner(Collection candidates) { + if(candidates.size() > 0){ + return candidates.iterator().next(); + } + + return ""; + } + + private String getCurrentOwner(YangInstanceIdentifier entityId) { + DataTreeSnapshot snapshot = getDataStore().getDataTree().takeSnapshot(); + Optional> optionalEntityOwner = snapshot.readNode(entityId.node(ENTITY_OWNER_QNAME)); + if(optionalEntityOwner.isPresent()){ + return optionalEntityOwner.get().getValue().toString(); + } + return null; + } + public static Props props(final ShardIdentifier name, final Map peerAddresses, - final DatastoreContext datastoreContext, final SchemaContext schemaContext) { - return Props.create(new Creator(name, peerAddresses, datastoreContext, schemaContext)); + final DatastoreContext datastoreContext, final SchemaContext schemaContext, final String localMemberName) { + return Props.create(new Creator(name, peerAddresses, datastoreContext, schemaContext, localMemberName)); } private static class Creator extends AbstractShardCreator { private static final long serialVersionUID = 1L; + private final String localMemberName; + Creator(final ShardIdentifier name, final Map peerAddresses, - final DatastoreContext datastoreContext, final SchemaContext schemaContext) { + final DatastoreContext datastoreContext, final SchemaContext schemaContext, + final String localMemberName) { super(name, peerAddresses, datastoreContext, schemaContext); + this.localMemberName = localMemberName; } @Override public Shard create() throws Exception { - return new EntityOwnershipShard(name, peerAddresses, datastoreContext, schemaContext); + return new EntityOwnershipShard(name, peerAddresses, datastoreContext, schemaContext, localMemberName); } } }