172849c9535e574ff3181e94304f7d4694aaf6ce
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShardCommitCoordinator.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Cancellable;
12 import akka.actor.Status.Failure;
13 import java.util.Iterator;
14 import java.util.LinkedList;
15 import java.util.Queue;
16 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
17 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
18 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
19 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
20 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
21 import org.opendaylight.controller.cluster.datastore.modification.Modification;
22 import org.slf4j.Logger;
23 import scala.concurrent.duration.FiniteDuration;
24
25 /**
26  * Handles commits and retries for the EntityOwnershipShard.
27  *
28  * @author Thomas Pantelis
29  */
30 class EntityOwnershipShardCommitCoordinator {
31     private static final Object COMMIT_RETRY_MESSAGE = "entityCommitRetry";
32
33     private final Logger log;
34     private int transactionIDCounter = 0;
35     private final String localMemberName;
36     private final Queue<Modification> pendingModifications = new LinkedList<>();
37     private BatchedModifications inflightCommit;
38     private Cancellable retryCommitSchedule;
39
40     EntityOwnershipShardCommitCoordinator(String localMemberName, Logger log) {
41         this.localMemberName = localMemberName;
42         this.log = log;
43     }
44
45     boolean handleMessage(Object message, EntityOwnershipShard shard) {
46         boolean handled = true;
47         if(CommitTransactionReply.SERIALIZABLE_CLASS.isInstance(message)) {
48             // Successful reply from a local commit.
49             inflightCommitSucceeded(shard);
50         } else if(message instanceof akka.actor.Status.Failure) {
51             // Failure reply from a local commit.
52             inflightCommitFailure(((Failure)message).cause(), shard);
53         } else if(message.equals(COMMIT_RETRY_MESSAGE)) {
54             retryInflightCommit(shard);
55         } else {
56             handled = false;
57         }
58
59         return handled;
60     }
61
62     private void retryInflightCommit(EntityOwnershipShard shard) {
63         // Shouldn't be null happen but verify anyway
64         if(inflightCommit == null) {
65             return;
66         }
67
68         if(shard.hasLeader()) {
69             log.debug("Retrying commit for BatchedModifications {}", inflightCommit.getTransactionID());
70
71             shard.tryCommitModifications(inflightCommit);
72         } else {
73             scheduleInflightCommitRetry(shard);
74         }
75     }
76
77     void inflightCommitFailure(Throwable cause, EntityOwnershipShard shard) {
78         // This should've originated from a failed inflight commit but verify anyway
79         if(inflightCommit == null) {
80             return;
81         }
82
83         log.debug("Inflight BatchedModifications {} commit failed", inflightCommit.getTransactionID(), cause);
84
85         if(!(cause instanceof NoShardLeaderException)) {
86             // If the failure is other than NoShardLeaderException the commit may have been partially
87             // processed so retry with a new transaction ID to be safe.
88             newInflightCommitWithDifferentTransactionID();
89         }
90
91         scheduleInflightCommitRetry(shard);
92     }
93
94     private void scheduleInflightCommitRetry(EntityOwnershipShard shard) {
95         FiniteDuration duration = shard.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval();
96
97         log.debug("Scheduling retry for BatchedModifications commit {} in {}",
98                 inflightCommit.getTransactionID(), duration);
99
100         retryCommitSchedule = shard.getContext().system().scheduler().scheduleOnce(duration, shard.getSelf(),
101                 COMMIT_RETRY_MESSAGE, shard.getContext().dispatcher(), ActorRef.noSender());
102     }
103
104     void inflightCommitSucceeded(EntityOwnershipShard shard) {
105         // Shouldn't be null but verify anyway
106         if(inflightCommit == null) {
107             return;
108         }
109
110         if(retryCommitSchedule != null) {
111             retryCommitSchedule.cancel();
112         }
113
114         log.debug("BatchedModifications commit {} succeeded", inflightCommit.getTransactionID());
115
116         inflightCommit = null;
117         commitNextBatch(shard);
118     }
119
120     void commitNextBatch(EntityOwnershipShard shard) {
121         if(inflightCommit != null || pendingModifications.isEmpty() || !shard.hasLeader()) {
122             return;
123         }
124
125         inflightCommit = newBatchedModifications();
126         Iterator<Modification> iter = pendingModifications.iterator();
127         while(iter.hasNext()) {
128             inflightCommit.addModification(iter.next());
129             iter.remove();
130             if(inflightCommit.getModifications().size() >=
131                     shard.getDatastoreContext().getShardBatchedModificationCount()) {
132                 break;
133             }
134         }
135
136         log.debug("Committing next BatchedModifications {}, size {}", inflightCommit.getTransactionID(),
137                 inflightCommit.getModifications().size());
138
139         shard.tryCommitModifications(inflightCommit);
140     }
141
142     void commitModification(Modification modification, EntityOwnershipShard shard) {
143         BatchedModifications modifications = newBatchedModifications();
144         modifications.addModification(modification);
145         commitModifications(modifications, shard);
146     }
147
148     void commitModifications(BatchedModifications modifications, EntityOwnershipShard shard) {
149         if(modifications.getModifications().isEmpty()) {
150             return;
151         }
152
153         boolean hasLeader = shard.hasLeader();
154         if(inflightCommit != null || !hasLeader) {
155             if(log.isDebugEnabled()) {
156                 log.debug("{} - adding modifications to pending",
157                         (inflightCommit != null ? "A commit is inflight" : "No shard leader"));
158             }
159
160             pendingModifications.addAll(modifications.getModifications());
161         } else {
162             inflightCommit = modifications;
163             shard.tryCommitModifications(inflightCommit);
164         }
165     }
166
167     void onStateChanged(EntityOwnershipShard shard, boolean isLeader) {
168         if(!isLeader && inflightCommit != null) {
169             // We're no longer the leader but we have an inflight local commit. This likely means we didn't get
170             // consensus for the commit and switched to follower due to another node with a higher term. We
171             // can't be sure if the commit was replicated to any node so we retry it here with a new
172             // transaction ID.
173             if(retryCommitSchedule != null) {
174                 retryCommitSchedule.cancel();
175             }
176
177             newInflightCommitWithDifferentTransactionID();
178             retryInflightCommit(shard);
179         } else {
180             commitNextBatch(shard);
181         }
182     }
183
184     private void newInflightCommitWithDifferentTransactionID() {
185         BatchedModifications newBatchedModifications = newBatchedModifications();
186         newBatchedModifications.getModifications().addAll(inflightCommit.getModifications());
187         inflightCommit = newBatchedModifications;
188     }
189
190     BatchedModifications newBatchedModifications() {
191         BatchedModifications modifications = new BatchedModifications(
192                 TransactionIdentifier.create(localMemberName, ++transactionIDCounter).toString(),
193                 DataStoreVersions.CURRENT_VERSION, "");
194         modifications.setDoCommitOnReady(true);
195         modifications.setReady(true);
196         modifications.setTotalMessagesSent(1);
197         return modifications;
198     }
199 }