NormalizedNodeAggregator should also report empty
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / LocalThreePhaseCommitCohort.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorSelection;
13 import akka.dispatch.Futures;
14 import akka.dispatch.OnComplete;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import java.util.Optional;
17 import java.util.SortedSet;
18 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
19 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
20 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
21 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
22 import org.opendaylight.mdsal.common.api.CommitInfo;
23 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
24 import org.opendaylight.mdsal.dom.spi.store.SnapshotBackedWriteTransaction;
25 import org.opendaylight.yangtools.yang.common.Empty;
26 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29 import scala.concurrent.Future;
30
31 /**
32  * Fake {@link DOMStoreThreePhaseCommitCohort} instantiated for local transactions to conform with the DOM
33  * transaction APIs. It is only used to hold the data from a local DOM transaction ready operation and to
34  * initiate direct or coordinated commits from the front-end by sending the ReadyLocalTransaction message.
35  * It is not actually called by the front-end to perform 3PC thus the canCommit/preCommit/commit methods
36  * are no-ops.
37  */
38 class LocalThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
39     private static final Logger LOG = LoggerFactory.getLogger(LocalThreePhaseCommitCohort.class);
40
41     private final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction;
42     private final DataTreeModification modification;
43     private final ActorUtils actorUtils;
44     private final ActorSelection leader;
45     private final Exception operationError;
46
47     protected LocalThreePhaseCommitCohort(final ActorUtils actorUtils, final ActorSelection leader,
48             final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction,
49             final DataTreeModification modification,
50             final Exception operationError) {
51         this.actorUtils = requireNonNull(actorUtils);
52         this.leader = requireNonNull(leader);
53         this.transaction = requireNonNull(transaction);
54         this.modification = requireNonNull(modification);
55         this.operationError = operationError;
56     }
57
58     protected LocalThreePhaseCommitCohort(final ActorUtils actorUtils, final ActorSelection leader,
59             final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction, final Exception operationError) {
60         this.actorUtils = requireNonNull(actorUtils);
61         this.leader = requireNonNull(leader);
62         this.transaction = requireNonNull(transaction);
63         this.operationError = requireNonNull(operationError);
64         modification = null;
65     }
66
67     private Future<Object> initiateCommit(final boolean immediate,
68             final Optional<SortedSet<String>> participatingShardNames) {
69         if (operationError != null) {
70             return Futures.failed(operationError);
71         }
72
73         final ReadyLocalTransaction message = new ReadyLocalTransaction(transaction.getIdentifier(),
74                 modification, immediate, participatingShardNames);
75         return actorUtils.executeOperationAsync(leader, message, actorUtils.getTransactionCommitOperationTimeout());
76     }
77
78     Future<ActorSelection> initiateCoordinatedCommit(final Optional<SortedSet<String>> participatingShardNames) {
79         final Future<Object> messageFuture = initiateCommit(false, participatingShardNames);
80         final Future<ActorSelection> ret = TransactionReadyReplyMapper.transform(messageFuture, actorUtils,
81                 transaction.getIdentifier());
82         ret.onComplete(new OnComplete<ActorSelection>() {
83             @Override
84             public void onComplete(final Throwable failure, final ActorSelection success) {
85                 if (failure != null) {
86                     LOG.warn("Failed to prepare transaction {} on backend", transaction.getIdentifier(), failure);
87                     transactionAborted(transaction);
88                     return;
89                 }
90
91                 LOG.debug("Transaction {} resolved to actor {}", transaction.getIdentifier(), success);
92             }
93         }, actorUtils.getClientDispatcher());
94
95         return ret;
96     }
97
98     Future<Object> initiateDirectCommit() {
99         final Future<Object> messageFuture = initiateCommit(true, Optional.empty());
100         messageFuture.onComplete(new OnComplete<>() {
101             @Override
102             public void onComplete(final Throwable failure, final Object message) {
103                 if (failure != null) {
104                     LOG.warn("Failed to prepare transaction {} on backend", transaction.getIdentifier(), failure);
105                     transactionAborted(transaction);
106                 } else if (CommitTransactionReply.isSerializedType(message)) {
107                     LOG.debug("Transaction {} committed successfully", transaction.getIdentifier());
108                     transactionCommitted(transaction);
109                 } else {
110                     LOG.error("Transaction {} resulted in unhandled message type {}, aborting",
111                         transaction.getIdentifier(), message.getClass());
112                     transactionAborted(transaction);
113                 }
114             }
115         }, actorUtils.getClientDispatcher());
116
117         return messageFuture;
118     }
119
120     @Override
121     public final ListenableFuture<Boolean> canCommit() {
122         // Intended no-op
123         throw new UnsupportedOperationException();
124     }
125
126     @Override
127     public final ListenableFuture<Empty> preCommit() {
128         // Intended no-op
129         throw new UnsupportedOperationException();
130     }
131
132     @Override
133     public final ListenableFuture<Empty> abort() {
134         // Intended no-op
135         throw new UnsupportedOperationException();
136     }
137
138     @Override
139     public final ListenableFuture<CommitInfo> commit() {
140         // Intended no-op
141         throw new UnsupportedOperationException();
142     }
143
144     protected void transactionAborted(final SnapshotBackedWriteTransaction<TransactionIdentifier> aborted) {
145     }
146
147     protected void transactionCommitted(final SnapshotBackedWriteTransaction<TransactionIdentifier> comitted) {
148     }
149 }