2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
13 import akka.actor.ActorRef;
14 import com.google.common.base.Verify;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import java.util.Collection;
17 import java.util.Iterator;
18 import java.util.LinkedHashMap;
20 import java.util.Optional;
21 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
22 import java.util.concurrent.locks.Lock;
23 import java.util.concurrent.locks.ReentrantLock;
24 import java.util.function.Consumer;
25 import org.checkerframework.checker.lock.qual.GuardedBy;
26 import org.checkerframework.checker.lock.qual.Holding;
27 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
28 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
29 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
30 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
31 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
32 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
33 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
34 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
35 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
36 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
37 import org.opendaylight.controller.cluster.access.concepts.Request;
38 import org.opendaylight.controller.cluster.access.concepts.RequestException;
39 import org.opendaylight.controller.cluster.access.concepts.Response;
40 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
41 import org.opendaylight.yangtools.concepts.Identifiable;
42 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
43 import org.opendaylight.yangtools.yang.data.api.schema.tree.ReadOnlyDataTree;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
48 * Per-connection representation of a local history. This class handles state replication across a single connection.
50 * @author Robert Varga
52 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
53 private abstract static class AbstractLocal extends ProxyHistory {
54 private final ReadOnlyDataTree dataTree;
56 AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
57 final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) {
58 super(parent, connection, identifier);
59 this.dataTree = requireNonNull(dataTree);
62 final DataTreeSnapshot takeSnapshot() {
63 return dataTree.takeSnapshot();
67 private abstract static class AbstractRemote extends ProxyHistory {
68 AbstractRemote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
69 final LocalHistoryIdentifier identifier) {
70 super(parent, connection, identifier);
74 private static final class Local extends AbstractLocal {
75 private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
76 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
78 // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
79 // the open one and attempts to create a new transaction again.
80 private LocalReadWriteProxyTransaction lastOpen;
82 private volatile LocalReadWriteProxyTransaction lastSealed;
84 Local(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
85 final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) {
86 super(parent, connection, identifier, dataTree);
90 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
91 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
92 checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
95 // Done transactions do not register on our radar on should not have any state associated.
96 return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId)
97 : new LocalReadWriteProxyTransaction(this, txId);
100 // onTransactionCompleted() runs concurrently
101 final LocalReadWriteProxyTransaction localSealed = lastSealed;
102 final DataTreeSnapshot baseSnapshot;
103 if (localSealed != null) {
104 baseSnapshot = localSealed.getSnapshot();
106 baseSnapshot = takeSnapshot();
110 return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
113 lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
114 LOG.debug("Proxy {} open transaction {}", this, lastOpen);
119 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
120 return createClient(parent(), connection, getIdentifier());
124 void onTransactionAborted(final AbstractProxyTransaction tx) {
125 if (tx.equals(lastOpen)) {
131 void onTransactionCompleted(final AbstractProxyTransaction tx) {
132 Verify.verify(tx instanceof LocalProxyTransaction);
133 if (tx instanceof LocalReadWriteProxyTransaction
134 && LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
135 LOG.debug("Completed last sealed transaction {}", tx);
140 void onTransactionSealed(final AbstractProxyTransaction tx) {
141 checkState(tx.equals(lastOpen));
142 lastSealed = lastOpen;
147 private static final class LocalSingle extends AbstractLocal {
148 LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
149 final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) {
150 super(parent, connection, identifier, dataTree);
154 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
155 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
156 final DataTreeSnapshot snapshot = takeSnapshot();
157 return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
158 new LocalReadWriteProxyTransaction(this, txId, snapshot);
162 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
163 return createSingle(parent(), connection, getIdentifier());
167 private static final class Remote extends AbstractRemote {
168 Remote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
169 final LocalHistoryIdentifier identifier) {
170 super(parent, connection, identifier);
174 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
175 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
176 return new RemoteProxyTransaction(this, txId, snapshotOnly, true, isDone);
180 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
181 return createClient(parent(), connection, getIdentifier());
185 private static final class RemoteSingle extends AbstractRemote {
186 RemoteSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
187 final LocalHistoryIdentifier identifier) {
188 super(parent, connection, identifier);
192 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
193 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
194 return new RemoteProxyTransaction(this, txId, snapshotOnly, false, isDone);
198 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
199 return createSingle(parent(), connection, getIdentifier());
203 private static final class RequestReplayException extends RequestException {
204 private static final long serialVersionUID = 1L;
206 RequestReplayException(final String format, final Object... args) {
207 super(String.format(format, args));
211 public boolean isRetriable() {
216 private final class ReconnectCohort extends ProxyReconnectCohort {
218 public LocalHistoryIdentifier getIdentifier() {
224 void replayRequests(final Collection<ConnectionEntry> previousEntries) {
225 // First look for our Create message
226 Iterator<ConnectionEntry> it = previousEntries.iterator();
227 while (it.hasNext()) {
228 final ConnectionEntry e = it.next();
229 final Request<?, ?> req = e.getRequest();
230 if (identifier.equals(req.getTarget())) {
231 Verify.verify(req instanceof LocalHistoryRequest);
232 if (req instanceof CreateLocalHistoryRequest) {
233 successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
240 for (AbstractProxyTransaction t : proxies.values()) {
241 LOG.debug("{} replaying messages to old proxy {} towards successor {}", identifier, t, successor);
242 t.replayMessages(successor, previousEntries);
245 // Now look for any finalizing messages
246 it = previousEntries.iterator();
247 while (it.hasNext()) {
248 final ConnectionEntry e = it.next();
249 final Request<?, ?> req = e.getRequest();
250 if (identifier.equals(req.getTarget())) {
251 Verify.verify(req instanceof LocalHistoryRequest);
252 if (req instanceof DestroyLocalHistoryRequest) {
253 successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
263 ProxyHistory finishReconnect() {
264 final ProxyHistory ret = Verify.verifyNotNull(successor);
266 for (AbstractProxyTransaction t : proxies.values()) {
270 LOG.debug("Finished reconnecting proxy history {}", this);
276 void replayEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> replayTo)
277 throws RequestException {
278 final Request<?, ?> request = entry.getRequest();
279 if (request instanceof TransactionRequest) {
280 lookupProxy(request).replayRequest((TransactionRequest<?>) request, entry.getCallback(),
281 entry.getEnqueuedTicks());
282 } else if (request instanceof LocalHistoryRequest) {
283 replayTo.accept(entry);
285 throw new IllegalArgumentException("Unhandled request " + request);
290 void forwardEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> forwardTo)
291 throws RequestException {
292 final Request<?, ?> request = entry.getRequest();
293 if (request instanceof TransactionRequest) {
294 lookupProxy(request).forwardRequest((TransactionRequest<?>) request, entry.getCallback());
295 } else if (request instanceof LocalHistoryRequest) {
296 forwardTo.accept(entry);
298 throw new IllegalArgumentException("Unhandled request " + request);
302 private AbstractProxyTransaction lookupProxy(final Request<?, ?> request)
303 throws RequestReplayException {
304 final AbstractProxyTransaction proxy;
307 proxy = proxies.get(request.getTarget());
315 throw new RequestReplayException("Failed to find proxy for %s", request);
319 private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
321 private final Lock lock = new ReentrantLock();
322 private final LocalHistoryIdentifier identifier;
323 private final AbstractClientConnection<ShardBackendInfo> connection;
324 private final AbstractClientHistory parent;
327 private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
329 private ProxyHistory successor;
331 private ProxyHistory(final AbstractClientHistory parent,
332 final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
333 this.parent = requireNonNull(parent);
334 this.connection = requireNonNull(connection);
335 this.identifier = requireNonNull(identifier);
338 static ProxyHistory createClient(final AbstractClientHistory parent,
339 final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
340 final Optional<ReadOnlyDataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
341 return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get())
342 : new Remote(parent, connection, identifier);
345 static ProxyHistory createSingle(final AbstractClientHistory parent,
346 final AbstractClientConnection<ShardBackendInfo> connection,
347 final LocalHistoryIdentifier identifier) {
348 final Optional<ReadOnlyDataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
349 return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get())
350 : new RemoteSingle(parent, connection, identifier);
354 public LocalHistoryIdentifier getIdentifier() {
358 final ClientActorContext context() {
359 return connection.context();
362 final long currentTime() {
363 return connection.currentTime();
366 final ActorRef localActor() {
367 return connection.localActor();
370 final AbstractClientHistory parent() {
374 final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
375 final boolean snapshotOnly) {
376 return createTransactionProxy(txId, snapshotOnly, false);
379 AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, final boolean snapshotOnly,
380 final boolean isDone) {
383 if (successor != null) {
384 return successor.createTransactionProxy(txId, snapshotOnly, isDone);
387 final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
388 final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly, isDone);
389 proxies.put(proxyId, ret);
390 LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
397 final void abortTransaction(final AbstractProxyTransaction tx) {
400 // Removal will be completed once purge completes
401 LOG.debug("Proxy {} aborted transaction {}", this, tx);
402 onTransactionAborted(tx);
408 final void completeTransaction(final AbstractProxyTransaction tx) {
411 // Removal will be completed once purge completes
412 LOG.debug("Proxy {} completing transaction {}", this, tx);
413 onTransactionCompleted(tx);
419 void purgeTransaction(final AbstractProxyTransaction tx) {
422 proxies.remove(tx.getIdentifier());
423 LOG.debug("Proxy {} purged transaction {}", this, tx);
432 if (successor != null) {
437 LOG.debug("Proxy {} invoking destroy", this);
438 connection.sendRequest(new DestroyLocalHistoryRequest(getIdentifier(), 1, localActor()),
439 this::onDestroyComplete);
445 final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
446 final long enqueuedTicks) {
447 connection.enqueueRequest(request, callback, enqueuedTicks);
450 final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
451 connection.sendRequest(request, callback);
455 @SuppressWarnings("checkstyle:hiddenField")
456 abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
457 TransactionIdentifier txId, boolean snapshotOnly, boolean isDone);
459 @SuppressWarnings("checkstyle:hiddenField")
460 abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
462 @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
463 ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
465 if (successor != null) {
467 throw new IllegalStateException("Proxy history " + this + " already has a successor");
470 successor = createSuccessor(newConnection);
471 LOG.debug("History {} instantiated successor {}", this, successor);
473 for (AbstractProxyTransaction t : proxies.values()) {
477 return new ReconnectCohort();
480 private void onDestroyComplete(final Response<?, ?> response) {
481 LOG.debug("Proxy {} destroy completed with {}", this, response);
485 parent.onProxyDestroyed(this);
486 connection.sendRequest(new PurgeLocalHistoryRequest(getIdentifier(), 2, localActor()),
487 this::onPurgeComplete);
493 private void onPurgeComplete(final Response<?, ?> response) {
494 LOG.debug("Proxy {} purge completed with {}", this, response);
498 void onTransactionAborted(final AbstractProxyTransaction tx) {
499 // No-op for most implementations
503 void onTransactionCompleted(final AbstractProxyTransaction tx) {
504 // No-op for most implementations
507 void onTransactionSealed(final AbstractProxyTransaction tx) {
508 // No-op on most implementations