2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.entityownership;
10 import akka.actor.ActorRef;
11 import akka.actor.Cancellable;
12 import akka.actor.Status.Failure;
13 import java.util.Iterator;
14 import java.util.LinkedList;
15 import java.util.Queue;
16 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
17 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
18 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
19 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
20 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
21 import org.opendaylight.controller.cluster.datastore.modification.Modification;
22 import org.slf4j.Logger;
23 import scala.concurrent.duration.FiniteDuration;
26 * Handles commits and retries for the EntityOwnershipShard.
28 * @author Thomas Pantelis
30 class EntityOwnershipShardCommitCoordinator {
31 private static final Object COMMIT_RETRY_MESSAGE = new Object() {
33 public String toString() {
34 return "entityCommitRetry";
38 private final Logger log;
39 private int transactionIDCounter = 0;
40 private final String localMemberName;
41 private final Queue<Modification> pendingModifications = new LinkedList<>();
42 private BatchedModifications inflightCommit;
43 private Cancellable retryCommitSchedule;
45 EntityOwnershipShardCommitCoordinator(String localMemberName, Logger log) {
46 this.localMemberName = localMemberName;
50 boolean handleMessage(Object message, EntityOwnershipShard shard) {
51 boolean handled = true;
52 if(CommitTransactionReply.isSerializedType(message)) {
53 // Successful reply from a local commit.
54 inflightCommitSucceeded(shard);
55 } else if(message instanceof akka.actor.Status.Failure) {
56 // Failure reply from a local commit.
57 inflightCommitFailure(((Failure)message).cause(), shard);
58 } else if(COMMIT_RETRY_MESSAGE.equals(message)) {
59 retryInflightCommit(shard);
67 private void retryInflightCommit(EntityOwnershipShard shard) {
68 // Shouldn't be null happen but verify anyway
69 if(inflightCommit == null) {
73 if(shard.hasLeader()) {
74 log.debug("Retrying commit for BatchedModifications {}", inflightCommit.getTransactionID());
76 shard.tryCommitModifications(inflightCommit);
78 scheduleInflightCommitRetry(shard);
82 void inflightCommitFailure(Throwable cause, EntityOwnershipShard shard) {
83 // This should've originated from a failed inflight commit but verify anyway
84 if(inflightCommit == null) {
88 log.debug("Inflight BatchedModifications {} commit failed", inflightCommit.getTransactionID(), cause);
90 if(!(cause instanceof NoShardLeaderException)) {
91 // If the failure is other than NoShardLeaderException the commit may have been partially
92 // processed so retry with a new transaction ID to be safe.
93 newInflightCommitWithDifferentTransactionID();
96 scheduleInflightCommitRetry(shard);
99 private void scheduleInflightCommitRetry(EntityOwnershipShard shard) {
100 FiniteDuration duration = shard.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval();
102 log.debug("Scheduling retry for BatchedModifications commit {} in {}",
103 inflightCommit.getTransactionID(), duration);
105 retryCommitSchedule = shard.getContext().system().scheduler().scheduleOnce(duration, shard.getSelf(),
106 COMMIT_RETRY_MESSAGE, shard.getContext().dispatcher(), ActorRef.noSender());
109 void inflightCommitSucceeded(EntityOwnershipShard shard) {
110 // Shouldn't be null but verify anyway
111 if(inflightCommit == null) {
115 if(retryCommitSchedule != null) {
116 retryCommitSchedule.cancel();
119 log.debug("BatchedModifications commit {} succeeded", inflightCommit.getTransactionID());
121 inflightCommit = null;
122 commitNextBatch(shard);
125 void commitNextBatch(EntityOwnershipShard shard) {
126 if(inflightCommit != null || pendingModifications.isEmpty() || !shard.hasLeader()) {
130 inflightCommit = newBatchedModifications();
131 Iterator<Modification> iter = pendingModifications.iterator();
132 while(iter.hasNext()) {
133 inflightCommit.addModification(iter.next());
135 if(inflightCommit.getModifications().size() >=
136 shard.getDatastoreContext().getShardBatchedModificationCount()) {
141 log.debug("Committing next BatchedModifications {}, size {}", inflightCommit.getTransactionID(),
142 inflightCommit.getModifications().size());
144 shard.tryCommitModifications(inflightCommit);
147 void commitModification(Modification modification, EntityOwnershipShard shard) {
148 BatchedModifications modifications = newBatchedModifications();
149 modifications.addModification(modification);
150 commitModifications(modifications, shard);
153 void commitModifications(BatchedModifications modifications, EntityOwnershipShard shard) {
154 if(modifications.getModifications().isEmpty()) {
158 boolean hasLeader = shard.hasLeader();
159 if(inflightCommit != null || !hasLeader) {
160 if(log.isDebugEnabled()) {
161 log.debug("{} - adding modifications to pending",
162 (inflightCommit != null ? "A commit is inflight" : "No shard leader"));
165 pendingModifications.addAll(modifications.getModifications());
167 inflightCommit = modifications;
168 shard.tryCommitModifications(inflightCommit);
172 void onStateChanged(EntityOwnershipShard shard, boolean isLeader) {
173 if(!isLeader && inflightCommit != null) {
174 // We're no longer the leader but we have an inflight local commit. This likely means we didn't get
175 // consensus for the commit and switched to follower due to another node with a higher term. We
176 // can't be sure if the commit was replicated to any node so we retry it here with a new
178 if(retryCommitSchedule != null) {
179 retryCommitSchedule.cancel();
182 newInflightCommitWithDifferentTransactionID();
183 retryInflightCommit(shard);
185 commitNextBatch(shard);
189 private void newInflightCommitWithDifferentTransactionID() {
190 BatchedModifications newBatchedModifications = newBatchedModifications();
191 newBatchedModifications.getModifications().addAll(inflightCommit.getModifications());
192 inflightCommit = newBatchedModifications;
195 BatchedModifications newBatchedModifications() {
196 BatchedModifications modifications = new BatchedModifications(
197 TransactionIdentifier.create(localMemberName, ++transactionIDCounter).toString(),
198 DataStoreVersions.CURRENT_VERSION, "");
199 modifications.setDoCommitOnReady(true);
200 modifications.setReady(true);
201 modifications.setTotalMessagesSent(1);
202 return modifications;