BUG-5280: use MemberName instead of String
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShardCommitCoordinator.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Cancellable;
12 import akka.actor.Status.Failure;
13 import java.util.Iterator;
14 import java.util.LinkedList;
15 import java.util.Queue;
16 import org.opendaylight.controller.cluster.access.concepts.MemberName;
17 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
18 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
19 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
20 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
21 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
22 import org.opendaylight.controller.cluster.datastore.modification.Modification;
23 import org.slf4j.Logger;
24 import scala.concurrent.duration.FiniteDuration;
25
26 /**
27  * Handles commits and retries for the EntityOwnershipShard.
28  *
29  * @author Thomas Pantelis
30  */
31 class EntityOwnershipShardCommitCoordinator {
32     private static final Object COMMIT_RETRY_MESSAGE = new Object() {
33         @Override
34         public String toString() {
35             return "entityCommitRetry";
36         }
37     };
38
39     private final Logger log;
40     private int transactionIDCounter = 0;
41     private final MemberName localMemberName;
42     private final Queue<Modification> pendingModifications = new LinkedList<>();
43     private BatchedModifications inflightCommit;
44     private Cancellable retryCommitSchedule;
45
46     EntityOwnershipShardCommitCoordinator(MemberName localMemberName, Logger log) {
47         this.localMemberName = localMemberName;
48         this.log = log;
49     }
50
51     boolean handleMessage(Object message, EntityOwnershipShard shard) {
52         boolean handled = true;
53         if(CommitTransactionReply.isSerializedType(message)) {
54             // Successful reply from a local commit.
55             inflightCommitSucceeded(shard);
56         } else if(message instanceof akka.actor.Status.Failure) {
57             // Failure reply from a local commit.
58             inflightCommitFailure(((Failure)message).cause(), shard);
59         } else if(COMMIT_RETRY_MESSAGE.equals(message)) {
60             retryInflightCommit(shard);
61         } else {
62             handled = false;
63         }
64
65         return handled;
66     }
67
68     private void retryInflightCommit(EntityOwnershipShard shard) {
69         // Shouldn't be null happen but verify anyway
70         if(inflightCommit == null) {
71             return;
72         }
73
74         if(shard.hasLeader()) {
75             log.debug("Retrying commit for BatchedModifications {}", inflightCommit.getTransactionID());
76
77             shard.tryCommitModifications(inflightCommit);
78         } else {
79             scheduleInflightCommitRetry(shard);
80         }
81     }
82
83     void inflightCommitFailure(Throwable cause, EntityOwnershipShard shard) {
84         // This should've originated from a failed inflight commit but verify anyway
85         if(inflightCommit == null) {
86             return;
87         }
88
89         log.debug("Inflight BatchedModifications {} commit failed", inflightCommit.getTransactionID(), cause);
90
91         if(!(cause instanceof NoShardLeaderException)) {
92             // If the failure is other than NoShardLeaderException the commit may have been partially
93             // processed so retry with a new transaction ID to be safe.
94             newInflightCommitWithDifferentTransactionID();
95         }
96
97         scheduleInflightCommitRetry(shard);
98     }
99
100     private void scheduleInflightCommitRetry(EntityOwnershipShard shard) {
101         FiniteDuration duration = shard.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval();
102
103         log.debug("Scheduling retry for BatchedModifications commit {} in {}",
104                 inflightCommit.getTransactionID(), duration);
105
106         retryCommitSchedule = shard.getContext().system().scheduler().scheduleOnce(duration, shard.getSelf(),
107                 COMMIT_RETRY_MESSAGE, shard.getContext().dispatcher(), ActorRef.noSender());
108     }
109
110     void inflightCommitSucceeded(EntityOwnershipShard shard) {
111         // Shouldn't be null but verify anyway
112         if(inflightCommit == null) {
113             return;
114         }
115
116         if(retryCommitSchedule != null) {
117             retryCommitSchedule.cancel();
118         }
119
120         log.debug("BatchedModifications commit {} succeeded", inflightCommit.getTransactionID());
121
122         inflightCommit = null;
123         commitNextBatch(shard);
124     }
125
126     void commitNextBatch(EntityOwnershipShard shard) {
127         if(inflightCommit != null || pendingModifications.isEmpty() || !shard.hasLeader()) {
128             return;
129         }
130
131         inflightCommit = newBatchedModifications();
132         Iterator<Modification> iter = pendingModifications.iterator();
133         while(iter.hasNext()) {
134             inflightCommit.addModification(iter.next());
135             iter.remove();
136             if(inflightCommit.getModifications().size() >=
137                     shard.getDatastoreContext().getShardBatchedModificationCount()) {
138                 break;
139             }
140         }
141
142         log.debug("Committing next BatchedModifications {}, size {}", inflightCommit.getTransactionID(),
143                 inflightCommit.getModifications().size());
144
145         shard.tryCommitModifications(inflightCommit);
146     }
147
148     void commitModification(Modification modification, EntityOwnershipShard shard) {
149         BatchedModifications modifications = newBatchedModifications();
150         modifications.addModification(modification);
151         commitModifications(modifications, shard);
152     }
153
154     void commitModifications(BatchedModifications modifications, EntityOwnershipShard shard) {
155         if(modifications.getModifications().isEmpty()) {
156             return;
157         }
158
159         boolean hasLeader = shard.hasLeader();
160         if(inflightCommit != null || !hasLeader) {
161             if(log.isDebugEnabled()) {
162                 log.debug("{} - adding modifications to pending",
163                         (inflightCommit != null ? "A commit is inflight" : "No shard leader"));
164             }
165
166             pendingModifications.addAll(modifications.getModifications());
167         } else {
168             inflightCommit = modifications;
169             shard.tryCommitModifications(inflightCommit);
170         }
171     }
172
173     void onStateChanged(EntityOwnershipShard shard, boolean isLeader) {
174         if(!isLeader && inflightCommit != null) {
175             // We're no longer the leader but we have an inflight local commit. This likely means we didn't get
176             // consensus for the commit and switched to follower due to another node with a higher term. We
177             // can't be sure if the commit was replicated to any node so we retry it here with a new
178             // transaction ID.
179             if(retryCommitSchedule != null) {
180                 retryCommitSchedule.cancel();
181             }
182
183             newInflightCommitWithDifferentTransactionID();
184             retryInflightCommit(shard);
185         } else {
186             commitNextBatch(shard);
187         }
188     }
189
190     private void newInflightCommitWithDifferentTransactionID() {
191         BatchedModifications newBatchedModifications = newBatchedModifications();
192         newBatchedModifications.getModifications().addAll(inflightCommit.getModifications());
193         inflightCommit = newBatchedModifications;
194     }
195
196     BatchedModifications newBatchedModifications() {
197         BatchedModifications modifications = new BatchedModifications(
198                 TransactionIdentifier.create(localMemberName, ++transactionIDCounter).toString(),
199                 DataStoreVersions.CURRENT_VERSION, "");
200         modifications.setDoCommitOnReady(true);
201         modifications.setReady(true);
202         modifications.setTotalMessagesSent(1);
203         return modifications;
204     }
205 }