2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import akka.actor.ActorRef;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14 import java.util.Collection;
15 import java.util.Iterator;
16 import java.util.LinkedHashMap;
18 import java.util.Optional;
19 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
20 import java.util.concurrent.locks.Lock;
21 import java.util.concurrent.locks.ReentrantLock;
22 import java.util.function.BiConsumer;
23 import java.util.function.Consumer;
24 import javax.annotation.concurrent.GuardedBy;
25 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
26 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
27 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
28 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
29 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
30 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
31 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
32 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
33 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
34 import org.opendaylight.controller.cluster.access.concepts.Request;
35 import org.opendaylight.controller.cluster.access.concepts.RequestException;
36 import org.opendaylight.controller.cluster.access.concepts.Response;
37 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
38 import org.opendaylight.yangtools.concepts.Identifiable;
39 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
40 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
45 * Per-connection representation of a local history. This class handles state replication across a single connection.
47 * @author Robert Varga
49 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
50 private abstract static class AbstractLocal extends ProxyHistory {
51 private final DataTree dataTree;
53 AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
54 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
55 super(parent, connection, identifier);
56 this.dataTree = Preconditions.checkNotNull(dataTree);
59 final DataTreeSnapshot takeSnapshot() {
60 return dataTree.takeSnapshot();
64 private abstract static class AbstractRemote extends ProxyHistory {
65 AbstractRemote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
66 final LocalHistoryIdentifier identifier) {
67 super(parent, connection, identifier);
71 private static final class Local extends AbstractLocal {
72 private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
73 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
75 // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
76 // the open one and attempts to create a new transaction again.
77 private LocalReadWriteProxyTransaction lastOpen;
79 private volatile LocalReadWriteProxyTransaction lastSealed;
81 Local(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
82 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
83 super(parent, connection, identifier, dataTree);
87 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
88 final TransactionIdentifier txId, final boolean snapshotOnly) {
89 Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
91 // onTransactionCompleted() runs concurrently
92 final LocalReadWriteProxyTransaction localSealed = lastSealed;
93 final DataTreeSnapshot baseSnapshot;
94 if (localSealed != null) {
95 baseSnapshot = localSealed.getSnapshot();
97 baseSnapshot = takeSnapshot();
101 return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
104 lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
105 LOG.debug("Proxy {} open transaction {}", this, lastOpen);
110 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
111 return createClient(parent(), connection, getIdentifier());
115 void onTransactionAborted(final AbstractProxyTransaction tx) {
116 if (tx.equals(lastOpen)) {
122 void onTransactionCompleted(final AbstractProxyTransaction tx) {
123 Verify.verify(tx instanceof LocalProxyTransaction);
124 if (tx instanceof LocalReadWriteProxyTransaction) {
125 if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
126 LOG.debug("Completed last sealed transaction {}", tx);
132 void onTransactionSealed(final AbstractProxyTransaction tx) {
133 Preconditions.checkState(tx.equals(lastOpen));
134 lastSealed = lastOpen;
139 private static final class LocalSingle extends AbstractLocal {
140 LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
141 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
142 super(parent, connection, identifier, dataTree);
146 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
147 final TransactionIdentifier txId, final boolean snapshotOnly) {
148 final DataTreeSnapshot snapshot = takeSnapshot();
149 return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
150 new LocalReadWriteProxyTransaction(this, txId, snapshot);
154 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
155 return createSingle(parent(), connection, getIdentifier());
159 private static final class Remote extends AbstractRemote {
160 Remote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
161 final LocalHistoryIdentifier identifier) {
162 super(parent, connection, identifier);
166 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
167 final TransactionIdentifier txId, final boolean snapshotOnly) {
168 return new RemoteProxyTransaction(this, txId, snapshotOnly, true);
172 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
173 return createClient(parent(), connection, getIdentifier());
177 private static final class RemoteSingle extends AbstractRemote {
178 RemoteSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
179 final LocalHistoryIdentifier identifier) {
180 super(parent, connection, identifier);
184 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
185 final TransactionIdentifier txId, final boolean snapshotOnly) {
186 return new RemoteProxyTransaction(this, txId, snapshotOnly, false);
190 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
191 return createSingle(parent(), connection, getIdentifier());
195 private static final class RequestReplayException extends RequestException {
196 private static final long serialVersionUID = 1L;
198 RequestReplayException(final String format, final Object... args) {
199 super(String.format(format, args));
203 public boolean isRetriable() {
208 private final class ReconnectCohort extends ProxyReconnectCohort {
210 public LocalHistoryIdentifier getIdentifier() {
216 void replayRequests(final Collection<ConnectionEntry> previousEntries) {
217 // First look for our Create message
218 Iterator<ConnectionEntry> it = previousEntries.iterator();
219 while (it.hasNext()) {
220 final ConnectionEntry e = it.next();
221 final Request<?, ?> req = e.getRequest();
222 if (identifier.equals(req.getTarget())) {
223 Verify.verify(req instanceof LocalHistoryRequest);
224 if (req instanceof CreateLocalHistoryRequest) {
225 successor.connection.sendRequest(req, e.getCallback());
232 for (AbstractProxyTransaction t : proxies.values()) {
233 LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
234 final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier(),
236 LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
237 t.replayMessages(newProxy, previousEntries);
240 // Now look for any finalizing messages
241 it = previousEntries.iterator();
242 while (it.hasNext()) {
243 final ConnectionEntry e = it.next();
244 final Request<?, ?> req = e.getRequest();
245 if (identifier.equals(req.getTarget())) {
246 Verify.verify(req instanceof LocalHistoryRequest);
247 if (req instanceof DestroyLocalHistoryRequest) {
248 successor.connection.sendRequest(req, e.getCallback());
258 ProxyHistory finishReconnect() {
259 final ProxyHistory ret = Verify.verifyNotNull(successor);
261 for (AbstractProxyTransaction t : proxies.values()) {
265 LOG.debug("Finished reconnecting proxy history {}", this);
271 void forwardRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
272 final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> forwardTo) throws RequestException {
273 // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
274 // period required to get into the queue.
275 if (request instanceof TransactionRequest) {
276 forwardTransactionRequest((TransactionRequest<?>) request, callback);
277 } else if (request instanceof LocalHistoryRequest) {
278 forwardTo.accept(request, callback);
280 throw new IllegalArgumentException("Unhandled request " + request);
284 private void forwardTransactionRequest(final TransactionRequest<?> request,
285 final Consumer<Response<?, ?>> callback) throws RequestException {
287 final AbstractProxyTransaction proxy;
290 proxy = proxies.get(request.getTarget());
295 throw new RequestReplayException("Failed to find proxy for %s", request);
298 proxy.forwardRequest(request, callback);
302 private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
304 private final Lock lock = new ReentrantLock();
305 private final LocalHistoryIdentifier identifier;
306 private final AbstractClientConnection<ShardBackendInfo> connection;
307 private final AbstractClientHistory parent;
310 private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
312 private ProxyHistory successor;
314 private ProxyHistory(final AbstractClientHistory parent,
315 final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
316 this.parent = Preconditions.checkNotNull(parent);
317 this.connection = Preconditions.checkNotNull(connection);
318 this.identifier = Preconditions.checkNotNull(identifier);
321 static ProxyHistory createClient(final AbstractClientHistory parent,
322 final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
323 final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
324 return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get())
325 : new Remote(parent, connection, identifier);
328 static ProxyHistory createSingle(final AbstractClientHistory parent,
329 final AbstractClientConnection<ShardBackendInfo> connection,
330 final LocalHistoryIdentifier identifier) {
331 final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
332 return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get())
333 : new RemoteSingle(parent, connection, identifier);
337 public LocalHistoryIdentifier getIdentifier() {
341 final long currentTime() {
342 return connection.currentTime();
345 final ActorRef localActor() {
346 return connection.localActor();
349 final AbstractClientHistory parent() {
353 final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
354 final boolean snapshotOnly) {
357 if (successor != null) {
358 return successor.createTransactionProxy(txId, snapshotOnly);
361 final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
362 final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly);
363 proxies.put(proxyId, ret);
364 LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
371 final void abortTransaction(final AbstractProxyTransaction tx) {
374 proxies.remove(tx.getIdentifier());
375 LOG.debug("Proxy {} aborting transaction {}", this, tx);
376 onTransactionAborted(tx);
382 final void completeTransaction(final AbstractProxyTransaction tx) {
385 proxies.remove(tx.getIdentifier());
386 LOG.debug("Proxy {} completing transaction {}", this, tx);
387 onTransactionCompleted(tx);
396 if (successor != null) {
401 LOG.debug("Proxy {} invoking destroy", this);
402 connection.sendRequest(new DestroyLocalHistoryRequest(getIdentifier(), 1, localActor()),
403 this::onDestroyComplete);
409 final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
410 final long enqueuedTicks) {
411 connection.enqueueRequest(request, callback, enqueuedTicks);
414 final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
415 connection.sendRequest(request, callback);
419 abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
420 TransactionIdentifier txId, boolean snapshotOnly);
422 abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
424 @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
425 ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
427 if (successor != null) {
429 throw new IllegalStateException("Proxy history " + this + " already has a successor");
432 successor = createSuccessor(newConnection);
433 LOG.debug("History {} instantiated successor {}", this, successor);
435 for (AbstractProxyTransaction t : proxies.values()) {
439 return new ReconnectCohort();
442 private void onDestroyComplete(final Response<?, ?> response) {
443 LOG.debug("Proxy {} destroy completed with {}", this, response);
447 parent.onProxyDestroyed(this);
448 connection.sendRequest(new PurgeLocalHistoryRequest(getIdentifier(), 2, localActor()),
449 this::onPurgeComplete);
455 private void onPurgeComplete(final Response<?, ?> response) {
456 LOG.debug("Proxy {} purge completed with {}", this, response);
460 void onTransactionAborted(final AbstractProxyTransaction tx) {
461 // No-op for most implementations
465 void onTransactionCompleted(final AbstractProxyTransaction tx) {
466 // No-op for most implementations
469 void onTransactionSealed(final AbstractProxyTransaction tx) {
470 // No-op on most implementations