e437b07c64bc7b7d9fdff19b5641bdce550dd49a
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / AbstractFrontendHistory.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.base.MoreObjects;
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.primitives.UnsignedLong;
15 import java.util.HashMap;
16 import java.util.Map;
17 import java.util.Optional;
18 import java.util.SortedSet;
19 import org.eclipse.jdt.annotation.Nullable;
20 import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest;
21 import org.opendaylight.controller.cluster.access.commands.ClosedTransactionException;
22 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
23 import org.opendaylight.controller.cluster.access.commands.DeadTransactionException;
24 import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
25 import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
26 import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
27 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
28 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
29 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
30 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
31 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
32 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
33 import org.opendaylight.controller.cluster.access.concepts.RequestException;
34 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
35 import org.opendaylight.controller.cluster.datastore.utils.UnsignedLongSet;
36 import org.opendaylight.yangtools.concepts.Identifiable;
37 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 /**
42  * Abstract class for providing logical tracking of frontend local histories. This class is specialized for
43  * standalone transactions and chained transactions.
44  *
45  * @author Robert Varga
46  */
47 abstract class AbstractFrontendHistory implements Identifiable<LocalHistoryIdentifier> {
48     private static final Logger LOG = LoggerFactory.getLogger(AbstractFrontendHistory.class);
49
50     private final Map<TransactionIdentifier, FrontendTransaction> transactions = new HashMap<>();
51     private final UnsignedLongSet purgedTransactions;
52     private final String persistenceId;
53     private final ShardDataTree tree;
54
55     /**
56      * Transactions closed by the previous leader. Boolean indicates whether the transaction was committed (true) or
57      * aborted (false). We only ever shrink these.
58      */
59     private Map<UnsignedLong, Boolean> closedTransactions;
60
61     AbstractFrontendHistory(final String persistenceId, final ShardDataTree tree,
62         final Map<UnsignedLong, Boolean> closedTransactions, final UnsignedLongSet purgedTransactions) {
63         this.persistenceId = requireNonNull(persistenceId);
64         this.tree = requireNonNull(tree);
65         this.closedTransactions = requireNonNull(closedTransactions);
66         this.purgedTransactions = requireNonNull(purgedTransactions);
67     }
68
69     final String persistenceId() {
70         return persistenceId;
71     }
72
73     final long readTime() {
74         return tree.readTime();
75     }
76
77     final @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
78             final RequestEnvelope envelope, final long now) throws RequestException {
79         if (request instanceof TransactionPurgeRequest) {
80             return handleTransactionPurgeRequest(request, envelope, now);
81         }
82
83         final TransactionIdentifier id = request.getTarget();
84         final long txidBits = id.getTransactionId();
85         if (purgedTransactions.contains(txidBits)) {
86             LOG.warn("{}: Request {} is contained purged transactions {}", persistenceId, request, purgedTransactions);
87             throw new DeadTransactionException(purgedTransactions.toRangeSet());
88         }
89
90         final Boolean closed = closedTransactions.get(UnsignedLong.fromLongBits(txidBits));
91         if (closed != null) {
92             final boolean successful = closed;
93             LOG.debug("{}: Request {} refers to a {} transaction", persistenceId, request, successful ? "successful"
94                     : "failed");
95             throw new ClosedTransactionException(successful);
96         }
97
98         FrontendTransaction tx = transactions.get(id);
99         if (tx == null) {
100             // The transaction does not exist and we are about to create it, check sequence number
101             if (request.getSequence() != 0) {
102                 LOG.warn("{}: no transaction state present, unexpected request {}", persistenceId(), request);
103                 throw new OutOfOrderRequestException(0);
104             }
105
106             tx = createTransaction(request, id);
107             transactions.put(id, tx);
108         } else if (!(request instanceof IncrementTransactionSequenceRequest)) {
109             final Optional<TransactionSuccess<?>> maybeReplay = tx.replaySequence(request.getSequence());
110             if (maybeReplay.isPresent()) {
111                 final TransactionSuccess<?> replay = maybeReplay.get();
112                 LOG.debug("{}: envelope {} replaying response {}", persistenceId(), envelope, replay);
113                 return replay;
114             }
115         }
116
117         return tx.handleRequest(request, envelope, now);
118     }
119
120     private TransactionSuccess<?> handleTransactionPurgeRequest(final TransactionRequest<?> request,
121             final RequestEnvelope envelope, final long now) {
122         final TransactionIdentifier id = request.getTarget();
123         final long txidBits = id.getTransactionId();
124         if (purgedTransactions.contains(txidBits)) {
125             // Retransmitted purge request: nothing to do
126             LOG.debug("{}: transaction {} already purged", persistenceId, id);
127             return new TransactionPurgeResponse(id, request.getSequence());
128         }
129
130         // We perform two lookups instead of a straight remove, because once the map becomes empty we switch it
131         // to an ImmutableMap, which does not allow remove().
132         final UnsignedLong ul = UnsignedLong.fromLongBits(txidBits);
133         if (closedTransactions.containsKey(ul)) {
134             tree.purgeTransaction(id, () -> {
135                 closedTransactions.remove(ul);
136                 if (closedTransactions.isEmpty()) {
137                     closedTransactions = ImmutableMap.of();
138                 }
139
140                 purgedTransactions.add(txidBits);
141                 LOG.debug("{}: finished purging inherited transaction {}", persistenceId(), id);
142                 envelope.sendSuccess(new TransactionPurgeResponse(id, request.getSequence()), readTime() - now);
143             });
144             return null;
145         }
146
147         final FrontendTransaction tx = transactions.get(id);
148         if (tx == null) {
149             // This should never happen because the purge callback removes the transaction and puts it into
150             // purged transactions in one go. If it does, we warn about the situation and
151             LOG.warn("{}: transaction {} not tracked in {}, but not present in active transactions", persistenceId,
152                 id, purgedTransactions);
153             purgedTransactions.add(txidBits);
154             return new TransactionPurgeResponse(id, request.getSequence());
155         }
156
157         tree.purgeTransaction(id, () -> {
158             purgedTransactions.add(txidBits);
159             transactions.remove(id);
160             LOG.debug("{}: finished purging transaction {}", persistenceId(), id);
161             envelope.sendSuccess(new TransactionPurgeResponse(id, request.getSequence()), readTime() - now);
162         });
163
164         return null;
165     }
166
167     final void destroy(final long sequence, final RequestEnvelope envelope, final long now) {
168         LOG.debug("{}: closing history {}", persistenceId(), getIdentifier());
169         tree.closeTransactionChain(getIdentifier(),
170             () -> envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now));
171     }
172
173     final void purge(final long sequence, final RequestEnvelope envelope, final long now) {
174         LOG.debug("{}: purging history {}", persistenceId(), getIdentifier());
175         tree.purgeTransactionChain(getIdentifier(),
176             () -> envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now));
177     }
178
179     final void retire() {
180         transactions.values().forEach(FrontendTransaction::retire);
181         tree.removeTransactionChain(getIdentifier());
182     }
183
184     private FrontendTransaction createTransaction(final TransactionRequest<?> request, final TransactionIdentifier id) {
185         if (request instanceof CommitLocalTransactionRequest) {
186             LOG.debug("{}: allocating new ready transaction {}", persistenceId(), id);
187             tree.getStats().incrementReadWriteTransactionCount();
188             return createReadyTransaction(id, ((CommitLocalTransactionRequest) request).getModification());
189         }
190         if (request instanceof AbstractReadTransactionRequest
191                 && ((AbstractReadTransactionRequest<?>) request).isSnapshotOnly()) {
192             LOG.debug("{}: allocating new open snapshot {}", persistenceId(), id);
193             tree.getStats().incrementReadOnlyTransactionCount();
194             return createOpenSnapshot(id);
195         }
196
197         LOG.debug("{}: allocating new open transaction {}", persistenceId(), id);
198         tree.getStats().incrementReadWriteTransactionCount();
199         return createOpenTransaction(id);
200     }
201
202     abstract FrontendTransaction createOpenSnapshot(TransactionIdentifier id);
203
204     abstract FrontendTransaction createOpenTransaction(TransactionIdentifier id);
205
206     abstract FrontendTransaction createReadyTransaction(TransactionIdentifier id, DataTreeModification mod)
207         ;
208
209     abstract ShardDataTreeCohort createFailedCohort(TransactionIdentifier id, DataTreeModification mod,
210             Exception failure);
211
212     abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier id, DataTreeModification mod,
213             Optional<SortedSet<String>> participatingShardNames);
214
215     @Override
216     public String toString() {
217         return MoreObjects.toStringHelper(this).omitNullValues().add("identifier", getIdentifier())
218                 .add("persistenceId", persistenceId).add("transactions", transactions).toString();
219     }
220 }