Bug 4105: Change commit retry mechanism in EntityOwnershipShard
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShardCommitCoordinator.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Cancellable;
12 import akka.actor.Status.Failure;
13 import java.util.Iterator;
14 import java.util.LinkedList;
15 import java.util.Queue;
16 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
17 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
18 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
19 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
20 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
21 import org.opendaylight.controller.cluster.datastore.modification.Modification;
22 import org.slf4j.Logger;
23 import scala.concurrent.duration.FiniteDuration;
24
25 /**
26  * Handles commits and retries for the EntityOwnershipShard.
27  *
28  * @author Thomas Pantelis
29  */
30 class EntityOwnershipShardCommitCoordinator {
31     private static final Object COMMIT_RETRY_MESSAGE = "entityCommitRetry";
32
33     private final Logger log;
34     private int transactionIDCounter = 0;
35     private final String localMemberName;
36     private final Queue<Modification> pendingModifications = new LinkedList<>();
37     private BatchedModifications inflightCommit;
38     private Cancellable retryCommitSchedule;
39
40     EntityOwnershipShardCommitCoordinator(String localMemberName, Logger log) {
41         this.localMemberName = localMemberName;
42         this.log = log;
43     }
44
45     boolean handleMessage(Object message, EntityOwnershipShard shard) {
46         boolean handled = true;
47         if(CommitTransactionReply.SERIALIZABLE_CLASS.isInstance(message)) {
48             // Successful reply from a local commit.
49             inflightCommitSucceeded(shard);
50         } else if(message instanceof akka.actor.Status.Failure) {
51             // Failure reply from a local commit.
52             inflightCommitFailure(((Failure)message).cause(), shard);
53         } else if(message.equals(COMMIT_RETRY_MESSAGE)) {
54             retryInflightCommit(shard);
55         } else {
56             handled = false;
57         }
58
59         return handled;
60     }
61
62     private void retryInflightCommit(EntityOwnershipShard shard) {
63         // Shouldn't be null happen but verify anyway
64         if(inflightCommit == null) {
65             return;
66         }
67
68         if(shard.hasLeader()) {
69             log.debug("Retrying commit for BatchedModifications {}", inflightCommit.getTransactionID());
70
71             shard.tryCommitModifications(inflightCommit);
72         } else {
73             scheduleInflightCommitRetry(shard);
74         }
75     }
76
77     void inflightCommitFailure(Throwable cause, EntityOwnershipShard shard) {
78         // This should've originated from a failed inflight commit but verify anyway
79         if(inflightCommit == null) {
80             return;
81         }
82
83         log.debug("Inflight BatchedModifications {} commit failed", inflightCommit.getTransactionID(), cause);
84
85         if(!(cause instanceof NoShardLeaderException)) {
86             // If the failure is other than NoShardLeaderException the commit may have been partially
87             // processed so retry with a new transaction ID to be safe.
88             newInflightCommitWithDifferentTransactionID();
89         }
90
91         scheduleInflightCommitRetry(shard);
92     }
93
94     private void scheduleInflightCommitRetry(EntityOwnershipShard shard) {
95         FiniteDuration duration = shard.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval();
96
97         log.debug("Scheduling retry for BatchedModifications commit {} in {}",
98                 inflightCommit.getTransactionID(), duration);
99
100         retryCommitSchedule = shard.getContext().system().scheduler().scheduleOnce(duration, shard.getSelf(),
101                 COMMIT_RETRY_MESSAGE, shard.getContext().dispatcher(), ActorRef.noSender());
102     }
103
104     void inflightCommitSucceeded(EntityOwnershipShard shard) {
105         // Shouldn't be null but verify anyway
106         if(inflightCommit == null) {
107             return;
108         }
109
110         if(retryCommitSchedule != null) {
111             retryCommitSchedule.cancel();
112         }
113
114         log.debug("BatchedModifications commit {} succeeded", inflightCommit.getTransactionID());
115
116         inflightCommit = null;
117         commitNextBatch(shard);
118     }
119
120     void commitNextBatch(EntityOwnershipShard shard) {
121         if(inflightCommit != null || pendingModifications.isEmpty() || !shard.hasLeader()) {
122             return;
123         }
124
125         inflightCommit = newBatchedModifications();
126         Iterator<Modification> iter = pendingModifications.iterator();
127         while(iter.hasNext()) {
128             inflightCommit.addModification(iter.next());
129             iter.remove();
130             if(inflightCommit.getModifications().size() >=
131                     shard.getDatastoreContext().getShardBatchedModificationCount()) {
132                 break;
133             }
134         }
135
136         log.debug("Committing next BatchedModifications {}, size {}", inflightCommit.getTransactionID(),
137                 inflightCommit.getModifications().size());
138
139         shard.tryCommitModifications(inflightCommit);
140     }
141
142     void commitModification(Modification modification, EntityOwnershipShard shard) {
143         boolean hasLeader = shard.hasLeader();
144         if(inflightCommit != null || !hasLeader) {
145             if(log.isDebugEnabled()) {
146                 log.debug("{} - adding modification to pending",
147                         (inflightCommit != null ? "A commit is inflight" : "No shard leader"));
148             }
149
150             pendingModifications.add(modification);
151         } else {
152             inflightCommit = newBatchedModifications();
153             inflightCommit.addModification(modification);
154
155             shard.tryCommitModifications(inflightCommit);
156         }
157     }
158
159     void onStateChanged(EntityOwnershipShard shard, boolean isLeader) {
160         if(!isLeader && inflightCommit != null) {
161             // We're no longer the leader but we have an inflight local commit. This likely means we didn't get
162             // consensus for the commit and switched to follower due to another node with a higher term. We
163             // can't be sure if the commit was replicated to any node so we retry it here with a new
164             // transaction ID.
165             if(retryCommitSchedule != null) {
166                 retryCommitSchedule.cancel();
167             }
168
169             newInflightCommitWithDifferentTransactionID();
170             retryInflightCommit(shard);
171         } else {
172             commitNextBatch(shard);
173         }
174     }
175
176     private void newInflightCommitWithDifferentTransactionID() {
177         BatchedModifications newBatchedModifications = newBatchedModifications();
178         newBatchedModifications.getModifications().addAll(inflightCommit.getModifications());
179         inflightCommit = newBatchedModifications;
180     }
181
182     private BatchedModifications newBatchedModifications() {
183         BatchedModifications modifications = new BatchedModifications(
184                 TransactionIdentifier.create(localMemberName, ++transactionIDCounter).toString(),
185                 DataStoreVersions.CURRENT_VERSION, "");
186         modifications.setDoCommitOnReady(true);
187         modifications.setReady(true);
188         modifications.setTotalMessagesSent(1);
189         return modifications;
190     }
191 }