2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.base.MoreObjects;
13 import com.google.common.base.MoreObjects.ToStringHelper;
14 import com.google.common.base.VerifyException;
15 import com.google.common.collect.Collections2;
16 import com.google.common.collect.ImmutableList;
17 import java.util.HashMap;
19 import org.eclipse.jdt.annotation.NonNull;
20 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
21 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
22 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
23 import org.opendaylight.controller.cluster.datastore.persisted.FrontendClientMetadata;
24 import org.opendaylight.controller.cluster.datastore.utils.ImmutableUnsignedLongSet;
25 import org.opendaylight.controller.cluster.datastore.utils.MutableUnsignedLongSet;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
30 * This class is NOT thread-safe.
32 abstract sealed class FrontendClientMetadataBuilder {
33 static final class Disabled extends FrontendClientMetadataBuilder {
34 Disabled(final String shardName, final ClientIdentifier clientId) {
35 super(shardName, clientId);
39 FrontendClientMetadata build() {
40 return new FrontendClientMetadata(clientId(), ImmutableUnsignedLongSet.of(), ImmutableList.of());
44 void onHistoryCreated(final LocalHistoryIdentifier historyId) {
49 void onHistoryClosed(final LocalHistoryIdentifier historyId) {
54 void onHistoryPurged(final LocalHistoryIdentifier historyId) {
59 void onTransactionAborted(final TransactionIdentifier txId) {
64 void onTransactionCommitted(final TransactionIdentifier txId) {
69 void onTransactionPurged(final TransactionIdentifier txId) {
74 void onTransactionsSkipped(final LocalHistoryIdentifier historyId, final ImmutableUnsignedLongSet txIds) {
79 LeaderFrontendState toLeaderState(final Shard shard) {
80 return new LeaderFrontendState.Disabled(shard.persistenceId(), clientId(), shard.getDataStore());
84 static final class Enabled extends FrontendClientMetadataBuilder {
85 private final Map<LocalHistoryIdentifier, FrontendHistoryMetadataBuilder> currentHistories = new HashMap<>();
86 private final MutableUnsignedLongSet purgedHistories;
87 private final LocalHistoryIdentifier standaloneId;
89 Enabled(final String shardName, final ClientIdentifier clientId) {
90 super(shardName, clientId);
92 purgedHistories = MutableUnsignedLongSet.of();
94 // History for stand-alone transactions is always present
95 standaloneId = standaloneHistoryId();
96 currentHistories.put(standaloneId, new FrontendHistoryMetadataBuilder(standaloneId));
99 Enabled(final String shardName, final FrontendClientMetadata meta) {
100 super(shardName, meta.clientId());
102 purgedHistories = meta.getPurgedHistories().mutableCopy();
103 for (var historyMeta : meta.getCurrentHistories()) {
104 final var builder = new FrontendHistoryMetadataBuilder(clientId(), historyMeta);
105 currentHistories.put(builder.getIdentifier(), builder);
108 // Sanity check and recovery
109 standaloneId = standaloneHistoryId();
110 if (!currentHistories.containsKey(standaloneId)) {
111 LOG.warn("{}: Client {} recovered histories {} do not contain stand-alone history, attempting recovery",
112 shardName, clientId(), currentHistories);
113 currentHistories.put(standaloneId, new FrontendHistoryMetadataBuilder(standaloneId));
118 FrontendClientMetadata build() {
119 return new FrontendClientMetadata(clientId(), purgedHistories.immutableCopy(),
120 Collections2.transform(currentHistories.values(), FrontendHistoryMetadataBuilder::build));
124 void onHistoryCreated(final LocalHistoryIdentifier historyId) {
125 final var newMeta = new FrontendHistoryMetadataBuilder(historyId);
126 final var oldMeta = currentHistories.putIfAbsent(historyId, newMeta);
127 if (oldMeta != null) {
128 // This should not be happening, warn about it
129 LOG.warn("{}: Reused local history {}", shardName(), historyId);
131 LOG.debug("{}: Created local history {}", shardName(), historyId);
136 void onHistoryClosed(final LocalHistoryIdentifier historyId) {
137 final var builder = currentHistories.get(historyId);
138 if (builder != null) {
139 builder.onHistoryClosed();
140 LOG.debug("{}: Closed history {}", shardName(), historyId);
142 LOG.warn("{}: Closed unknown history {}, ignoring", shardName(), historyId);
147 void onHistoryPurged(final LocalHistoryIdentifier historyId) {
148 final var history = currentHistories.remove(historyId);
149 final long historyBits = historyId.getHistoryId();
150 if (history == null) {
151 if (!purgedHistories.contains(historyBits)) {
152 purgedHistories.add(historyBits);
153 LOG.warn("{}: Purging unknown history {}", shardName(), historyId);
155 LOG.warn("{}: Duplicate purge of history {}", shardName(), historyId);
158 purgedHistories.add(historyBits);
159 LOG.debug("{}: Purged history {}", shardName(), historyId);
164 void onTransactionAborted(final TransactionIdentifier txId) {
165 final var history = getHistory(txId);
166 if (history != null) {
167 history.onTransactionAborted(txId);
168 LOG.debug("{}: Aborted transaction {}", shardName(), txId);
170 LOG.warn("{}: Unknown history for aborted transaction {}, ignoring", shardName(), txId);
175 void onTransactionCommitted(final TransactionIdentifier txId) {
176 final var history = getHistory(txId);
177 if (history != null) {
178 history.onTransactionCommitted(txId);
179 LOG.debug("{}: Committed transaction {}", shardName(), txId);
181 LOG.warn("{}: Unknown history for commited transaction {}, ignoring", shardName(), txId);
186 void onTransactionPurged(final TransactionIdentifier txId) {
187 final var history = getHistory(txId);
188 if (history != null) {
189 history.onTransactionPurged(txId);
190 LOG.debug("{}: Purged transaction {}", shardName(), txId);
192 LOG.warn("{}: Unknown history for purged transaction {}, ignoring", shardName(), txId);
197 void onTransactionsSkipped(final LocalHistoryIdentifier historyId, final ImmutableUnsignedLongSet txIds) {
198 final FrontendHistoryMetadataBuilder history = getHistory(historyId);
199 if (history != null) {
200 history.onTransactionsSkipped(txIds);
201 LOG.debug("{}: History {} skipped transactions {}", shardName(), historyId, txIds);
203 LOG.warn("{}: Unknown history {} for skipped transactions, ignoring", shardName(), historyId);
208 LeaderFrontendState toLeaderState(final Shard shard) {
209 // Note: we have to make sure to *copy* all current state and not leak any views, otherwise leader/follower
210 // interactions would get intertwined leading to inconsistencies.
211 final var histories = new HashMap<LocalHistoryIdentifier, LocalFrontendHistory>();
212 for (var historyMetaBuilder : currentHistories.values()) {
213 final var historyId = historyMetaBuilder.getIdentifier();
214 if (historyId.getHistoryId() != 0) {
215 final var state = historyMetaBuilder.toLeaderState(shard);
216 if (state instanceof LocalFrontendHistory localState) {
217 histories.put(historyId, localState);
219 throw new VerifyException("Unexpected state " + state);
224 final AbstractFrontendHistory singleHistory;
225 final var singleHistoryMeta = currentHistories.get(new LocalHistoryIdentifier(clientId(), 0));
226 if (singleHistoryMeta == null) {
227 final var tree = shard.getDataStore();
228 singleHistory = StandaloneFrontendHistory.create(shard.persistenceId(), clientId(), tree);
230 singleHistory = singleHistoryMeta.toLeaderState(shard);
233 return new LeaderFrontendState.Enabled(shard.persistenceId(), clientId(), shard.getDataStore(),
234 purgedHistories.mutableCopy(), singleHistory, histories);
238 ToStringHelper addToStringAttributes(final ToStringHelper helper) {
239 return super.addToStringAttributes(helper).add("current", currentHistories).add("purged", purgedHistories);
242 private FrontendHistoryMetadataBuilder getHistory(final TransactionIdentifier txId) {
243 return getHistory(txId.getHistoryId());
246 private FrontendHistoryMetadataBuilder getHistory(final LocalHistoryIdentifier historyId) {
247 final LocalHistoryIdentifier local;
248 if (historyId.getHistoryId() == 0 && historyId.getCookie() != 0) {
249 // We are pre-creating the history for free-standing transactions with a zero cookie, hence our lookup
250 // needs to account for that.
251 LOG.debug("{}: looking up {} instead of {}", shardName(), standaloneId, historyId);
252 local = standaloneId;
257 return currentHistories.get(local);
260 private LocalHistoryIdentifier standaloneHistoryId() {
261 return new LocalHistoryIdentifier(clientId(), 0);
265 private static final Logger LOG = LoggerFactory.getLogger(FrontendClientMetadataBuilder.class);
267 private final @NonNull ClientIdentifier clientId;
268 private final @NonNull String shardName;
270 FrontendClientMetadataBuilder(final String shardName, final ClientIdentifier clientId) {
271 this.shardName = requireNonNull(shardName);
272 this.clientId = requireNonNull(clientId);
275 static FrontendClientMetadataBuilder of(final String shardName, final FrontendClientMetadata meta) {
276 // Completely empty histories imply disabled state, as otherwise we'd have a record of the single history --
277 // either purged or active
278 return meta.getCurrentHistories().isEmpty() && meta.getPurgedHistories().isEmpty()
279 ? new Disabled(shardName, meta.clientId()) : new Enabled(shardName, meta);
282 final ClientIdentifier clientId() {
286 final String shardName() {
290 abstract FrontendClientMetadata build();
292 abstract void onHistoryCreated(LocalHistoryIdentifier historyId);
294 abstract void onHistoryClosed(LocalHistoryIdentifier historyId);
296 abstract void onHistoryPurged(LocalHistoryIdentifier historyId);
298 abstract void onTransactionAborted(TransactionIdentifier txId);
300 abstract void onTransactionCommitted(TransactionIdentifier txId);
302 abstract void onTransactionPurged(TransactionIdentifier txId);
304 abstract void onTransactionsSkipped(LocalHistoryIdentifier historyId, ImmutableUnsignedLongSet txIds);
307 * Transform frontend metadata for a particular client into its {@link LeaderFrontendState} counterpart.
309 * @param shard parent shard
310 * @return Leader frontend state
312 abstract @NonNull LeaderFrontendState toLeaderState(@NonNull Shard shard);
315 public final String toString() {
316 return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
319 ToStringHelper addToStringAttributes(final ToStringHelper helper) {
320 return helper.add("clientId", clientId);