e906ce116043c8368894f65810d147df2f17c3b7
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShardCommitCoordinator.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Cancellable;
12 import akka.actor.Status.Failure;
13 import com.google.common.base.Preconditions;
14 import java.util.Iterator;
15 import java.util.LinkedList;
16 import java.util.Queue;
17 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
18 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
19 import org.opendaylight.controller.cluster.access.concepts.FrontendType;
20 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
21 import org.opendaylight.controller.cluster.access.concepts.MemberName;
22 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
23 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
24 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
25 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
26 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.modification.Modification;
28 import org.opendaylight.controller.cluster.datastore.utils.TransactionIdentifierUtils;
29 import org.slf4j.Logger;
30 import scala.concurrent.duration.FiniteDuration;
31
32 /**
33  * Handles commits and retries for the EntityOwnershipShard.
34  *
35  * @author Thomas Pantelis
36  */
37 class EntityOwnershipShardCommitCoordinator {
38     private static final Object COMMIT_RETRY_MESSAGE = new Object() {
39         @Override
40         public String toString() {
41             return "entityCommitRetry";
42         }
43     };
44     private static final FrontendType FRONTEND_TYPE = FrontendType.forName("entity-ownership-internal");
45
46     private final Queue<Modification> pendingModifications = new LinkedList<>();
47     private final LocalHistoryIdentifier historyId;
48     private final Logger log;
49
50     private BatchedModifications inflightCommit;
51     private Cancellable retryCommitSchedule;
52     private long transactionIDCounter = 0;
53
54     EntityOwnershipShardCommitCoordinator(MemberName localMemberName, Logger log) {
55         this.log = Preconditions.checkNotNull(log);
56         historyId = new LocalHistoryIdentifier(
57                 ClientIdentifier.create(FrontendIdentifier.create(localMemberName, FRONTEND_TYPE), 0), 0);
58     }
59
60     boolean handleMessage(Object message, EntityOwnershipShard shard) {
61         boolean handled = true;
62         if(CommitTransactionReply.isSerializedType(message)) {
63             // Successful reply from a local commit.
64             inflightCommitSucceeded(shard);
65         } else if(message instanceof akka.actor.Status.Failure) {
66             // Failure reply from a local commit.
67             inflightCommitFailure(((Failure)message).cause(), shard);
68         } else if(COMMIT_RETRY_MESSAGE.equals(message)) {
69             retryInflightCommit(shard);
70         } else {
71             handled = false;
72         }
73
74         return handled;
75     }
76
77     private void retryInflightCommit(EntityOwnershipShard shard) {
78         // Shouldn't be null happen but verify anyway
79         if(inflightCommit == null) {
80             return;
81         }
82
83         if(shard.hasLeader()) {
84             log.debug("Retrying commit for BatchedModifications {}", inflightCommit.getTransactionID());
85
86             shard.tryCommitModifications(inflightCommit);
87         } else {
88             scheduleInflightCommitRetry(shard);
89         }
90     }
91
92     void inflightCommitFailure(Throwable cause, EntityOwnershipShard shard) {
93         // This should've originated from a failed inflight commit but verify anyway
94         if(inflightCommit == null) {
95             return;
96         }
97
98         log.debug("Inflight BatchedModifications {} commit failed", inflightCommit.getTransactionID(), cause);
99
100         if(!(cause instanceof NoShardLeaderException)) {
101             // If the failure is other than NoShardLeaderException the commit may have been partially
102             // processed so retry with a new transaction ID to be safe.
103             newInflightCommitWithDifferentTransactionID();
104         }
105
106         scheduleInflightCommitRetry(shard);
107     }
108
109     private void scheduleInflightCommitRetry(EntityOwnershipShard shard) {
110         FiniteDuration duration = shard.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval();
111
112         log.debug("Scheduling retry for BatchedModifications commit {} in {}",
113                 inflightCommit.getTransactionID(), duration);
114
115         retryCommitSchedule = shard.getContext().system().scheduler().scheduleOnce(duration, shard.getSelf(),
116                 COMMIT_RETRY_MESSAGE, shard.getContext().dispatcher(), ActorRef.noSender());
117     }
118
119     void inflightCommitSucceeded(EntityOwnershipShard shard) {
120         // Shouldn't be null but verify anyway
121         if(inflightCommit == null) {
122             return;
123         }
124
125         if(retryCommitSchedule != null) {
126             retryCommitSchedule.cancel();
127         }
128
129         log.debug("BatchedModifications commit {} succeeded", inflightCommit.getTransactionID());
130
131         inflightCommit = null;
132         commitNextBatch(shard);
133     }
134
135     void commitNextBatch(EntityOwnershipShard shard) {
136         if(inflightCommit != null || pendingModifications.isEmpty() || !shard.hasLeader()) {
137             return;
138         }
139
140         inflightCommit = newBatchedModifications();
141         Iterator<Modification> iter = pendingModifications.iterator();
142         while(iter.hasNext()) {
143             inflightCommit.addModification(iter.next());
144             iter.remove();
145             if(inflightCommit.getModifications().size() >=
146                     shard.getDatastoreContext().getShardBatchedModificationCount()) {
147                 break;
148             }
149         }
150
151         log.debug("Committing next BatchedModifications {}, size {}", inflightCommit.getTransactionID(),
152                 inflightCommit.getModifications().size());
153
154         shard.tryCommitModifications(inflightCommit);
155     }
156
157     void commitModification(Modification modification, EntityOwnershipShard shard) {
158         BatchedModifications modifications = newBatchedModifications();
159         modifications.addModification(modification);
160         commitModifications(modifications, shard);
161     }
162
163     void commitModifications(BatchedModifications modifications, EntityOwnershipShard shard) {
164         if(modifications.getModifications().isEmpty()) {
165             return;
166         }
167
168         boolean hasLeader = shard.hasLeader();
169         if(inflightCommit != null || !hasLeader) {
170             if(log.isDebugEnabled()) {
171                 log.debug("{} - adding modifications to pending",
172                         (inflightCommit != null ? "A commit is inflight" : "No shard leader"));
173             }
174
175             pendingModifications.addAll(modifications.getModifications());
176         } else {
177             inflightCommit = modifications;
178             shard.tryCommitModifications(inflightCommit);
179         }
180     }
181
182     void onStateChanged(EntityOwnershipShard shard, boolean isLeader) {
183         if(!isLeader && inflightCommit != null) {
184             // We're no longer the leader but we have an inflight local commit. This likely means we didn't get
185             // consensus for the commit and switched to follower due to another node with a higher term. We
186             // can't be sure if the commit was replicated to any node so we retry it here with a new
187             // transaction ID.
188             if(retryCommitSchedule != null) {
189                 retryCommitSchedule.cancel();
190             }
191
192             newInflightCommitWithDifferentTransactionID();
193             retryInflightCommit(shard);
194         } else {
195             commitNextBatch(shard);
196         }
197     }
198
199     private void newInflightCommitWithDifferentTransactionID() {
200         BatchedModifications newBatchedModifications = newBatchedModifications();
201         newBatchedModifications.getModifications().addAll(inflightCommit.getModifications());
202         inflightCommit = newBatchedModifications;
203     }
204
205     BatchedModifications newBatchedModifications() {
206         BatchedModifications modifications = new BatchedModifications(TransactionIdentifierUtils.actorNameFor(
207             new TransactionIdentifier(historyId, ++transactionIDCounter)), DataStoreVersions.CURRENT_VERSION, "");
208         modifications.setDoCommitOnReady(true);
209         modifications.setReady(true);
210         modifications.setTotalMessagesSent(1);
211         return modifications;
212     }
213 }