Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / LeaderFrontendState.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.base.MoreObjects;
13 import com.google.common.base.MoreObjects.ToStringHelper;
14 import java.util.HashMap;
15 import java.util.Map;
16 import org.eclipse.jdt.annotation.NonNull;
17 import org.eclipse.jdt.annotation.Nullable;
18 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
19 import org.opendaylight.controller.cluster.access.commands.DeadHistoryException;
20 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
21 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
22 import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
23 import org.opendaylight.controller.cluster.access.commands.OutOfSequenceEnvelopeException;
24 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
25 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
26 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
27 import org.opendaylight.controller.cluster.access.commands.UnknownHistoryException;
28 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
29 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
30 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
31 import org.opendaylight.controller.cluster.access.concepts.RequestException;
32 import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
33 import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
34 import org.opendaylight.controller.cluster.datastore.utils.MutableUnsignedLongSet;
35 import org.opendaylight.yangtools.concepts.Identifiable;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 /**
40  * Frontend state as observed by the shard leader. This class is responsible for tracking generations and sequencing
41  * in the frontend/backend conversation. This class is NOT thread-safe.
42  */
43 abstract sealed class LeaderFrontendState implements Identifiable<ClientIdentifier> {
44     static final class Disabled extends LeaderFrontendState {
45         Disabled(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
46             super(persistenceId, clientId, tree);
47         }
48
49         @Override
50         LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest<?> request,
51                 final RequestEnvelope envelope, final long now) throws RequestException {
52             throw new UnsupportedRequestException(request);
53         }
54
55         @Override
56         TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
57                 final RequestEnvelope envelope, final long now) throws RequestException {
58             throw new UnsupportedRequestException(request);
59         }
60     }
61
62     static final class Enabled extends LeaderFrontendState {
63         // Histories which have not been purged
64         private final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories;
65
66         // UnsignedLongSet performs automatic merging, hence we keep minimal state tracking information
67         private final MutableUnsignedLongSet purgedHistories;
68
69         // Used for all standalone transactions
70         private final AbstractFrontendHistory standaloneHistory;
71
72         private long expectedTxSequence;
73         private Long lastSeenHistory = null;
74
75         Enabled(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
76             this(persistenceId, clientId, tree, MutableUnsignedLongSet.of(),
77                 StandaloneFrontendHistory.create(persistenceId, clientId, tree), new HashMap<>());
78         }
79
80         Enabled(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree,
81                 final MutableUnsignedLongSet purgedHistories, final AbstractFrontendHistory standaloneHistory,
82                 final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories) {
83             super(persistenceId, clientId, tree);
84             this.purgedHistories = requireNonNull(purgedHistories);
85             this.standaloneHistory = requireNonNull(standaloneHistory);
86             this.localHistories = requireNonNull(localHistories);
87         }
88
89         @Override
90         @Nullable LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest<?> request,
91                 final RequestEnvelope envelope, final long now) throws RequestException {
92             checkRequestSequence(envelope);
93
94             try {
95                 if (request instanceof CreateLocalHistoryRequest req) {
96                     return handleCreateHistory(req, envelope, now);
97                 } else if (request instanceof DestroyLocalHistoryRequest req) {
98                     return handleDestroyHistory(req, envelope, now);
99                 } else if (request instanceof PurgeLocalHistoryRequest req) {
100                     return handlePurgeHistory(req, envelope, now);
101                 } else {
102                     LOG.warn("{}: rejecting unsupported request {}", persistenceId(), request);
103                     throw new UnsupportedRequestException(request);
104                 }
105             } finally {
106                 expectNextRequest();
107             }
108         }
109
110         @Override
111         @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
112                 final RequestEnvelope envelope, final long now) throws RequestException {
113             checkRequestSequence(envelope);
114
115             try {
116                 final var lhId = request.getTarget().getHistoryId();
117                 final AbstractFrontendHistory history;
118
119                 if (lhId.getHistoryId() != 0) {
120                     history = localHistories.get(lhId);
121                     if (history == null) {
122                         if (purgedHistories.contains(lhId.getHistoryId())) {
123                             LOG.warn("{}: rejecting request {} to purged history", persistenceId(), request);
124                             throw new DeadHistoryException(purgedHistories.toRangeSet());
125                         }
126
127                         LOG.warn("{}: rejecting unknown history request {}", persistenceId(), request);
128                         throw new UnknownHistoryException(lastSeenHistory);
129                     }
130                 } else {
131                     history = standaloneHistory;
132                 }
133
134                 return history.handleTransactionRequest(request, envelope, now);
135             } finally {
136                 expectNextRequest();
137             }
138         }
139
140         @Override
141         void reconnect() {
142             expectedTxSequence = 0;
143             super.reconnect();
144         }
145
146         @Override
147         void retire() {
148             super.retire();
149
150             // Clear out all transaction chains
151             localHistories.values().forEach(AbstractFrontendHistory::retire);
152             localHistories.clear();
153             standaloneHistory.retire();
154         }
155
156         @Override
157         ToStringHelper addToStringAttributes(final ToStringHelper helper) {
158             return super.addToStringAttributes(helper).add("purgedHistories", purgedHistories);
159         }
160
161         private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request,
162                 final RequestEnvelope envelope, final long now) throws RequestException {
163             final var historyId = request.getTarget();
164             final var existing = localHistories.get(historyId);
165             if (existing != null) {
166                 // History already exists: report success
167                 LOG.debug("{}: history {} already exists", persistenceId(), historyId);
168                 return new LocalHistorySuccess(historyId, request.getSequence());
169             }
170
171             // We have not found the history. Before we create it we need to check history ID sequencing so that we do
172             // not end up resurrecting a purged history.
173             if (purgedHistories.contains(historyId.getHistoryId())) {
174                 LOG.debug("{}: rejecting purged request {}", persistenceId(), request);
175                 throw new DeadHistoryException(purgedHistories.toRangeSet());
176             }
177
178             // Update last history we have seen
179             if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, historyId.getHistoryId()) < 0) {
180                 lastSeenHistory = historyId.getHistoryId();
181             }
182
183             // We have to send the response only after persistence has completed
184             final var chain = tree().ensureTransactionChain(historyId, () -> {
185                 LOG.debug("{}: persisted history {}", persistenceId(), historyId);
186                 envelope.sendSuccess(new LocalHistorySuccess(historyId, request.getSequence()),
187                     tree().readTime() - now);
188             });
189
190             localHistories.put(historyId, LocalFrontendHistory.create(persistenceId(), tree(), chain));
191             LOG.debug("{}: created history {}", persistenceId(), historyId);
192             return null;
193         }
194
195         private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request,
196                 final RequestEnvelope envelope, final long now) {
197             final var id = request.getTarget();
198             final var existing = localHistories.get(id);
199             if (existing == null) {
200                 // History does not exist: report success
201                 LOG.debug("{}: history {} does not exist, nothing to destroy", persistenceId(), id);
202                 return new LocalHistorySuccess(id, request.getSequence());
203             }
204
205             existing.destroy(request.getSequence(), envelope, now);
206             return null;
207         }
208
209         private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request,
210                 final RequestEnvelope envelope, final long now) {
211             final var id = request.getTarget();
212             final var existing = localHistories.remove(id);
213             if (existing == null) {
214                 LOG.debug("{}: history {} has already been purged", persistenceId(), id);
215                 return new LocalHistorySuccess(id, request.getSequence());
216             }
217
218             LOG.debug("{}: purging history {}", persistenceId(), id);
219             purgedHistories.add(id.getHistoryId());
220             existing.purge(request.getSequence(), envelope, now);
221             return null;
222         }
223
224         private void checkRequestSequence(final RequestEnvelope envelope) throws OutOfSequenceEnvelopeException {
225             if (expectedTxSequence != envelope.getTxSequence()) {
226                 throw new OutOfSequenceEnvelopeException(expectedTxSequence);
227             }
228         }
229
230         private void expectNextRequest() {
231             expectedTxSequence++;
232         }
233     }
234
235     private static final Logger LOG = LoggerFactory.getLogger(LeaderFrontendState.class);
236
237     private final @NonNull ClientIdentifier clientId;
238     private final @NonNull String persistenceId;
239     private final @NonNull ShardDataTree tree;
240
241     private long lastConnectTicks;
242     private long lastSeenTicks;
243
244     // TODO: explicit failover notification
245     //       Record the ActorRef for the originating actor and when we switch to being a leader send a notification
246     //       to the frontend client -- that way it can immediately start sending requests
247
248     // TODO: add statistics:
249     // - number of requests processed
250     // - number of histories processed
251     // - per-RequestException throw counters
252
253     LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
254         this.persistenceId = requireNonNull(persistenceId);
255         this.clientId = requireNonNull(clientId);
256         this.tree = requireNonNull(tree);
257         lastSeenTicks = tree.readTime();
258     }
259
260     @Override
261     public final ClientIdentifier getIdentifier() {
262         return clientId;
263     }
264
265     final String persistenceId() {
266         return persistenceId;
267     }
268
269     final long getLastConnectTicks() {
270         return lastConnectTicks;
271     }
272
273     final long getLastSeenTicks() {
274         return lastSeenTicks;
275     }
276
277     final ShardDataTree tree() {
278         return tree;
279     }
280
281     final void touch() {
282         lastSeenTicks = tree.readTime();
283     }
284
285     abstract @Nullable LocalHistorySuccess handleLocalHistoryRequest(LocalHistoryRequest<?> request,
286             RequestEnvelope envelope, long now) throws RequestException;
287
288     abstract @Nullable TransactionSuccess<?> handleTransactionRequest(TransactionRequest<?> request,
289             RequestEnvelope envelope, long now) throws RequestException;
290
291     void reconnect() {
292         lastConnectTicks = tree.readTime();
293     }
294
295     void retire() {
296         // Hunt down any transactions associated with this frontend
297         final var it = tree.cohortIterator();
298         while (it.hasNext()) {
299             final var cohort = it.next();
300             final var transactionId = cohort.transactionId();
301             if (clientId.equals(transactionId.getHistoryId().getClientId())) {
302                 if (cohort.getState() != State.COMMIT_PENDING) {
303                     LOG.debug("{}: Retiring transaction {}", persistenceId, transactionId);
304                     it.remove();
305                 } else {
306                     LOG.debug("{}: Transaction {} already committing, not retiring it", persistenceId, transactionId);
307                 }
308             }
309         }
310     }
311
312     @Override
313     public final String toString() {
314         return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
315     }
316
317     ToStringHelper addToStringAttributes(final ToStringHelper helper) {
318         return helper.add("clientId", clientId).add("nanosAgo", tree.readTime() - lastSeenTicks);
319     }
320 }