BUG-8403: guard against ConcurrentModificationException
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ProxyHistory.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import akka.actor.ActorRef;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14 import java.util.Collection;
15 import java.util.Iterator;
16 import java.util.LinkedHashMap;
17 import java.util.Map;
18 import java.util.Optional;
19 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
20 import java.util.concurrent.locks.Lock;
21 import java.util.concurrent.locks.ReentrantLock;
22 import java.util.function.BiConsumer;
23 import java.util.function.Consumer;
24 import javax.annotation.concurrent.GuardedBy;
25 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
26 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
27 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
28 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
29 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
30 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
31 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
32 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
33 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
34 import org.opendaylight.controller.cluster.access.concepts.Request;
35 import org.opendaylight.controller.cluster.access.concepts.RequestException;
36 import org.opendaylight.controller.cluster.access.concepts.Response;
37 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
38 import org.opendaylight.yangtools.concepts.Identifiable;
39 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
40 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 /**
45  * Per-connection representation of a local history. This class handles state replication across a single connection.
46  *
47  * @author Robert Varga
48  */
49 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
50     private abstract static class AbstractLocal extends ProxyHistory {
51         private final DataTree dataTree;
52
53         AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
54             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
55             super(parent, connection, identifier);
56             this.dataTree = Preconditions.checkNotNull(dataTree);
57         }
58
59         final DataTreeSnapshot takeSnapshot() {
60             return dataTree.takeSnapshot();
61         }
62     }
63
64     private abstract static class AbstractRemote extends ProxyHistory {
65         AbstractRemote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
66             final LocalHistoryIdentifier identifier) {
67             super(parent, connection, identifier);
68         }
69     }
70
71     private static final class Local extends AbstractLocal {
72         private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
73                 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
74
75         // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
76         // the open one and attempts to create a new transaction again.
77         private LocalReadWriteProxyTransaction lastOpen;
78
79         private volatile LocalReadWriteProxyTransaction lastSealed;
80
81         Local(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
82             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
83             super(parent, connection, identifier, dataTree);
84         }
85
86         @Override
87         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
88                 final TransactionIdentifier txId, final boolean snapshotOnly) {
89             Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
90
91             // onTransactionCompleted() runs concurrently
92             final LocalReadWriteProxyTransaction localSealed = lastSealed;
93             final DataTreeSnapshot baseSnapshot;
94             if (localSealed != null) {
95                 baseSnapshot = localSealed.getSnapshot();
96             } else {
97                 baseSnapshot = takeSnapshot();
98             }
99
100             if (snapshotOnly) {
101                 return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
102             }
103
104             lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
105             LOG.debug("Proxy {} open transaction {}", this, lastOpen);
106             return lastOpen;
107         }
108
109         @Override
110         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
111             return createClient(parent(), connection, getIdentifier());
112         }
113
114         @Override
115         void onTransactionAborted(final AbstractProxyTransaction tx) {
116             if (tx.equals(lastOpen)) {
117                 lastOpen = null;
118             }
119         }
120
121         @Override
122         void onTransactionCompleted(final AbstractProxyTransaction tx) {
123             Verify.verify(tx instanceof LocalProxyTransaction);
124             if (tx instanceof LocalReadWriteProxyTransaction) {
125                 if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
126                     LOG.debug("Completed last sealed transaction {}", tx);
127                 }
128             }
129         }
130
131         @Override
132         void onTransactionSealed(final AbstractProxyTransaction tx) {
133             Preconditions.checkState(tx.equals(lastOpen));
134             lastSealed = lastOpen;
135             lastOpen = null;
136         }
137     }
138
139     private static final class LocalSingle extends AbstractLocal {
140         LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
141             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
142             super(parent, connection, identifier, dataTree);
143         }
144
145         @Override
146         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
147                 final TransactionIdentifier txId, final boolean snapshotOnly) {
148             final DataTreeSnapshot snapshot = takeSnapshot();
149             return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
150                 new LocalReadWriteProxyTransaction(this, txId, snapshot);
151         }
152
153         @Override
154         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
155             return createSingle(parent(), connection, getIdentifier());
156         }
157     }
158
159     private static final class Remote extends AbstractRemote {
160         Remote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
161             final LocalHistoryIdentifier identifier) {
162             super(parent, connection, identifier);
163         }
164
165         @Override
166         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
167                 final TransactionIdentifier txId, final boolean snapshotOnly) {
168             return new RemoteProxyTransaction(this, txId, snapshotOnly, true);
169         }
170
171         @Override
172         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
173             return createClient(parent(), connection, getIdentifier());
174         }
175     }
176
177     private static final class RemoteSingle extends AbstractRemote {
178         RemoteSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
179             final LocalHistoryIdentifier identifier) {
180             super(parent, connection, identifier);
181         }
182
183         @Override
184         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
185                 final TransactionIdentifier txId, final boolean snapshotOnly) {
186             return new RemoteProxyTransaction(this, txId, snapshotOnly, false);
187         }
188
189         @Override
190         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
191             return createSingle(parent(), connection, getIdentifier());
192         }
193     }
194
195     private static final class RequestReplayException extends RequestException {
196         private static final long serialVersionUID = 1L;
197
198         RequestReplayException(final String format, final Object... args) {
199             super(String.format(format, args));
200         }
201
202         @Override
203         public boolean isRetriable() {
204             return false;
205         }
206     }
207
208     private final class ReconnectCohort extends ProxyReconnectCohort {
209         @Override
210         public LocalHistoryIdentifier getIdentifier() {
211             return identifier;
212         }
213
214         @GuardedBy("lock")
215         @Override
216         void replayRequests(final Collection<ConnectionEntry> previousEntries) {
217             // First look for our Create message
218             Iterator<ConnectionEntry> it = previousEntries.iterator();
219             while (it.hasNext()) {
220                 final ConnectionEntry e = it.next();
221                 final Request<?, ?> req = e.getRequest();
222                 if (identifier.equals(req.getTarget())) {
223                     Verify.verify(req instanceof LocalHistoryRequest);
224                     if (req instanceof CreateLocalHistoryRequest) {
225                         successor.connection.sendRequest(req, e.getCallback());
226                         it.remove();
227                         break;
228                     }
229                 }
230             }
231
232             for (AbstractProxyTransaction t : proxies.values()) {
233                 LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
234                 final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier(),
235                     t.isSnapshotOnly());
236                 LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
237                 t.replayMessages(newProxy, previousEntries);
238             }
239
240             // Now look for any finalizing messages
241             it = previousEntries.iterator();
242             while (it.hasNext()) {
243                 final ConnectionEntry e  = it.next();
244                 final Request<?, ?> req = e.getRequest();
245                 if (identifier.equals(req.getTarget())) {
246                     Verify.verify(req instanceof LocalHistoryRequest);
247                     if (req instanceof DestroyLocalHistoryRequest) {
248                         successor.connection.sendRequest(req, e.getCallback());
249                         it.remove();
250                         break;
251                     }
252                 }
253             }
254         }
255
256         @GuardedBy("lock")
257         @Override
258         ProxyHistory finishReconnect() {
259             final ProxyHistory ret = Verify.verifyNotNull(successor);
260
261             for (AbstractProxyTransaction t : proxies.values()) {
262                 t.finishReconnect();
263             }
264
265             LOG.debug("Finished reconnecting proxy history {}", this);
266             lock.unlock();
267             return ret;
268         }
269
270         @Override
271         void forwardRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
272                 final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> forwardTo) throws RequestException {
273             // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
274             //        period required to get into the queue.
275             if (request instanceof TransactionRequest) {
276                 forwardTransactionRequest((TransactionRequest<?>) request, callback);
277             } else if (request instanceof LocalHistoryRequest) {
278                 forwardTo.accept(request, callback);
279             } else {
280                 throw new IllegalArgumentException("Unhandled request " + request);
281             }
282         }
283
284         private void forwardTransactionRequest(final TransactionRequest<?> request,
285                 final Consumer<Response<?, ?>> callback) throws RequestException {
286
287             final AbstractProxyTransaction proxy;
288             lock.lock();
289             try {
290                 proxy = proxies.get(request.getTarget());
291             } finally {
292                 lock.unlock();
293             }
294             if (proxy == null) {
295                 throw new RequestReplayException("Failed to find proxy for %s", request);
296             }
297
298             proxy.forwardRequest(request, callback);
299         }
300     }
301
302     private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
303
304     private final Lock lock = new ReentrantLock();
305     private final LocalHistoryIdentifier identifier;
306     private final AbstractClientConnection<ShardBackendInfo> connection;
307     private final AbstractClientHistory parent;
308
309     @GuardedBy("lock")
310     private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
311     @GuardedBy("lock")
312     private ProxyHistory successor;
313
314     private ProxyHistory(final AbstractClientHistory parent,
315             final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
316         this.parent = Preconditions.checkNotNull(parent);
317         this.connection = Preconditions.checkNotNull(connection);
318         this.identifier = Preconditions.checkNotNull(identifier);
319     }
320
321     static ProxyHistory createClient(final AbstractClientHistory parent,
322             final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
323         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
324         return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get())
325              : new Remote(parent, connection, identifier);
326     }
327
328     static ProxyHistory createSingle(final AbstractClientHistory parent,
329             final AbstractClientConnection<ShardBackendInfo> connection,
330             final LocalHistoryIdentifier identifier) {
331         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
332         return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get())
333              : new RemoteSingle(parent, connection, identifier);
334     }
335
336     @Override
337     public LocalHistoryIdentifier getIdentifier() {
338         return identifier;
339     }
340
341     final long currentTime() {
342         return connection.currentTime();
343     }
344
345     final ActorRef localActor() {
346         return connection.localActor();
347     }
348
349     final AbstractClientHistory parent() {
350         return parent;
351     }
352
353     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
354             final boolean snapshotOnly) {
355         lock.lock();
356         try {
357             if (successor != null) {
358                 return successor.createTransactionProxy(txId, snapshotOnly);
359             }
360
361             final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
362             final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly);
363             proxies.put(proxyId, ret);
364             LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
365             return ret;
366         } finally {
367             lock.unlock();
368         }
369     }
370
371     final void abortTransaction(final AbstractProxyTransaction tx) {
372         lock.lock();
373         try {
374             proxies.remove(tx.getIdentifier());
375             LOG.debug("Proxy {} aborting transaction {}", this, tx);
376             onTransactionAborted(tx);
377         } finally {
378             lock.unlock();
379         }
380     }
381
382     final void completeTransaction(final AbstractProxyTransaction tx) {
383         lock.lock();
384         try {
385             proxies.remove(tx.getIdentifier());
386             LOG.debug("Proxy {} completing transaction {}", this, tx);
387             onTransactionCompleted(tx);
388         } finally {
389             lock.unlock();
390         }
391     }
392
393     final void close() {
394         lock.lock();
395         try {
396             if (successor != null) {
397                 successor.close();
398                 return;
399             }
400
401             LOG.debug("Proxy {} invoking destroy", this);
402             connection.sendRequest(new DestroyLocalHistoryRequest(getIdentifier(), 1, localActor()),
403                 this::onDestroyComplete);
404         } finally {
405             lock.unlock();
406         }
407     }
408
409     final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
410             final long enqueuedTicks) {
411         connection.enqueueRequest(request, callback, enqueuedTicks);
412     }
413
414     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
415         connection.sendRequest(request, callback);
416     }
417
418     @GuardedBy("lock")
419     abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
420             TransactionIdentifier txId, boolean snapshotOnly);
421
422     abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
423
424     @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
425     ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
426         lock.lock();
427         if (successor != null) {
428             lock.unlock();
429             throw new IllegalStateException("Proxy history " + this + " already has a successor");
430         }
431
432         successor = createSuccessor(newConnection);
433         LOG.debug("History {} instantiated successor {}", this, successor);
434
435         for (AbstractProxyTransaction t : proxies.values()) {
436             t.startReconnect();
437         }
438
439         return new ReconnectCohort();
440     }
441
442     private void onDestroyComplete(final Response<?, ?> response) {
443         LOG.debug("Proxy {} destroy completed with {}", this, response);
444
445         lock.lock();
446         try {
447             parent.onProxyDestroyed(this);
448             connection.sendRequest(new PurgeLocalHistoryRequest(getIdentifier(), 2, localActor()),
449                 this::onPurgeComplete);
450         } finally {
451             lock.unlock();
452         }
453     }
454
455     private void onPurgeComplete(final Response<?, ?> response) {
456         LOG.debug("Proxy {} purge completed with {}", this, response);
457     }
458
459     @GuardedBy("lock")
460     void onTransactionAborted(final AbstractProxyTransaction tx) {
461         // No-op for most implementations
462     }
463
464     @GuardedBy("lock")
465     void onTransactionCompleted(final AbstractProxyTransaction tx) {
466         // No-op for most implementations
467     }
468
469     void onTransactionSealed(final AbstractProxyTransaction tx) {
470         // No-op on most implementations
471     }
472 }