2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static com.google.common.base.Verify.verify;
11 import static java.util.Objects.requireNonNull;
13 import com.google.common.base.MoreObjects;
14 import com.google.common.base.MoreObjects.ToStringHelper;
15 import com.google.common.collect.Collections2;
16 import com.google.common.collect.ImmutableList;
17 import java.util.HashMap;
19 import org.eclipse.jdt.annotation.NonNull;
20 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
21 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
22 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
23 import org.opendaylight.controller.cluster.datastore.persisted.FrontendClientMetadata;
24 import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata;
25 import org.opendaylight.controller.cluster.datastore.utils.ImmutableUnsignedLongSet;
26 import org.opendaylight.controller.cluster.datastore.utils.MutableUnsignedLongSet;
27 import org.opendaylight.yangtools.concepts.Builder;
28 import org.opendaylight.yangtools.concepts.Identifiable;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
33 * This class is NOT thread-safe.
35 abstract class FrontendClientMetadataBuilder implements Builder<FrontendClientMetadata>,
36 Identifiable<ClientIdentifier> {
37 static final class Disabled extends FrontendClientMetadataBuilder {
38 Disabled(final String shardName, final ClientIdentifier identifier) {
39 super(shardName, identifier);
43 public FrontendClientMetadata build() {
44 return new FrontendClientMetadata(getIdentifier(), ImmutableUnsignedLongSet.of(), ImmutableList.of());
48 void onHistoryCreated(final LocalHistoryIdentifier historyId) {
53 void onHistoryClosed(final LocalHistoryIdentifier historyId) {
58 void onHistoryPurged(final LocalHistoryIdentifier historyId) {
63 void onTransactionAborted(final TransactionIdentifier txId) {
68 void onTransactionCommitted(final TransactionIdentifier txId) {
73 void onTransactionPurged(final TransactionIdentifier txId) {
78 void onTransactionsSkipped(final LocalHistoryIdentifier historyId, final ImmutableUnsignedLongSet txIds) {
83 LeaderFrontendState toLeaderState(final Shard shard) {
84 return new LeaderFrontendState.Disabled(shard.persistenceId(), getIdentifier(), shard.getDataStore());
88 static final class Enabled extends FrontendClientMetadataBuilder {
89 private final Map<LocalHistoryIdentifier, FrontendHistoryMetadataBuilder> currentHistories = new HashMap<>();
90 private final MutableUnsignedLongSet purgedHistories;
91 private final LocalHistoryIdentifier standaloneId;
93 Enabled(final String shardName, final ClientIdentifier identifier) {
94 super(shardName, identifier);
96 purgedHistories = MutableUnsignedLongSet.of();
98 // History for stand-alone transactions is always present
99 standaloneId = standaloneHistoryId();
100 currentHistories.put(standaloneId, new FrontendHistoryMetadataBuilder(standaloneId));
103 Enabled(final String shardName, final FrontendClientMetadata meta) {
104 super(shardName, meta.getIdentifier());
106 purgedHistories = meta.getPurgedHistories().mutableCopy();
107 for (FrontendHistoryMetadata h : meta.getCurrentHistories()) {
108 final FrontendHistoryMetadataBuilder b = new FrontendHistoryMetadataBuilder(getIdentifier(), h);
109 currentHistories.put(b.getIdentifier(), b);
112 // Sanity check and recovery
113 standaloneId = standaloneHistoryId();
114 if (!currentHistories.containsKey(standaloneId)) {
115 LOG.warn("{}: Client {} recovered histories {} do not contain stand-alone history, attempting recovery",
116 shardName, getIdentifier(), currentHistories);
117 currentHistories.put(standaloneId, new FrontendHistoryMetadataBuilder(standaloneId));
122 public FrontendClientMetadata build() {
123 return new FrontendClientMetadata(getIdentifier(), purgedHistories.immutableCopy(),
124 Collections2.transform(currentHistories.values(), FrontendHistoryMetadataBuilder::build));
128 void onHistoryCreated(final LocalHistoryIdentifier historyId) {
129 final FrontendHistoryMetadataBuilder newMeta = new FrontendHistoryMetadataBuilder(historyId);
130 final FrontendHistoryMetadataBuilder oldMeta = currentHistories.putIfAbsent(historyId, newMeta);
131 if (oldMeta != null) {
132 // This should not be happening, warn about it
133 LOG.warn("{}: Reused local history {}", shardName(), historyId);
135 LOG.debug("{}: Created local history {}", shardName(), historyId);
140 void onHistoryClosed(final LocalHistoryIdentifier historyId) {
141 final FrontendHistoryMetadataBuilder builder = currentHistories.get(historyId);
142 if (builder != null) {
143 builder.onHistoryClosed();
144 LOG.debug("{}: Closed history {}", shardName(), historyId);
146 LOG.warn("{}: Closed unknown history {}, ignoring", shardName(), historyId);
151 void onHistoryPurged(final LocalHistoryIdentifier historyId) {
152 final FrontendHistoryMetadataBuilder history = currentHistories.remove(historyId);
153 final long historyBits = historyId.getHistoryId();
154 if (history == null) {
155 if (!purgedHistories.contains(historyBits)) {
156 purgedHistories.add(historyBits);
157 LOG.warn("{}: Purging unknown history {}", shardName(), historyId);
159 LOG.warn("{}: Duplicate purge of history {}", shardName(), historyId);
162 purgedHistories.add(historyBits);
163 LOG.debug("{}: Purged history {}", shardName(), historyId);
168 void onTransactionAborted(final TransactionIdentifier txId) {
169 final FrontendHistoryMetadataBuilder history = getHistory(txId);
170 if (history != null) {
171 history.onTransactionAborted(txId);
172 LOG.debug("{}: Aborted transaction {}", shardName(), txId);
174 LOG.warn("{}: Unknown history for aborted transaction {}, ignoring", shardName(), txId);
179 void onTransactionCommitted(final TransactionIdentifier txId) {
180 final FrontendHistoryMetadataBuilder history = getHistory(txId);
181 if (history != null) {
182 history.onTransactionCommitted(txId);
183 LOG.debug("{}: Committed transaction {}", shardName(), txId);
185 LOG.warn("{}: Unknown history for commited transaction {}, ignoring", shardName(), txId);
190 void onTransactionPurged(final TransactionIdentifier txId) {
191 final FrontendHistoryMetadataBuilder history = getHistory(txId);
192 if (history != null) {
193 history.onTransactionPurged(txId);
194 LOG.debug("{}: Purged transaction {}", shardName(), txId);
196 LOG.warn("{}: Unknown history for purged transaction {}, ignoring", shardName(), txId);
201 void onTransactionsSkipped(final LocalHistoryIdentifier historyId, final ImmutableUnsignedLongSet txIds) {
202 final FrontendHistoryMetadataBuilder history = getHistory(historyId);
203 if (history != null) {
204 history.onTransactionsSkipped(txIds);
205 LOG.debug("{}: History {} skipped transactions {}", shardName(), historyId, txIds);
207 LOG.warn("{}: Unknown history {} for skipped transactions, ignoring", shardName(), historyId);
212 LeaderFrontendState toLeaderState(final Shard shard) {
213 // Note: we have to make sure to *copy* all current state and not leak any views, otherwise leader/follower
214 // interactions would get intertwined leading to inconsistencies.
215 final Map<LocalHistoryIdentifier, LocalFrontendHistory> histories = new HashMap<>();
216 for (FrontendHistoryMetadataBuilder e : currentHistories.values()) {
217 if (e.getIdentifier().getHistoryId() != 0) {
218 final AbstractFrontendHistory state = e.toLeaderState(shard);
219 verify(state instanceof LocalFrontendHistory, "Unexpected state %s", state);
220 histories.put(e.getIdentifier(), (LocalFrontendHistory) state);
224 final AbstractFrontendHistory singleHistory;
225 final FrontendHistoryMetadataBuilder singleHistoryMeta = currentHistories.get(
226 new LocalHistoryIdentifier(getIdentifier(), 0));
227 if (singleHistoryMeta == null) {
228 final ShardDataTree tree = shard.getDataStore();
229 singleHistory = StandaloneFrontendHistory.create(shard.persistenceId(), getIdentifier(), tree);
231 singleHistory = singleHistoryMeta.toLeaderState(shard);
234 return new LeaderFrontendState.Enabled(shard.persistenceId(), getIdentifier(), shard.getDataStore(),
235 purgedHistories.mutableCopy(), singleHistory, histories);
239 ToStringHelper addToStringAttributes(final ToStringHelper helper) {
240 return super.addToStringAttributes(helper).add("current", currentHistories).add("purged", purgedHistories);
243 private FrontendHistoryMetadataBuilder getHistory(final TransactionIdentifier txId) {
244 return getHistory(txId.getHistoryId());
247 private FrontendHistoryMetadataBuilder getHistory(final LocalHistoryIdentifier historyId) {
248 final LocalHistoryIdentifier local;
249 if (historyId.getHistoryId() == 0 && historyId.getCookie() != 0) {
250 // We are pre-creating the history for free-standing transactions with a zero cookie, hence our lookup
251 // needs to account for that.
252 LOG.debug("{}: looking up {} instead of {}", shardName(), standaloneId, historyId);
253 local = standaloneId;
258 return currentHistories.get(local);
261 private LocalHistoryIdentifier standaloneHistoryId() {
262 return new LocalHistoryIdentifier(getIdentifier(), 0);
266 private static final Logger LOG = LoggerFactory.getLogger(FrontendClientMetadataBuilder.class);
268 private final @NonNull ClientIdentifier identifier;
269 private final @NonNull String shardName;
271 FrontendClientMetadataBuilder(final String shardName, final ClientIdentifier identifier) {
272 this.shardName = requireNonNull(shardName);
273 this.identifier = requireNonNull(identifier);
276 static FrontendClientMetadataBuilder of(final String shardName, final FrontendClientMetadata meta) {
277 // Completely empty histories imply disabled state, as otherwise we'd have a record of the single history --
278 // either purged or active
279 return meta.getCurrentHistories().isEmpty() && meta.getPurgedHistories().isEmpty()
280 ? new Disabled(shardName, meta.getIdentifier()) : new Enabled(shardName, meta);
284 public final ClientIdentifier getIdentifier() {
288 final String shardName() {
292 abstract void onHistoryCreated(LocalHistoryIdentifier historyId);
294 abstract void onHistoryClosed(LocalHistoryIdentifier historyId);
296 abstract void onHistoryPurged(LocalHistoryIdentifier historyId);
298 abstract void onTransactionAborted(TransactionIdentifier txId);
300 abstract void onTransactionCommitted(TransactionIdentifier txId);
302 abstract void onTransactionPurged(TransactionIdentifier txId);
304 abstract void onTransactionsSkipped(LocalHistoryIdentifier historyId, ImmutableUnsignedLongSet txIds);
307 * Transform frontend metadata for a particular client into its {@link LeaderFrontendState} counterpart.
309 * @param shard parent shard
310 * @return Leader frontend state
312 abstract @NonNull LeaderFrontendState toLeaderState(@NonNull Shard shard);
315 public final String toString() {
316 return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
319 ToStringHelper addToStringAttributes(final ToStringHelper helper) {
320 return helper.add("identifier", identifier);