97e6c62037b2801e1ea1e6521cc8b77597be3335
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShardCommitCoordinator.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Cancellable;
12 import akka.actor.Status.Failure;
13 import com.google.common.base.Preconditions;
14 import java.util.Iterator;
15 import java.util.LinkedList;
16 import java.util.Queue;
17 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
18 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
19 import org.opendaylight.controller.cluster.access.concepts.FrontendType;
20 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
21 import org.opendaylight.controller.cluster.access.concepts.MemberName;
22 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
23 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
24 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
25 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
26 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.modification.Modification;
28 import org.slf4j.Logger;
29 import scala.concurrent.duration.FiniteDuration;
30
31 /**
32  * Handles commits and retries for the EntityOwnershipShard.
33  *
34  * @author Thomas Pantelis
35  */
36 class EntityOwnershipShardCommitCoordinator {
37     private static final Object COMMIT_RETRY_MESSAGE = new Object() {
38         @Override
39         public String toString() {
40             return "entityCommitRetry";
41         }
42     };
43     private static final FrontendType FRONTEND_TYPE = FrontendType.forName("entity-ownership-internal");
44
45     private final Queue<Modification> pendingModifications = new LinkedList<>();
46     private final LocalHistoryIdentifier historyId;
47     private final Logger log;
48
49     private BatchedModifications inflightCommit;
50     private Cancellable retryCommitSchedule;
51     private long transactionIDCounter = 0;
52
53     EntityOwnershipShardCommitCoordinator(MemberName localMemberName, Logger log) {
54         this.log = Preconditions.checkNotNull(log);
55         historyId = new LocalHistoryIdentifier(
56                 ClientIdentifier.create(FrontendIdentifier.create(localMemberName, FRONTEND_TYPE), 0), 0);
57     }
58
59     boolean handleMessage(Object message, EntityOwnershipShard shard) {
60         boolean handled = true;
61         if(CommitTransactionReply.isSerializedType(message)) {
62             // Successful reply from a local commit.
63             inflightCommitSucceeded(shard);
64         } else if(message instanceof akka.actor.Status.Failure) {
65             // Failure reply from a local commit.
66             inflightCommitFailure(((Failure)message).cause(), shard);
67         } else if(COMMIT_RETRY_MESSAGE.equals(message)) {
68             retryInflightCommit(shard);
69         } else {
70             handled = false;
71         }
72
73         return handled;
74     }
75
76     private void retryInflightCommit(EntityOwnershipShard shard) {
77         // Shouldn't be null happen but verify anyway
78         if(inflightCommit == null) {
79             return;
80         }
81
82         if(shard.hasLeader()) {
83             log.debug("Retrying commit for BatchedModifications {}", inflightCommit.getTransactionID());
84
85             shard.tryCommitModifications(inflightCommit);
86         } else {
87             scheduleInflightCommitRetry(shard);
88         }
89     }
90
91     void inflightCommitFailure(Throwable cause, EntityOwnershipShard shard) {
92         // This should've originated from a failed inflight commit but verify anyway
93         if(inflightCommit == null) {
94             return;
95         }
96
97         log.debug("Inflight BatchedModifications {} commit failed", inflightCommit.getTransactionID(), cause);
98
99         if(!(cause instanceof NoShardLeaderException)) {
100             // If the failure is other than NoShardLeaderException the commit may have been partially
101             // processed so retry with a new transaction ID to be safe.
102             newInflightCommitWithDifferentTransactionID();
103         }
104
105         scheduleInflightCommitRetry(shard);
106     }
107
108     private void scheduleInflightCommitRetry(EntityOwnershipShard shard) {
109         FiniteDuration duration = shard.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval();
110
111         log.debug("Scheduling retry for BatchedModifications commit {} in {}",
112                 inflightCommit.getTransactionID(), duration);
113
114         retryCommitSchedule = shard.getContext().system().scheduler().scheduleOnce(duration, shard.getSelf(),
115                 COMMIT_RETRY_MESSAGE, shard.getContext().dispatcher(), ActorRef.noSender());
116     }
117
118     void inflightCommitSucceeded(EntityOwnershipShard shard) {
119         // Shouldn't be null but verify anyway
120         if(inflightCommit == null) {
121             return;
122         }
123
124         if(retryCommitSchedule != null) {
125             retryCommitSchedule.cancel();
126         }
127
128         log.debug("BatchedModifications commit {} succeeded", inflightCommit.getTransactionID());
129
130         inflightCommit = null;
131         commitNextBatch(shard);
132     }
133
134     void commitNextBatch(EntityOwnershipShard shard) {
135         if(inflightCommit != null || pendingModifications.isEmpty() || !shard.hasLeader()) {
136             return;
137         }
138
139         inflightCommit = newBatchedModifications();
140         Iterator<Modification> iter = pendingModifications.iterator();
141         while(iter.hasNext()) {
142             inflightCommit.addModification(iter.next());
143             iter.remove();
144             if(inflightCommit.getModifications().size() >=
145                     shard.getDatastoreContext().getShardBatchedModificationCount()) {
146                 break;
147             }
148         }
149
150         log.debug("Committing next BatchedModifications {}, size {}", inflightCommit.getTransactionID(),
151                 inflightCommit.getModifications().size());
152
153         shard.tryCommitModifications(inflightCommit);
154     }
155
156     void commitModification(Modification modification, EntityOwnershipShard shard) {
157         BatchedModifications modifications = newBatchedModifications();
158         modifications.addModification(modification);
159         commitModifications(modifications, shard);
160     }
161
162     void commitModifications(BatchedModifications modifications, EntityOwnershipShard shard) {
163         if(modifications.getModifications().isEmpty()) {
164             return;
165         }
166
167         boolean hasLeader = shard.hasLeader();
168         if(inflightCommit != null || !hasLeader) {
169             if(log.isDebugEnabled()) {
170                 log.debug("{} - adding modifications to pending",
171                         (inflightCommit != null ? "A commit is inflight" : "No shard leader"));
172             }
173
174             pendingModifications.addAll(modifications.getModifications());
175         } else {
176             inflightCommit = modifications;
177             shard.tryCommitModifications(inflightCommit);
178         }
179     }
180
181     void onStateChanged(EntityOwnershipShard shard, boolean isLeader) {
182         if(!isLeader && inflightCommit != null) {
183             // We're no longer the leader but we have an inflight local commit. This likely means we didn't get
184             // consensus for the commit and switched to follower due to another node with a higher term. We
185             // can't be sure if the commit was replicated to any node so we retry it here with a new
186             // transaction ID.
187             if(retryCommitSchedule != null) {
188                 retryCommitSchedule.cancel();
189             }
190
191             newInflightCommitWithDifferentTransactionID();
192             retryInflightCommit(shard);
193         } else {
194             commitNextBatch(shard);
195         }
196     }
197
198     private void newInflightCommitWithDifferentTransactionID() {
199         BatchedModifications newBatchedModifications = newBatchedModifications();
200         newBatchedModifications.getModifications().addAll(inflightCommit.getModifications());
201         inflightCommit = newBatchedModifications;
202     }
203
204     BatchedModifications newBatchedModifications() {
205         BatchedModifications modifications = new BatchedModifications(
206             new TransactionIdentifier(historyId, ++transactionIDCounter), DataStoreVersions.CURRENT_VERSION);
207         modifications.setDoCommitOnReady(true);
208         modifications.setReady(true);
209         modifications.setTotalMessagesSent(1);
210         return modifications;
211     }
212 }