4e344cd68825f006b2bd944eb9661fd7affd3f67
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShardCommitCoordinator.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME;
11
12 import akka.actor.ActorRef;
13 import akka.actor.Cancellable;
14 import akka.actor.Status.Failure;
15 import com.google.common.base.Preconditions;
16 import com.google.common.collect.ImmutableList;
17 import java.util.Iterator;
18 import java.util.LinkedList;
19 import java.util.List;
20 import java.util.Queue;
21 import javax.annotation.Nullable;
22 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
23 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
24 import org.opendaylight.controller.cluster.access.concepts.FrontendType;
25 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
26 import org.opendaylight.controller.cluster.access.concepts.MemberName;
27 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
28 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
29 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
30 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
31 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
32 import org.opendaylight.controller.cluster.datastore.modification.Modification;
33 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
34 import org.slf4j.Logger;
35 import scala.concurrent.duration.FiniteDuration;
36
37 /**
38  * Handles commits and retries for the EntityOwnershipShard.
39  *
40  * @author Thomas Pantelis
41  */
42 class EntityOwnershipShardCommitCoordinator {
43     private static final Object COMMIT_RETRY_MESSAGE = new Object() {
44         @Override
45         public String toString() {
46             return "entityCommitRetry";
47         }
48     };
49     private static final FrontendType FRONTEND_TYPE = FrontendType.forName("entity-ownership-internal");
50
51     private final Queue<Modification> pendingModifications = new LinkedList<>();
52     private final LocalHistoryIdentifier historyId;
53     private final Logger log;
54
55     private BatchedModifications inflightCommit;
56     private Cancellable retryCommitSchedule;
57     private long transactionIDCounter = 0;
58
59     EntityOwnershipShardCommitCoordinator(MemberName localMemberName, Logger log) {
60         this.log = Preconditions.checkNotNull(log);
61         historyId = new LocalHistoryIdentifier(
62                 ClientIdentifier.create(FrontendIdentifier.create(localMemberName, FRONTEND_TYPE), 0), 0);
63     }
64
65     boolean handleMessage(Object message, EntityOwnershipShard shard) {
66         boolean handled = true;
67         if (CommitTransactionReply.isSerializedType(message)) {
68             // Successful reply from a local commit.
69             inflightCommitSucceeded(shard);
70         } else if (message instanceof akka.actor.Status.Failure) {
71             // Failure reply from a local commit.
72             inflightCommitFailure(((Failure) message).cause(), shard);
73         } else if (COMMIT_RETRY_MESSAGE.equals(message)) {
74             retryInflightCommit(shard);
75         } else {
76             handled = false;
77         }
78
79         return handled;
80     }
81
82     private void retryInflightCommit(EntityOwnershipShard shard) {
83         // Shouldn't be null happen but verify anyway
84         if (inflightCommit == null) {
85             return;
86         }
87
88         if (shard.hasLeader()) {
89             log.debug("Retrying commit for BatchedModifications {}", inflightCommit.getTransactionId());
90
91             shard.tryCommitModifications(inflightCommit);
92         } else {
93             scheduleInflightCommitRetry(shard);
94         }
95     }
96
97     void inflightCommitFailure(Throwable cause, EntityOwnershipShard shard) {
98         // This should've originated from a failed inflight commit but verify anyway
99         if (inflightCommit == null) {
100             return;
101         }
102
103         log.debug("Inflight BatchedModifications {} commit failed", inflightCommit.getTransactionId(), cause);
104
105         if (!(cause instanceof NoShardLeaderException)) {
106             // If the failure is other than NoShardLeaderException the commit may have been partially
107             // processed so retry with a new transaction ID to be safe.
108             newInflightCommitWithDifferentTransactionID();
109         }
110
111         scheduleInflightCommitRetry(shard);
112     }
113
114     private void scheduleInflightCommitRetry(EntityOwnershipShard shard) {
115         FiniteDuration duration = shard.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval();
116
117         log.debug("Scheduling retry for BatchedModifications commit {} in {}",
118                 inflightCommit.getTransactionId(), duration);
119
120         retryCommitSchedule = shard.getContext().system().scheduler().scheduleOnce(duration, shard.getSelf(),
121                 COMMIT_RETRY_MESSAGE, shard.getContext().dispatcher(), ActorRef.noSender());
122     }
123
124     void inflightCommitSucceeded(EntityOwnershipShard shard) {
125         // Shouldn't be null but verify anyway
126         if (inflightCommit == null) {
127             return;
128         }
129
130         if (retryCommitSchedule != null) {
131             retryCommitSchedule.cancel();
132         }
133
134         log.debug("BatchedModifications commit {} succeeded", inflightCommit.getTransactionId());
135
136         inflightCommit = null;
137         commitNextBatch(shard);
138     }
139
140     void commitNextBatch(EntityOwnershipShard shard) {
141         if (inflightCommit != null || pendingModifications.isEmpty() || !shard.hasLeader()) {
142             return;
143         }
144
145         inflightCommit = newBatchedModifications();
146         Iterator<Modification> iter = pendingModifications.iterator();
147         while (iter.hasNext()) {
148             inflightCommit.addModification(iter.next());
149             iter.remove();
150             if (inflightCommit.getModifications().size()
151                     >= shard.getDatastoreContext().getShardBatchedModificationCount()) {
152                 break;
153             }
154         }
155
156         log.debug("Committing next BatchedModifications {}, size {}", inflightCommit.getTransactionId(),
157                 inflightCommit.getModifications().size());
158
159         shard.tryCommitModifications(inflightCommit);
160     }
161
162     void commitModification(Modification modification, EntityOwnershipShard shard) {
163         commitModifications(ImmutableList.of(modification), shard);
164     }
165
166     void commitModifications(List<Modification> modifications, EntityOwnershipShard shard) {
167         if (modifications.isEmpty()) {
168             return;
169         }
170
171         boolean hasLeader = shard.hasLeader();
172         if (inflightCommit != null || !hasLeader) {
173             if (log.isDebugEnabled()) {
174                 log.debug("{} - adding modifications to pending",
175                         inflightCommit != null ? "A commit is inflight" : "No shard leader");
176             }
177
178             pendingModifications.addAll(modifications);
179         } else {
180             inflightCommit = newBatchedModifications();
181             inflightCommit.addModifications(modifications);
182             shard.tryCommitModifications(inflightCommit);
183         }
184     }
185
186     void onStateChanged(EntityOwnershipShard shard, boolean isLeader) {
187         shard.possiblyRemoveAllInitialCandidates(shard.getLeader());
188
189         possiblyPrunePendingCommits(shard, isLeader);
190
191         if (!isLeader && inflightCommit != null) {
192             // We're no longer the leader but we have an inflight local commit. This likely means we didn't get
193             // consensus for the commit and switched to follower due to another node with a higher term. We
194             // can't be sure if the commit was replicated to any node so we retry it here with a new
195             // transaction ID.
196             if (retryCommitSchedule != null) {
197                 retryCommitSchedule.cancel();
198             }
199
200             newInflightCommitWithDifferentTransactionID();
201             retryInflightCommit(shard);
202         } else {
203             commitNextBatch(shard);
204         }
205     }
206
207     private void possiblyPrunePendingCommits(EntityOwnershipShard shard, boolean isLeader) {
208         // If we were the leader and transitioned to follower, we'll try to forward pending commits to the new leader.
209         // However certain commits, e.g. entity owner changes, should only be committed by a valid leader as the
210         // criteria used to determine the commit may be stale. Since we're no longer a valid leader, we should not
211         // forward such commits thus we prune the pending modifications. We still should forward local candidate change
212         // commits.
213         if (shard.hasLeader() && !isLeader) {
214             // We may have already submitted a transaction for replication and commit. We don't need the base Shard to
215             // forward it since we also have it stored in the inflightCommit and handle retries. So we just clear
216             // pending transactions and drop them.
217             shard.convertPendingTransactionsToMessages();
218
219             // Prune the inflightCommit.
220             if (inflightCommit != null) {
221                 inflightCommit = pruneModifications(inflightCommit);
222             }
223
224             // Prune the subsequent pending modifications.
225             Iterator<Modification> iter = pendingModifications.iterator();
226             while (iter.hasNext()) {
227                 Modification mod = iter.next();
228                 if (!canForwardModificationToNewLeader(mod)) {
229                     iter.remove();
230                 }
231             }
232         }
233     }
234
235     @Nullable
236     private BatchedModifications pruneModifications(BatchedModifications toPrune) {
237         BatchedModifications prunedModifications = new BatchedModifications(toPrune.getTransactionId(),
238                 toPrune.getVersion());
239         prunedModifications.setDoCommitOnReady(toPrune.isDoCommitOnReady());
240         if (toPrune.isReady()) {
241             prunedModifications.setReady(toPrune.getParticipatingShardNames());
242         }
243         prunedModifications.setTotalMessagesSent(toPrune.getTotalMessagesSent());
244         for (Modification mod: toPrune.getModifications()) {
245             if (canForwardModificationToNewLeader(mod)) {
246                 prunedModifications.addModification(mod);
247             }
248         }
249
250         return !prunedModifications.getModifications().isEmpty() ? prunedModifications : null;
251     }
252
253     private boolean canForwardModificationToNewLeader(Modification mod) {
254         // If this is a WRITE of entity owner we don't want to forward it to a new leader since the criteria used
255         // to determine the new owner might be stale.
256         if (mod instanceof WriteModification) {
257             WriteModification writeMod = (WriteModification)mod;
258             boolean canForward = !writeMod.getPath().getLastPathArgument().getNodeType().equals(ENTITY_OWNER_QNAME);
259
260             if (!canForward) {
261                 log.debug("Not forwarding WRITE modification for {} to new leader", writeMod.getPath());
262             }
263
264             return canForward;
265         }
266
267         return true;
268     }
269
270     private void newInflightCommitWithDifferentTransactionID() {
271         BatchedModifications newBatchedModifications = newBatchedModifications();
272         newBatchedModifications.addModifications(inflightCommit.getModifications());
273         inflightCommit = newBatchedModifications;
274     }
275
276     private BatchedModifications newBatchedModifications() {
277         BatchedModifications modifications = new BatchedModifications(
278             new TransactionIdentifier(historyId, ++transactionIDCounter), DataStoreVersions.CURRENT_VERSION);
279         modifications.setDoCommitOnReady(true);
280         modifications.setReady();
281         modifications.setTotalMessagesSent(1);
282         return modifications;
283     }
284 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.