import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.Response;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
final class DistributedDataStoreClientBehavior extends ClientActorBehavior implements DistributedDataStoreClient {
private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreClientBehavior.class);
- private final Map<TransactionIdentifier, ClientTransaction> transactions = new ConcurrentHashMap<>();
private final Map<LocalHistoryIdentifier, ClientLocalHistory> histories = new ConcurrentHashMap<>();
private final AtomicLong nextHistoryId = new AtomicLong(1);
- private final AtomicLong nextTransactionId = new AtomicLong();
private final ModuleShardBackendResolver resolver;
private final SingleClientHistory singleHistory;
h.localAbort(cause);
}
histories.clear();
-
- for (ClientTransaction t : transactions.values()) {
- t.localAbort(cause);
- }
- transactions.clear();
}
private DistributedDataStoreClientBehavior shutdown(final ClientActorBehavior currentBehavior) {
@Override
public ClientTransaction createTransaction() {
- final TransactionIdentifier txId = new TransactionIdentifier(singleHistory.getIdentifier(),
- nextTransactionId.getAndIncrement());
- final ClientTransaction tx = new ClientTransaction(singleHistory, txId);
- LOG.debug("{}: creating a new transaction {}", persistenceId(), tx);
-
- return returnIfOperational(transactions, txId, tx, aborted);
+ return singleHistory.createTransaction();
}
@Override
return resolver;
}
- void transactionComplete(final ClientTransaction transaction) {
- transactions.remove(transaction.getIdentifier());
- }
-
void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> completer) {
sendRequest(request, response -> {
completer.accept(response);