2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import akka.actor.ActorRef;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14 import java.util.LinkedHashMap;
16 import java.util.Optional;
17 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
18 import java.util.concurrent.locks.Lock;
19 import java.util.concurrent.locks.ReentrantLock;
20 import java.util.function.BiConsumer;
21 import java.util.function.Consumer;
22 import javax.annotation.concurrent.GuardedBy;
23 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
24 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
25 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
26 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
27 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
28 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
29 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
30 import org.opendaylight.controller.cluster.access.concepts.Request;
31 import org.opendaylight.controller.cluster.access.concepts.RequestException;
32 import org.opendaylight.controller.cluster.access.concepts.Response;
33 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
34 import org.opendaylight.yangtools.concepts.Identifiable;
35 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
36 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
41 * Per-connection representation of a local history. This class handles state replication across a single connection.
43 * @author Robert Varga
45 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
46 private abstract static class AbstractLocal extends ProxyHistory {
47 private final DataTree dataTree;
49 AbstractLocal(final AbstractClientConnection<ShardBackendInfo> connection,
50 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
51 super(connection, identifier);
52 this.dataTree = Preconditions.checkNotNull(dataTree);
55 final DataTreeSnapshot takeSnapshot() {
56 return dataTree.takeSnapshot();
60 private abstract static class AbstractRemote extends ProxyHistory {
61 AbstractRemote(final AbstractClientConnection<ShardBackendInfo> connection,
62 final LocalHistoryIdentifier identifier) {
63 super(connection, identifier);
67 final AbstractProxyTransaction doCreateTransactionProxy(
68 final AbstractClientConnection<ShardBackendInfo> connection, final TransactionIdentifier txId,
69 final boolean snapshotOnly) {
70 return new RemoteProxyTransaction(this, txId, snapshotOnly);
74 private static final class Local extends AbstractLocal {
75 private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
76 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
78 // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
79 // the open one and attempts to create a new transaction again.
80 private LocalReadWriteProxyTransaction lastOpen;
82 private volatile LocalReadWriteProxyTransaction lastSealed;
84 Local(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier,
85 final DataTree dataTree) {
86 super(connection, identifier, dataTree);
90 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
91 final TransactionIdentifier txId, final boolean snapshotOnly) {
92 Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
94 // onTransactionCompleted() runs concurrently
95 final LocalReadWriteProxyTransaction localSealed = lastSealed;
96 final DataTreeSnapshot baseSnapshot;
97 if (localSealed != null) {
98 baseSnapshot = localSealed.getSnapshot();
100 baseSnapshot = takeSnapshot();
104 return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
107 lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
108 LOG.debug("Proxy {} open transaction {}", this, lastOpen);
113 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
114 return createClient(connection, getIdentifier());
118 void onTransactionAborted(final AbstractProxyTransaction tx) {
119 if (tx.equals(lastOpen)) {
125 void onTransactionCompleted(final AbstractProxyTransaction tx) {
126 Verify.verify(tx instanceof LocalProxyTransaction);
127 if (tx instanceof LocalReadWriteProxyTransaction) {
128 if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
129 LOG.debug("Completed last sealed transaction {}", tx);
135 void onTransactionSealed(final AbstractProxyTransaction tx) {
136 Preconditions.checkState(tx.equals(lastOpen));
137 lastSealed = lastOpen;
142 private static final class LocalSingle extends AbstractLocal {
143 LocalSingle(final AbstractClientConnection<ShardBackendInfo> connection,
144 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
145 super(connection, identifier, dataTree);
149 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
150 final TransactionIdentifier txId, final boolean snapshotOnly) {
151 final DataTreeSnapshot snapshot = takeSnapshot();
152 return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
153 new LocalReadWriteProxyTransaction(this, txId, snapshot);
157 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
158 return createSingle(connection, getIdentifier());
162 private static final class Remote extends AbstractRemote {
163 Remote(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
164 super(connection, identifier);
168 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
169 return createClient(connection, getIdentifier());
173 private static final class RemoteSingle extends AbstractRemote {
174 RemoteSingle(final AbstractClientConnection<ShardBackendInfo> connection,
175 final LocalHistoryIdentifier identifier) {
176 super(connection, identifier);
180 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
181 return createSingle(connection, getIdentifier());
185 private static final class RequestReplayException extends RequestException {
186 private static final long serialVersionUID = 1L;
188 RequestReplayException(final String format, final Object... args) {
189 super(String.format(format, args));
193 public boolean isRetriable() {
198 private final class ReconnectCohort extends ProxyReconnectCohort {
200 public LocalHistoryIdentifier getIdentifier() {
206 void replaySuccessfulRequests(final Iterable<ConnectionEntry> previousEntries) {
207 // First look for our Create message
208 for (ConnectionEntry e : previousEntries) {
209 final Request<?, ?> req = e.getRequest();
210 if (identifier.equals(req.getTarget())) {
211 Verify.verify(req instanceof LocalHistoryRequest);
212 if (req instanceof CreateLocalHistoryRequest) {
213 successor.connection.sendRequest(req, e.getCallback());
219 for (AbstractProxyTransaction t : proxies.values()) {
220 LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
221 final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier(),
223 LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
224 t.replayMessages(newProxy, previousEntries);
227 // Now look for any finalizing messages
228 for (ConnectionEntry e : previousEntries) {
229 final Request<?, ?> req = e.getRequest();
230 if (identifier.equals(req.getTarget())) {
231 Verify.verify(req instanceof LocalHistoryRequest);
232 successor.connection.sendRequest(req, e.getCallback());
239 ProxyHistory finishReconnect() {
240 final ProxyHistory ret = Verify.verifyNotNull(successor);
242 for (AbstractProxyTransaction t : proxies.values()) {
246 LOG.debug("Finished reconnecting proxy history {}", this);
252 void replayRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
253 final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> replayTo) throws RequestException {
254 if (request instanceof TransactionRequest) {
255 replayTransactionRequest((TransactionRequest<?>) request, callback);
256 } else if (request instanceof LocalHistoryRequest) {
257 replayTo.accept(request, callback);
259 throw new IllegalArgumentException("Unhandled request " + request);
263 private void replayTransactionRequest(final TransactionRequest<?> request,
264 final Consumer<Response<?, ?>> callback) throws RequestException {
266 final AbstractProxyTransaction proxy;
269 proxy = proxies.get(request.getTarget());
274 throw new RequestReplayException("Failed to find proxy for %s", request);
277 proxy.replayRequest(request, callback);
281 private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
283 private final Lock lock = new ReentrantLock();
284 private final LocalHistoryIdentifier identifier;
285 private final AbstractClientConnection<ShardBackendInfo> connection;
288 private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
290 private ProxyHistory successor;
292 private ProxyHistory(final AbstractClientConnection<ShardBackendInfo> connection,
293 final LocalHistoryIdentifier identifier) {
294 this.connection = Preconditions.checkNotNull(connection);
295 this.identifier = Preconditions.checkNotNull(identifier);
298 static ProxyHistory createClient(final AbstractClientConnection<ShardBackendInfo> connection,
299 final LocalHistoryIdentifier identifier) {
300 final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
301 return dataTree.isPresent() ? new Local(connection, identifier, dataTree.get())
302 : new Remote(connection, identifier);
305 static ProxyHistory createSingle(final AbstractClientConnection<ShardBackendInfo> connection,
306 final LocalHistoryIdentifier identifier) {
307 final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
308 return dataTree.isPresent() ? new LocalSingle(connection, identifier, dataTree.get())
309 : new RemoteSingle(connection, identifier);
313 public LocalHistoryIdentifier getIdentifier() {
317 final ActorRef localActor() {
318 return connection.localActor();
321 final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
322 final boolean snapshotOnly) {
325 if (successor != null) {
326 return successor.createTransactionProxy(txId, snapshotOnly);
329 final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
330 final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly);
331 proxies.put(proxyId, ret);
332 LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
339 final void abortTransaction(final AbstractProxyTransaction tx) {
342 proxies.remove(tx.getIdentifier());
343 LOG.debug("Proxy {} aborting transaction {}", this, tx);
344 onTransactionAborted(tx);
350 final void completeTransaction(final AbstractProxyTransaction tx) {
353 proxies.remove(tx.getIdentifier());
354 LOG.debug("Proxy {} completing transaction {}", this, tx);
355 onTransactionCompleted(tx);
361 final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
362 connection.sendRequest(request, callback);
366 abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
367 TransactionIdentifier txId, boolean snapshotOnly);
369 abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
371 @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
372 ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
374 if (successor != null) {
376 throw new IllegalStateException("Proxy history " + this + " already has a successor");
379 successor = createSuccessor(newConnection);
380 LOG.debug("History {} instantiated successor {}", this, successor);
382 for (AbstractProxyTransaction t : proxies.values()) {
386 return new ReconnectCohort();
390 void onTransactionAborted(final AbstractProxyTransaction tx) {
391 // No-op for most implementations
395 void onTransactionCompleted(final AbstractProxyTransaction tx) {
396 // No-op for most implementations
399 void onTransactionSealed(final AbstractProxyTransaction tx) {
400 // No-op on most implementations