2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.entityownership;
10 import static java.util.Objects.requireNonNull;
11 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME;
13 import akka.actor.ActorRef;
14 import akka.actor.Cancellable;
15 import akka.actor.Status.Failure;
16 import com.google.common.collect.ImmutableList;
17 import java.util.Iterator;
18 import java.util.LinkedList;
19 import java.util.List;
20 import java.util.Queue;
21 import org.eclipse.jdt.annotation.Nullable;
22 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
23 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
24 import org.opendaylight.controller.cluster.access.concepts.FrontendType;
25 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
26 import org.opendaylight.controller.cluster.access.concepts.MemberName;
27 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
28 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
29 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
30 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
31 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
32 import org.opendaylight.controller.cluster.datastore.modification.Modification;
33 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
34 import org.slf4j.Logger;
35 import scala.concurrent.duration.FiniteDuration;
38 * Handles commits and retries for the EntityOwnershipShard.
40 * @author Thomas Pantelis
42 class EntityOwnershipShardCommitCoordinator {
43 private static final Object COMMIT_RETRY_MESSAGE = new Object() {
45 public String toString() {
46 return "entityCommitRetry";
49 private static final FrontendType FRONTEND_TYPE = FrontendType.forName("entity-ownership-internal");
51 private final Queue<Modification> pendingModifications = new LinkedList<>();
52 private final LocalHistoryIdentifier historyId;
53 private final Logger log;
55 private BatchedModifications inflightCommit;
56 private Cancellable retryCommitSchedule;
57 private long transactionIDCounter = 0;
59 EntityOwnershipShardCommitCoordinator(final MemberName localMemberName, final Logger log) {
60 this.log = requireNonNull(log);
61 historyId = new LocalHistoryIdentifier(
62 ClientIdentifier.create(FrontendIdentifier.create(localMemberName, FRONTEND_TYPE), 0), 0);
65 boolean handleMessage(final Object message, final EntityOwnershipShard shard) {
66 boolean handled = true;
67 if (CommitTransactionReply.isSerializedType(message)) {
68 // Successful reply from a local commit.
69 inflightCommitSucceeded(shard);
70 } else if (message instanceof akka.actor.Status.Failure) {
71 // Failure reply from a local commit.
72 inflightCommitFailure(((Failure) message).cause(), shard);
73 } else if (COMMIT_RETRY_MESSAGE.equals(message)) {
74 retryInflightCommit(shard);
82 private void retryInflightCommit(final EntityOwnershipShard shard) {
83 // Shouldn't be null happen but verify anyway
84 if (inflightCommit == null) {
88 if (shard.hasLeader()) {
89 log.debug("Retrying commit for BatchedModifications {}", inflightCommit.getTransactionId());
91 shard.tryCommitModifications(inflightCommit);
93 scheduleInflightCommitRetry(shard);
97 void inflightCommitFailure(final Throwable cause, final EntityOwnershipShard shard) {
98 // This should've originated from a failed inflight commit but verify anyway
99 if (inflightCommit == null) {
103 log.debug("Inflight BatchedModifications {} commit failed", inflightCommit.getTransactionId(), cause);
105 if (!(cause instanceof NoShardLeaderException)) {
106 // If the failure is other than NoShardLeaderException the commit may have been partially
107 // processed so retry with a new transaction ID to be safe.
108 newInflightCommitWithDifferentTransactionID();
111 scheduleInflightCommitRetry(shard);
114 private void scheduleInflightCommitRetry(final EntityOwnershipShard shard) {
115 FiniteDuration duration = shard.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval();
117 log.debug("Scheduling retry for BatchedModifications commit {} in {}",
118 inflightCommit.getTransactionId(), duration);
120 retryCommitSchedule = shard.getContext().system().scheduler().scheduleOnce(duration, shard.getSelf(),
121 COMMIT_RETRY_MESSAGE, shard.getContext().dispatcher(), ActorRef.noSender());
124 void inflightCommitSucceeded(final EntityOwnershipShard shard) {
125 // Shouldn't be null but verify anyway
126 if (inflightCommit == null) {
130 if (retryCommitSchedule != null) {
131 retryCommitSchedule.cancel();
134 log.debug("BatchedModifications commit {} succeeded", inflightCommit.getTransactionId());
136 inflightCommit = null;
137 commitNextBatch(shard);
140 void commitNextBatch(final EntityOwnershipShard shard) {
141 if (inflightCommit != null || pendingModifications.isEmpty() || !shard.hasLeader()) {
145 inflightCommit = newBatchedModifications();
146 Iterator<Modification> iter = pendingModifications.iterator();
147 while (iter.hasNext()) {
148 inflightCommit.addModification(iter.next());
150 if (inflightCommit.getModifications().size()
151 >= shard.getDatastoreContext().getShardBatchedModificationCount()) {
156 log.debug("Committing next BatchedModifications {}, size {}", inflightCommit.getTransactionId(),
157 inflightCommit.getModifications().size());
159 shard.tryCommitModifications(inflightCommit);
162 void commitModification(final Modification modification, final EntityOwnershipShard shard) {
163 commitModifications(ImmutableList.of(modification), shard);
166 void commitModifications(final List<Modification> modifications, final EntityOwnershipShard shard) {
167 if (modifications.isEmpty()) {
171 boolean hasLeader = shard.hasLeader();
172 if (inflightCommit != null || !hasLeader) {
173 if (log.isDebugEnabled()) {
174 log.debug("{} - adding modifications to pending",
175 inflightCommit != null ? "A commit is inflight" : "No shard leader");
178 pendingModifications.addAll(modifications);
180 inflightCommit = newBatchedModifications();
181 inflightCommit.addModifications(modifications);
182 shard.tryCommitModifications(inflightCommit);
186 void onStateChanged(final EntityOwnershipShard shard, final boolean isLeader) {
187 shard.possiblyRemoveAllInitialCandidates(shard.getLeader());
189 possiblyPrunePendingCommits(shard, isLeader);
191 if (!isLeader && inflightCommit != null) {
192 // We're no longer the leader but we have an inflight local commit. This likely means we didn't get
193 // consensus for the commit and switched to follower due to another node with a higher term. We
194 // can't be sure if the commit was replicated to any node so we retry it here with a new
196 if (retryCommitSchedule != null) {
197 retryCommitSchedule.cancel();
200 newInflightCommitWithDifferentTransactionID();
201 retryInflightCommit(shard);
203 commitNextBatch(shard);
207 private void possiblyPrunePendingCommits(final EntityOwnershipShard shard, final boolean isLeader) {
208 // If we were the leader and transitioned to follower, we'll try to forward pending commits to the new leader.
209 // However certain commits, e.g. entity owner changes, should only be committed by a valid leader as the
210 // criteria used to determine the commit may be stale. Since we're no longer a valid leader, we should not
211 // forward such commits thus we prune the pending modifications. We still should forward local candidate change
213 if (shard.hasLeader() && !isLeader) {
214 // We may have already submitted a transaction for replication and commit. We don't need the base Shard to
215 // forward it since we also have it stored in the inflightCommit and handle retries. So we just clear
216 // pending transactions and drop them.
217 shard.convertPendingTransactionsToMessages();
219 // Prune the inflightCommit.
220 if (inflightCommit != null) {
221 inflightCommit = pruneModifications(inflightCommit);
224 // Prune the subsequent pending modifications.
225 pendingModifications.removeIf(mod -> !canForwardModificationToNewLeader(mod));
229 private @Nullable BatchedModifications pruneModifications(final BatchedModifications toPrune) {
230 BatchedModifications prunedModifications = new BatchedModifications(toPrune.getTransactionId(),
231 toPrune.getVersion());
232 prunedModifications.setDoCommitOnReady(toPrune.isDoCommitOnReady());
233 if (toPrune.isReady()) {
234 prunedModifications.setReady(toPrune.getParticipatingShardNames());
236 prunedModifications.setTotalMessagesSent(toPrune.getTotalMessagesSent());
237 for (Modification mod: toPrune.getModifications()) {
238 if (canForwardModificationToNewLeader(mod)) {
239 prunedModifications.addModification(mod);
243 return !prunedModifications.getModifications().isEmpty() ? prunedModifications : null;
246 private boolean canForwardModificationToNewLeader(final Modification mod) {
247 // If this is a WRITE of entity owner we don't want to forward it to a new leader since the criteria used
248 // to determine the new owner might be stale.
249 if (mod instanceof WriteModification) {
250 WriteModification writeMod = (WriteModification)mod;
251 boolean canForward = !writeMod.getPath().getLastPathArgument().getNodeType().equals(ENTITY_OWNER_QNAME);
254 log.debug("Not forwarding WRITE modification for {} to new leader", writeMod.getPath());
263 private void newInflightCommitWithDifferentTransactionID() {
264 BatchedModifications newBatchedModifications = newBatchedModifications();
265 newBatchedModifications.addModifications(inflightCommit.getModifications());
266 inflightCommit = newBatchedModifications;
269 private BatchedModifications newBatchedModifications() {
270 BatchedModifications modifications = new BatchedModifications(
271 new TransactionIdentifier(historyId, ++transactionIDCounter), DataStoreVersions.CURRENT_VERSION);
272 modifications.setDoCommitOnReady(true);
273 modifications.setReady();
274 modifications.setTotalMessagesSent(1);
275 return modifications;