Minor sal-distributed-datastore cleanups
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / LeaderFrontendState.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.base.MoreObjects;
13 import com.google.common.base.MoreObjects.ToStringHelper;
14 import java.util.HashMap;
15 import java.util.Iterator;
16 import java.util.Map;
17 import org.eclipse.jdt.annotation.NonNull;
18 import org.eclipse.jdt.annotation.Nullable;
19 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
20 import org.opendaylight.controller.cluster.access.commands.DeadHistoryException;
21 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
22 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
23 import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
24 import org.opendaylight.controller.cluster.access.commands.OutOfSequenceEnvelopeException;
25 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
26 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
27 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
28 import org.opendaylight.controller.cluster.access.commands.UnknownHistoryException;
29 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
30 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
31 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
32 import org.opendaylight.controller.cluster.access.concepts.RequestException;
33 import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
34 import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
35 import org.opendaylight.controller.cluster.datastore.utils.MutableUnsignedLongSet;
36 import org.opendaylight.yangtools.concepts.Identifiable;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 /**
41  * Frontend state as observed by the shard leader. This class is responsible for tracking generations and sequencing
42  * in the frontend/backend conversation. This class is NOT thread-safe.
43  *
44  * @author Robert Varga
45  */
46 abstract class LeaderFrontendState implements Identifiable<ClientIdentifier> {
47     static final class Disabled extends LeaderFrontendState {
48         Disabled(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
49             super(persistenceId, clientId, tree);
50         }
51
52         @Override
53         LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest<?> request,
54                 final RequestEnvelope envelope, final long now) throws RequestException {
55             throw new UnsupportedRequestException(request);
56         }
57
58         @Override
59         TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
60                 final RequestEnvelope envelope, final long now) throws RequestException {
61             throw new UnsupportedRequestException(request);
62         }
63     }
64
65     static final class Enabled extends LeaderFrontendState {
66         // Histories which have not been purged
67         private final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories;
68
69         // UnsignedLongSet performs automatic merging, hence we keep minimal state tracking information
70         private final MutableUnsignedLongSet purgedHistories;
71
72         // Used for all standalone transactions
73         private final AbstractFrontendHistory standaloneHistory;
74
75         private long expectedTxSequence;
76         private Long lastSeenHistory = null;
77
78         Enabled(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
79             this(persistenceId, clientId, tree, MutableUnsignedLongSet.of(),
80                 StandaloneFrontendHistory.create(persistenceId, clientId, tree), new HashMap<>());
81         }
82
83         Enabled(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree,
84                 final MutableUnsignedLongSet purgedHistories, final AbstractFrontendHistory standaloneHistory,
85                 final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories) {
86             super(persistenceId, clientId, tree);
87             this.purgedHistories = requireNonNull(purgedHistories);
88             this.standaloneHistory = requireNonNull(standaloneHistory);
89             this.localHistories = requireNonNull(localHistories);
90         }
91
92         @Override
93         @Nullable LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest<?> request,
94                 final RequestEnvelope envelope, final long now) throws RequestException {
95             checkRequestSequence(envelope);
96
97             try {
98                 if (request instanceof CreateLocalHistoryRequest) {
99                     return handleCreateHistory((CreateLocalHistoryRequest) request, envelope, now);
100                 } else if (request instanceof DestroyLocalHistoryRequest) {
101                     return handleDestroyHistory((DestroyLocalHistoryRequest) request, envelope, now);
102                 } else if (request instanceof PurgeLocalHistoryRequest) {
103                     return handlePurgeHistory((PurgeLocalHistoryRequest) request, envelope, now);
104                 } else {
105                     LOG.warn("{}: rejecting unsupported request {}", persistenceId(), request);
106                     throw new UnsupportedRequestException(request);
107                 }
108             } finally {
109                 expectNextRequest();
110             }
111         }
112
113         @Override
114         @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
115                 final RequestEnvelope envelope, final long now) throws RequestException {
116             checkRequestSequence(envelope);
117
118             try {
119                 final LocalHistoryIdentifier lhId = request.getTarget().getHistoryId();
120                 final AbstractFrontendHistory history;
121
122                 if (lhId.getHistoryId() != 0) {
123                     history = localHistories.get(lhId);
124                     if (history == null) {
125                         if (purgedHistories.contains(lhId.getHistoryId())) {
126                             LOG.warn("{}: rejecting request {} to purged history", persistenceId(), request);
127                             throw new DeadHistoryException(purgedHistories.toRangeSet());
128                         }
129
130                         LOG.warn("{}: rejecting unknown history request {}", persistenceId(), request);
131                         throw new UnknownHistoryException(lastSeenHistory);
132                     }
133                 } else {
134                     history = standaloneHistory;
135                 }
136
137                 return history.handleTransactionRequest(request, envelope, now);
138             } finally {
139                 expectNextRequest();
140             }
141         }
142
143         @Override
144         void reconnect() {
145             expectedTxSequence = 0;
146             super.reconnect();
147         }
148
149         @Override
150         void retire() {
151             super.retire();
152
153             // Clear out all transaction chains
154             localHistories.values().forEach(AbstractFrontendHistory::retire);
155             localHistories.clear();
156             standaloneHistory.retire();
157         }
158
159         @Override
160         ToStringHelper addToStringAttributes(final ToStringHelper helper) {
161             return super.addToStringAttributes(helper).add("purgedHistories", purgedHistories);
162         }
163
164         private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request,
165                 final RequestEnvelope envelope, final long now) throws RequestException {
166             final LocalHistoryIdentifier historyId = request.getTarget();
167             final AbstractFrontendHistory existing = localHistories.get(historyId);
168             if (existing != null) {
169                 // History already exists: report success
170                 LOG.debug("{}: history {} already exists", persistenceId(), historyId);
171                 return new LocalHistorySuccess(historyId, request.getSequence());
172             }
173
174             // We have not found the history. Before we create it we need to check history ID sequencing so that we do
175             // not end up resurrecting a purged history.
176             if (purgedHistories.contains(historyId.getHistoryId())) {
177                 LOG.debug("{}: rejecting purged request {}", persistenceId(), request);
178                 throw new DeadHistoryException(purgedHistories.toRangeSet());
179             }
180
181             // Update last history we have seen
182             if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, historyId.getHistoryId()) < 0) {
183                 lastSeenHistory = historyId.getHistoryId();
184             }
185
186             // We have to send the response only after persistence has completed
187             final ShardDataTreeTransactionChain chain = tree().ensureTransactionChain(historyId, () -> {
188                 LOG.debug("{}: persisted history {}", persistenceId(), historyId);
189                 envelope.sendSuccess(new LocalHistorySuccess(historyId, request.getSequence()),
190                     tree().readTime() - now);
191             });
192
193             localHistories.put(historyId, LocalFrontendHistory.create(persistenceId(), tree(), chain));
194             LOG.debug("{}: created history {}", persistenceId(), historyId);
195             return null;
196         }
197
198         private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request,
199                 final RequestEnvelope envelope, final long now) {
200             final LocalHistoryIdentifier id = request.getTarget();
201             final LocalFrontendHistory existing = localHistories.get(id);
202             if (existing == null) {
203                 // History does not exist: report success
204                 LOG.debug("{}: history {} does not exist, nothing to destroy", persistenceId(), id);
205                 return new LocalHistorySuccess(id, request.getSequence());
206             }
207
208             existing.destroy(request.getSequence(), envelope, now);
209             return null;
210         }
211
212         private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request,
213                 final RequestEnvelope envelope, final long now) {
214             final LocalHistoryIdentifier id = request.getTarget();
215             final LocalFrontendHistory existing = localHistories.remove(id);
216             if (existing == null) {
217                 LOG.debug("{}: history {} has already been purged", persistenceId(), id);
218                 return new LocalHistorySuccess(id, request.getSequence());
219             }
220
221             LOG.debug("{}: purging history {}", persistenceId(), id);
222             purgedHistories.add(id.getHistoryId());
223             existing.purge(request.getSequence(), envelope, now);
224             return null;
225         }
226
227         private void checkRequestSequence(final RequestEnvelope envelope) throws OutOfSequenceEnvelopeException {
228             if (expectedTxSequence != envelope.getTxSequence()) {
229                 throw new OutOfSequenceEnvelopeException(expectedTxSequence);
230             }
231         }
232
233         private void expectNextRequest() {
234             expectedTxSequence++;
235         }
236     }
237
238     private static final Logger LOG = LoggerFactory.getLogger(LeaderFrontendState.class);
239
240     private final @NonNull ClientIdentifier clientId;
241     private final @NonNull String persistenceId;
242     private final @NonNull ShardDataTree tree;
243
244     private long lastConnectTicks;
245     private long lastSeenTicks;
246
247     // TODO: explicit failover notification
248     //       Record the ActorRef for the originating actor and when we switch to being a leader send a notification
249     //       to the frontend client -- that way it can immediately start sending requests
250
251     // TODO: add statistics:
252     // - number of requests processed
253     // - number of histories processed
254     // - per-RequestException throw counters
255
256     LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
257         this.persistenceId = requireNonNull(persistenceId);
258         this.clientId = requireNonNull(clientId);
259         this.tree = requireNonNull(tree);
260         lastSeenTicks = tree.readTime();
261     }
262
263     @Override
264     public final ClientIdentifier getIdentifier() {
265         return clientId;
266     }
267
268     final String persistenceId() {
269         return persistenceId;
270     }
271
272     final long getLastConnectTicks() {
273         return lastConnectTicks;
274     }
275
276     final long getLastSeenTicks() {
277         return lastSeenTicks;
278     }
279
280     final ShardDataTree tree() {
281         return tree;
282     }
283
284     final void touch() {
285         lastSeenTicks = tree.readTime();
286     }
287
288     abstract @Nullable LocalHistorySuccess handleLocalHistoryRequest(LocalHistoryRequest<?> request,
289             RequestEnvelope envelope, long now) throws RequestException;
290
291     abstract @Nullable TransactionSuccess<?> handleTransactionRequest(TransactionRequest<?> request,
292             RequestEnvelope envelope, long now) throws RequestException;
293
294     void reconnect() {
295         lastConnectTicks = tree.readTime();
296     }
297
298     void retire() {
299         // Hunt down any transactions associated with this frontend
300         final Iterator<SimpleShardDataTreeCohort> it = tree.cohortIterator();
301         while (it.hasNext()) {
302             final SimpleShardDataTreeCohort cohort = it.next();
303             if (clientId.equals(cohort.getIdentifier().getHistoryId().getClientId())) {
304                 if (cohort.getState() != State.COMMIT_PENDING) {
305                     LOG.debug("{}: Retiring transaction {}", persistenceId, cohort.getIdentifier());
306                     it.remove();
307                 } else {
308                     LOG.debug("{}: Transaction {} already committing, not retiring it", persistenceId,
309                         cohort.getIdentifier());
310                 }
311             }
312         }
313     }
314
315     @Override
316     public final String toString() {
317         return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
318     }
319
320     ToStringHelper addToStringAttributes(final ToStringHelper helper) {
321         return helper.add("clientId", clientId).add("nanosAgo", tree.readTime() - lastSeenTicks);
322     }
323 }