2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.entityownership;
10 import akka.actor.ActorRef;
11 import akka.actor.Cancellable;
12 import akka.actor.Status.Failure;
13 import com.google.common.base.Preconditions;
14 import java.util.Iterator;
15 import java.util.LinkedList;
16 import java.util.Queue;
17 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
18 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
19 import org.opendaylight.controller.cluster.access.concepts.FrontendType;
20 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
21 import org.opendaylight.controller.cluster.access.concepts.MemberName;
22 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
23 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
24 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
25 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
26 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.modification.Modification;
28 import org.opendaylight.controller.cluster.datastore.utils.TransactionIdentifierUtils;
29 import org.slf4j.Logger;
30 import scala.concurrent.duration.FiniteDuration;
33 * Handles commits and retries for the EntityOwnershipShard.
35 * @author Thomas Pantelis
37 class EntityOwnershipShardCommitCoordinator {
38 private static final Object COMMIT_RETRY_MESSAGE = new Object() {
40 public String toString() {
41 return "entityCommitRetry";
44 private static final FrontendType FRONTEND_TYPE = FrontendType.forName("entity-ownership-internal");
46 private final Queue<Modification> pendingModifications = new LinkedList<>();
47 private final LocalHistoryIdentifier historyId;
48 private final Logger log;
50 private BatchedModifications inflightCommit;
51 private Cancellable retryCommitSchedule;
52 private long transactionIDCounter = 0;
54 EntityOwnershipShardCommitCoordinator(MemberName localMemberName, Logger log) {
55 this.log = Preconditions.checkNotNull(log);
56 historyId = new LocalHistoryIdentifier(
57 ClientIdentifier.create(FrontendIdentifier.create(localMemberName, FRONTEND_TYPE), 0), 0);
60 boolean handleMessage(Object message, EntityOwnershipShard shard) {
61 boolean handled = true;
62 if(CommitTransactionReply.isSerializedType(message)) {
63 // Successful reply from a local commit.
64 inflightCommitSucceeded(shard);
65 } else if(message instanceof akka.actor.Status.Failure) {
66 // Failure reply from a local commit.
67 inflightCommitFailure(((Failure)message).cause(), shard);
68 } else if(COMMIT_RETRY_MESSAGE.equals(message)) {
69 retryInflightCommit(shard);
77 private void retryInflightCommit(EntityOwnershipShard shard) {
78 // Shouldn't be null happen but verify anyway
79 if(inflightCommit == null) {
83 if(shard.hasLeader()) {
84 log.debug("Retrying commit for BatchedModifications {}", inflightCommit.getTransactionID());
86 shard.tryCommitModifications(inflightCommit);
88 scheduleInflightCommitRetry(shard);
92 void inflightCommitFailure(Throwable cause, EntityOwnershipShard shard) {
93 // This should've originated from a failed inflight commit but verify anyway
94 if(inflightCommit == null) {
98 log.debug("Inflight BatchedModifications {} commit failed", inflightCommit.getTransactionID(), cause);
100 if(!(cause instanceof NoShardLeaderException)) {
101 // If the failure is other than NoShardLeaderException the commit may have been partially
102 // processed so retry with a new transaction ID to be safe.
103 newInflightCommitWithDifferentTransactionID();
106 scheduleInflightCommitRetry(shard);
109 private void scheduleInflightCommitRetry(EntityOwnershipShard shard) {
110 FiniteDuration duration = shard.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval();
112 log.debug("Scheduling retry for BatchedModifications commit {} in {}",
113 inflightCommit.getTransactionID(), duration);
115 retryCommitSchedule = shard.getContext().system().scheduler().scheduleOnce(duration, shard.getSelf(),
116 COMMIT_RETRY_MESSAGE, shard.getContext().dispatcher(), ActorRef.noSender());
119 void inflightCommitSucceeded(EntityOwnershipShard shard) {
120 // Shouldn't be null but verify anyway
121 if(inflightCommit == null) {
125 if(retryCommitSchedule != null) {
126 retryCommitSchedule.cancel();
129 log.debug("BatchedModifications commit {} succeeded", inflightCommit.getTransactionID());
131 inflightCommit = null;
132 commitNextBatch(shard);
135 void commitNextBatch(EntityOwnershipShard shard) {
136 if(inflightCommit != null || pendingModifications.isEmpty() || !shard.hasLeader()) {
140 inflightCommit = newBatchedModifications();
141 Iterator<Modification> iter = pendingModifications.iterator();
142 while(iter.hasNext()) {
143 inflightCommit.addModification(iter.next());
145 if(inflightCommit.getModifications().size() >=
146 shard.getDatastoreContext().getShardBatchedModificationCount()) {
151 log.debug("Committing next BatchedModifications {}, size {}", inflightCommit.getTransactionID(),
152 inflightCommit.getModifications().size());
154 shard.tryCommitModifications(inflightCommit);
157 void commitModification(Modification modification, EntityOwnershipShard shard) {
158 BatchedModifications modifications = newBatchedModifications();
159 modifications.addModification(modification);
160 commitModifications(modifications, shard);
163 void commitModifications(BatchedModifications modifications, EntityOwnershipShard shard) {
164 if(modifications.getModifications().isEmpty()) {
168 boolean hasLeader = shard.hasLeader();
169 if(inflightCommit != null || !hasLeader) {
170 if(log.isDebugEnabled()) {
171 log.debug("{} - adding modifications to pending",
172 (inflightCommit != null ? "A commit is inflight" : "No shard leader"));
175 pendingModifications.addAll(modifications.getModifications());
177 inflightCommit = modifications;
178 shard.tryCommitModifications(inflightCommit);
182 void onStateChanged(EntityOwnershipShard shard, boolean isLeader) {
183 if(!isLeader && inflightCommit != null) {
184 // We're no longer the leader but we have an inflight local commit. This likely means we didn't get
185 // consensus for the commit and switched to follower due to another node with a higher term. We
186 // can't be sure if the commit was replicated to any node so we retry it here with a new
188 if(retryCommitSchedule != null) {
189 retryCommitSchedule.cancel();
192 newInflightCommitWithDifferentTransactionID();
193 retryInflightCommit(shard);
195 commitNextBatch(shard);
199 private void newInflightCommitWithDifferentTransactionID() {
200 BatchedModifications newBatchedModifications = newBatchedModifications();
201 newBatchedModifications.getModifications().addAll(inflightCommit.getModifications());
202 inflightCommit = newBatchedModifications;
205 BatchedModifications newBatchedModifications() {
206 BatchedModifications modifications = new BatchedModifications(TransactionIdentifierUtils.actorNameFor(
207 new TransactionIdentifier(historyId, ++transactionIDCounter)), DataStoreVersions.CURRENT_VERSION, "");
208 modifications.setDoCommitOnReady(true);
209 modifications.setReady(true);
210 modifications.setTotalMessagesSent(1);
211 return modifications;