Simplify code using Java 8 features
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShardCommitCoordinator.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME;
11
12 import akka.actor.ActorRef;
13 import akka.actor.Cancellable;
14 import akka.actor.Status.Failure;
15 import com.google.common.base.Preconditions;
16 import com.google.common.collect.ImmutableList;
17 import java.util.Iterator;
18 import java.util.LinkedList;
19 import java.util.List;
20 import java.util.Queue;
21 import javax.annotation.Nullable;
22 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
23 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
24 import org.opendaylight.controller.cluster.access.concepts.FrontendType;
25 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
26 import org.opendaylight.controller.cluster.access.concepts.MemberName;
27 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
28 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
29 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
30 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
31 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
32 import org.opendaylight.controller.cluster.datastore.modification.Modification;
33 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
34 import org.slf4j.Logger;
35 import scala.concurrent.duration.FiniteDuration;
36
37 /**
38  * Handles commits and retries for the EntityOwnershipShard.
39  *
40  * @author Thomas Pantelis
41  */
42 class EntityOwnershipShardCommitCoordinator {
43     private static final Object COMMIT_RETRY_MESSAGE = new Object() {
44         @Override
45         public String toString() {
46             return "entityCommitRetry";
47         }
48     };
49     private static final FrontendType FRONTEND_TYPE = FrontendType.forName("entity-ownership-internal");
50
51     private final Queue<Modification> pendingModifications = new LinkedList<>();
52     private final LocalHistoryIdentifier historyId;
53     private final Logger log;
54
55     private BatchedModifications inflightCommit;
56     private Cancellable retryCommitSchedule;
57     private long transactionIDCounter = 0;
58
59     EntityOwnershipShardCommitCoordinator(MemberName localMemberName, Logger log) {
60         this.log = Preconditions.checkNotNull(log);
61         historyId = new LocalHistoryIdentifier(
62                 ClientIdentifier.create(FrontendIdentifier.create(localMemberName, FRONTEND_TYPE), 0), 0);
63     }
64
65     boolean handleMessage(Object message, EntityOwnershipShard shard) {
66         boolean handled = true;
67         if (CommitTransactionReply.isSerializedType(message)) {
68             // Successful reply from a local commit.
69             inflightCommitSucceeded(shard);
70         } else if (message instanceof akka.actor.Status.Failure) {
71             // Failure reply from a local commit.
72             inflightCommitFailure(((Failure) message).cause(), shard);
73         } else if (COMMIT_RETRY_MESSAGE.equals(message)) {
74             retryInflightCommit(shard);
75         } else {
76             handled = false;
77         }
78
79         return handled;
80     }
81
82     private void retryInflightCommit(EntityOwnershipShard shard) {
83         // Shouldn't be null happen but verify anyway
84         if (inflightCommit == null) {
85             return;
86         }
87
88         if (shard.hasLeader()) {
89             log.debug("Retrying commit for BatchedModifications {}", inflightCommit.getTransactionId());
90
91             shard.tryCommitModifications(inflightCommit);
92         } else {
93             scheduleInflightCommitRetry(shard);
94         }
95     }
96
97     void inflightCommitFailure(Throwable cause, EntityOwnershipShard shard) {
98         // This should've originated from a failed inflight commit but verify anyway
99         if (inflightCommit == null) {
100             return;
101         }
102
103         log.debug("Inflight BatchedModifications {} commit failed", inflightCommit.getTransactionId(), cause);
104
105         if (!(cause instanceof NoShardLeaderException)) {
106             // If the failure is other than NoShardLeaderException the commit may have been partially
107             // processed so retry with a new transaction ID to be safe.
108             newInflightCommitWithDifferentTransactionID();
109         }
110
111         scheduleInflightCommitRetry(shard);
112     }
113
114     private void scheduleInflightCommitRetry(EntityOwnershipShard shard) {
115         FiniteDuration duration = shard.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval();
116
117         log.debug("Scheduling retry for BatchedModifications commit {} in {}",
118                 inflightCommit.getTransactionId(), duration);
119
120         retryCommitSchedule = shard.getContext().system().scheduler().scheduleOnce(duration, shard.getSelf(),
121                 COMMIT_RETRY_MESSAGE, shard.getContext().dispatcher(), ActorRef.noSender());
122     }
123
124     void inflightCommitSucceeded(EntityOwnershipShard shard) {
125         // Shouldn't be null but verify anyway
126         if (inflightCommit == null) {
127             return;
128         }
129
130         if (retryCommitSchedule != null) {
131             retryCommitSchedule.cancel();
132         }
133
134         log.debug("BatchedModifications commit {} succeeded", inflightCommit.getTransactionId());
135
136         inflightCommit = null;
137         commitNextBatch(shard);
138     }
139
140     void commitNextBatch(EntityOwnershipShard shard) {
141         if (inflightCommit != null || pendingModifications.isEmpty() || !shard.hasLeader()) {
142             return;
143         }
144
145         inflightCommit = newBatchedModifications();
146         Iterator<Modification> iter = pendingModifications.iterator();
147         while (iter.hasNext()) {
148             inflightCommit.addModification(iter.next());
149             iter.remove();
150             if (inflightCommit.getModifications().size()
151                     >= shard.getDatastoreContext().getShardBatchedModificationCount()) {
152                 break;
153             }
154         }
155
156         log.debug("Committing next BatchedModifications {}, size {}", inflightCommit.getTransactionId(),
157                 inflightCommit.getModifications().size());
158
159         shard.tryCommitModifications(inflightCommit);
160     }
161
162     void commitModification(Modification modification, EntityOwnershipShard shard) {
163         commitModifications(ImmutableList.of(modification), shard);
164     }
165
166     void commitModifications(List<Modification> modifications, EntityOwnershipShard shard) {
167         if (modifications.isEmpty()) {
168             return;
169         }
170
171         boolean hasLeader = shard.hasLeader();
172         if (inflightCommit != null || !hasLeader) {
173             if (log.isDebugEnabled()) {
174                 log.debug("{} - adding modifications to pending",
175                         inflightCommit != null ? "A commit is inflight" : "No shard leader");
176             }
177
178             pendingModifications.addAll(modifications);
179         } else {
180             inflightCommit = newBatchedModifications();
181             inflightCommit.addModifications(modifications);
182             shard.tryCommitModifications(inflightCommit);
183         }
184     }
185
186     void onStateChanged(EntityOwnershipShard shard, boolean isLeader) {
187         shard.possiblyRemoveAllInitialCandidates(shard.getLeader());
188
189         possiblyPrunePendingCommits(shard, isLeader);
190
191         if (!isLeader && inflightCommit != null) {
192             // We're no longer the leader but we have an inflight local commit. This likely means we didn't get
193             // consensus for the commit and switched to follower due to another node with a higher term. We
194             // can't be sure if the commit was replicated to any node so we retry it here with a new
195             // transaction ID.
196             if (retryCommitSchedule != null) {
197                 retryCommitSchedule.cancel();
198             }
199
200             newInflightCommitWithDifferentTransactionID();
201             retryInflightCommit(shard);
202         } else {
203             commitNextBatch(shard);
204         }
205     }
206
207     private void possiblyPrunePendingCommits(EntityOwnershipShard shard, boolean isLeader) {
208         // If we were the leader and transitioned to follower, we'll try to forward pending commits to the new leader.
209         // However certain commits, e.g. entity owner changes, should only be committed by a valid leader as the
210         // criteria used to determine the commit may be stale. Since we're no longer a valid leader, we should not
211         // forward such commits thus we prune the pending modifications. We still should forward local candidate change
212         // commits.
213         if (shard.hasLeader() && !isLeader) {
214             // We may have already submitted a transaction for replication and commit. We don't need the base Shard to
215             // forward it since we also have it stored in the inflightCommit and handle retries. So we just clear
216             // pending transactions and drop them.
217             shard.convertPendingTransactionsToMessages();
218
219             // Prune the inflightCommit.
220             if (inflightCommit != null) {
221                 inflightCommit = pruneModifications(inflightCommit);
222             }
223
224             // Prune the subsequent pending modifications.
225             pendingModifications.removeIf(mod -> !canForwardModificationToNewLeader(mod));
226         }
227     }
228
229     @Nullable
230     private BatchedModifications pruneModifications(BatchedModifications toPrune) {
231         BatchedModifications prunedModifications = new BatchedModifications(toPrune.getTransactionId(),
232                 toPrune.getVersion());
233         prunedModifications.setDoCommitOnReady(toPrune.isDoCommitOnReady());
234         if (toPrune.isReady()) {
235             prunedModifications.setReady(toPrune.getParticipatingShardNames());
236         }
237         prunedModifications.setTotalMessagesSent(toPrune.getTotalMessagesSent());
238         for (Modification mod: toPrune.getModifications()) {
239             if (canForwardModificationToNewLeader(mod)) {
240                 prunedModifications.addModification(mod);
241             }
242         }
243
244         return !prunedModifications.getModifications().isEmpty() ? prunedModifications : null;
245     }
246
247     private boolean canForwardModificationToNewLeader(Modification mod) {
248         // If this is a WRITE of entity owner we don't want to forward it to a new leader since the criteria used
249         // to determine the new owner might be stale.
250         if (mod instanceof WriteModification) {
251             WriteModification writeMod = (WriteModification)mod;
252             boolean canForward = !writeMod.getPath().getLastPathArgument().getNodeType().equals(ENTITY_OWNER_QNAME);
253
254             if (!canForward) {
255                 log.debug("Not forwarding WRITE modification for {} to new leader", writeMod.getPath());
256             }
257
258             return canForward;
259         }
260
261         return true;
262     }
263
264     private void newInflightCommitWithDifferentTransactionID() {
265         BatchedModifications newBatchedModifications = newBatchedModifications();
266         newBatchedModifications.addModifications(inflightCommit.getModifications());
267         inflightCommit = newBatchedModifications;
268     }
269
270     private BatchedModifications newBatchedModifications() {
271         BatchedModifications modifications = new BatchedModifications(
272             new TransactionIdentifier(historyId, ++transactionIDCounter), DataStoreVersions.CURRENT_VERSION);
273         modifications.setDoCommitOnReady(true);
274         modifications.setReady();
275         modifications.setTotalMessagesSent(1);
276         return modifications;
277     }
278 }