return commitCoordinator;
}
- protected DatastoreContext getDatastoreContext() {
+ public DatastoreContext getDatastoreContext() {
return datastoreContext;
}
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
+import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Props;
-import akka.dispatch.OnComplete;
-import akka.pattern.AskTimeoutException;
import akka.pattern.Patterns;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
* @author Thomas Pantelis
*/
class EntityOwnershipShard extends Shard {
- private int transactionIDCounter = 0;
private final String localMemberName;
- private final List<BatchedModifications> retryModifications = new ArrayList<>();
+ private final EntityOwnershipShardCommitCoordinator commitCoordinator;
private static DatastoreContext noPersistenceDatastoreContext(DatastoreContext datastoreContext) {
return DatastoreContext.newBuilderFrom(datastoreContext).persistent(false).build();
DatastoreContext datastoreContext, SchemaContext schemaContext, String localMemberName) {
super(name, peerAddresses, noPersistenceDatastoreContext(datastoreContext), schemaContext);
this.localMemberName = localMemberName;
+ this.commitCoordinator = new EntityOwnershipShardCommitCoordinator(localMemberName, LOG);
}
@Override
onRegisterCandidateLocal((RegisterCandidateLocal)message);
} else if(message instanceof UnregisterCandidateLocal) {
onUnregisterCandidateLocal((UnregisterCandidateLocal)message);
- } else {
+ } else if(!commitCoordinator.handleMessage(message, this)) {
super.onReceiveCommand(message);
}
}
// TODO - add the listener locally.
- BatchedModifications modifications = new BatchedModifications(
- TransactionIdentifier.create(localMemberName, ++transactionIDCounter).toString(),
- DataStoreVersions.CURRENT_VERSION, "");
- modifications.setDoCommitOnReady(true);
- modifications.setReady(true);
- modifications.setTotalMessagesSent(1);
-
NormalizedNode<?, ?> entityOwners = entityOwnersWithCandidate(registerCandidate.getEntity().getType(),
registerCandidate.getEntity().getId(), localMemberName);
- modifications.addModification(new MergeModification(ENTITY_OWNERS_PATH, entityOwners));
-
- tryCommitModifications(modifications);
+ commitCoordinator.commitModification(new MergeModification(ENTITY_OWNERS_PATH, entityOwners), this);
getSender().tell(SuccessReply.INSTANCE, getSelf());
}
- private void tryCommitModifications(final BatchedModifications modifications) {
+ void tryCommitModifications(final BatchedModifications modifications) {
if(isLeader()) {
- if(isIsolatedLeader()) {
- LOG.debug("Leader is isolated - adding BatchedModifications {} for retry", modifications.getTransactionID());
-
- retryModifications.add(modifications);
- } else {
- LOG.debug("Committing BatchedModifications {} locally", modifications.getTransactionID());
-
- // Note that it's possible the commit won't get consensus and will timeout and not be applied
- // to the state. However we don't need to retry it in that case b/c it will be committed to
- // the journal first and, once a majority of followers come back on line and it is replicated,
- // it will be applied at that point.
- handleBatchedModificationsLocal(modifications, self());
- }
+ LOG.debug("Committing BatchedModifications {} locally", modifications.getTransactionID());
+
+ // Note that it's possible the commit won't get consensus and will timeout and not be applied
+ // to the state. However we don't need to retry it in that case b/c it will be committed to
+ // the journal first and, once a majority of followers come back on line and it is replicated,
+ // it will be applied at that point.
+ handleBatchedModificationsLocal(modifications, self());
} else {
final ActorSelection leader = getLeader();
if (leader != null) {
Future<Object> future = Patterns.ask(leader, modifications, TimeUnit.SECONDS.toMillis(
getDatastoreContext().getShardTransactionCommitTimeoutInSeconds()));
- future.onComplete(new OnComplete<Object>() {
- @Override
- public void onComplete(Throwable failure, Object response) {
- if(failure != null) {
- if(failure instanceof AskTimeoutException) {
- LOG.debug("BatchedModifications {} to leader {} timed out - retrying",
- modifications.getTransactionID(), leader);
- tryCommitModifications(modifications);
- } else {
- LOG.error("BatchedModifications {} to leader {} failed",
- modifications.getTransactionID(), leader, failure);
- }
- } else {
- LOG.debug("BatchedModifications {} to leader {} succeeded",
- modifications.getTransactionID(), leader);
- }
- }
- }, getContext().dispatcher());
- } else {
- LOG.debug("No leader - adding BatchedModifications {} for retry", modifications.getTransactionID());
-
- retryModifications.add(modifications);
+
+ Patterns.pipe(future, getContext().dispatcher()).pipeTo(getSelf(), ActorRef.noSender());
}
}
}
+ boolean hasLeader() {
+ return getLeader() != null && !isIsolatedLeader();
+ }
+
@Override
protected void onStateChanged() {
super.onStateChanged();
- if(!retryModifications.isEmpty() && getLeader() != null && !isIsolatedLeader()) {
- LOG.debug("# BatchedModifications to retry {}", retryModifications.size());
-
- List<BatchedModifications> retryModificationsCopy = new ArrayList<>(retryModifications);
- retryModifications.clear();
- for(BatchedModifications mods: retryModificationsCopy) {
- tryCommitModifications(mods);
- }
- }
+ commitCoordinator.onStateChanged(this, isLeader());
}
private void onUnregisterCandidateLocal(UnregisterCandidateLocal unregisterCandidate) {
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.entityownership;
+
+import akka.actor.ActorRef;
+import akka.actor.Cancellable;
+import akka.actor.Status.Failure;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.slf4j.Logger;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Handles commits and retries for the EntityOwnershipShard.
+ *
+ * @author Thomas Pantelis
+ */
+class EntityOwnershipShardCommitCoordinator {
+ private static final Object COMMIT_RETRY_MESSAGE = "entityCommitRetry";
+
+ private final Logger log;
+ private int transactionIDCounter = 0;
+ private final String localMemberName;
+ private final Queue<Modification> pendingModifications = new LinkedList<>();
+ private BatchedModifications inflightCommit;
+ private Cancellable retryCommitSchedule;
+
+ EntityOwnershipShardCommitCoordinator(String localMemberName, Logger log) {
+ this.localMemberName = localMemberName;
+ this.log = log;
+ }
+
+ boolean handleMessage(Object message, EntityOwnershipShard shard) {
+ boolean handled = true;
+ if(CommitTransactionReply.SERIALIZABLE_CLASS.isInstance(message)) {
+ // Successful reply from a local commit.
+ inflightCommitSucceeded(shard);
+ } else if(message instanceof akka.actor.Status.Failure) {
+ // Failure reply from a local commit.
+ inflightCommitFailure(((Failure)message).cause(), shard);
+ } else if(message.equals(COMMIT_RETRY_MESSAGE)) {
+ retryInflightCommit(shard);
+ } else {
+ handled = false;
+ }
+
+ return handled;
+ }
+
+ private void retryInflightCommit(EntityOwnershipShard shard) {
+ // Shouldn't be null happen but verify anyway
+ if(inflightCommit == null) {
+ return;
+ }
+
+ if(shard.hasLeader()) {
+ log.debug("Retrying commit for BatchedModifications {}", inflightCommit.getTransactionID());
+
+ shard.tryCommitModifications(inflightCommit);
+ } else {
+ scheduleInflightCommitRetry(shard);
+ }
+ }
+
+ void inflightCommitFailure(Throwable cause, EntityOwnershipShard shard) {
+ // This should've originated from a failed inflight commit but verify anyway
+ if(inflightCommit == null) {
+ return;
+ }
+
+ log.debug("Inflight BatchedModifications {} commit failed", inflightCommit.getTransactionID(), cause);
+
+ if(!(cause instanceof NoShardLeaderException)) {
+ // If the failure is other than NoShardLeaderException the commit may have been partially
+ // processed so retry with a new transaction ID to be safe.
+ newInflightCommitWithDifferentTransactionID();
+ }
+
+ scheduleInflightCommitRetry(shard);
+ }
+
+ private void scheduleInflightCommitRetry(EntityOwnershipShard shard) {
+ FiniteDuration duration = shard.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval();
+
+ log.debug("Scheduling retry for BatchedModifications commit {} in {}",
+ inflightCommit.getTransactionID(), duration);
+
+ retryCommitSchedule = shard.getContext().system().scheduler().scheduleOnce(duration, shard.getSelf(),
+ COMMIT_RETRY_MESSAGE, shard.getContext().dispatcher(), ActorRef.noSender());
+ }
+
+ void inflightCommitSucceeded(EntityOwnershipShard shard) {
+ // Shouldn't be null but verify anyway
+ if(inflightCommit == null) {
+ return;
+ }
+
+ if(retryCommitSchedule != null) {
+ retryCommitSchedule.cancel();
+ }
+
+ log.debug("BatchedModifications commit {} succeeded", inflightCommit.getTransactionID());
+
+ inflightCommit = null;
+ commitNextBatch(shard);
+ }
+
+ void commitNextBatch(EntityOwnershipShard shard) {
+ if(inflightCommit != null || pendingModifications.isEmpty() || !shard.hasLeader()) {
+ return;
+ }
+
+ inflightCommit = newBatchedModifications();
+ Iterator<Modification> iter = pendingModifications.iterator();
+ while(iter.hasNext()) {
+ inflightCommit.addModification(iter.next());
+ iter.remove();
+ if(inflightCommit.getModifications().size() >=
+ shard.getDatastoreContext().getShardBatchedModificationCount()) {
+ break;
+ }
+ }
+
+ log.debug("Committing next BatchedModifications {}, size {}", inflightCommit.getTransactionID(),
+ inflightCommit.getModifications().size());
+
+ shard.tryCommitModifications(inflightCommit);
+ }
+
+ void commitModification(Modification modification, EntityOwnershipShard shard) {
+ boolean hasLeader = shard.hasLeader();
+ if(inflightCommit != null || !hasLeader) {
+ if(log.isDebugEnabled()) {
+ log.debug("{} - adding modification to pending",
+ (inflightCommit != null ? "A commit is inflight" : "No shard leader"));
+ }
+
+ pendingModifications.add(modification);
+ } else {
+ inflightCommit = newBatchedModifications();
+ inflightCommit.addModification(modification);
+
+ shard.tryCommitModifications(inflightCommit);
+ }
+ }
+
+ void onStateChanged(EntityOwnershipShard shard, boolean isLeader) {
+ if(!isLeader && inflightCommit != null) {
+ // We're no longer the leader but we have an inflight local commit. This likely means we didn't get
+ // consensus for the commit and switched to follower due to another node with a higher term. We
+ // can't be sure if the commit was replicated to any node so we retry it here with a new
+ // transaction ID.
+ if(retryCommitSchedule != null) {
+ retryCommitSchedule.cancel();
+ }
+
+ newInflightCommitWithDifferentTransactionID();
+ retryInflightCommit(shard);
+ } else {
+ commitNextBatch(shard);
+ }
+ }
+
+ private void newInflightCommitWithDifferentTransactionID() {
+ BatchedModifications newBatchedModifications = newBatchedModifications();
+ newBatchedModifications.getModifications().addAll(inflightCommit.getModifications());
+ inflightCommit = newBatchedModifications;
+ }
+
+ private BatchedModifications newBatchedModifications() {
+ BatchedModifications modifications = new BatchedModifications(
+ TransactionIdentifier.create(localMemberName, ++transactionIDCounter).toString(),
+ DataStoreVersions.CURRENT_VERSION, "");
+ modifications.setDoCommitOnReady(true);
+ modifications.setReady(true);
+ modifications.setTotalMessagesSent(1);
+ return modifications;
+ }
+}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
- builder.append("RegisterCandidateLocal [candidate=").append(candidate.getClass()).append(", entity=")
- .append(entity).append("]");
+ builder.append("RegisterCandidateLocal [entity=").append(entity).append(", candidate=").append(candidate)
+ .append("]");
return builder.toString();
}
}
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_QNAME;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_QNAME;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_QNAME;
MapNode entityTypeMapNode = (MapNode) childNode.get();
Optional<MapEntryNode> entityTypeEntry = entityTypeMapNode.getChild(new NodeIdentifierWithPredicates(
childMap, child, key));
- assertEquals("Missing " + childMap.toString() + " entry for " + key, true, entityTypeEntry.isPresent());
+ if(!entityTypeEntry.isPresent()) {
+ fail("Missing " + childMap.toString() + " entry for " + key + ". Actual: " + entityTypeMapNode.getValue());
+ }
return entityTypeEntry.get();
}
}
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.TestActorFactory;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
ShardTestKit kit = new ShardTestKit(getSystem());
- dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(100);
+ dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(100).
+ shardBatchedModificationCount(5);
String peerId = actorFactory.generateActorId("leader");
TestActorRef<MockLeader> peer = actorFactory.createTestActor(Props.create(MockLeader.class).
MockLeader leader = peer.underlyingActor();
assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
leader.modificationsReceived, 5, TimeUnit.SECONDS));
- verifyBatchedEntityCandidate(leader.receivedModifications, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+ verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID1,
+ LOCAL_MEMBER_NAME);
+
+ shard.tell(dataStoreContextBuilder.shardElectionTimeoutFactor(2).build(), ActorRef.noSender());
- leader.modificationsReceived = new CountDownLatch(2);
+ // Test with initial commit timeout and subsequent retry.
+
+ leader.modificationsReceived = new CountDownLatch(1);
leader.sendReply = false;
shard.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
leader.modificationsReceived, 5, TimeUnit.SECONDS));
- verifyBatchedEntityCandidate(leader.receivedModifications, ENTITY_TYPE, ENTITY_ID2, LOCAL_MEMBER_NAME);
+ verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID2,
+ LOCAL_MEMBER_NAME);
+
+ // Send a bunch of registration messages quickly and verify.
+
+ int max = 100;
+ leader.delay = 4;
+ leader.modificationsReceived = new CountDownLatch(max);
+ List<YangInstanceIdentifier> entityIds = new ArrayList<>();
+ for(int i = 1; i <= max; i++) {
+ YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i));
+ entityIds.add(id);
+ shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, id)), kit.getRef());
+ }
+
+ assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
+ leader.modificationsReceived, 10, TimeUnit.SECONDS));
+
+ // Sleep a little to ensure no additional BatchedModifications are received.
+
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+
+ List<Modification> receivedMods = leader.getAndClearReceivedModifications();
+ for(int i = 0; i < max; i++) {
+ verifyBatchedEntityCandidate(receivedMods.get(i), ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME);
+ }
+
+ assertEquals("# modifications received", max, receivedMods.size());
}
private void verifyCommittedEntityCandidate(TestActorRef<EntityOwnershipShard> shard, String entityType,
verifyEntityCandidate(readEntityOwners(shard), entityType, entityId, candidateName);
}
- private void verifyBatchedEntityCandidate(BatchedModifications mods, String entityType,
+ private void verifyBatchedEntityCandidate(List<Modification> mods, String entityType,
YangInstanceIdentifier entityId, String candidateName) throws Exception {
- assertEquals("BatchedModifications size", 1, mods.getModifications().size());
- assertEquals("Modification type", MergeModification.class, mods.getModifications().get(0).getClass());
- verifyEntityCandidate(((MergeModification)mods.getModifications().get(0)).getData(), entityType,
+ assertEquals("BatchedModifications size", 1, mods.size());
+ verifyBatchedEntityCandidate(mods.get(0), entityType, entityId, candidateName);
+ }
+
+ private void verifyBatchedEntityCandidate(Modification mod, String entityType,
+ YangInstanceIdentifier entityId, String candidateName) throws Exception {
+ assertEquals("Modification type", MergeModification.class, mod.getClass());
+ verifyEntityCandidate(((MergeModification)mod).getData(), entityType,
entityId, candidateName);
}
public static class MockLeader extends UntypedActor {
volatile CountDownLatch modificationsReceived = new CountDownLatch(1);
- volatile BatchedModifications receivedModifications;
+ List<Modification> receivedModifications = new ArrayList<>();
volatile boolean sendReply = true;
+ volatile long delay;
@Override
public void onReceive(Object message) {
if(message instanceof BatchedModifications) {
- receivedModifications = (BatchedModifications) message;
- modificationsReceived.countDown();
+ if(delay > 0) {
+ Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
+ }
+
if(sendReply) {
+ BatchedModifications mods = (BatchedModifications) message;
+ synchronized (receivedModifications) {
+ for(int i = 0; i < mods.getModifications().size(); i++) {
+ receivedModifications.add(mods.getModifications().get(i));
+ modificationsReceived.countDown();
+ }
+ }
+
getSender().tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
} else {
sendReply = true;
}
}
}
+
+ List<Modification> getAndClearReceivedModifications() {
+ synchronized (receivedModifications) {
+ List<Modification> ret = new ArrayList<>(receivedModifications);
+ receivedModifications.clear();
+ return ret;
+ }
+ }
}
}