2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import akka.dispatch.ExecutionContexts;
11 import akka.dispatch.OnComplete;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Verify;
14 import com.google.common.primitives.UnsignedLong;
15 import com.google.common.util.concurrent.FutureCallback;
16 import java.util.Optional;
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.TimeoutException;
19 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
20 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
21 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
22 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25 import scala.concurrent.Future;
27 final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
28 private static final Logger LOG = LoggerFactory.getLogger(SimpleShardDataTreeCohort.class);
30 private final DataTreeModification transaction;
31 private final ShardDataTree dataTree;
32 private final TransactionIdentifier transactionId;
33 private final CompositeDataTreeCohort userCohorts;
35 private State state = State.READY;
36 private DataTreeCandidateTip candidate;
37 private FutureCallback<?> callback;
38 private Exception nextFailure;
40 SimpleShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction,
41 final TransactionIdentifier transactionId, final CompositeDataTreeCohort userCohorts) {
42 this.dataTree = Preconditions.checkNotNull(dataTree);
43 this.transaction = Preconditions.checkNotNull(transaction);
44 this.transactionId = Preconditions.checkNotNull(transactionId);
45 this.userCohorts = Preconditions.checkNotNull(userCohorts);
49 public TransactionIdentifier getIdentifier() {
54 DataTreeCandidateTip getCandidate() {
59 DataTreeModification getDataTreeModification() {
63 private void checkState(final State expected) {
64 Preconditions.checkState(state == expected, "State %s does not match expected state %s", state, expected);
68 public void canCommit(final FutureCallback<Void> newCallback) {
69 if (state == State.CAN_COMMIT_PENDING) {
73 checkState(State.READY);
74 this.callback = Preconditions.checkNotNull(newCallback);
75 state = State.CAN_COMMIT_PENDING;
76 dataTree.startCanCommit(this);
80 public void preCommit(final FutureCallback<DataTreeCandidate> newCallback) {
81 checkState(State.CAN_COMMIT_COMPLETE);
82 this.callback = Preconditions.checkNotNull(newCallback);
83 state = State.PRE_COMMIT_PENDING;
85 if (nextFailure == null) {
86 dataTree.startPreCommit(this);
88 failedPreCommit(nextFailure);
93 public void abort(final FutureCallback<Void> abortCallback) {
94 if (!dataTree.startAbort(this)) {
95 abortCallback.onSuccess(null);
100 state = State.ABORTED;
102 final Optional<Future<Iterable<Object>>> maybeAborts = userCohorts.abort();
103 if (!maybeAborts.isPresent()) {
104 abortCallback.onSuccess(null);
108 final Future<Iterable<Object>> aborts = maybeAborts.get();
109 if (aborts.isCompleted()) {
110 abortCallback.onSuccess(null);
114 aborts.onComplete(new OnComplete<Iterable<Object>>() {
116 public void onComplete(final Throwable failure, final Iterable<Object> objs) {
117 if (failure != null) {
118 abortCallback.onFailure(failure);
120 abortCallback.onSuccess(null);
123 }, ExecutionContexts.global());
127 public void commit(final FutureCallback<UnsignedLong> newCallback) {
128 checkState(State.PRE_COMMIT_COMPLETE);
129 this.callback = Preconditions.checkNotNull(newCallback);
130 state = State.COMMIT_PENDING;
132 if (nextFailure == null) {
133 dataTree.startCommit(this, candidate);
135 failedCommit(nextFailure);
139 private <T> FutureCallback<T> switchState(final State newState) {
140 @SuppressWarnings("unchecked")
141 final FutureCallback<T> ret = (FutureCallback<T>) this.callback;
142 this.callback = null;
143 LOG.debug("Transaction {} changing state from {} to {}", transactionId, state, newState);
144 this.state = newState;
148 void setNewCandidate(DataTreeCandidateTip dataTreeCandidate) {
149 checkState(State.PRE_COMMIT_COMPLETE);
150 this.candidate = Verify.verifyNotNull(dataTreeCandidate);
153 void successfulCanCommit() {
154 switchState(State.CAN_COMMIT_COMPLETE).onSuccess(null);
157 void failedCanCommit(final Exception cause) {
158 switchState(State.FAILED).onFailure(cause);
162 * Run user-defined canCommit and preCommit hooks. We want to run these before we initiate persistence so that
163 * any failure to validate is propagated before we record the transaction.
165 * @param dataTreeCandidate {@link DataTreeCandidate} under consideration
166 * @throws ExecutionException if the operation fails
167 * @throws TimeoutException if the operation times out
169 // FIXME: this should be asynchronous
170 void userPreCommit(final DataTreeCandidate dataTreeCandidate) throws ExecutionException, TimeoutException {
172 userCohorts.canCommit(dataTreeCandidate);
173 userCohorts.preCommit();
176 void successfulPreCommit(final DataTreeCandidateTip dataTreeCandidate) {
177 LOG.trace("Transaction {} prepared candidate {}", transaction, dataTreeCandidate);
178 this.candidate = Verify.verifyNotNull(dataTreeCandidate);
179 switchState(State.PRE_COMMIT_COMPLETE).onSuccess(dataTreeCandidate);
182 void failedPreCommit(final Exception cause) {
183 if (LOG.isTraceEnabled()) {
184 LOG.trace("Transaction {} failed to prepare", transaction, cause);
186 LOG.error("Transaction {} failed to prepare", transactionId, cause);
190 switchState(State.FAILED).onFailure(cause);
193 void successfulCommit(final UnsignedLong journalIndex) {
195 userCohorts.commit();
196 } catch (TimeoutException | ExecutionException e) {
197 // We are probably dead, depending on what the cohorts end up doing
198 LOG.error("User cohorts failed to commit", e);
201 switchState(State.COMMITTED).onSuccess(journalIndex);
204 void failedCommit(final Exception cause) {
205 if (LOG.isTraceEnabled()) {
206 LOG.trace("Transaction {} failed to commit", transaction, cause);
208 LOG.error("Transaction failed to commit", cause);
212 switchState(State.FAILED).onFailure(cause);
216 public State getState() {
220 void reportFailure(final Exception cause) {
221 this.nextFailure = Preconditions.checkNotNull(cause);
225 public boolean isFailed() {
226 return state == State.FAILED || nextFailure != null;