import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.Response;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* {@link ClientActorBehavior} acting as an intermediary between the backend actors and the DistributedDataStore
* frontend.
*
+ * <p>
* This class is not visible outside of this package because it breaks the actor containment. Services provided to
* Java world outside of actor containment are captured in {@link DistributedDataStoreClient}.
*
+ * <p>
* IMPORTANT: this class breaks actor containment via methods implementing {@link DistributedDataStoreClient} contract.
* When touching internal state, be mindful of the execution context from which execution context, Actor
* or POJO, is the state being accessed or modified.
*
+ * <p>
* THREAD SAFETY: this class must always be kept thread-safe, so that both the Actor System thread and the application
* threads can run concurrently. All state transitions must be made in a thread-safe manner. When in
* doubt, feel free to synchronize on this object.
*
+ * <p>
* PERFORMANCE: this class lies in a performance-critical fast path. All code needs to be concise and efficient, but
* performance must not come at the price of correctness. Any optimizations need to be carefully analyzed
* for correctness and performance impact.
*
+ * <p>
* TRADE-OFFS: part of the functionality runs in application threads without switching contexts, which makes it ideal
* for performing work and charging applications for it. That has two positive effects:
* - CPU usage is distributed across applications, minimizing work done in the actor thread
final class DistributedDataStoreClientBehavior extends ClientActorBehavior implements DistributedDataStoreClient {
private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreClientBehavior.class);
- private final Map<TransactionIdentifier, ClientTransaction> transactions = new ConcurrentHashMap<>();
private final Map<LocalHistoryIdentifier, ClientLocalHistory> histories = new ConcurrentHashMap<>();
private final AtomicLong nextHistoryId = new AtomicLong(1);
- private final AtomicLong nextTransactionId = new AtomicLong();
private final ModuleShardBackendResolver resolver;
private final SingleClientHistory singleHistory;
DistributedDataStoreClientBehavior(final ClientActorContext context, final ActorContext actorContext) {
super(context);
- resolver = new ModuleShardBackendResolver(actorContext);
+ resolver = new ModuleShardBackendResolver(context.getIdentifier(), actorContext);
singleHistory = new SingleClientHistory(this, new LocalHistoryIdentifier(getIdentifier(), 0));
}
h.localAbort(cause);
}
histories.clear();
-
- for (ClientTransaction t : transactions.values()) {
- t.localAbort(cause);
- }
- transactions.clear();
}
private DistributedDataStoreClientBehavior shutdown(final ClientActorBehavior currentBehavior) {
//
//
+ @SuppressWarnings("checkstyle:IllegalCatch")
private static <K, V extends LocalAbortable> V returnIfOperational(final Map<K , V> map, final K key, final V value,
final Throwable aborted) {
Verify.verify(map.put(key, value) == null);
@Override
public ClientTransaction createTransaction() {
- final TransactionIdentifier txId = new TransactionIdentifier(singleHistory.getIdentifier(),
- nextTransactionId.getAndIncrement());
- final ClientTransaction tx = new ClientTransaction(this, singleHistory, txId);
- LOG.debug("{}: creating a new transaction {}", persistenceId(), tx);
-
- return returnIfOperational(transactions, txId, tx, aborted);
+ return singleHistory.createTransaction();
}
@Override
return resolver;
}
- void transactionComplete(final ClientTransaction transaction) {
- transactions.remove(transaction.getIdentifier());
- }
-
void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> completer) {
sendRequest(request, response -> {
completer.accept(response);