2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.entityownership;
10 import akka.actor.ActorRef;
11 import akka.actor.Cancellable;
12 import akka.actor.Status.Failure;
13 import com.google.common.base.Preconditions;
14 import java.util.Iterator;
15 import java.util.LinkedList;
16 import java.util.Queue;
17 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
18 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
19 import org.opendaylight.controller.cluster.access.concepts.FrontendType;
20 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
21 import org.opendaylight.controller.cluster.access.concepts.MemberName;
22 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
23 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
24 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
25 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
26 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.modification.Modification;
28 import org.slf4j.Logger;
29 import scala.concurrent.duration.FiniteDuration;
32 * Handles commits and retries for the EntityOwnershipShard.
34 * @author Thomas Pantelis
36 class EntityOwnershipShardCommitCoordinator {
37 private static final Object COMMIT_RETRY_MESSAGE = new Object() {
39 public String toString() {
40 return "entityCommitRetry";
43 private static final FrontendType FRONTEND_TYPE = FrontendType.forName("entity-ownership-internal");
45 private final Queue<Modification> pendingModifications = new LinkedList<>();
46 private final LocalHistoryIdentifier historyId;
47 private final Logger log;
49 private BatchedModifications inflightCommit;
50 private Cancellable retryCommitSchedule;
51 private long transactionIDCounter = 0;
53 EntityOwnershipShardCommitCoordinator(MemberName localMemberName, Logger log) {
54 this.log = Preconditions.checkNotNull(log);
55 historyId = new LocalHistoryIdentifier(
56 ClientIdentifier.create(FrontendIdentifier.create(localMemberName, FRONTEND_TYPE), 0), 0);
59 boolean handleMessage(Object message, EntityOwnershipShard shard) {
60 boolean handled = true;
61 if(CommitTransactionReply.isSerializedType(message)) {
62 // Successful reply from a local commit.
63 inflightCommitSucceeded(shard);
64 } else if(message instanceof akka.actor.Status.Failure) {
65 // Failure reply from a local commit.
66 inflightCommitFailure(((Failure)message).cause(), shard);
67 } else if(COMMIT_RETRY_MESSAGE.equals(message)) {
68 retryInflightCommit(shard);
76 private void retryInflightCommit(EntityOwnershipShard shard) {
77 // Shouldn't be null happen but verify anyway
78 if(inflightCommit == null) {
82 if(shard.hasLeader()) {
83 log.debug("Retrying commit for BatchedModifications {}", inflightCommit.getTransactionID());
85 shard.tryCommitModifications(inflightCommit);
87 scheduleInflightCommitRetry(shard);
91 void inflightCommitFailure(Throwable cause, EntityOwnershipShard shard) {
92 // This should've originated from a failed inflight commit but verify anyway
93 if(inflightCommit == null) {
97 log.debug("Inflight BatchedModifications {} commit failed", inflightCommit.getTransactionID(), cause);
99 if(!(cause instanceof NoShardLeaderException)) {
100 // If the failure is other than NoShardLeaderException the commit may have been partially
101 // processed so retry with a new transaction ID to be safe.
102 newInflightCommitWithDifferentTransactionID();
105 scheduleInflightCommitRetry(shard);
108 private void scheduleInflightCommitRetry(EntityOwnershipShard shard) {
109 FiniteDuration duration = shard.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval();
111 log.debug("Scheduling retry for BatchedModifications commit {} in {}",
112 inflightCommit.getTransactionID(), duration);
114 retryCommitSchedule = shard.getContext().system().scheduler().scheduleOnce(duration, shard.getSelf(),
115 COMMIT_RETRY_MESSAGE, shard.getContext().dispatcher(), ActorRef.noSender());
118 void inflightCommitSucceeded(EntityOwnershipShard shard) {
119 // Shouldn't be null but verify anyway
120 if(inflightCommit == null) {
124 if(retryCommitSchedule != null) {
125 retryCommitSchedule.cancel();
128 log.debug("BatchedModifications commit {} succeeded", inflightCommit.getTransactionID());
130 inflightCommit = null;
131 commitNextBatch(shard);
134 void commitNextBatch(EntityOwnershipShard shard) {
135 if(inflightCommit != null || pendingModifications.isEmpty() || !shard.hasLeader()) {
139 inflightCommit = newBatchedModifications();
140 Iterator<Modification> iter = pendingModifications.iterator();
141 while(iter.hasNext()) {
142 inflightCommit.addModification(iter.next());
144 if(inflightCommit.getModifications().size() >=
145 shard.getDatastoreContext().getShardBatchedModificationCount()) {
150 log.debug("Committing next BatchedModifications {}, size {}", inflightCommit.getTransactionID(),
151 inflightCommit.getModifications().size());
153 shard.tryCommitModifications(inflightCommit);
156 void commitModification(Modification modification, EntityOwnershipShard shard) {
157 BatchedModifications modifications = newBatchedModifications();
158 modifications.addModification(modification);
159 commitModifications(modifications, shard);
162 void commitModifications(BatchedModifications modifications, EntityOwnershipShard shard) {
163 if(modifications.getModifications().isEmpty()) {
167 boolean hasLeader = shard.hasLeader();
168 if(inflightCommit != null || !hasLeader) {
169 if(log.isDebugEnabled()) {
170 log.debug("{} - adding modifications to pending",
171 inflightCommit != null ? "A commit is inflight" : "No shard leader");
174 pendingModifications.addAll(modifications.getModifications());
176 inflightCommit = modifications;
177 shard.tryCommitModifications(inflightCommit);
181 void onStateChanged(EntityOwnershipShard shard, boolean isLeader) {
182 shard.possiblyRemoveAllInitialCandidates(shard.getLeader());
184 if(!isLeader && inflightCommit != null) {
185 // We're no longer the leader but we have an inflight local commit. This likely means we didn't get
186 // consensus for the commit and switched to follower due to another node with a higher term. We
187 // can't be sure if the commit was replicated to any node so we retry it here with a new
189 if(retryCommitSchedule != null) {
190 retryCommitSchedule.cancel();
193 newInflightCommitWithDifferentTransactionID();
194 retryInflightCommit(shard);
196 commitNextBatch(shard);
200 private void newInflightCommitWithDifferentTransactionID() {
201 BatchedModifications newBatchedModifications = newBatchedModifications();
202 newBatchedModifications.getModifications().addAll(inflightCommit.getModifications());
203 inflightCommit = newBatchedModifications;
206 BatchedModifications newBatchedModifications() {
207 BatchedModifications modifications = new BatchedModifications(
208 new TransactionIdentifier(historyId, ++transactionIDCounter), DataStoreVersions.CURRENT_VERSION);
209 modifications.setDoCommitOnReady(true);
210 modifications.setReady(true);
211 modifications.setTotalMessagesSent(1);
212 return modifications;