Fix free-standing transaction lookup with module-based shards
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / FrontendClientMetadataBuilder.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static com.google.common.base.Verify.verify;
11 import static java.util.Objects.requireNonNull;
12
13 import com.google.common.base.MoreObjects;
14 import com.google.common.collect.Collections2;
15 import java.util.HashMap;
16 import java.util.Map;
17 import javax.annotation.Nonnull;
18 import javax.annotation.concurrent.NotThreadSafe;
19 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
20 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
21 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
22 import org.opendaylight.controller.cluster.datastore.persisted.FrontendClientMetadata;
23 import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata;
24 import org.opendaylight.controller.cluster.datastore.utils.UnsignedLongRangeSet;
25 import org.opendaylight.yangtools.concepts.Builder;
26 import org.opendaylight.yangtools.concepts.Identifiable;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 @NotThreadSafe
31 final class FrontendClientMetadataBuilder implements Builder<FrontendClientMetadata>, Identifiable<ClientIdentifier> {
32     private static final Logger LOG = LoggerFactory.getLogger(FrontendClientMetadataBuilder.class);
33
34     private final Map<LocalHistoryIdentifier, FrontendHistoryMetadataBuilder> currentHistories = new HashMap<>();
35     private final UnsignedLongRangeSet purgedHistories;
36     private final LocalHistoryIdentifier standaloneId;
37     private final ClientIdentifier identifier;
38     private final String shardName;
39
40     FrontendClientMetadataBuilder(final String shardName, final ClientIdentifier identifier) {
41         this.shardName = requireNonNull(shardName);
42         this.identifier = requireNonNull(identifier);
43         purgedHistories = UnsignedLongRangeSet.create();
44
45         // History for stand-alone transactions is always present
46         standaloneId = standaloneHistoryId();
47         currentHistories.put(standaloneId, new FrontendHistoryMetadataBuilder(standaloneId));
48     }
49
50     FrontendClientMetadataBuilder(final String shardName, final FrontendClientMetadata meta) {
51         this.shardName = requireNonNull(shardName);
52         this.identifier = meta.getIdentifier();
53         purgedHistories = UnsignedLongRangeSet.create(meta.getPurgedHistories());
54
55         for (FrontendHistoryMetadata h : meta.getCurrentHistories()) {
56             final FrontendHistoryMetadataBuilder b = new FrontendHistoryMetadataBuilder(identifier, h);
57             currentHistories.put(b.getIdentifier(), b);
58         }
59
60         // Sanity check and recovery
61         standaloneId = standaloneHistoryId();
62         if (!currentHistories.containsKey(standaloneId)) {
63             LOG.warn("{}: Client {} recovered histories {} do not contain stand-alone history, attempting recovery",
64                 shardName, identifier, currentHistories);
65             currentHistories.put(standaloneId, new FrontendHistoryMetadataBuilder(standaloneId));
66         }
67     }
68
69     private LocalHistoryIdentifier standaloneHistoryId() {
70         return new LocalHistoryIdentifier(identifier, 0);
71     }
72
73     @Override
74     public FrontendClientMetadata build() {
75         return new FrontendClientMetadata(identifier, purgedHistories.toImmutable(),
76             Collections2.transform(currentHistories.values(), FrontendHistoryMetadataBuilder::build));
77     }
78
79     @Override
80     public ClientIdentifier getIdentifier() {
81         return identifier;
82     }
83
84     void onHistoryCreated(final LocalHistoryIdentifier historyId) {
85         final FrontendHistoryMetadataBuilder newMeta = new FrontendHistoryMetadataBuilder(historyId);
86         final FrontendHistoryMetadataBuilder oldMeta = currentHistories.putIfAbsent(historyId, newMeta);
87         if (oldMeta != null) {
88             // This should not be happening, warn about it
89             LOG.warn("{}: Reused local history {}", shardName, historyId);
90         } else {
91             LOG.debug("{}: Created local history {}", shardName, historyId);
92         }
93     }
94
95     void onHistoryClosed(final LocalHistoryIdentifier historyId) {
96         final FrontendHistoryMetadataBuilder builder = currentHistories.get(historyId);
97         if (builder != null) {
98             builder.onHistoryClosed();
99             LOG.debug("{}: Closed history {}", shardName, historyId);
100         } else {
101             LOG.warn("{}: Closed unknown history {}, ignoring", shardName, historyId);
102         }
103     }
104
105     void onHistoryPurged(final LocalHistoryIdentifier historyId) {
106         final FrontendHistoryMetadataBuilder history = currentHistories.remove(historyId);
107         final long historyBits = historyId.getHistoryId();
108         if (history == null) {
109             if (!purgedHistories.contains(historyBits)) {
110                 purgedHistories.add(historyBits);
111                 LOG.warn("{}: Purging unknown history {}", shardName, historyId);
112             } else {
113                 LOG.warn("{}: Duplicate purge of history {}", shardName, historyId);
114             }
115         } else {
116             purgedHistories.add(historyBits);
117             LOG.debug("{}: Purged history {}", shardName, historyId);
118         }
119     }
120
121     void onTransactionAborted(final TransactionIdentifier txId) {
122         final FrontendHistoryMetadataBuilder history = getHistory(txId);
123         if (history != null) {
124             history.onTransactionAborted(txId);
125             LOG.debug("{}: Aborted transaction {}", shardName, txId);
126         } else {
127             LOG.warn("{}: Unknown history for aborted transaction {}, ignoring", shardName, txId);
128         }
129     }
130
131     void onTransactionCommitted(final TransactionIdentifier txId) {
132         final FrontendHistoryMetadataBuilder history = getHistory(txId);
133         if (history != null) {
134             history.onTransactionCommitted(txId);
135             LOG.debug("{}: Committed transaction {}", shardName, txId);
136         } else {
137             LOG.warn("{}: Unknown history for commited transaction {}, ignoring", shardName, txId);
138         }
139     }
140
141     void onTransactionPurged(final TransactionIdentifier txId) {
142         final FrontendHistoryMetadataBuilder history = getHistory(txId);
143         if (history != null) {
144             history.onTransactionPurged(txId);
145             LOG.debug("{}: Purged transaction {}", shardName, txId);
146         } else {
147             LOG.warn("{}: Unknown history for purged transaction {}, ignoring", shardName, txId);
148         }
149     }
150
151     /**
152      * Transform frontend metadata for a particular client into its {@link LeaderFrontendState} counterpart.
153      *
154      * @param shard parent shard
155      * @return Leader frontend state
156      */
157     @Nonnull LeaderFrontendState toLeaderState(@Nonnull final Shard shard) {
158         // Note: we have to make sure to *copy* all current state and not leak any views, otherwise leader/follower
159         //       interactions would get intertwined leading to inconsistencies.
160         final Map<LocalHistoryIdentifier, LocalFrontendHistory> histories = new HashMap<>();
161         for (FrontendHistoryMetadataBuilder e : currentHistories.values()) {
162             if (e.getIdentifier().getHistoryId() != 0) {
163                 final AbstractFrontendHistory state = e.toLeaderState(shard);
164                 verify(state instanceof LocalFrontendHistory, "Unexpected state %s", state);
165                 histories.put(e.getIdentifier(), (LocalFrontendHistory) state);
166             }
167         }
168
169         final AbstractFrontendHistory singleHistory;
170         final FrontendHistoryMetadataBuilder singleHistoryMeta = currentHistories.get(
171             new LocalHistoryIdentifier(identifier, 0));
172         if (singleHistoryMeta == null) {
173             final ShardDataTree tree = shard.getDataStore();
174             singleHistory = StandaloneFrontendHistory.create(shard.persistenceId(), getIdentifier(), tree);
175         } else {
176             singleHistory = singleHistoryMeta.toLeaderState(shard);
177         }
178
179         return new LeaderFrontendState(shard.persistenceId(), getIdentifier(), shard.getDataStore(),
180             purgedHistories.copy(), singleHistory, histories);
181     }
182
183     private FrontendHistoryMetadataBuilder getHistory(final TransactionIdentifier txId) {
184         LocalHistoryIdentifier historyId = txId.getHistoryId();
185         if (historyId.getHistoryId() == 0 && historyId.getCookie() != 0) {
186             // We are pre-creating the history for free-standing transactions with a zero cookie, hence our lookup
187             // needs to account for that.
188             LOG.debug("{}: looking up {} instead of {}", shardName, standaloneId, historyId);
189             historyId = standaloneId;
190         }
191
192         return currentHistories.get(historyId);
193     }
194
195     @Override
196     public String toString() {
197         return MoreObjects.toStringHelper(this).add("identifier", identifier).add("current", currentHistories)
198                 .add("purged", purgedHistories).toString();
199     }
200 }