2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.base.MoreObjects;
13 import com.google.common.base.MoreObjects.ToStringHelper;
14 import java.util.HashMap;
16 import org.eclipse.jdt.annotation.NonNull;
17 import org.eclipse.jdt.annotation.Nullable;
18 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
19 import org.opendaylight.controller.cluster.access.commands.DeadHistoryException;
20 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
21 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
22 import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
23 import org.opendaylight.controller.cluster.access.commands.OutOfSequenceEnvelopeException;
24 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
25 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
26 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
27 import org.opendaylight.controller.cluster.access.commands.UnknownHistoryException;
28 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
29 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
30 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
31 import org.opendaylight.controller.cluster.access.concepts.RequestException;
32 import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
33 import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
34 import org.opendaylight.controller.cluster.datastore.utils.MutableUnsignedLongSet;
35 import org.opendaylight.yangtools.concepts.Identifiable;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
40 * Frontend state as observed by the shard leader. This class is responsible for tracking generations and sequencing
41 * in the frontend/backend conversation. This class is NOT thread-safe.
43 abstract sealed class LeaderFrontendState implements Identifiable<ClientIdentifier> {
44 static final class Disabled extends LeaderFrontendState {
45 Disabled(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
46 super(persistenceId, clientId, tree);
50 LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest<?> request,
51 final RequestEnvelope envelope, final long now) throws RequestException {
52 throw new UnsupportedRequestException(request);
56 TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
57 final RequestEnvelope envelope, final long now) throws RequestException {
58 throw new UnsupportedRequestException(request);
62 static final class Enabled extends LeaderFrontendState {
63 // Histories which have not been purged
64 private final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories;
66 // UnsignedLongSet performs automatic merging, hence we keep minimal state tracking information
67 private final MutableUnsignedLongSet purgedHistories;
69 // Used for all standalone transactions
70 private final AbstractFrontendHistory standaloneHistory;
72 private long expectedTxSequence;
73 private Long lastSeenHistory = null;
75 Enabled(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
76 this(persistenceId, clientId, tree, MutableUnsignedLongSet.of(),
77 StandaloneFrontendHistory.create(persistenceId, clientId, tree), new HashMap<>());
80 Enabled(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree,
81 final MutableUnsignedLongSet purgedHistories, final AbstractFrontendHistory standaloneHistory,
82 final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories) {
83 super(persistenceId, clientId, tree);
84 this.purgedHistories = requireNonNull(purgedHistories);
85 this.standaloneHistory = requireNonNull(standaloneHistory);
86 this.localHistories = requireNonNull(localHistories);
90 @Nullable LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest<?> request,
91 final RequestEnvelope envelope, final long now) throws RequestException {
92 checkRequestSequence(envelope);
95 if (request instanceof CreateLocalHistoryRequest req) {
96 return handleCreateHistory(req, envelope, now);
97 } else if (request instanceof DestroyLocalHistoryRequest req) {
98 return handleDestroyHistory(req, envelope, now);
99 } else if (request instanceof PurgeLocalHistoryRequest req) {
100 return handlePurgeHistory(req, envelope, now);
102 LOG.warn("{}: rejecting unsupported request {}", persistenceId(), request);
103 throw new UnsupportedRequestException(request);
111 @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
112 final RequestEnvelope envelope, final long now) throws RequestException {
113 checkRequestSequence(envelope);
116 final var lhId = request.getTarget().getHistoryId();
117 final AbstractFrontendHistory history;
119 if (lhId.getHistoryId() != 0) {
120 history = localHistories.get(lhId);
121 if (history == null) {
122 if (purgedHistories.contains(lhId.getHistoryId())) {
123 LOG.warn("{}: rejecting request {} to purged history", persistenceId(), request);
124 throw new DeadHistoryException(purgedHistories.toRangeSet());
127 LOG.warn("{}: rejecting unknown history request {}", persistenceId(), request);
128 throw new UnknownHistoryException(lastSeenHistory);
131 history = standaloneHistory;
134 return history.handleTransactionRequest(request, envelope, now);
142 expectedTxSequence = 0;
150 // Clear out all transaction chains
151 localHistories.values().forEach(AbstractFrontendHistory::retire);
152 localHistories.clear();
153 standaloneHistory.retire();
157 ToStringHelper addToStringAttributes(final ToStringHelper helper) {
158 return super.addToStringAttributes(helper).add("purgedHistories", purgedHistories);
161 private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request,
162 final RequestEnvelope envelope, final long now) throws RequestException {
163 final var historyId = request.getTarget();
164 final var existing = localHistories.get(historyId);
165 if (existing != null) {
166 // History already exists: report success
167 LOG.debug("{}: history {} already exists", persistenceId(), historyId);
168 return new LocalHistorySuccess(historyId, request.getSequence());
171 // We have not found the history. Before we create it we need to check history ID sequencing so that we do
172 // not end up resurrecting a purged history.
173 if (purgedHistories.contains(historyId.getHistoryId())) {
174 LOG.debug("{}: rejecting purged request {}", persistenceId(), request);
175 throw new DeadHistoryException(purgedHistories.toRangeSet());
178 // Update last history we have seen
179 if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, historyId.getHistoryId()) < 0) {
180 lastSeenHistory = historyId.getHistoryId();
183 // We have to send the response only after persistence has completed
184 final var chain = tree().ensureTransactionChain(historyId, () -> {
185 LOG.debug("{}: persisted history {}", persistenceId(), historyId);
186 envelope.sendSuccess(new LocalHistorySuccess(historyId, request.getSequence()),
187 tree().readTime() - now);
190 localHistories.put(historyId, LocalFrontendHistory.create(persistenceId(), tree(), chain));
191 LOG.debug("{}: created history {}", persistenceId(), historyId);
195 private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request,
196 final RequestEnvelope envelope, final long now) {
197 final var id = request.getTarget();
198 final var existing = localHistories.get(id);
199 if (existing == null) {
200 // History does not exist: report success
201 LOG.debug("{}: history {} does not exist, nothing to destroy", persistenceId(), id);
202 return new LocalHistorySuccess(id, request.getSequence());
205 existing.destroy(request.getSequence(), envelope, now);
209 private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request,
210 final RequestEnvelope envelope, final long now) {
211 final var id = request.getTarget();
212 final var existing = localHistories.remove(id);
213 if (existing == null) {
214 LOG.debug("{}: history {} has already been purged", persistenceId(), id);
215 return new LocalHistorySuccess(id, request.getSequence());
218 LOG.debug("{}: purging history {}", persistenceId(), id);
219 purgedHistories.add(id.getHistoryId());
220 existing.purge(request.getSequence(), envelope, now);
224 private void checkRequestSequence(final RequestEnvelope envelope) throws OutOfSequenceEnvelopeException {
225 if (expectedTxSequence != envelope.getTxSequence()) {
226 throw new OutOfSequenceEnvelopeException(expectedTxSequence);
230 private void expectNextRequest() {
231 expectedTxSequence++;
235 private static final Logger LOG = LoggerFactory.getLogger(LeaderFrontendState.class);
237 private final @NonNull ClientIdentifier clientId;
238 private final @NonNull String persistenceId;
239 private final @NonNull ShardDataTree tree;
241 private long lastConnectTicks;
242 private long lastSeenTicks;
244 // TODO: explicit failover notification
245 // Record the ActorRef for the originating actor and when we switch to being a leader send a notification
246 // to the frontend client -- that way it can immediately start sending requests
248 // TODO: add statistics:
249 // - number of requests processed
250 // - number of histories processed
251 // - per-RequestException throw counters
253 LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
254 this.persistenceId = requireNonNull(persistenceId);
255 this.clientId = requireNonNull(clientId);
256 this.tree = requireNonNull(tree);
257 lastSeenTicks = tree.readTime();
261 public final ClientIdentifier getIdentifier() {
265 final String persistenceId() {
266 return persistenceId;
269 final long getLastConnectTicks() {
270 return lastConnectTicks;
273 final long getLastSeenTicks() {
274 return lastSeenTicks;
277 final ShardDataTree tree() {
282 lastSeenTicks = tree.readTime();
285 abstract @Nullable LocalHistorySuccess handleLocalHistoryRequest(LocalHistoryRequest<?> request,
286 RequestEnvelope envelope, long now) throws RequestException;
288 abstract @Nullable TransactionSuccess<?> handleTransactionRequest(TransactionRequest<?> request,
289 RequestEnvelope envelope, long now) throws RequestException;
292 lastConnectTicks = tree.readTime();
296 // Hunt down any transactions associated with this frontend
297 final var it = tree.cohortIterator();
298 while (it.hasNext()) {
299 final var cohort = it.next();
300 if (clientId.equals(cohort.getIdentifier().getHistoryId().getClientId())) {
301 if (cohort.getState() != State.COMMIT_PENDING) {
302 LOG.debug("{}: Retiring transaction {}", persistenceId, cohort.getIdentifier());
305 LOG.debug("{}: Transaction {} already committing, not retiring it", persistenceId,
306 cohort.getIdentifier());
313 public final String toString() {
314 return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
317 ToStringHelper addToStringAttributes(final ToStringHelper helper) {
318 return helper.add("clientId", clientId).add("nanosAgo", tree.readTime() - lastSeenTicks);