2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import akka.dispatch.ExecutionContexts;
11 import akka.dispatch.OnComplete;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Verify;
14 import com.google.common.primitives.UnsignedLong;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.SettableFuture;
19 import java.util.Optional;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.TimeoutException;
22 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
23 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
24 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
25 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import scala.concurrent.Future;
30 final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
31 private static final Logger LOG = LoggerFactory.getLogger(SimpleShardDataTreeCohort.class);
32 private static final ListenableFuture<Void> VOID_FUTURE = Futures.immediateFuture(null);
33 private final DataTreeModification transaction;
34 private final ShardDataTree dataTree;
35 private final TransactionIdentifier transactionId;
36 private final CompositeDataTreeCohort userCohorts;
38 private State state = State.READY;
39 private DataTreeCandidateTip candidate;
40 private FutureCallback<?> callback;
41 private Exception nextFailure;
43 SimpleShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction,
44 final TransactionIdentifier transactionId, final CompositeDataTreeCohort userCohorts) {
45 this.dataTree = Preconditions.checkNotNull(dataTree);
46 this.transaction = Preconditions.checkNotNull(transaction);
47 this.transactionId = Preconditions.checkNotNull(transactionId);
48 this.userCohorts = Preconditions.checkNotNull(userCohorts);
52 public TransactionIdentifier getIdentifier() {
57 DataTreeCandidateTip getCandidate() {
63 DataTreeModification getDataTreeModification() {
67 private void checkState(final State expected) {
68 Preconditions.checkState(state == expected, "State %s does not match expected state %s", state, expected);
72 public void canCommit(final FutureCallback<Void> newCallback) {
73 if (state == State.CAN_COMMIT_PENDING) {
77 checkState(State.READY);
78 this.callback = Preconditions.checkNotNull(newCallback);
79 state = State.CAN_COMMIT_PENDING;
80 dataTree.startCanCommit(this);
84 public void preCommit(final FutureCallback<DataTreeCandidate> newCallback) {
85 checkState(State.CAN_COMMIT_COMPLETE);
86 this.callback = Preconditions.checkNotNull(newCallback);
87 state = State.PRE_COMMIT_PENDING;
89 if (nextFailure == null) {
90 dataTree.startPreCommit(this);
92 failedPreCommit(nextFailure);
97 public ListenableFuture<Void> abort() {
98 dataTree.startAbort(this);
99 state = State.ABORTED;
101 final Optional<Future<Iterable<Object>>> maybeAborts = userCohorts.abort();
102 if (!maybeAborts.isPresent()) {
106 final Future<Iterable<Object>> aborts = maybeAborts.get();
107 if (aborts.isCompleted()) {
111 final SettableFuture<Void> ret = SettableFuture.create();
112 aborts.onComplete(new OnComplete<Iterable<Object>>() {
114 public void onComplete(final Throwable failure, final Iterable<Object> objs) {
115 if (failure != null) {
116 ret.setException(failure);
121 }, ExecutionContexts.global());
127 public void commit(final FutureCallback<UnsignedLong> newCallback) {
128 checkState(State.PRE_COMMIT_COMPLETE);
129 this.callback = Preconditions.checkNotNull(newCallback);
130 state = State.COMMIT_PENDING;
131 dataTree.startCommit(this, candidate);
134 private <T> FutureCallback<T> switchState(final State newState) {
135 @SuppressWarnings("unchecked")
136 final FutureCallback<T> ret = (FutureCallback<T>) this.callback;
137 this.callback = null;
138 LOG.debug("Transaction {} changing state from {} to {}", transactionId, state, newState);
139 this.state = newState;
143 void successfulCanCommit() {
144 switchState(State.CAN_COMMIT_COMPLETE).onSuccess(null);
147 void failedCanCommit(final Exception cause) {
148 switchState(State.FAILED).onFailure(cause);
152 * Run user-defined canCommit and preCommit hooks. We want to run these before we initiate persistence so that
153 * any failure to validate is propagated before we record the transaction.
155 * @param dataTreeCandidate {@link DataTreeCandidate} under consideration
156 * @throws ExecutionException if the operation fails
157 * @throws TimeoutException if the operation times out
159 // FIXME: this should be asynchronous
160 void userPreCommit(final DataTreeCandidate dataTreeCandidate) throws ExecutionException, TimeoutException {
161 userCohorts.canCommit(dataTreeCandidate);
162 userCohorts.preCommit();
165 void successfulPreCommit(final DataTreeCandidateTip dataTreeCandidate) {
166 LOG.trace("Transaction {} prepared candidate {}", transaction, dataTreeCandidate);
167 this.candidate = Verify.verifyNotNull(dataTreeCandidate);
168 switchState(State.PRE_COMMIT_COMPLETE).onSuccess(dataTreeCandidate);
171 void failedPreCommit(final Exception cause) {
172 if (LOG.isTraceEnabled()) {
173 LOG.trace("Transaction {} failed to prepare", transaction, cause);
175 LOG.error("Transaction {} failed to prepare", transactionId, cause);
179 switchState(State.FAILED).onFailure(cause);
182 void successfulCommit(final UnsignedLong journalIndex) {
184 userCohorts.commit();
185 } catch (TimeoutException | ExecutionException e) {
186 // We are probably dead, depending on what the cohorts end up doing
187 LOG.error("User cohorts failed to commit", e);
190 switchState(State.COMMITTED).onSuccess(journalIndex);
193 void failedCommit(final Exception cause) {
194 if (LOG.isTraceEnabled()) {
195 LOG.trace("Transaction {} failed to commit", transaction, cause);
197 LOG.error("Transaction failed to commit", cause);
201 switchState(State.FAILED).onFailure(cause);
205 public State getState() {
209 void reportFailure(final Exception cause) {
210 this.nextFailure = Preconditions.checkNotNull(cause);
214 public boolean isFailed() {
215 return state == State.FAILED || nextFailure != null;