2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import com.google.common.base.MoreObjects.ToStringHelper;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import com.google.common.primitives.UnsignedLong;
14 import com.google.common.util.concurrent.FutureCallback;
15 import java.util.Optional;
16 import java.util.concurrent.CompletionStage;
17 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
18 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
19 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
20 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
24 final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
25 private static final Logger LOG = LoggerFactory.getLogger(SimpleShardDataTreeCohort.class);
27 private final DataTreeModification transaction;
28 private final ShardDataTree dataTree;
29 private final TransactionIdentifier transactionId;
30 private final CompositeDataTreeCohort userCohorts;
32 private State state = State.READY;
33 private DataTreeCandidateTip candidate;
34 private FutureCallback<?> callback;
35 private Exception nextFailure;
37 SimpleShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction,
38 final TransactionIdentifier transactionId, final CompositeDataTreeCohort userCohorts) {
39 this.dataTree = Preconditions.checkNotNull(dataTree);
40 this.transaction = Preconditions.checkNotNull(transaction);
41 this.transactionId = Preconditions.checkNotNull(transactionId);
42 this.userCohorts = Preconditions.checkNotNull(userCohorts);
45 SimpleShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction,
46 final TransactionIdentifier transactionId, final Exception nextFailure) {
47 this.dataTree = Preconditions.checkNotNull(dataTree);
48 this.transaction = Preconditions.checkNotNull(transaction);
49 this.transactionId = Preconditions.checkNotNull(transactionId);
50 this.userCohorts = null;
51 this.nextFailure = Preconditions.checkNotNull(nextFailure);
55 public TransactionIdentifier getIdentifier() {
60 DataTreeCandidateTip getCandidate() {
65 DataTreeModification getDataTreeModification() {
69 private void checkState(final State expected) {
70 Preconditions.checkState(state == expected, "State %s does not match expected state %s", state, expected);
74 public void canCommit(final FutureCallback<Void> newCallback) {
75 if (state == State.CAN_COMMIT_PENDING) {
79 checkState(State.READY);
80 this.callback = Preconditions.checkNotNull(newCallback);
81 state = State.CAN_COMMIT_PENDING;
83 if (nextFailure == null) {
84 dataTree.startCanCommit(this);
86 failedCanCommit(nextFailure);
91 public void preCommit(final FutureCallback<DataTreeCandidate> newCallback) {
92 checkState(State.CAN_COMMIT_COMPLETE);
93 this.callback = Preconditions.checkNotNull(newCallback);
94 state = State.PRE_COMMIT_PENDING;
96 if (nextFailure == null) {
97 dataTree.startPreCommit(this);
99 failedPreCommit(nextFailure);
104 public void abort(final FutureCallback<Void> abortCallback) {
105 if (!dataTree.startAbort(this)) {
106 abortCallback.onSuccess(null);
111 state = State.ABORTED;
113 final Optional<CompletionStage<?>> maybeAborts = userCohorts.abort();
114 if (!maybeAborts.isPresent()) {
115 abortCallback.onSuccess(null);
119 maybeAborts.get().whenComplete((noop, failure) -> {
120 if (failure != null) {
121 abortCallback.onFailure(failure);
123 abortCallback.onSuccess(null);
129 public void commit(final FutureCallback<UnsignedLong> newCallback) {
130 checkState(State.PRE_COMMIT_COMPLETE);
131 this.callback = Preconditions.checkNotNull(newCallback);
132 state = State.COMMIT_PENDING;
134 if (nextFailure == null) {
135 dataTree.startCommit(this, candidate);
137 failedCommit(nextFailure);
141 private <T> FutureCallback<T> switchState(final State newState) {
142 @SuppressWarnings("unchecked")
143 final FutureCallback<T> ret = (FutureCallback<T>) this.callback;
144 this.callback = null;
145 LOG.debug("Transaction {} changing state from {} to {}", transactionId, state, newState);
146 this.state = newState;
150 void setNewCandidate(final DataTreeCandidateTip dataTreeCandidate) {
151 checkState(State.PRE_COMMIT_COMPLETE);
152 this.candidate = Verify.verifyNotNull(dataTreeCandidate);
155 void successfulCanCommit() {
156 switchState(State.CAN_COMMIT_COMPLETE).onSuccess(null);
159 void failedCanCommit(final Exception cause) {
160 switchState(State.FAILED).onFailure(cause);
164 * Run user-defined canCommit and preCommit hooks. We want to run these before we initiate persistence so that
165 * any failure to validate is propagated before we record the transaction.
167 * @param dataTreeCandidate {@link DataTreeCandidate} under consideration
168 * @param futureCallback the callback to invoke on completion, which may be immediate or async.
170 void userPreCommit(final DataTreeCandidate dataTreeCandidate, final FutureCallback<Void> futureCallback) {
173 final Optional<CompletionStage<Void>> maybeCanCommitFuture = userCohorts.canCommit(dataTreeCandidate);
174 if (!maybeCanCommitFuture.isPresent()) {
175 doUserPreCommit(futureCallback);
179 maybeCanCommitFuture.get().whenComplete((noop, failure) -> {
180 if (failure != null) {
181 futureCallback.onFailure(failure);
183 doUserPreCommit(futureCallback);
188 private void doUserPreCommit(final FutureCallback<Void> futureCallback) {
189 final Optional<CompletionStage<Void>> maybePreCommitFuture = userCohorts.preCommit();
190 if (!maybePreCommitFuture.isPresent()) {
191 futureCallback.onSuccess(null);
195 maybePreCommitFuture.get().whenComplete((noop, failure) -> {
196 if (failure != null) {
197 futureCallback.onFailure(failure);
199 futureCallback.onSuccess(null);
204 void successfulPreCommit(final DataTreeCandidateTip dataTreeCandidate) {
205 LOG.trace("Transaction {} prepared candidate {}", transaction, dataTreeCandidate);
206 this.candidate = Verify.verifyNotNull(dataTreeCandidate);
207 switchState(State.PRE_COMMIT_COMPLETE).onSuccess(dataTreeCandidate);
210 void failedPreCommit(final Throwable cause) {
211 if (LOG.isTraceEnabled()) {
212 LOG.trace("Transaction {} failed to prepare", transaction, cause);
214 LOG.error("Transaction {} failed to prepare", transactionId, cause);
218 switchState(State.FAILED).onFailure(cause);
221 void successfulCommit(final UnsignedLong journalIndex, final Runnable onComplete) {
222 final Optional<CompletionStage<Void>> maybeCommitFuture = userCohorts.commit();
223 if (!maybeCommitFuture.isPresent()) {
224 finishSuccessfulCommit(journalIndex, onComplete);
228 maybeCommitFuture.get().whenComplete((noop, failure) -> {
229 if (failure != null) {
230 LOG.error("User cohorts failed to commit", failure);
233 finishSuccessfulCommit(journalIndex, onComplete);
237 private void finishSuccessfulCommit(final UnsignedLong journalIndex, final Runnable onComplete) {
238 switchState(State.COMMITTED).onSuccess(journalIndex);
242 void failedCommit(final Exception cause) {
243 if (LOG.isTraceEnabled()) {
244 LOG.trace("Transaction {} failed to commit", transaction, cause);
246 LOG.error("Transaction failed to commit", cause);
250 switchState(State.FAILED).onFailure(cause);
254 public State getState() {
258 void reportFailure(final Exception cause) {
259 if (nextFailure == null) {
260 this.nextFailure = Preconditions.checkNotNull(cause);
262 LOG.debug("Transaction {} already has a set failure, not updating it", transactionId, cause);
267 public boolean isFailed() {
268 return state == State.FAILED || nextFailure != null;
272 ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
273 return super.addToStringAttributes(toStringHelper).add("nextFailure", nextFailure);