2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.entityownership;
10 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME;
12 import akka.actor.ActorRef;
13 import akka.actor.Cancellable;
14 import akka.actor.Status.Failure;
15 import com.google.common.base.Preconditions;
16 import java.util.Iterator;
17 import java.util.LinkedList;
18 import java.util.Queue;
19 import javax.annotation.Nullable;
20 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
21 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
22 import org.opendaylight.controller.cluster.access.concepts.FrontendType;
23 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
24 import org.opendaylight.controller.cluster.access.concepts.MemberName;
25 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
26 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
27 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
28 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
29 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.modification.Modification;
31 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
32 import org.slf4j.Logger;
33 import scala.concurrent.duration.FiniteDuration;
36 * Handles commits and retries for the EntityOwnershipShard.
38 * @author Thomas Pantelis
40 class EntityOwnershipShardCommitCoordinator {
41 private static final Object COMMIT_RETRY_MESSAGE = new Object() {
43 public String toString() {
44 return "entityCommitRetry";
47 private static final FrontendType FRONTEND_TYPE = FrontendType.forName("entity-ownership-internal");
49 private final Queue<Modification> pendingModifications = new LinkedList<>();
50 private final LocalHistoryIdentifier historyId;
51 private final Logger log;
53 private BatchedModifications inflightCommit;
54 private Cancellable retryCommitSchedule;
55 private long transactionIDCounter = 0;
57 EntityOwnershipShardCommitCoordinator(MemberName localMemberName, Logger log) {
58 this.log = Preconditions.checkNotNull(log);
59 historyId = new LocalHistoryIdentifier(
60 ClientIdentifier.create(FrontendIdentifier.create(localMemberName, FRONTEND_TYPE), 0), 0);
63 boolean handleMessage(Object message, EntityOwnershipShard shard) {
64 boolean handled = true;
65 if (CommitTransactionReply.isSerializedType(message)) {
66 // Successful reply from a local commit.
67 inflightCommitSucceeded(shard);
68 } else if (message instanceof akka.actor.Status.Failure) {
69 // Failure reply from a local commit.
70 inflightCommitFailure(((Failure) message).cause(), shard);
71 } else if (COMMIT_RETRY_MESSAGE.equals(message)) {
72 retryInflightCommit(shard);
80 private void retryInflightCommit(EntityOwnershipShard shard) {
81 // Shouldn't be null happen but verify anyway
82 if (inflightCommit == null) {
86 if (shard.hasLeader()) {
87 log.debug("Retrying commit for BatchedModifications {}", inflightCommit.getTransactionId());
89 shard.tryCommitModifications(inflightCommit);
91 scheduleInflightCommitRetry(shard);
95 void inflightCommitFailure(Throwable cause, EntityOwnershipShard shard) {
96 // This should've originated from a failed inflight commit but verify anyway
97 if (inflightCommit == null) {
101 log.debug("Inflight BatchedModifications {} commit failed", inflightCommit.getTransactionId(), cause);
103 if (!(cause instanceof NoShardLeaderException)) {
104 // If the failure is other than NoShardLeaderException the commit may have been partially
105 // processed so retry with a new transaction ID to be safe.
106 newInflightCommitWithDifferentTransactionID();
109 scheduleInflightCommitRetry(shard);
112 private void scheduleInflightCommitRetry(EntityOwnershipShard shard) {
113 FiniteDuration duration = shard.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval();
115 log.debug("Scheduling retry for BatchedModifications commit {} in {}",
116 inflightCommit.getTransactionId(), duration);
118 retryCommitSchedule = shard.getContext().system().scheduler().scheduleOnce(duration, shard.getSelf(),
119 COMMIT_RETRY_MESSAGE, shard.getContext().dispatcher(), ActorRef.noSender());
122 void inflightCommitSucceeded(EntityOwnershipShard shard) {
123 // Shouldn't be null but verify anyway
124 if (inflightCommit == null) {
128 if (retryCommitSchedule != null) {
129 retryCommitSchedule.cancel();
132 log.debug("BatchedModifications commit {} succeeded", inflightCommit.getTransactionId());
134 inflightCommit = null;
135 commitNextBatch(shard);
138 void commitNextBatch(EntityOwnershipShard shard) {
139 if (inflightCommit != null || pendingModifications.isEmpty() || !shard.hasLeader()) {
143 inflightCommit = newBatchedModifications();
144 Iterator<Modification> iter = pendingModifications.iterator();
145 while (iter.hasNext()) {
146 inflightCommit.addModification(iter.next());
148 if (inflightCommit.getModifications().size()
149 >= shard.getDatastoreContext().getShardBatchedModificationCount()) {
154 log.debug("Committing next BatchedModifications {}, size {}", inflightCommit.getTransactionId(),
155 inflightCommit.getModifications().size());
157 shard.tryCommitModifications(inflightCommit);
160 void commitModification(Modification modification, EntityOwnershipShard shard) {
161 BatchedModifications modifications = newBatchedModifications();
162 modifications.addModification(modification);
163 commitModifications(modifications, shard);
166 void commitModifications(BatchedModifications modifications, EntityOwnershipShard shard) {
167 if (modifications.getModifications().isEmpty()) {
171 boolean hasLeader = shard.hasLeader();
172 if (inflightCommit != null || !hasLeader) {
173 if (log.isDebugEnabled()) {
174 log.debug("{} - adding modifications to pending",
175 inflightCommit != null ? "A commit is inflight" : "No shard leader");
178 pendingModifications.addAll(modifications.getModifications());
180 inflightCommit = modifications;
181 shard.tryCommitModifications(inflightCommit);
185 void onStateChanged(EntityOwnershipShard shard, boolean isLeader) {
186 shard.possiblyRemoveAllInitialCandidates(shard.getLeader());
188 possiblyPrunePendingCommits(shard, isLeader);
190 if (!isLeader && inflightCommit != null) {
191 // We're no longer the leader but we have an inflight local commit. This likely means we didn't get
192 // consensus for the commit and switched to follower due to another node with a higher term. We
193 // can't be sure if the commit was replicated to any node so we retry it here with a new
195 if (retryCommitSchedule != null) {
196 retryCommitSchedule.cancel();
199 newInflightCommitWithDifferentTransactionID();
200 retryInflightCommit(shard);
202 commitNextBatch(shard);
206 private void possiblyPrunePendingCommits(EntityOwnershipShard shard, boolean isLeader) {
207 // If we were the leader and transitioned to follower, we'll try to forward pending commits to the new leader.
208 // However certain commits, e.g. entity owner changes, should only be committed by a valid leader as the
209 // criteria used to determine the commit may be stale. Since we're no longer a valid leader, we should not
210 // forward such commits thus we prune the pending modifications. We still should forward local candidate change
212 if (shard.hasLeader() && !isLeader) {
213 // We may have already submitted a transaction for replication and commit. We don't need the base Shard to
214 // forward it since we also have it stored in the inflightCommit and handle retries. So we just clear
215 // pending transactions and drop them.
216 shard.convertPendingTransactionsToMessages();
218 // Prune the inflightCommit.
219 if (inflightCommit != null) {
220 inflightCommit = pruneModifications(inflightCommit);
223 // Prune the subsequent pending modifications.
224 Iterator<Modification> iter = pendingModifications.iterator();
225 while (iter.hasNext()) {
226 Modification mod = iter.next();
227 if (!canForwardModificationToNewLeader(mod)) {
235 private BatchedModifications pruneModifications(BatchedModifications toPrune) {
236 BatchedModifications prunedModifications = new BatchedModifications(toPrune.getTransactionId(),
237 toPrune.getVersion());
238 prunedModifications.setDoCommitOnReady(toPrune.isDoCommitOnReady());
239 prunedModifications.setReady(toPrune.isReady());
240 prunedModifications.setTotalMessagesSent(toPrune.getTotalMessagesSent());
241 for (Modification mod: toPrune.getModifications()) {
242 if (canForwardModificationToNewLeader(mod)) {
243 prunedModifications.addModification(mod);
247 return !prunedModifications.getModifications().isEmpty() ? prunedModifications : null;
250 private boolean canForwardModificationToNewLeader(Modification mod) {
251 // If this is a WRITE of entity owner we don't want to forward it to a new leader since the criteria used
252 // to determine the new owner might be stale.
253 if (mod instanceof WriteModification) {
254 WriteModification writeMod = (WriteModification)mod;
255 boolean canForward = !writeMod.getPath().getLastPathArgument().getNodeType().equals(ENTITY_OWNER_QNAME);
258 log.debug("Not forwarding WRITE modification for {} to new leader", writeMod.getPath());
267 private void newInflightCommitWithDifferentTransactionID() {
268 BatchedModifications newBatchedModifications = newBatchedModifications();
269 newBatchedModifications.addModifications(inflightCommit.getModifications());
270 inflightCommit = newBatchedModifications;
273 BatchedModifications newBatchedModifications() {
274 BatchedModifications modifications = new BatchedModifications(
275 new TransactionIdentifier(historyId, ++transactionIDCounter), DataStoreVersions.CURRENT_VERSION);
276 modifications.setDoCommitOnReady(true);
277 modifications.setReady(true);
278 modifications.setTotalMessagesSent(1);
279 return modifications;