2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.entityownership;
10 import akka.actor.ActorRef;
11 import akka.actor.Cancellable;
12 import akka.actor.Status.Failure;
13 import java.util.Iterator;
14 import java.util.LinkedList;
15 import java.util.Queue;
16 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
17 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
18 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
19 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
20 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
21 import org.opendaylight.controller.cluster.datastore.modification.Modification;
22 import org.slf4j.Logger;
23 import scala.concurrent.duration.FiniteDuration;
26 * Handles commits and retries for the EntityOwnershipShard.
28 * @author Thomas Pantelis
30 class EntityOwnershipShardCommitCoordinator {
31 private static final Object COMMIT_RETRY_MESSAGE = "entityCommitRetry";
33 private final Logger log;
34 private int transactionIDCounter = 0;
35 private final String localMemberName;
36 private final Queue<Modification> pendingModifications = new LinkedList<>();
37 private BatchedModifications inflightCommit;
38 private Cancellable retryCommitSchedule;
40 EntityOwnershipShardCommitCoordinator(String localMemberName, Logger log) {
41 this.localMemberName = localMemberName;
45 boolean handleMessage(Object message, EntityOwnershipShard shard) {
46 boolean handled = true;
47 if(CommitTransactionReply.SERIALIZABLE_CLASS.isInstance(message)) {
48 // Successful reply from a local commit.
49 inflightCommitSucceeded(shard);
50 } else if(message instanceof akka.actor.Status.Failure) {
51 // Failure reply from a local commit.
52 inflightCommitFailure(((Failure)message).cause(), shard);
53 } else if(message.equals(COMMIT_RETRY_MESSAGE)) {
54 retryInflightCommit(shard);
62 private void retryInflightCommit(EntityOwnershipShard shard) {
63 // Shouldn't be null happen but verify anyway
64 if(inflightCommit == null) {
68 if(shard.hasLeader()) {
69 log.debug("Retrying commit for BatchedModifications {}", inflightCommit.getTransactionID());
71 shard.tryCommitModifications(inflightCommit);
73 scheduleInflightCommitRetry(shard);
77 void inflightCommitFailure(Throwable cause, EntityOwnershipShard shard) {
78 // This should've originated from a failed inflight commit but verify anyway
79 if(inflightCommit == null) {
83 log.debug("Inflight BatchedModifications {} commit failed", inflightCommit.getTransactionID(), cause);
85 if(!(cause instanceof NoShardLeaderException)) {
86 // If the failure is other than NoShardLeaderException the commit may have been partially
87 // processed so retry with a new transaction ID to be safe.
88 newInflightCommitWithDifferentTransactionID();
91 scheduleInflightCommitRetry(shard);
94 private void scheduleInflightCommitRetry(EntityOwnershipShard shard) {
95 FiniteDuration duration = shard.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval();
97 log.debug("Scheduling retry for BatchedModifications commit {} in {}",
98 inflightCommit.getTransactionID(), duration);
100 retryCommitSchedule = shard.getContext().system().scheduler().scheduleOnce(duration, shard.getSelf(),
101 COMMIT_RETRY_MESSAGE, shard.getContext().dispatcher(), ActorRef.noSender());
104 void inflightCommitSucceeded(EntityOwnershipShard shard) {
105 // Shouldn't be null but verify anyway
106 if(inflightCommit == null) {
110 if(retryCommitSchedule != null) {
111 retryCommitSchedule.cancel();
114 log.debug("BatchedModifications commit {} succeeded", inflightCommit.getTransactionID());
116 inflightCommit = null;
117 commitNextBatch(shard);
120 void commitNextBatch(EntityOwnershipShard shard) {
121 if(inflightCommit != null || pendingModifications.isEmpty() || !shard.hasLeader()) {
125 inflightCommit = newBatchedModifications();
126 Iterator<Modification> iter = pendingModifications.iterator();
127 while(iter.hasNext()) {
128 inflightCommit.addModification(iter.next());
130 if(inflightCommit.getModifications().size() >=
131 shard.getDatastoreContext().getShardBatchedModificationCount()) {
136 log.debug("Committing next BatchedModifications {}, size {}", inflightCommit.getTransactionID(),
137 inflightCommit.getModifications().size());
139 shard.tryCommitModifications(inflightCommit);
142 void commitModification(Modification modification, EntityOwnershipShard shard) {
143 BatchedModifications modifications = newBatchedModifications();
144 modifications.addModification(modification);
145 commitModifications(modifications, shard);
148 void commitModifications(BatchedModifications modifications, EntityOwnershipShard shard) {
149 if(modifications.getModifications().isEmpty()) {
153 boolean hasLeader = shard.hasLeader();
154 if(inflightCommit != null || !hasLeader) {
155 if(log.isDebugEnabled()) {
156 log.debug("{} - adding modifications to pending",
157 (inflightCommit != null ? "A commit is inflight" : "No shard leader"));
160 pendingModifications.addAll(modifications.getModifications());
162 inflightCommit = modifications;
163 shard.tryCommitModifications(inflightCommit);
167 void onStateChanged(EntityOwnershipShard shard, boolean isLeader) {
168 if(!isLeader && inflightCommit != null) {
169 // We're no longer the leader but we have an inflight local commit. This likely means we didn't get
170 // consensus for the commit and switched to follower due to another node with a higher term. We
171 // can't be sure if the commit was replicated to any node so we retry it here with a new
173 if(retryCommitSchedule != null) {
174 retryCommitSchedule.cancel();
177 newInflightCommitWithDifferentTransactionID();
178 retryInflightCommit(shard);
180 commitNextBatch(shard);
184 private void newInflightCommitWithDifferentTransactionID() {
185 BatchedModifications newBatchedModifications = newBatchedModifications();
186 newBatchedModifications.getModifications().addAll(inflightCommit.getModifications());
187 inflightCommit = newBatchedModifications;
190 BatchedModifications newBatchedModifications() {
191 BatchedModifications modifications = new BatchedModifications(
192 TransactionIdentifier.create(localMemberName, ++transactionIDCounter).toString(),
193 DataStoreVersions.CURRENT_VERSION, "");
194 modifications.setDoCommitOnReady(true);
195 modifications.setReady(true);
196 modifications.setTotalMessagesSent(1);
197 return modifications;