2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import static com.google.common.base.Preconditions.checkState;
11 import static com.google.common.base.Verify.verify;
12 import static com.google.common.base.Verify.verifyNotNull;
13 import static java.util.Objects.requireNonNull;
15 import akka.actor.ActorRef;
16 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
17 import java.util.Collection;
18 import java.util.Iterator;
19 import java.util.LinkedHashMap;
21 import java.util.Optional;
22 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
23 import java.util.concurrent.locks.Lock;
24 import java.util.concurrent.locks.ReentrantLock;
25 import java.util.function.Consumer;
26 import org.checkerframework.checker.lock.qual.GuardedBy;
27 import org.checkerframework.checker.lock.qual.Holding;
28 import org.eclipse.jdt.annotation.NonNull;
29 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
30 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
31 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
32 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
33 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
34 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
35 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
36 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
37 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
38 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
39 import org.opendaylight.controller.cluster.access.concepts.Request;
40 import org.opendaylight.controller.cluster.access.concepts.RequestException;
41 import org.opendaylight.controller.cluster.access.concepts.Response;
42 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
43 import org.opendaylight.yangtools.concepts.Identifiable;
44 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
45 import org.opendaylight.yangtools.yang.data.api.schema.tree.ReadOnlyDataTree;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
50 * Per-connection representation of a local history. This class handles state replication across a single connection.
52 * @author Robert Varga
54 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
55 private abstract static class AbstractLocal extends ProxyHistory {
56 private final ReadOnlyDataTree dataTree;
58 AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
59 final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) {
60 super(parent, connection, identifier);
61 this.dataTree = requireNonNull(dataTree);
64 final DataTreeSnapshot takeSnapshot() {
65 return dataTree.takeSnapshot();
69 private abstract static class AbstractRemote extends ProxyHistory {
70 AbstractRemote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
71 final LocalHistoryIdentifier identifier) {
72 super(parent, connection, identifier);
76 private static final class Local extends AbstractLocal {
77 private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
78 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
80 // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
81 // the open one and attempts to create a new transaction again.
82 private LocalReadWriteProxyTransaction lastOpen;
84 private volatile LocalReadWriteProxyTransaction lastSealed;
86 Local(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
87 final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) {
88 super(parent, connection, identifier, dataTree);
92 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
93 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
94 checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
97 // Done transactions do not register on our radar on should not have any state associated.
98 return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId)
99 : new LocalReadWriteProxyTransaction(this, txId);
102 // onTransactionCompleted() runs concurrently
103 final LocalReadWriteProxyTransaction localSealed = lastSealed;
104 final DataTreeSnapshot baseSnapshot;
105 if (localSealed != null) {
106 baseSnapshot = localSealed.getSnapshot();
108 baseSnapshot = takeSnapshot();
112 return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
115 lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
116 LOG.debug("Proxy {} open transaction {}", this, lastOpen);
121 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
122 return createClient(parent(), connection, getIdentifier());
126 void onTransactionAborted(final AbstractProxyTransaction tx) {
127 if (tx.equals(lastOpen)) {
133 void onTransactionCompleted(final AbstractProxyTransaction tx) {
134 verify(tx instanceof LocalProxyTransaction, "Unexpected transaction %s", tx);
135 if (tx instanceof LocalReadWriteProxyTransaction
136 && LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
137 LOG.debug("Completed last sealed transaction {}", tx);
142 void onTransactionSealed(final AbstractProxyTransaction tx) {
143 checkState(tx.equals(lastOpen));
144 lastSealed = lastOpen;
149 private static final class LocalSingle extends AbstractLocal {
150 LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
151 final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) {
152 super(parent, connection, identifier, dataTree);
156 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
157 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
158 final DataTreeSnapshot snapshot = takeSnapshot();
159 return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
160 new LocalReadWriteProxyTransaction(this, txId, snapshot);
164 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
165 return createSingle(parent(), connection, getIdentifier());
169 private static final class Remote extends AbstractRemote {
170 Remote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
171 final LocalHistoryIdentifier identifier) {
172 super(parent, connection, identifier);
176 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
177 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
178 return new RemoteProxyTransaction(this, txId, snapshotOnly, true, isDone);
182 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
183 return createClient(parent(), connection, getIdentifier());
187 private static final class RemoteSingle extends AbstractRemote {
188 RemoteSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
189 final LocalHistoryIdentifier identifier) {
190 super(parent, connection, identifier);
194 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
195 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
196 return new RemoteProxyTransaction(this, txId, snapshotOnly, false, isDone);
200 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
201 return createSingle(parent(), connection, getIdentifier());
205 private static final class RequestReplayException extends RequestException {
206 private static final long serialVersionUID = 1L;
208 RequestReplayException(final String format, final Object... args) {
209 super(String.format(format, args));
213 public boolean isRetriable() {
218 private final class ReconnectCohort extends ProxyReconnectCohort {
220 public LocalHistoryIdentifier getIdentifier() {
226 void replayRequests(final Collection<ConnectionEntry> previousEntries) {
227 // First look for our Create message
228 Iterator<ConnectionEntry> it = previousEntries.iterator();
229 while (it.hasNext()) {
230 final ConnectionEntry e = it.next();
231 final Request<?, ?> req = e.getRequest();
232 if (identifier.equals(req.getTarget())) {
233 verify(req instanceof LocalHistoryRequest, "Unexpected request %s", req);
234 if (req instanceof CreateLocalHistoryRequest) {
235 successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
242 for (AbstractProxyTransaction t : proxies.values()) {
243 LOG.debug("{} replaying messages to old proxy {} towards successor {}", identifier, t, successor);
244 t.replayMessages(successor, previousEntries);
247 // Now look for any finalizing messages
248 it = previousEntries.iterator();
249 while (it.hasNext()) {
250 final ConnectionEntry e = it.next();
251 final Request<?, ?> req = e.getRequest();
252 if (identifier.equals(req.getTarget())) {
253 verify(req instanceof LocalHistoryRequest, "Unexpected request %s", req);
254 if (req instanceof DestroyLocalHistoryRequest) {
255 successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
265 ProxyHistory finishReconnect() {
266 final ProxyHistory ret = verifyNotNull(successor);
268 for (AbstractProxyTransaction t : proxies.values()) {
272 LOG.debug("Finished reconnecting proxy history {}", this);
278 void replayEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> replayTo)
279 throws RequestException {
280 final Request<?, ?> request = entry.getRequest();
281 if (request instanceof TransactionRequest) {
282 lookupProxy(request).replayRequest((TransactionRequest<?>) request, entry.getCallback(),
283 entry.getEnqueuedTicks());
284 } else if (request instanceof LocalHistoryRequest) {
285 replayTo.accept(entry);
287 throw new IllegalArgumentException("Unhandled request " + request);
292 void forwardEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> forwardTo)
293 throws RequestException {
294 final Request<?, ?> request = entry.getRequest();
295 if (request instanceof TransactionRequest) {
296 lookupProxy(request).forwardRequest((TransactionRequest<?>) request, entry.getCallback());
297 } else if (request instanceof LocalHistoryRequest) {
298 forwardTo.accept(entry);
300 throw new IllegalArgumentException("Unhandled request " + request);
304 private AbstractProxyTransaction lookupProxy(final Request<?, ?> request)
305 throws RequestReplayException {
306 final AbstractProxyTransaction proxy;
309 proxy = proxies.get(request.getTarget());
317 throw new RequestReplayException("Failed to find proxy for %s", request);
321 private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
323 private final Lock lock = new ReentrantLock();
324 private final @NonNull LocalHistoryIdentifier identifier;
325 private final @NonNull AbstractClientConnection<ShardBackendInfo> connection;
326 private final @NonNull AbstractClientHistory parent;
329 private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
331 private ProxyHistory successor;
333 private ProxyHistory(final AbstractClientHistory parent,
334 final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
335 this.parent = requireNonNull(parent);
336 this.connection = requireNonNull(connection);
337 this.identifier = requireNonNull(identifier);
340 static ProxyHistory createClient(final AbstractClientHistory parent,
341 final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
342 final Optional<ReadOnlyDataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
343 return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get())
344 : new Remote(parent, connection, identifier);
347 static ProxyHistory createSingle(final AbstractClientHistory parent,
348 final AbstractClientConnection<ShardBackendInfo> connection,
349 final LocalHistoryIdentifier identifier) {
350 final Optional<ReadOnlyDataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
351 return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get())
352 : new RemoteSingle(parent, connection, identifier);
356 // Non-final for mocking
357 public LocalHistoryIdentifier getIdentifier() {
361 final ClientActorContext context() {
362 return connection.context();
365 final long currentTime() {
366 return connection.currentTime();
369 final ActorRef localActor() {
370 return connection.localActor();
373 final AbstractClientHistory parent() {
377 final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
378 final boolean snapshotOnly) {
379 return createTransactionProxy(txId, snapshotOnly, false);
382 // Non-final for mocking
383 AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, final boolean snapshotOnly,
384 final boolean isDone) {
387 if (successor != null) {
388 return successor.createTransactionProxy(txId, snapshotOnly, isDone);
391 final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
392 final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly, isDone);
393 proxies.put(proxyId, ret);
394 LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
401 final void abortTransaction(final AbstractProxyTransaction tx) {
404 // Removal will be completed once purge completes
405 LOG.debug("Proxy {} aborted transaction {}", this, tx);
406 onTransactionAborted(tx);
412 final void completeTransaction(final AbstractProxyTransaction tx) {
415 // Removal will be completed once purge completes
416 LOG.debug("Proxy {} completing transaction {}", this, tx);
417 onTransactionCompleted(tx);
423 final void purgeTransaction(final AbstractProxyTransaction tx) {
426 proxies.remove(tx.getIdentifier());
427 LOG.debug("Proxy {} purged transaction {}", this, tx);
436 if (successor != null) {
441 LOG.debug("Proxy {} invoking destroy", this);
442 connection.sendRequest(new DestroyLocalHistoryRequest(getIdentifier(), 1, localActor()),
443 this::onDestroyComplete);
449 final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
450 final long enqueuedTicks) {
451 connection.enqueueRequest(request, callback, enqueuedTicks);
454 final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
455 connection.sendRequest(request, callback);
459 @SuppressWarnings("checkstyle:hiddenField")
460 abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
461 TransactionIdentifier txId, boolean snapshotOnly, boolean isDone);
464 @SuppressWarnings("checkstyle:hiddenField")
465 abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
467 @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
468 final ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
470 if (successor != null) {
472 throw new IllegalStateException("Proxy history " + this + " already has a successor");
475 successor = createSuccessor(newConnection);
476 LOG.debug("History {} instantiated successor {}", this, successor);
478 for (AbstractProxyTransaction t : proxies.values()) {
482 return new ReconnectCohort();
485 private void onDestroyComplete(final Response<?, ?> response) {
486 LOG.debug("Proxy {} destroy completed with {}", this, response);
490 parent.onProxyDestroyed(this);
491 connection.sendRequest(new PurgeLocalHistoryRequest(getIdentifier(), 2, localActor()),
492 this::onPurgeComplete);
498 private void onPurgeComplete(final Response<?, ?> response) {
499 LOG.debug("Proxy {} purge completed with {}", this, response);
503 void onTransactionAborted(final AbstractProxyTransaction tx) {
504 // No-op for most implementations
508 void onTransactionCompleted(final AbstractProxyTransaction tx) {
509 // No-op for most implementations
513 void onTransactionSealed(final AbstractProxyTransaction tx) {
514 // No-op on most implementations