2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
13 import com.google.common.primitives.UnsignedLong;
14 import com.google.common.util.concurrent.FutureCallback;
15 import java.util.Collection;
16 import java.util.Optional;
17 import javax.annotation.concurrent.NotThreadSafe;
18 import org.eclipse.jdt.annotation.Nullable;
19 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
20 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
21 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
22 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
23 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
24 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionSuccess;
25 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
26 import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
27 import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
28 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
29 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
30 import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
31 import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
32 import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
33 import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
34 import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
35 import org.opendaylight.controller.cluster.access.commands.TransactionModification;
36 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
37 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
38 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
39 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
40 import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
41 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
42 import org.opendaylight.controller.cluster.access.concepts.RequestException;
43 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
44 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
45 import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
46 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
47 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
48 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
53 * Frontend read-write transaction state as observed by the shard leader.
55 * @author Robert Varga
58 final class FrontendReadWriteTransaction extends FrontendTransaction {
59 private enum CommitStage {
68 private abstract static class State {
70 public abstract String toString();
73 private static final class Failed extends State {
74 final RequestException cause;
76 Failed(final RequestException cause) {
77 this.cause = requireNonNull(cause);
81 public String toString() {
82 return "FAILED (" + cause.getMessage() + ")";
86 private static final class Open extends State {
87 final ReadWriteShardDataTreeTransaction openTransaction;
89 Open(final ReadWriteShardDataTreeTransaction openTransaction) {
90 this.openTransaction = requireNonNull(openTransaction);
94 public String toString() {
99 private static final class Ready extends State {
100 final ShardDataTreeCohort readyCohort;
103 Ready(final ShardDataTreeCohort readyCohort) {
104 this.readyCohort = requireNonNull(readyCohort);
105 this.stage = CommitStage.READY;
109 public String toString() {
110 return "READY (" + stage + ")";
114 private static final class Sealed extends State {
115 final DataTreeModification sealedModification;
117 Sealed(final DataTreeModification sealedModification) {
118 this.sealedModification = requireNonNull(sealedModification);
122 public String toString() {
128 * Retired state, needed to catch and suppress callbacks after we have removed associated state.
130 private static final class Retired extends State {
131 private final String prevStateString;
133 Retired(final State prevState) {
134 prevStateString = prevState.toString();
138 public String toString() {
139 return "RETIRED (in " + prevStateString + ")";
143 private static final Logger LOG = LoggerFactory.getLogger(FrontendReadWriteTransaction.class);
144 private static final State ABORTED = new State() {
146 public String toString() {
150 private static final State ABORTING = new State() {
152 public String toString() {
156 private static final State COMMITTED = new State() {
158 public String toString() {
165 private FrontendReadWriteTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id,
166 final ReadWriteShardDataTreeTransaction transaction) {
168 this.state = new Open(transaction);
171 private FrontendReadWriteTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id,
172 final DataTreeModification mod) {
174 this.state = new Sealed(mod);
177 static FrontendReadWriteTransaction createOpen(final AbstractFrontendHistory history,
178 final ReadWriteShardDataTreeTransaction transaction) {
179 return new FrontendReadWriteTransaction(history, transaction.getIdentifier(), transaction);
182 static FrontendReadWriteTransaction createReady(final AbstractFrontendHistory history,
183 final TransactionIdentifier id, final DataTreeModification mod) {
184 return new FrontendReadWriteTransaction(history, id, mod);
187 // Sequence has already been checked
189 TransactionSuccess<?> doHandleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope,
190 final long now) throws RequestException {
191 if (request instanceof ModifyTransactionRequest) {
192 return handleModifyTransaction((ModifyTransactionRequest) request, envelope, now);
193 } else if (request instanceof CommitLocalTransactionRequest) {
194 handleCommitLocalTransaction((CommitLocalTransactionRequest) request, envelope, now);
196 } else if (request instanceof ExistsTransactionRequest) {
197 return handleExistsTransaction((ExistsTransactionRequest) request);
198 } else if (request instanceof ReadTransactionRequest) {
199 return handleReadTransaction((ReadTransactionRequest) request);
200 } else if (request instanceof TransactionPreCommitRequest) {
201 handleTransactionPreCommit((TransactionPreCommitRequest) request, envelope, now);
203 } else if (request instanceof TransactionDoCommitRequest) {
204 handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now);
206 } else if (request instanceof TransactionAbortRequest) {
207 return handleTransactionAbort(request.getSequence(), envelope, now);
208 } else if (request instanceof AbortLocalTransactionRequest) {
209 handleLocalTransactionAbort(request.getSequence(), envelope, now);
212 LOG.warn("Rejecting unsupported request {}", request);
213 throw new UnsupportedRequestException(request);
219 state = new Retired(state);
222 private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
223 final RequestEnvelope envelope, final long now) throws RequestException {
226 final Ready ready = checkReady();
227 switch (ready.stage) {
228 case PRE_COMMIT_PENDING:
229 LOG.debug("{}: Transaction {} is already preCommitting", persistenceId(), getIdentifier());
231 case CAN_COMMIT_COMPLETE:
232 ready.stage = CommitStage.PRE_COMMIT_PENDING;
233 LOG.debug("{}: Transaction {} initiating preCommit", persistenceId(), getIdentifier());
234 ready.readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
236 public void onSuccess(final DataTreeCandidate result) {
237 successfulPreCommit(envelope, now);
241 public void onFailure(final Throwable failure) {
242 failTransaction(envelope, now, new RuntimeRequestException("Precommit failed", failure));
246 case CAN_COMMIT_PENDING:
248 case PRE_COMMIT_COMPLETE:
250 throw new IllegalStateException("Attempted to preCommit in stage " + ready.stage);
252 throwUnhandledCommitStage(ready);
256 void successfulPreCommit(final RequestEnvelope envelope, final long startTime) {
257 if (state instanceof Retired) {
258 LOG.debug("{}: Suppressing successful preCommit of retired transaction {}", persistenceId(),
263 final Ready ready = checkReady();
264 LOG.debug("{}: Transaction {} completed preCommit", persistenceId(), getIdentifier());
265 recordAndSendSuccess(envelope, startTime, new TransactionPreCommitSuccess(getIdentifier(),
266 envelope.getMessage().getSequence()));
267 ready.stage = CommitStage.PRE_COMMIT_COMPLETE;
270 void failTransaction(final RequestEnvelope envelope, final long now, final RuntimeRequestException cause) {
271 if (state instanceof Retired) {
272 LOG.debug("{}: Suppressing failure of retired transaction {}", persistenceId(), getIdentifier(), cause);
276 recordAndSendFailure(envelope, now, cause);
277 state = new Failed(cause);
278 LOG.debug("{}: Transaction {} failed", persistenceId(), getIdentifier(), cause);
281 private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope,
282 final long now) throws RequestException {
285 final Ready ready = checkReady();
286 switch (ready.stage) {
288 LOG.debug("{}: Transaction {} is already committing", persistenceId(), getIdentifier());
290 case PRE_COMMIT_COMPLETE:
291 ready.stage = CommitStage.COMMIT_PENDING;
292 LOG.debug("{}: Transaction {} initiating commit", persistenceId(), getIdentifier());
293 ready.readyCohort.commit(new FutureCallback<UnsignedLong>() {
295 public void onSuccess(final UnsignedLong result) {
296 successfulCommit(envelope, now);
300 public void onFailure(final Throwable failure) {
301 failTransaction(envelope, now, new RuntimeRequestException("Commit failed", failure));
305 case CAN_COMMIT_COMPLETE:
306 case CAN_COMMIT_PENDING:
307 case PRE_COMMIT_PENDING:
309 throw new IllegalStateException("Attempted to doCommit in stage " + ready.stage);
311 throwUnhandledCommitStage(ready);
315 private void handleLocalTransactionAbort(final long sequence, final RequestEnvelope envelope, final long now) {
316 checkOpen().abort(() -> recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(),
320 private void startAbort() {
322 LOG.debug("{}: Transaction {} aborting", persistenceId(), getIdentifier());
325 private void finishAbort() {
327 LOG.debug("{}: Transaction {} aborted", persistenceId(), getIdentifier());
330 private TransactionAbortSuccess handleTransactionAbort(final long sequence, final RequestEnvelope envelope,
332 if (state instanceof Open) {
333 final ReadWriteShardDataTreeTransaction openTransaction = checkOpen();
335 openTransaction.abort(() -> {
336 recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(),
342 if (ABORTING.equals(state)) {
343 LOG.debug("{}: Transaction {} already aborting", persistenceId(), getIdentifier());
346 if (ABORTED.equals(state)) {
347 // We should have recorded the reply
348 LOG.warn("{}: Transaction {} already aborted", persistenceId(), getIdentifier());
349 return new TransactionAbortSuccess(getIdentifier(), sequence);
352 final Ready ready = checkReady();
354 ready.readyCohort.abort(new FutureCallback<Void>() {
356 public void onSuccess(final Void result) {
357 recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(), sequence));
362 public void onFailure(final Throwable failure) {
363 recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure));
364 LOG.warn("{}: Transaction {} abort failed", persistenceId(), getIdentifier(), failure);
371 private void coordinatedCommit(final RequestEnvelope envelope, final long now) throws RequestException {
374 final Ready ready = checkReady();
375 switch (ready.stage) {
376 case CAN_COMMIT_PENDING:
377 LOG.debug("{}: Transaction {} is already canCommitting", persistenceId(), getIdentifier());
380 ready.stage = CommitStage.CAN_COMMIT_PENDING;
381 LOG.debug("{}: Transaction {} initiating canCommit", persistenceId(), getIdentifier());
382 checkReady().readyCohort.canCommit(new FutureCallback<Void>() {
384 public void onSuccess(final Void result) {
385 successfulCanCommit(envelope, now);
389 public void onFailure(final Throwable failure) {
390 failTransaction(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
394 case CAN_COMMIT_COMPLETE:
396 case PRE_COMMIT_COMPLETE:
397 case PRE_COMMIT_PENDING:
398 throw new IllegalStateException("Attempted to canCommit in stage " + ready.stage);
400 throwUnhandledCommitStage(ready);
404 void successfulCanCommit(final RequestEnvelope envelope, final long startTime) {
405 if (state instanceof Retired) {
406 LOG.debug("{}: Suppressing successful canCommit of retired transaction {}", persistenceId(),
411 final Ready ready = checkReady();
412 recordAndSendSuccess(envelope, startTime, new TransactionCanCommitSuccess(getIdentifier(),
413 envelope.getMessage().getSequence()));
414 ready.stage = CommitStage.CAN_COMMIT_COMPLETE;
415 LOG.debug("{}: Transaction {} completed canCommit", persistenceId(), getIdentifier());
418 private void directCommit(final RequestEnvelope envelope, final long now) throws RequestException {
421 final Ready ready = checkReady();
422 switch (ready.stage) {
423 case CAN_COMMIT_COMPLETE:
424 case CAN_COMMIT_PENDING:
426 case PRE_COMMIT_COMPLETE:
427 case PRE_COMMIT_PENDING:
428 LOG.debug("{}: Transaction {} in state {}, not initiating direct commit for {}", persistenceId(),
429 getIdentifier(), state, envelope);
432 ready.stage = CommitStage.CAN_COMMIT_PENDING;
433 LOG.debug("{}: Transaction {} initiating direct canCommit", persistenceId(), getIdentifier());
434 ready.readyCohort.canCommit(new FutureCallback<Void>() {
436 public void onSuccess(final Void result) {
437 successfulDirectCanCommit(envelope, now);
441 public void onFailure(final Throwable failure) {
442 failTransaction(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
447 throwUnhandledCommitStage(ready);
451 void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
452 if (state instanceof Retired) {
453 LOG.debug("{}: Suppressing direct canCommit of retired transaction {}", persistenceId(), getIdentifier());
457 final Ready ready = checkReady();
458 ready.stage = CommitStage.PRE_COMMIT_PENDING;
459 LOG.debug("{}: Transaction {} initiating direct preCommit", persistenceId(), getIdentifier());
460 ready.readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
462 public void onSuccess(final DataTreeCandidate result) {
463 successfulDirectPreCommit(envelope, startTime);
467 public void onFailure(final Throwable failure) {
468 failTransaction(envelope, startTime, new RuntimeRequestException("PreCommit failed", failure));
473 void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
474 if (state instanceof Retired) {
475 LOG.debug("{}: Suppressing direct commit of retired transaction {}", persistenceId(), getIdentifier());
479 final Ready ready = checkReady();
480 ready.stage = CommitStage.COMMIT_PENDING;
481 LOG.debug("{}: Transaction {} initiating direct commit", persistenceId(), getIdentifier());
482 ready.readyCohort.commit(new FutureCallback<UnsignedLong>() {
484 public void onSuccess(final UnsignedLong result) {
485 successfulCommit(envelope, startTime);
489 public void onFailure(final Throwable failure) {
490 failTransaction(envelope, startTime, new RuntimeRequestException("DoCommit failed", failure));
495 void successfulCommit(final RequestEnvelope envelope, final long startTime) {
496 if (state instanceof Retired) {
497 LOG.debug("{}: Suppressing commit response on retired transaction {}", persistenceId(), getIdentifier());
501 recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(getIdentifier(),
502 envelope.getMessage().getSequence()));
506 private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request,
507 final RequestEnvelope envelope, final long now) throws RequestException {
508 final DataTreeModification sealedModification = checkSealed();
509 if (!sealedModification.equals(request.getModification())) {
510 LOG.warn("Expecting modification {}, commit request has {}", sealedModification, request.getModification());
511 throw new UnsupportedRequestException(request);
514 final Optional<Exception> optFailure = request.getDelayedFailure();
515 if (optFailure.isPresent()) {
516 state = new Ready(history().createFailedCohort(getIdentifier(), sealedModification, optFailure.get()));
518 state = new Ready(history().createReadyCohort(getIdentifier(), sealedModification, Optional.empty()));
521 if (request.isCoordinated()) {
522 coordinatedCommit(envelope, now);
524 directCommit(envelope, now);
528 private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request) {
529 final Optional<NormalizedNode<?, ?>> data = checkOpen().getSnapshot().readNode(request.getPath());
530 return recordSuccess(request.getSequence(), new ExistsTransactionSuccess(getIdentifier(), request.getSequence(),
534 private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request) {
535 final Optional<NormalizedNode<?, ?>> data = checkOpen().getSnapshot().readNode(request.getPath());
536 return recordSuccess(request.getSequence(), new ReadTransactionSuccess(getIdentifier(), request.getSequence(),
540 private ModifyTransactionSuccess replyModifySuccess(final long sequence) {
541 return recordSuccess(sequence, new ModifyTransactionSuccess(getIdentifier(), sequence));
544 private void applyModifications(final Collection<TransactionModification> modifications) {
545 if (!modifications.isEmpty()) {
546 final DataTreeModification modification = checkOpen().getSnapshot();
547 for (TransactionModification m : modifications) {
548 if (m instanceof TransactionDelete) {
549 modification.delete(m.getPath());
550 } else if (m instanceof TransactionWrite) {
551 modification.write(m.getPath(), ((TransactionWrite) m).getData());
552 } else if (m instanceof TransactionMerge) {
553 modification.merge(m.getPath(), ((TransactionMerge) m).getData());
555 LOG.warn("{}: ignoring unhandled modification {}", persistenceId(), m);
561 private @Nullable TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
562 final RequestEnvelope envelope, final long now) throws RequestException {
563 // We need to examine the persistence protocol first to see if this is an idempotent request. If there is no
564 // protocol, there is nothing for us to do.
565 final Optional<PersistenceProtocol> maybeProto = request.getPersistenceProtocol();
566 if (!maybeProto.isPresent()) {
567 applyModifications(request.getModifications());
568 return replyModifySuccess(request.getSequence());
571 switch (maybeProto.get()) {
573 if (ABORTING.equals(state)) {
574 LOG.debug("{}: Transaction {} already aborting", persistenceId(), getIdentifier());
577 final ReadWriteShardDataTreeTransaction openTransaction = checkOpen();
579 openTransaction.abort(() -> {
580 recordAndSendSuccess(envelope, now, new ModifyTransactionSuccess(getIdentifier(),
581 request.getSequence()));
586 ensureReady(request.getModifications());
587 return replyModifySuccess(request.getSequence());
589 ensureReady(request.getModifications());
590 directCommit(envelope, now);
593 ensureReady(request.getModifications());
594 coordinatedCommit(envelope, now);
597 LOG.warn("{}: rejecting unsupported protocol {}", persistenceId(), maybeProto.get());
598 throw new UnsupportedRequestException(request);
602 private void ensureReady(final Collection<TransactionModification> modifications) {
603 // We may have a combination of READY + SIMPLE/THREE_PHASE , in which case we want to ready the transaction
605 if (state instanceof Ready) {
606 LOG.debug("{}: {} is already in state {}", persistenceId(), getIdentifier(), state);
610 applyModifications(modifications);
611 state = new Ready(checkOpen().ready(Optional.empty()));
612 LOG.debug("{}: transitioned {} to ready", persistenceId(), getIdentifier());
615 private void throwIfFailed() throws RequestException {
616 if (state instanceof Failed) {
617 LOG.debug("{}: {} has failed, rejecting request", persistenceId(), getIdentifier());
618 throw ((Failed) state).cause;
622 private ReadWriteShardDataTreeTransaction checkOpen() {
623 checkState(state instanceof Open, "%s expect to be open, is in state %s", getIdentifier(), state);
624 return ((Open) state).openTransaction;
627 private Ready checkReady() {
628 checkState(state instanceof Ready, "%s expect to be ready, is in state %s", getIdentifier(), state);
629 return (Ready) state;
632 private DataTreeModification checkSealed() {
633 checkState(state instanceof Sealed, "%s expect to be sealed, is in state %s", getIdentifier(), state);
634 return ((Sealed) state).sealedModification;
637 private static void throwUnhandledCommitStage(final Ready ready) {
638 throw new IllegalStateException("Unhandled commit stage " + ready.stage);