Do not use Strings for internal messages
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShardCommitCoordinator.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Cancellable;
12 import akka.actor.Status.Failure;
13 import java.util.Iterator;
14 import java.util.LinkedList;
15 import java.util.Queue;
16 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
17 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
18 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
19 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
20 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
21 import org.opendaylight.controller.cluster.datastore.modification.Modification;
22 import org.slf4j.Logger;
23 import scala.concurrent.duration.FiniteDuration;
24
25 /**
26  * Handles commits and retries for the EntityOwnershipShard.
27  *
28  * @author Thomas Pantelis
29  */
30 class EntityOwnershipShardCommitCoordinator {
31     private static final Object COMMIT_RETRY_MESSAGE = new Object() {
32         @Override
33         public String toString() {
34             return "entityCommitRetry";
35         }
36     };
37
38     private final Logger log;
39     private int transactionIDCounter = 0;
40     private final String localMemberName;
41     private final Queue<Modification> pendingModifications = new LinkedList<>();
42     private BatchedModifications inflightCommit;
43     private Cancellable retryCommitSchedule;
44
45     EntityOwnershipShardCommitCoordinator(String localMemberName, Logger log) {
46         this.localMemberName = localMemberName;
47         this.log = log;
48     }
49
50     boolean handleMessage(Object message, EntityOwnershipShard shard) {
51         boolean handled = true;
52         if(CommitTransactionReply.isSerializedType(message)) {
53             // Successful reply from a local commit.
54             inflightCommitSucceeded(shard);
55         } else if(message instanceof akka.actor.Status.Failure) {
56             // Failure reply from a local commit.
57             inflightCommitFailure(((Failure)message).cause(), shard);
58         } else if(COMMIT_RETRY_MESSAGE.equals(message)) {
59             retryInflightCommit(shard);
60         } else {
61             handled = false;
62         }
63
64         return handled;
65     }
66
67     private void retryInflightCommit(EntityOwnershipShard shard) {
68         // Shouldn't be null happen but verify anyway
69         if(inflightCommit == null) {
70             return;
71         }
72
73         if(shard.hasLeader()) {
74             log.debug("Retrying commit for BatchedModifications {}", inflightCommit.getTransactionID());
75
76             shard.tryCommitModifications(inflightCommit);
77         } else {
78             scheduleInflightCommitRetry(shard);
79         }
80     }
81
82     void inflightCommitFailure(Throwable cause, EntityOwnershipShard shard) {
83         // This should've originated from a failed inflight commit but verify anyway
84         if(inflightCommit == null) {
85             return;
86         }
87
88         log.debug("Inflight BatchedModifications {} commit failed", inflightCommit.getTransactionID(), cause);
89
90         if(!(cause instanceof NoShardLeaderException)) {
91             // If the failure is other than NoShardLeaderException the commit may have been partially
92             // processed so retry with a new transaction ID to be safe.
93             newInflightCommitWithDifferentTransactionID();
94         }
95
96         scheduleInflightCommitRetry(shard);
97     }
98
99     private void scheduleInflightCommitRetry(EntityOwnershipShard shard) {
100         FiniteDuration duration = shard.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval();
101
102         log.debug("Scheduling retry for BatchedModifications commit {} in {}",
103                 inflightCommit.getTransactionID(), duration);
104
105         retryCommitSchedule = shard.getContext().system().scheduler().scheduleOnce(duration, shard.getSelf(),
106                 COMMIT_RETRY_MESSAGE, shard.getContext().dispatcher(), ActorRef.noSender());
107     }
108
109     void inflightCommitSucceeded(EntityOwnershipShard shard) {
110         // Shouldn't be null but verify anyway
111         if(inflightCommit == null) {
112             return;
113         }
114
115         if(retryCommitSchedule != null) {
116             retryCommitSchedule.cancel();
117         }
118
119         log.debug("BatchedModifications commit {} succeeded", inflightCommit.getTransactionID());
120
121         inflightCommit = null;
122         commitNextBatch(shard);
123     }
124
125     void commitNextBatch(EntityOwnershipShard shard) {
126         if(inflightCommit != null || pendingModifications.isEmpty() || !shard.hasLeader()) {
127             return;
128         }
129
130         inflightCommit = newBatchedModifications();
131         Iterator<Modification> iter = pendingModifications.iterator();
132         while(iter.hasNext()) {
133             inflightCommit.addModification(iter.next());
134             iter.remove();
135             if(inflightCommit.getModifications().size() >=
136                     shard.getDatastoreContext().getShardBatchedModificationCount()) {
137                 break;
138             }
139         }
140
141         log.debug("Committing next BatchedModifications {}, size {}", inflightCommit.getTransactionID(),
142                 inflightCommit.getModifications().size());
143
144         shard.tryCommitModifications(inflightCommit);
145     }
146
147     void commitModification(Modification modification, EntityOwnershipShard shard) {
148         BatchedModifications modifications = newBatchedModifications();
149         modifications.addModification(modification);
150         commitModifications(modifications, shard);
151     }
152
153     void commitModifications(BatchedModifications modifications, EntityOwnershipShard shard) {
154         if(modifications.getModifications().isEmpty()) {
155             return;
156         }
157
158         boolean hasLeader = shard.hasLeader();
159         if(inflightCommit != null || !hasLeader) {
160             if(log.isDebugEnabled()) {
161                 log.debug("{} - adding modifications to pending",
162                         (inflightCommit != null ? "A commit is inflight" : "No shard leader"));
163             }
164
165             pendingModifications.addAll(modifications.getModifications());
166         } else {
167             inflightCommit = modifications;
168             shard.tryCommitModifications(inflightCommit);
169         }
170     }
171
172     void onStateChanged(EntityOwnershipShard shard, boolean isLeader) {
173         if(!isLeader && inflightCommit != null) {
174             // We're no longer the leader but we have an inflight local commit. This likely means we didn't get
175             // consensus for the commit and switched to follower due to another node with a higher term. We
176             // can't be sure if the commit was replicated to any node so we retry it here with a new
177             // transaction ID.
178             if(retryCommitSchedule != null) {
179                 retryCommitSchedule.cancel();
180             }
181
182             newInflightCommitWithDifferentTransactionID();
183             retryInflightCommit(shard);
184         } else {
185             commitNextBatch(shard);
186         }
187     }
188
189     private void newInflightCommitWithDifferentTransactionID() {
190         BatchedModifications newBatchedModifications = newBatchedModifications();
191         newBatchedModifications.getModifications().addAll(inflightCommit.getModifications());
192         inflightCommit = newBatchedModifications;
193     }
194
195     BatchedModifications newBatchedModifications() {
196         BatchedModifications modifications = new BatchedModifications(
197                 TransactionIdentifier.create(localMemberName, ++transactionIDCounter).toString(),
198                 DataStoreVersions.CURRENT_VERSION, "");
199         modifications.setDoCommitOnReady(true);
200         modifications.setReady(true);
201         modifications.setTotalMessagesSent(1);
202         return modifications;
203     }
204 }