2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.entityownership;
10 import akka.actor.ActorRef;
11 import akka.actor.Cancellable;
12 import akka.actor.Status.Failure;
13 import java.util.Iterator;
14 import java.util.LinkedList;
15 import java.util.Queue;
16 import org.opendaylight.controller.cluster.access.concepts.MemberName;
17 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
18 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
19 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
20 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
21 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
22 import org.opendaylight.controller.cluster.datastore.modification.Modification;
23 import org.slf4j.Logger;
24 import scala.concurrent.duration.FiniteDuration;
27 * Handles commits and retries for the EntityOwnershipShard.
29 * @author Thomas Pantelis
31 class EntityOwnershipShardCommitCoordinator {
32 private static final Object COMMIT_RETRY_MESSAGE = new Object() {
34 public String toString() {
35 return "entityCommitRetry";
39 private final Logger log;
40 private int transactionIDCounter = 0;
41 private final MemberName localMemberName;
42 private final Queue<Modification> pendingModifications = new LinkedList<>();
43 private BatchedModifications inflightCommit;
44 private Cancellable retryCommitSchedule;
46 EntityOwnershipShardCommitCoordinator(MemberName localMemberName, Logger log) {
47 this.localMemberName = localMemberName;
51 boolean handleMessage(Object message, EntityOwnershipShard shard) {
52 boolean handled = true;
53 if(CommitTransactionReply.isSerializedType(message)) {
54 // Successful reply from a local commit.
55 inflightCommitSucceeded(shard);
56 } else if(message instanceof akka.actor.Status.Failure) {
57 // Failure reply from a local commit.
58 inflightCommitFailure(((Failure)message).cause(), shard);
59 } else if(COMMIT_RETRY_MESSAGE.equals(message)) {
60 retryInflightCommit(shard);
68 private void retryInflightCommit(EntityOwnershipShard shard) {
69 // Shouldn't be null happen but verify anyway
70 if(inflightCommit == null) {
74 if(shard.hasLeader()) {
75 log.debug("Retrying commit for BatchedModifications {}", inflightCommit.getTransactionID());
77 shard.tryCommitModifications(inflightCommit);
79 scheduleInflightCommitRetry(shard);
83 void inflightCommitFailure(Throwable cause, EntityOwnershipShard shard) {
84 // This should've originated from a failed inflight commit but verify anyway
85 if(inflightCommit == null) {
89 log.debug("Inflight BatchedModifications {} commit failed", inflightCommit.getTransactionID(), cause);
91 if(!(cause instanceof NoShardLeaderException)) {
92 // If the failure is other than NoShardLeaderException the commit may have been partially
93 // processed so retry with a new transaction ID to be safe.
94 newInflightCommitWithDifferentTransactionID();
97 scheduleInflightCommitRetry(shard);
100 private void scheduleInflightCommitRetry(EntityOwnershipShard shard) {
101 FiniteDuration duration = shard.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval();
103 log.debug("Scheduling retry for BatchedModifications commit {} in {}",
104 inflightCommit.getTransactionID(), duration);
106 retryCommitSchedule = shard.getContext().system().scheduler().scheduleOnce(duration, shard.getSelf(),
107 COMMIT_RETRY_MESSAGE, shard.getContext().dispatcher(), ActorRef.noSender());
110 void inflightCommitSucceeded(EntityOwnershipShard shard) {
111 // Shouldn't be null but verify anyway
112 if(inflightCommit == null) {
116 if(retryCommitSchedule != null) {
117 retryCommitSchedule.cancel();
120 log.debug("BatchedModifications commit {} succeeded", inflightCommit.getTransactionID());
122 inflightCommit = null;
123 commitNextBatch(shard);
126 void commitNextBatch(EntityOwnershipShard shard) {
127 if(inflightCommit != null || pendingModifications.isEmpty() || !shard.hasLeader()) {
131 inflightCommit = newBatchedModifications();
132 Iterator<Modification> iter = pendingModifications.iterator();
133 while(iter.hasNext()) {
134 inflightCommit.addModification(iter.next());
136 if(inflightCommit.getModifications().size() >=
137 shard.getDatastoreContext().getShardBatchedModificationCount()) {
142 log.debug("Committing next BatchedModifications {}, size {}", inflightCommit.getTransactionID(),
143 inflightCommit.getModifications().size());
145 shard.tryCommitModifications(inflightCommit);
148 void commitModification(Modification modification, EntityOwnershipShard shard) {
149 BatchedModifications modifications = newBatchedModifications();
150 modifications.addModification(modification);
151 commitModifications(modifications, shard);
154 void commitModifications(BatchedModifications modifications, EntityOwnershipShard shard) {
155 if(modifications.getModifications().isEmpty()) {
159 boolean hasLeader = shard.hasLeader();
160 if(inflightCommit != null || !hasLeader) {
161 if(log.isDebugEnabled()) {
162 log.debug("{} - adding modifications to pending",
163 (inflightCommit != null ? "A commit is inflight" : "No shard leader"));
166 pendingModifications.addAll(modifications.getModifications());
168 inflightCommit = modifications;
169 shard.tryCommitModifications(inflightCommit);
173 void onStateChanged(EntityOwnershipShard shard, boolean isLeader) {
174 if(!isLeader && inflightCommit != null) {
175 // We're no longer the leader but we have an inflight local commit. This likely means we didn't get
176 // consensus for the commit and switched to follower due to another node with a higher term. We
177 // can't be sure if the commit was replicated to any node so we retry it here with a new
179 if(retryCommitSchedule != null) {
180 retryCommitSchedule.cancel();
183 newInflightCommitWithDifferentTransactionID();
184 retryInflightCommit(shard);
186 commitNextBatch(shard);
190 private void newInflightCommitWithDifferentTransactionID() {
191 BatchedModifications newBatchedModifications = newBatchedModifications();
192 newBatchedModifications.getModifications().addAll(inflightCommit.getModifications());
193 inflightCommit = newBatchedModifications;
196 BatchedModifications newBatchedModifications() {
197 BatchedModifications modifications = new BatchedModifications(
198 TransactionIdentifier.create(localMemberName, ++transactionIDCounter).toString(),
199 DataStoreVersions.CURRENT_VERSION, "");
200 modifications.setDoCommitOnReady(true);
201 modifications.setReady(true);
202 modifications.setTotalMessagesSent(1);
203 return modifications;