BUG-5280: add basic concept of ClientSnapshot
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractClientHistory.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Verify;
12 import java.util.HashMap;
13 import java.util.Map;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
16 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
17 import javax.annotation.concurrent.GuardedBy;
18 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
19 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
20 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
21 import org.opendaylight.controller.cluster.access.client.InversibleLockException;
22 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
23 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
24 import org.opendaylight.controller.cluster.access.concepts.Response;
25 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
26 import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
27 import org.opendaylight.yangtools.concepts.Identifiable;
28 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * Abstract base class for client view of a history. This class has two implementations, one for normal local histories
34  * and the other for single transactions.
35  *
36  * @author Robert Varga
37  */
38 abstract class AbstractClientHistory extends LocalAbortable implements Identifiable<LocalHistoryIdentifier> {
39     enum State {
40         IDLE,
41         TX_OPEN,
42         CLOSED,
43     }
44
45     private static final Logger LOG = LoggerFactory.getLogger(AbstractClientHistory.class);
46     private static final AtomicLongFieldUpdater<AbstractClientHistory> NEXT_TX_UPDATER =
47             AtomicLongFieldUpdater.newUpdater(AbstractClientHistory.class, "nextTx");
48     private static final AtomicReferenceFieldUpdater<AbstractClientHistory, State> STATE_UPDATER =
49             AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state");
50
51     @GuardedBy("this")
52     private final Map<TransactionIdentifier, AbstractClientHandle<?>> openTransactions = new HashMap<>();
53     @GuardedBy("this")
54     private final Map<TransactionIdentifier, AbstractTransactionCommitCohort> readyTransactions = new HashMap<>();
55
56     private final Map<Long, ProxyHistory> histories = new ConcurrentHashMap<>();
57     private final AbstractDataStoreClientBehavior client;
58     private final LocalHistoryIdentifier identifier;
59
60     // Used via NEXT_TX_UPDATER
61     @SuppressWarnings("unused")
62     private volatile long nextTx = 0;
63
64     private volatile State state = State.IDLE;
65
66     AbstractClientHistory(final AbstractDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) {
67         this.client = Preconditions.checkNotNull(client);
68         this.identifier = Preconditions.checkNotNull(identifier);
69         Preconditions.checkArgument(identifier.getCookie() == 0);
70     }
71
72     final State state() {
73         return state;
74     }
75
76     final void updateState(final State expected, final State next) {
77         final boolean success = STATE_UPDATER.compareAndSet(this, expected, next);
78         Preconditions.checkState(success, "Race condition detected, state changed from %s to %s", expected, state);
79         LOG.debug("Client history {} changed state from {} to {}", this, expected, next);
80     }
81
82     @Override
83     public final LocalHistoryIdentifier getIdentifier() {
84         return identifier;
85     }
86
87     final long nextTx() {
88         return NEXT_TX_UPDATER.getAndIncrement(this);
89     }
90
91     final Long resolveShardForPath(final YangInstanceIdentifier path) {
92         return client.resolveShardForPath(path);
93     }
94
95     @Override
96     final void localAbort(final Throwable cause) {
97         final State oldState = STATE_UPDATER.getAndSet(this, State.CLOSED);
98         if (oldState != State.CLOSED) {
99             LOG.debug("Force-closing history {}", getIdentifier(), cause);
100
101             synchronized (this) {
102                 for (AbstractClientHandle<?> t : openTransactions.values()) {
103                     t.localAbort(cause);
104                 }
105                 openTransactions.clear();
106                 readyTransactions.clear();
107             }
108         }
109     }
110
111     /**
112      * Create a new history proxy for a given shard.
113      *
114      * @throws InversibleLockException if the shard is being reconnected
115      */
116     private ProxyHistory createHistoryProxy(final Long shard) {
117         final AbstractClientConnection<ShardBackendInfo> connection = client.getConnection(shard);
118         final LocalHistoryIdentifier proxyId = new LocalHistoryIdentifier(identifier.getClientId(),
119             identifier.getHistoryId(), shard);
120         LOG.debug("Created proxyId {} for history {} shard {}", proxyId, identifier, shard);
121
122         final ProxyHistory ret = createHistoryProxy(proxyId, connection);
123
124         // Request creation of the history, if it is not the single history
125         if (ret.getIdentifier().getHistoryId() != 0) {
126             connection.sendRequest(new CreateLocalHistoryRequest(ret.getIdentifier(), connection.localActor()),
127                 this::createHistoryCallback);
128         }
129         return ret;
130     }
131
132     abstract ProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId,
133             final AbstractClientConnection<ShardBackendInfo> connection);
134
135     private void createHistoryCallback(final Response<?, ?> response) {
136         LOG.debug("Create history response {}", response);
137     }
138
139     private ProxyHistory ensureHistoryProxy(final TransactionIdentifier transactionId, final Long shard) {
140         while (true) {
141             try {
142                 return histories.computeIfAbsent(shard, this::createHistoryProxy);
143             } catch (InversibleLockException e) {
144                 LOG.trace("Waiting for transaction {} shard {} connection to resolve", transactionId, shard);
145                 e.awaitResolution();
146                 LOG.trace("Retrying transaction {} shard {} connection", transactionId, shard);
147             }
148         }
149     }
150
151     final AbstractProxyTransaction createSnapshotProxy(final TransactionIdentifier transactionId, final Long shard) {
152         return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, true);
153     }
154
155     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) {
156         return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, false);
157     }
158
159     private void checkNotClosed() {
160         if (state == State.CLOSED) {
161             throw new TransactionChainClosedException(String.format("Local history %s is closed", identifier));
162         }
163     }
164
165     /**
166      * Allocate a new {@link ClientTransaction}.
167      *
168      * @return A new {@link ClientTransaction}
169      * @throws TransactionChainClosedException if this history is closed
170      * @throws IllegalStateException if a previous dependent transaction has not been closed
171      */
172     public final ClientTransaction createTransaction() {
173         checkNotClosed();
174
175         synchronized (this) {
176             final ClientTransaction ret = doCreateTransaction();
177             openTransactions.put(ret.getIdentifier(), ret);
178             return ret;
179         }
180     }
181
182     /**
183      * Create a new {@link ClientSnapshot}.
184      *
185      * @return A new {@link ClientSnapshot}
186      * @throws TransactionChainClosedException if this history is closed
187      * @throws IllegalStateException if a previous dependent transaction has not been closed
188      */
189     public final ClientSnapshot takeSnapshot() {
190         checkNotClosed();
191
192         synchronized (this) {
193             final ClientSnapshot ret = doCreateSnapshot();
194             openTransactions.put(ret.getIdentifier(), ret);
195             return ret;
196         }
197     }
198
199     @GuardedBy("this")
200     abstract ClientSnapshot doCreateSnapshot();
201
202     @GuardedBy("this")
203     abstract ClientTransaction doCreateTransaction();
204
205     /**
206      * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission.
207      *
208      * @param txId Transaction identifier
209      * @param cohort Transaction commit cohort
210      */
211     synchronized AbstractTransactionCommitCohort onTransactionReady(final ClientTransaction tx,
212             final AbstractTransactionCommitCohort cohort) {
213         final TransactionIdentifier txId = tx.getIdentifier();
214         if (openTransactions.remove(txId) == null) {
215             LOG.warn("Transaction {} not recorded, proceeding with readiness", txId);
216         }
217
218         final AbstractTransactionCommitCohort previous = readyTransactions.putIfAbsent(txId, cohort);
219         Preconditions.checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s",
220                 cohort, txId, previous);
221
222         LOG.debug("Local history {} readied transaction {}", this, txId);
223         return cohort;
224     }
225
226     /**
227      * Callback invoked from {@link ClientTransaction} when a child transaction has been aborted without touching
228      * backend.
229      *
230      * @param snapshot transaction identifier
231      */
232     synchronized void onTransactionAbort(final AbstractClientHandle<?> snapshot) {
233         if (openTransactions.remove(snapshot.getIdentifier()) == null) {
234             LOG.warn("Could not find aborting transaction {}", snapshot.getIdentifier());
235         }
236     }
237
238     /**
239      * Callback invoked from {@link AbstractTransactionCommitCohort} when a child transaction has been completed
240      * and all its state can be removed.
241      *
242      * @param txId transaction identifier
243      */
244     synchronized void onTransactionComplete(final TransactionIdentifier txId) {
245         if (readyTransactions.remove(txId) == null) {
246             LOG.warn("Could not find completed transaction {}", txId);
247         }
248     }
249
250     HistoryReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConn) {
251         final ProxyHistory oldProxy = histories.get(newConn.cookie());
252         if (oldProxy == null) {
253             return null;
254         }
255
256         final ProxyReconnectCohort proxy = Verify.verifyNotNull(oldProxy.startReconnect(newConn));
257         return new HistoryReconnectCohort() {
258             @Override
259             ProxyReconnectCohort getProxy() {
260                 return proxy;
261             }
262
263             @Override
264             void replaySuccessfulRequests(final Iterable<ConnectionEntry> previousEntries) {
265                 proxy.replaySuccessfulRequests(previousEntries);
266             }
267
268             @Override
269             public void close() {
270                 LOG.debug("Client history {} finishing reconnect to {}", AbstractClientHistory.this, newConn);
271                 final ProxyHistory newProxy = proxy.finishReconnect();
272                 if (!histories.replace(newConn.cookie(), oldProxy, newProxy)) {
273                     LOG.warn("Failed to replace proxy {} with {} in {}", oldProxy, newProxy,
274                         AbstractClientHistory.this);
275                 }
276             }
277         };
278     }
279 }