Fix eos entity lookups with YangInstanceIdentifier
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractClientHistory.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12 import static com.google.common.base.Verify.verifyNotNull;
13 import static java.util.Objects.requireNonNull;
14
15 import java.util.Collection;
16 import java.util.HashMap;
17 import java.util.Map;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
20 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
21 import java.util.concurrent.locks.StampedLock;
22 import org.checkerframework.checker.lock.qual.GuardedBy;
23 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
24 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
25 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
26 import org.opendaylight.controller.cluster.access.client.InversibleLockException;
27 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
28 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
29 import org.opendaylight.controller.cluster.access.concepts.Response;
30 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
31 import org.opendaylight.mdsal.dom.api.DOMTransactionChainClosedException;
32 import org.opendaylight.yangtools.concepts.Identifiable;
33 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * Abstract base class for client view of a history. This class has two implementations, one for normal local histories
39  * and the other for single transactions.
40  *
41  * @author Robert Varga
42  */
43 public abstract class AbstractClientHistory extends LocalAbortable implements Identifiable<LocalHistoryIdentifier> {
44     enum State {
45         IDLE,
46         TX_OPEN,
47         CLOSED,
48     }
49
50     private static final Logger LOG = LoggerFactory.getLogger(AbstractClientHistory.class);
51     private static final AtomicLongFieldUpdater<AbstractClientHistory> NEXT_TX_UPDATER =
52             AtomicLongFieldUpdater.newUpdater(AbstractClientHistory.class, "nextTx");
53     private static final AtomicReferenceFieldUpdater<AbstractClientHistory, State> STATE_UPDATER =
54             AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state");
55
56     @GuardedBy("this")
57     private final Map<TransactionIdentifier, AbstractClientHandle<?>> openTransactions = new HashMap<>();
58     @GuardedBy("this")
59     private final Map<TransactionIdentifier, AbstractTransactionCommitCohort> readyTransactions = new HashMap<>();
60
61     @GuardedBy("lock")
62     private final Map<Long, ProxyHistory> histories = new ConcurrentHashMap<>();
63     private final StampedLock lock = new StampedLock();
64
65     private final AbstractDataStoreClientBehavior client;
66     private final LocalHistoryIdentifier identifier;
67
68     // Used via NEXT_TX_UPDATER
69     @SuppressWarnings("unused")
70     private volatile long nextTx = 0;
71
72     private volatile State state = State.IDLE;
73
74     AbstractClientHistory(final AbstractDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) {
75         this.client = requireNonNull(client);
76         this.identifier = requireNonNull(identifier);
77         checkArgument(identifier.getCookie() == 0);
78     }
79
80     final State state() {
81         return state;
82     }
83
84     final void updateState(final State expected, final State next) {
85         final boolean success = STATE_UPDATER.compareAndSet(this, expected, next);
86         checkState(success, "Race condition detected, state changed from %s to %s", expected, state);
87         LOG.debug("Client history {} changed state from {} to {}", this, expected, next);
88     }
89
90     final synchronized void doClose() {
91         final State local = state;
92         if (local != State.CLOSED) {
93             checkState(local == State.IDLE, "Local history %s has an open transaction", this);
94             histories.values().forEach(ProxyHistory::close);
95             updateState(local, State.CLOSED);
96         }
97     }
98
99     final synchronized void onProxyDestroyed(final ProxyHistory proxyHistory) {
100         histories.remove(proxyHistory.getIdentifier().getCookie());
101         LOG.debug("{}: removed destroyed proxy {}", this, proxyHistory);
102     }
103
104     @Override
105     public LocalHistoryIdentifier getIdentifier() {
106         return identifier;
107     }
108
109     final long nextTx() {
110         return NEXT_TX_UPDATER.getAndIncrement(this);
111     }
112
113     final Long resolveShardForPath(final YangInstanceIdentifier path) {
114         return client.resolveShardForPath(path);
115     }
116
117     @Override
118     final void localAbort(final Throwable cause) {
119         final State oldState = STATE_UPDATER.getAndSet(this, State.CLOSED);
120         if (oldState != State.CLOSED) {
121             LOG.debug("Force-closing history {}", getIdentifier(), cause);
122
123             synchronized (this) {
124                 for (AbstractClientHandle<?> t : openTransactions.values()) {
125                     t.localAbort(cause);
126                 }
127                 openTransactions.clear();
128                 readyTransactions.clear();
129             }
130         }
131     }
132
133     /**
134      * Create a new history proxy for a given shard.
135      *
136      * @throws InversibleLockException if the shard is being reconnected
137      */
138     @GuardedBy("lock")
139     private ProxyHistory createHistoryProxy(final Long shard) {
140         final AbstractClientConnection<ShardBackendInfo> connection = client.getConnection(shard);
141         final LocalHistoryIdentifier proxyId = new LocalHistoryIdentifier(identifier.getClientId(),
142             identifier.getHistoryId(), shard);
143         LOG.debug("Created proxyId {} for history {} shard {}", proxyId, identifier, shard);
144
145         final ProxyHistory ret = createHistoryProxy(proxyId, connection);
146
147         // Request creation of the history, if it is not the single history
148         if (ret.getIdentifier().getHistoryId() != 0) {
149             connection.sendRequest(new CreateLocalHistoryRequest(ret.getIdentifier(), connection.localActor()),
150                 this::createHistoryCallback);
151         }
152         return ret;
153     }
154
155     abstract ProxyHistory createHistoryProxy(LocalHistoryIdentifier historyId,
156             AbstractClientConnection<ShardBackendInfo> connection);
157
158     private void createHistoryCallback(final Response<?, ?> response) {
159         LOG.debug("Create history response {}", response);
160     }
161
162     private ProxyHistory ensureHistoryProxy(final TransactionIdentifier transactionId, final Long shard) {
163         while (true) {
164             try {
165                 // Short-lived lock to ensure exclusion of createHistoryProxy and the lookup phase in startReconnect,
166                 // see comments in startReconnect() for details.
167                 final long stamp = lock.readLock();
168                 try {
169                     return histories.computeIfAbsent(shard, this::createHistoryProxy);
170                 } finally {
171                     lock.unlockRead(stamp);
172                 }
173             } catch (InversibleLockException e) {
174                 LOG.trace("Waiting for transaction {} shard {} connection to resolve", transactionId, shard);
175                 e.awaitResolution();
176                 LOG.trace("Retrying transaction {} shard {} connection", transactionId, shard);
177             }
178         }
179     }
180
181     final AbstractProxyTransaction createSnapshotProxy(final TransactionIdentifier transactionId, final Long shard) {
182         return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, true);
183     }
184
185     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) {
186         return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, false);
187     }
188
189     private void checkNotClosed() {
190         if (state == State.CLOSED) {
191             throw new DOMTransactionChainClosedException(String.format("Local history %s is closed", identifier));
192         }
193     }
194
195     /**
196      * Allocate a new {@link ClientTransaction}.
197      *
198      * @return A new {@link ClientTransaction}
199      * @throws DOMTransactionChainClosedException if this history is closed
200      * @throws IllegalStateException if a previous dependent transaction has not been closed
201      */
202     public ClientTransaction createTransaction() {
203         checkNotClosed();
204
205         synchronized (this) {
206             final ClientTransaction ret = doCreateTransaction();
207             openTransactions.put(ret.getIdentifier(), ret);
208             return ret;
209         }
210     }
211
212     /**
213      * Create a new {@link ClientSnapshot}.
214      *
215      * @return A new {@link ClientSnapshot}
216      * @throws DOMTransactionChainClosedException if this history is closed
217      * @throws IllegalStateException if a previous dependent transaction has not been closed
218      */
219     public ClientSnapshot takeSnapshot() {
220         checkNotClosed();
221
222         synchronized (this) {
223             final ClientSnapshot ret = doCreateSnapshot();
224             openTransactions.put(ret.getIdentifier(), ret);
225             return ret;
226         }
227     }
228
229     @GuardedBy("this")
230     abstract ClientSnapshot doCreateSnapshot();
231
232     @GuardedBy("this")
233     abstract ClientTransaction doCreateTransaction();
234
235     /**
236      * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission.
237      *
238      * @param txId Transaction identifier
239      * @param cohort Transaction commit cohort
240      */
241     synchronized AbstractTransactionCommitCohort onTransactionReady(final ClientTransaction tx,
242             final AbstractTransactionCommitCohort cohort) {
243         final TransactionIdentifier txId = tx.getIdentifier();
244         if (openTransactions.remove(txId) == null) {
245             LOG.warn("Transaction {} not recorded, proceeding with readiness", txId);
246         }
247
248         final AbstractTransactionCommitCohort previous = readyTransactions.putIfAbsent(txId, cohort);
249         checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s", cohort, txId, previous);
250
251         LOG.debug("Local history {} readied transaction {}", this, txId);
252         return cohort;
253     }
254
255     /**
256      * Callback invoked from {@link ClientTransaction} when a child transaction has been aborted without touching
257      * backend.
258      *
259      * @param snapshot transaction identifier
260      */
261     synchronized void onTransactionAbort(final AbstractClientHandle<?> snapshot) {
262         if (openTransactions.remove(snapshot.getIdentifier()) == null) {
263             LOG.warn("Could not find aborting transaction {}", snapshot.getIdentifier());
264         }
265     }
266
267     /**
268      * Callback invoked from {@link AbstractTransactionCommitCohort} when a child transaction has been completed
269      * and all its state can be removed.
270      *
271      * @param txId transaction identifier
272      */
273     synchronized void onTransactionComplete(final TransactionIdentifier txId) {
274         if (readyTransactions.remove(txId) == null) {
275             LOG.warn("Could not find completed transaction {}", txId);
276         }
277     }
278
279     HistoryReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConn) {
280         /*
281          * This looks ugly and unusual and there is a reason for that, as the locking involved is in multiple places.
282          *
283          * We need to make sure that a new proxy is not created while we are reconnecting, which is partially satisfied
284          * by client.getConnection() throwing InversibleLockException by the time this method is invoked. That does
285          * not cover the case when createHistoryProxy() has already acquired the connection, but has not yet populated
286          * the history map.
287          *
288          * Hence we need to make sure no potential computation is happening concurrently with us looking at the history
289          * map. Once we have performed that lookup, though, we can release the lock immediately, as all creation
290          * requests are established to happen either before or after the reconnect attempt.
291          */
292         final ProxyHistory oldProxy;
293         final long stamp = lock.writeLock();
294         try {
295             oldProxy = histories.get(newConn.cookie());
296         } finally {
297             lock.unlockWrite(stamp);
298         }
299
300         if (oldProxy == null) {
301             return null;
302         }
303
304         final ProxyReconnectCohort proxy = verifyNotNull(oldProxy.startReconnect(newConn));
305         return new HistoryReconnectCohort() {
306             @Override
307             ProxyReconnectCohort getProxy() {
308                 return proxy;
309             }
310
311             @Override
312             void replayRequests(final Collection<ConnectionEntry> previousEntries) {
313                 proxy.replayRequests(previousEntries);
314             }
315
316             @Override
317             public void close() {
318                 LOG.debug("Client history {} finishing reconnect to {}", AbstractClientHistory.this, newConn);
319                 final ProxyHistory newProxy = proxy.finishReconnect();
320                 if (!histories.replace(newConn.cookie(), oldProxy, newProxy)) {
321                     LOG.warn("Failed to replace proxy {} with {} in {}", oldProxy, newProxy,
322                         AbstractClientHistory.this);
323                 }
324             }
325         };
326     }
327
328 }