2a1537bd28ab1ca8f91b23da68ce81581af24cf5
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / LeaderFrontendState.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.base.MoreObjects;
13 import java.util.HashMap;
14 import java.util.Iterator;
15 import java.util.Map;
16 import org.eclipse.jdt.annotation.Nullable;
17 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
18 import org.opendaylight.controller.cluster.access.commands.DeadHistoryException;
19 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
20 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
21 import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
22 import org.opendaylight.controller.cluster.access.commands.OutOfSequenceEnvelopeException;
23 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
24 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
25 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
26 import org.opendaylight.controller.cluster.access.commands.UnknownHistoryException;
27 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
28 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
29 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
30 import org.opendaylight.controller.cluster.access.concepts.RequestException;
31 import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
32 import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
33 import org.opendaylight.controller.cluster.datastore.utils.UnsignedLongRangeSet;
34 import org.opendaylight.yangtools.concepts.Identifiable;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 /**
39  * Frontend state as observed by the shard leader. This class is responsible for tracking generations and sequencing
40  * in the frontend/backend conversation. This class is NOT thread-safe.
41  *
42  * @author Robert Varga
43  */
44 final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
45     private static final Logger LOG = LoggerFactory.getLogger(LeaderFrontendState.class);
46
47     // Histories which have not been purged
48     private final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories;
49
50     // RangeSet performs automatic merging, hence we keep minimal state tracking information
51     private final UnsignedLongRangeSet purgedHistories;
52
53     // Used for all standalone transactions
54     private final AbstractFrontendHistory standaloneHistory;
55     private final ShardDataTree tree;
56     private final ClientIdentifier clientId;
57     private final String persistenceId;
58
59     private long lastConnectTicks;
60     private long lastSeenTicks;
61     private long expectedTxSequence;
62     private Long lastSeenHistory = null;
63
64     // TODO: explicit failover notification
65     //       Record the ActorRef for the originating actor and when we switch to being a leader send a notification
66     //       to the frontend client -- that way it can immediately start sending requests
67
68     // TODO: add statistics:
69     // - number of requests processed
70     // - number of histories processed
71     // - per-RequestException throw counters
72
73     LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
74         this(persistenceId, clientId, tree, UnsignedLongRangeSet.create(),
75             StandaloneFrontendHistory.create(persistenceId, clientId, tree), new HashMap<>());
76     }
77
78     LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree,
79         final UnsignedLongRangeSet purgedHistories, final AbstractFrontendHistory standaloneHistory,
80         final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories) {
81         this.persistenceId = requireNonNull(persistenceId);
82         this.clientId = requireNonNull(clientId);
83         this.tree = requireNonNull(tree);
84         this.purgedHistories = requireNonNull(purgedHistories);
85         this.standaloneHistory = requireNonNull(standaloneHistory);
86         this.localHistories = requireNonNull(localHistories);
87         this.lastSeenTicks = tree.readTime();
88     }
89
90     @Override
91     public ClientIdentifier getIdentifier() {
92         return clientId;
93     }
94
95     private void checkRequestSequence(final RequestEnvelope envelope) throws OutOfSequenceEnvelopeException {
96         if (expectedTxSequence != envelope.getTxSequence()) {
97             throw new OutOfSequenceEnvelopeException(expectedTxSequence);
98         }
99     }
100
101     private void expectNextRequest() {
102         expectedTxSequence++;
103     }
104
105     @Nullable LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest<?> request,
106             final RequestEnvelope envelope, final long now) throws RequestException {
107         checkRequestSequence(envelope);
108
109         try {
110             if (request instanceof CreateLocalHistoryRequest) {
111                 return handleCreateHistory((CreateLocalHistoryRequest) request, envelope, now);
112             } else if (request instanceof DestroyLocalHistoryRequest) {
113                 return handleDestroyHistory((DestroyLocalHistoryRequest) request, envelope, now);
114             } else if (request instanceof PurgeLocalHistoryRequest) {
115                 return handlePurgeHistory((PurgeLocalHistoryRequest)request, envelope, now);
116             } else {
117                 LOG.warn("{}: rejecting unsupported request {}", persistenceId, request);
118                 throw new UnsupportedRequestException(request);
119             }
120         } finally {
121             expectNextRequest();
122         }
123     }
124
125     private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request,
126             final RequestEnvelope envelope, final long now) throws RequestException {
127         final LocalHistoryIdentifier historyId = request.getTarget();
128         final AbstractFrontendHistory existing = localHistories.get(historyId);
129         if (existing != null) {
130             // History already exists: report success
131             LOG.debug("{}: history {} already exists", persistenceId, historyId);
132             return new LocalHistorySuccess(historyId, request.getSequence());
133         }
134
135         // We have not found the history. Before we create it we need to check history ID sequencing so that we do not
136         // end up resurrecting a purged history.
137         if (purgedHistories.contains(historyId.getHistoryId())) {
138             LOG.debug("{}: rejecting purged request {}", persistenceId, request);
139             throw new DeadHistoryException(purgedHistories.toImmutable());
140         }
141
142         // Update last history we have seen
143         if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, historyId.getHistoryId()) < 0) {
144             lastSeenHistory = historyId.getHistoryId();
145         }
146
147         // We have to send the response only after persistence has completed
148         final ShardDataTreeTransactionChain chain = tree.ensureTransactionChain(historyId, () -> {
149             LOG.debug("{}: persisted history {}", persistenceId, historyId);
150             envelope.sendSuccess(new LocalHistorySuccess(historyId, request.getSequence()), tree.readTime() - now);
151         });
152
153         localHistories.put(historyId, LocalFrontendHistory.create(persistenceId, tree, chain));
154         LOG.debug("{}: created history {}", persistenceId, historyId);
155         return null;
156     }
157
158     private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request,
159             final RequestEnvelope envelope, final long now) {
160         final LocalHistoryIdentifier id = request.getTarget();
161         final LocalFrontendHistory existing = localHistories.get(id);
162         if (existing == null) {
163             // History does not exist: report success
164             LOG.debug("{}: history {} does not exist, nothing to destroy", persistenceId, id);
165             return new LocalHistorySuccess(id, request.getSequence());
166         }
167
168         existing.destroy(request.getSequence(), envelope, now);
169         return null;
170     }
171
172     private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request,
173             final RequestEnvelope envelope, final long now) {
174         final LocalHistoryIdentifier id = request.getTarget();
175         final LocalFrontendHistory existing = localHistories.remove(id);
176         if (existing == null) {
177             LOG.debug("{}: history {} has already been purged", persistenceId, id);
178             return new LocalHistorySuccess(id, request.getSequence());
179         }
180
181         LOG.debug("{}: purging history {}", persistenceId, id);
182         purgedHistories.add(id.getHistoryId());
183         existing.purge(request.getSequence(), envelope, now);
184         return null;
185     }
186
187     @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
188             final RequestEnvelope envelope, final long now) throws RequestException {
189         checkRequestSequence(envelope);
190
191         try {
192             final LocalHistoryIdentifier lhId = request.getTarget().getHistoryId();
193             final AbstractFrontendHistory history;
194
195             if (lhId.getHistoryId() != 0) {
196                 history = localHistories.get(lhId);
197                 if (history == null) {
198                     if (purgedHistories.contains(lhId.getHistoryId())) {
199                         LOG.warn("{}: rejecting request {} to purged history", persistenceId, request);
200                         throw new DeadHistoryException(purgedHistories.toImmutable());
201                     }
202
203                     LOG.warn("{}: rejecting unknown history request {}", persistenceId, request);
204                     throw new UnknownHistoryException(lastSeenHistory);
205                 }
206             } else {
207                 history = standaloneHistory;
208             }
209
210             return history.handleTransactionRequest(request, envelope, now);
211         } finally {
212             expectNextRequest();
213         }
214     }
215
216     void reconnect() {
217         expectedTxSequence = 0;
218         lastConnectTicks = tree.readTime();
219     }
220
221     void retire() {
222         // Hunt down any transactions associated with this frontend
223         final Iterator<SimpleShardDataTreeCohort> it = tree.cohortIterator();
224         while (it.hasNext()) {
225             final SimpleShardDataTreeCohort cohort = it.next();
226             if (clientId.equals(cohort.getIdentifier().getHistoryId().getClientId())) {
227                 if (cohort.getState() != State.COMMIT_PENDING) {
228                     LOG.debug("{}: Retiring transaction {}", persistenceId, cohort.getIdentifier());
229                     it.remove();
230                 } else {
231                     LOG.debug("{}: Transaction {} already committing, not retiring it", persistenceId,
232                         cohort.getIdentifier());
233                 }
234             }
235         }
236
237         // Clear out all transaction chains
238         localHistories.values().forEach(AbstractFrontendHistory::retire);
239         localHistories.clear();
240         standaloneHistory.retire();
241     }
242
243     long getLastConnectTicks() {
244         return lastConnectTicks;
245     }
246
247     long getLastSeenTicks() {
248         return lastSeenTicks;
249     }
250
251     void touch() {
252         this.lastSeenTicks = tree.readTime();
253     }
254
255     @Override
256     public String toString() {
257         return MoreObjects.toStringHelper(LeaderFrontendState.class)
258                 .add("clientId", clientId)
259                 .add("nanosAgo", tree.readTime() - lastSeenTicks)
260                 .add("purgedHistories", purgedHistories)
261                 .toString();
262     }
263 }