NormalizedNodeAggregator should also report empty
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / AbstractFrontendHistory.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.base.MoreObjects;
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.primitives.UnsignedLong;
15 import java.util.ArrayList;
16 import java.util.HashMap;
17 import java.util.Map;
18 import java.util.Optional;
19 import java.util.SortedSet;
20 import org.eclipse.jdt.annotation.Nullable;
21 import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest;
22 import org.opendaylight.controller.cluster.access.commands.ClosedTransactionException;
23 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
24 import org.opendaylight.controller.cluster.access.commands.DeadTransactionException;
25 import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
26 import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
27 import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
28 import org.opendaylight.controller.cluster.access.commands.SkipTransactionsRequest;
29 import org.opendaylight.controller.cluster.access.commands.SkipTransactionsResponse;
30 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
31 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
32 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
33 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
34 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
35 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
36 import org.opendaylight.controller.cluster.access.concepts.RequestException;
37 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
38 import org.opendaylight.controller.cluster.datastore.utils.MutableUnsignedLongSet;
39 import org.opendaylight.yangtools.concepts.Identifiable;
40 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 /**
45  * Abstract class for providing logical tracking of frontend local histories. This class is specialized for
46  * standalone transactions and chained transactions.
47  *
48  * @author Robert Varga
49  */
50 abstract class AbstractFrontendHistory implements Identifiable<LocalHistoryIdentifier> {
51     private static final Logger LOG = LoggerFactory.getLogger(AbstractFrontendHistory.class);
52
53     private final Map<TransactionIdentifier, FrontendTransaction> transactions = new HashMap<>();
54     private final MutableUnsignedLongSet purgedTransactions;
55     private final String persistenceId;
56     private final ShardDataTree tree;
57
58     /**
59      * Transactions closed by the previous leader. Boolean indicates whether the transaction was committed (true) or
60      * aborted (false). We only ever shrink these.
61      */
62     private Map<UnsignedLong, Boolean> closedTransactions;
63
64     AbstractFrontendHistory(final String persistenceId, final ShardDataTree tree,
65             final Map<UnsignedLong, Boolean> closedTransactions, final MutableUnsignedLongSet purgedTransactions) {
66         this.persistenceId = requireNonNull(persistenceId);
67         this.tree = requireNonNull(tree);
68         this.closedTransactions = requireNonNull(closedTransactions);
69         this.purgedTransactions = requireNonNull(purgedTransactions);
70     }
71
72     final String persistenceId() {
73         return persistenceId;
74     }
75
76     final long readTime() {
77         return tree.readTime();
78     }
79
80     final @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
81             final RequestEnvelope envelope, final long now) throws RequestException {
82         if (request instanceof TransactionPurgeRequest) {
83             return handleTransactionPurgeRequest((TransactionPurgeRequest) request, envelope, now);
84         } else if (request instanceof SkipTransactionsRequest) {
85             return handleSkipTransactionsRequest((SkipTransactionsRequest) request, envelope, now);
86         }
87
88         final TransactionIdentifier id = request.getTarget();
89         final long txidBits = id.getTransactionId();
90         if (purgedTransactions.contains(txidBits)) {
91             LOG.warn("{}: Request {} is contained purged transactions {}", persistenceId, request, purgedTransactions);
92             throw new DeadTransactionException(purgedTransactions.toRangeSet());
93         }
94
95         final Boolean closed = closedTransactions.get(UnsignedLong.fromLongBits(txidBits));
96         if (closed != null) {
97             final boolean successful = closed;
98             LOG.debug("{}: Request {} refers to a {} transaction", persistenceId, request, successful ? "successful"
99                     : "failed");
100             throw new ClosedTransactionException(successful);
101         }
102
103         FrontendTransaction tx = transactions.get(id);
104         if (tx == null) {
105             // The transaction does not exist and we are about to create it, check sequence number
106             if (request.getSequence() != 0) {
107                 LOG.warn("{}: no transaction state present, unexpected request {}", persistenceId(), request);
108                 throw new OutOfOrderRequestException(0);
109             }
110
111             tx = createTransaction(request, id);
112             transactions.put(id, tx);
113         } else if (!(request instanceof IncrementTransactionSequenceRequest)) {
114             final Optional<TransactionSuccess<?>> maybeReplay = tx.replaySequence(request.getSequence());
115             if (maybeReplay.isPresent()) {
116                 final TransactionSuccess<?> replay = maybeReplay.get();
117                 LOG.debug("{}: envelope {} replaying response {}", persistenceId(), envelope, replay);
118                 return replay;
119             }
120         }
121
122         return tx.handleRequest(request, envelope, now);
123     }
124
125     private TransactionPurgeResponse handleTransactionPurgeRequest(final TransactionPurgeRequest request,
126             final RequestEnvelope envelope, final long now) {
127         final TransactionIdentifier id = request.getTarget();
128         final long txidBits = id.getTransactionId();
129         if (purgedTransactions.contains(txidBits)) {
130             // Retransmitted purge request: nothing to do
131             LOG.debug("{}: transaction {} already purged", persistenceId, id);
132             return new TransactionPurgeResponse(id, request.getSequence());
133         }
134
135         // We perform two lookups instead of a straight remove, because once the map becomes empty we switch it
136         // to an ImmutableMap, which does not allow remove().
137         final UnsignedLong ul = UnsignedLong.fromLongBits(txidBits);
138         if (closedTransactions.containsKey(ul)) {
139             tree.purgeTransaction(id, () -> {
140                 closedTransactions.remove(ul);
141                 if (closedTransactions.isEmpty()) {
142                     closedTransactions = ImmutableMap.of();
143                 }
144
145                 purgedTransactions.add(txidBits);
146                 LOG.debug("{}: finished purging inherited transaction {}", persistenceId(), id);
147                 envelope.sendSuccess(new TransactionPurgeResponse(id, request.getSequence()), readTime() - now);
148             });
149             return null;
150         }
151
152         final FrontendTransaction tx = transactions.get(id);
153         if (tx == null) {
154             // This should never happen because the purge callback removes the transaction and puts it into
155             // purged transactions in one go. If it does, we warn about the situation and
156             LOG.warn("{}: transaction {} not tracked in {}, but not present in active transactions", persistenceId,
157                 id, purgedTransactions);
158             purgedTransactions.add(txidBits);
159             return new TransactionPurgeResponse(id, request.getSequence());
160         }
161
162         tree.purgeTransaction(id, () -> {
163             purgedTransactions.add(txidBits);
164             transactions.remove(id);
165             LOG.debug("{}: finished purging transaction {}", persistenceId(), id);
166             envelope.sendSuccess(new TransactionPurgeResponse(id, request.getSequence()), readTime() - now);
167         });
168
169         return null;
170     }
171
172     private SkipTransactionsResponse handleSkipTransactionsRequest(final SkipTransactionsRequest request,
173             final RequestEnvelope envelope, final long now) throws RequestException {
174         final var first = request.getTarget();
175         final var others = request.getOthers();
176         final var ids = new ArrayList<UnsignedLong>(others.size() + 1);
177         ids.add(UnsignedLong.fromLongBits(first.getTransactionId()));
178         ids.addAll(others);
179
180         final var it = ids.iterator();
181         while (it.hasNext()) {
182             final var id = it.next();
183             final long bits = id.longValue();
184             if (purgedTransactions.contains(bits)) {
185                 LOG.warn("{}: history {} tracks {} as purged", persistenceId(), getIdentifier(), id);
186                 it.remove();
187             } else if (transactions.containsKey(new TransactionIdentifier(getIdentifier(), bits))) {
188                 LOG.warn("{}: history {} tracks {} as open", persistenceId(), getIdentifier(), id);
189                 it.remove();
190             }
191         }
192
193         if (ids.isEmpty()) {
194             LOG.debug("{}: history {} completing empty skip request", persistenceId(), getIdentifier());
195             return new SkipTransactionsResponse(first, now);
196         }
197
198         final var transactionIds = MutableUnsignedLongSet.of(ids.stream().mapToLong(UnsignedLong::longValue).toArray())
199             .immutableCopy();
200         LOG.debug("{}: history {} skipping transactions {}", persistenceId(), getIdentifier(), transactionIds.ranges());
201
202         tree.skipTransactions(getIdentifier(), transactionIds, () -> {
203             purgedTransactions.addAll(transactionIds);
204             envelope.sendSuccess(new TransactionPurgeResponse(first, request.getSequence()), readTime() - now);
205         });
206         return null;
207     }
208
209     final void destroy(final long sequence, final RequestEnvelope envelope, final long now) {
210         LOG.debug("{}: closing history {}", persistenceId(), getIdentifier());
211         tree.closeTransactionChain(getIdentifier(),
212             () -> envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now));
213     }
214
215     final void purge(final long sequence, final RequestEnvelope envelope, final long now) {
216         LOG.debug("{}: purging history {}", persistenceId(), getIdentifier());
217         tree.purgeTransactionChain(getIdentifier(),
218             () -> envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now));
219     }
220
221     final void retire() {
222         transactions.values().forEach(FrontendTransaction::retire);
223         tree.removeTransactionChain(getIdentifier());
224     }
225
226     private FrontendTransaction createTransaction(final TransactionRequest<?> request, final TransactionIdentifier id) {
227         if (request instanceof CommitLocalTransactionRequest) {
228             LOG.debug("{}: allocating new ready transaction {}", persistenceId(), id);
229             tree.getStats().incrementReadWriteTransactionCount();
230             return createReadyTransaction(id, ((CommitLocalTransactionRequest) request).getModification());
231         }
232         if (request instanceof AbstractReadTransactionRequest
233                 && ((AbstractReadTransactionRequest<?>) request).isSnapshotOnly()) {
234             LOG.debug("{}: allocating new open snapshot {}", persistenceId(), id);
235             tree.getStats().incrementReadOnlyTransactionCount();
236             return createOpenSnapshot(id);
237         }
238
239         LOG.debug("{}: allocating new open transaction {}", persistenceId(), id);
240         tree.getStats().incrementReadWriteTransactionCount();
241         return createOpenTransaction(id);
242     }
243
244     abstract FrontendTransaction createOpenSnapshot(TransactionIdentifier id);
245
246     abstract FrontendTransaction createOpenTransaction(TransactionIdentifier id);
247
248     abstract FrontendTransaction createReadyTransaction(TransactionIdentifier id, DataTreeModification mod);
249
250     abstract ShardDataTreeCohort createFailedCohort(TransactionIdentifier id, DataTreeModification mod,
251             Exception failure);
252
253     abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier id, DataTreeModification mod,
254             Optional<SortedSet<String>> participatingShardNames);
255
256     @Override
257     public final String toString() {
258         return MoreObjects.toStringHelper(this).omitNullValues()
259             .add("identifier", getIdentifier())
260             .add("persistenceId", persistenceId)
261             .add("transactions", transactions)
262             .toString();
263     }
264 }