BUG-5280: Remove PeristentMessages
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / LocalThreePhaseCommitCohort.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorSelection;
11 import akka.dispatch.Futures;
12 import akka.dispatch.OnComplete;
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
16 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
17 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
18 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
19 import org.opendaylight.controller.cluster.datastore.utils.TransactionIdentifierUtils;
20 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
21 import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction;
22 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25 import scala.concurrent.Future;
26
27 /**
28  * Fake {@link DOMStoreThreePhaseCommitCohort} instantiated for local transactions to conform with the DOM
29  * transaction APIs. It is only used to hold the data from a local DOM transaction ready operation and to
30  * initiate direct or coordinated commits from the front-end by sending the ReadyLocalTransaction message.
31  * It is not actually called by the front-end to perform 3PC thus the canCommit/preCommit/commit methods
32  * are no-ops.
33  */
34 class LocalThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
35     private static final Logger LOG = LoggerFactory.getLogger(LocalThreePhaseCommitCohort.class);
36
37     private final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction;
38     private final DataTreeModification modification;
39     private final ActorContext actorContext;
40     private final ActorSelection leader;
41     private final Exception operationError;
42
43     protected LocalThreePhaseCommitCohort(final ActorContext actorContext, final ActorSelection leader,
44             final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction, final DataTreeModification modification) {
45         this.actorContext = Preconditions.checkNotNull(actorContext);
46         this.leader = Preconditions.checkNotNull(leader);
47         this.transaction = Preconditions.checkNotNull(transaction);
48         this.modification = Preconditions.checkNotNull(modification);
49         this.operationError = null;
50     }
51
52     protected LocalThreePhaseCommitCohort(final ActorContext actorContext, final ActorSelection leader,
53             final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction, final Exception operationError) {
54         this.actorContext = Preconditions.checkNotNull(actorContext);
55         this.leader = Preconditions.checkNotNull(leader);
56         this.transaction = Preconditions.checkNotNull(transaction);
57         this.operationError = Preconditions.checkNotNull(operationError);
58         this.modification = null;
59     }
60
61     private Future<Object> initiateCommit(final boolean immediate) {
62         if(operationError != null) {
63             return Futures.failed(operationError);
64         }
65
66         final ReadyLocalTransaction message = new ReadyLocalTransaction(
67             TransactionIdentifierUtils.actorNameFor(transaction.getIdentifier()), modification, immediate);
68         return actorContext.executeOperationAsync(leader, message, actorContext.getTransactionCommitOperationTimeout());
69     }
70
71     Future<ActorSelection> initiateCoordinatedCommit() {
72         final Future<Object> messageFuture = initiateCommit(false);
73         final Future<ActorSelection> ret = TransactionReadyReplyMapper.transform(messageFuture, actorContext,
74                 transaction.getIdentifier());
75         ret.onComplete(new OnComplete<ActorSelection>() {
76             @Override
77             public void onComplete(final Throwable failure, final ActorSelection success) throws Throwable {
78                 if (failure != null) {
79                     LOG.info("Failed to prepare transaction {} on backend", transaction.getIdentifier(), failure);
80                     transactionAborted(transaction);
81                     return;
82                 }
83
84                 LOG.debug("Transaction {} resolved to actor {}", transaction.getIdentifier(), success);
85             }
86         }, actorContext.getClientDispatcher());
87
88         return ret;
89     }
90
91     Future<Object> initiateDirectCommit() {
92         final Future<Object> messageFuture = initiateCommit(true);
93         messageFuture.onComplete(new OnComplete<Object>() {
94             @Override
95             public void onComplete(final Throwable failure, final Object message) throws Throwable {
96                 if (failure != null) {
97                     LOG.error("Failed to prepare transaction {} on backend", transaction.getIdentifier(), failure);
98                     transactionAborted(transaction);
99                 } else if (CommitTransactionReply.isSerializedType(message)) {
100                     LOG.debug("Transaction {} committed successfully", transaction.getIdentifier());
101                     transactionCommitted(transaction);
102                 } else {
103                     LOG.error("Transaction {} resulted in unhandled message type {}, aborting", message.getClass());
104                     transactionAborted(transaction);
105                 }
106             }
107         }, actorContext.getClientDispatcher());
108
109         return messageFuture;
110     }
111
112     @Override
113     public final ListenableFuture<Boolean> canCommit() {
114         // Intended no-op
115         throw new UnsupportedOperationException();
116     }
117
118     @Override
119     public final ListenableFuture<Void> preCommit() {
120         // Intended no-op
121         throw new UnsupportedOperationException();
122     }
123
124     @Override
125     public final ListenableFuture<Void> abort() {
126         // Intended no-op
127         throw new UnsupportedOperationException();
128     }
129
130     @Override
131     public final ListenableFuture<Void> commit() {
132         // Intended no-op
133         throw new UnsupportedOperationException();
134     }
135
136     protected void transactionAborted(SnapshotBackedWriteTransaction<TransactionIdentifier> transaction) {
137     }
138
139     protected void transactionCommitted(SnapshotBackedWriteTransaction<TransactionIdentifier> transaction) {
140     }
141 }