/*
* Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
package org.opendaylight.controller.md.sal.dom.broker.impl.legacy.sharded.adapter;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.util.Map;
import java.util.Queue;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.mdsal.common.api.CommitInfo;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
/**
* Read/write transaction that delegates write and initial read to
* {@link org.opendaylight.mdsal.dom.broker.ShardedDOMWriteTransactionAdapter}
* and {@link org.opendaylight.mdsal.dom.broker.ShardedDOMReadTransactionAdapter}
* respectively. These two in turn rely on shard aware implementation of
* {@link org.opendaylight.mdsal.dom.api.DOMDataTreeService}.
*
*
* Since reading data distributed on different subshards is not guaranteed to
* return all relevant data, best effort is to try to operate only on single
* subtree in conceptual data tree. We define this subtree by first write
* operation performed on transaction. All next read and write operations
* should be performed just in this initial subtree.
*/
// FIXME explicitly enforce just one subtree requirement
@NotThreadSafe
class ShardedDOMDataBrokerDelegatingReadWriteTransaction implements DOMDataReadWriteTransaction {
private final DOMDataReadOnlyTransaction readTxDelegate;
private final DOMDataWriteTransaction writeTxDelegate;
private final Object txIdentifier;
private final ImmutableMap> modificationHistoryMap;
private final ImmutableMap snapshotMap;
private final Map>>> initialReadMap;
private YangInstanceIdentifier root = null;
ShardedDOMDataBrokerDelegatingReadWriteTransaction(final Object readWriteTxId, final SchemaContext ctx,
final DOMDataReadOnlyTransaction readTxDelegate,
final DOMDataWriteTransaction writeTxDelegate) {
this.readTxDelegate = requireNonNull(readTxDelegate);
this.writeTxDelegate = requireNonNull(writeTxDelegate);
this.txIdentifier = requireNonNull(readWriteTxId);
this.initialReadMap = Maps.newEnumMap(LogicalDatastoreType.class);
final InMemoryDataTreeFactory treeFactory = new InMemoryDataTreeFactory();
final ImmutableMap.Builder snapshotMapBuilder = ImmutableMap.builder();
final ImmutableMap.Builder> modificationHistoryMapBuilder
= ImmutableMap.builder();
for (final LogicalDatastoreType store : LogicalDatastoreType.values()) {
final DataTree tree = treeFactory.create(treeConfigForStore(store));
tree.setSchemaContext(ctx);
snapshotMapBuilder.put(store, tree.takeSnapshot());
modificationHistoryMapBuilder.put(store, Lists.newLinkedList());
}
modificationHistoryMap = modificationHistoryMapBuilder.build();
snapshotMap = snapshotMapBuilder.build();
}
@Override
public boolean cancel() {
readTxDelegate.close();
return writeTxDelegate.cancel();
}
@Override
public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
if (root == null) {
initialRead(path);
}
modificationHistoryMap.get(store).add(new Modification(Modification.Operation.DELETE, path, null));
writeTxDelegate.delete(store, path);
}
@Override
public FluentFuture extends CommitInfo> commit() {
return writeTxDelegate.commit();
}
@Override
public CheckedFuture>, ReadFailedException> read(final LogicalDatastoreType store,
final YangInstanceIdentifier path) {
checkState(root != null,
"A modify operation (put, merge or delete) must be performed prior to a read operation");
final SettableFuture>> readResult = SettableFuture.create();
final Queue currentHistory = Lists.newLinkedList(modificationHistoryMap.get(store));
Futures.addCallback(initialReadMap.get(store), new FutureCallback>>() {
@Override
public void onSuccess(final Optional> result) {
final DataTreeModification mod = snapshotMap.get(store).newModification();
if (result.isPresent()) {
mod.write(path, result.get());
}
applyModificationHistoryToSnapshot(mod, currentHistory);
readResult.set(Optional.fromJavaUtil(mod.readNode(path)));
}
@Override
public void onFailure(final Throwable throwable) {
readResult.setException(throwable);
}
}, MoreExecutors.directExecutor());
return Futures.makeChecked(readResult, ReadFailedException.MAPPER);
}
@Override
public CheckedFuture exists(final LogicalDatastoreType store,
final YangInstanceIdentifier path) {
checkState(root != null,
"A modify operation (put, merge or delete) must be performed prior to an exists operation");
return Futures.makeChecked(Futures.transform(read(store, path), Optional::isPresent,
MoreExecutors.directExecutor()), ReadFailedException.MAPPER);
}
@Override
public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path,
final NormalizedNode, ?> data) {
if (root == null) {
initialRead(path);
}
modificationHistoryMap.get(store).add(new Modification(Modification.Operation.WRITE, path, data));
writeTxDelegate.put(store, path, data);
}
@Override
public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path,
final NormalizedNode, ?> data) {
if (root == null) {
initialRead(path);
}
modificationHistoryMap.get(store).add(new Modification(Modification.Operation.MERGE, path, data));
writeTxDelegate.merge(store, path, data);
}
@Override
public Object getIdentifier() {
return txIdentifier;
}
private void initialRead(final YangInstanceIdentifier path) {
root = path;
for (final LogicalDatastoreType store : LogicalDatastoreType.values()) {
initialReadMap.put(store, readTxDelegate.read(store, path));
}
}
private static DataTreeConfiguration treeConfigForStore(final LogicalDatastoreType store) {
return store == LogicalDatastoreType.CONFIGURATION ? DataTreeConfiguration.DEFAULT_CONFIGURATION
: DataTreeConfiguration.DEFAULT_OPERATIONAL;
}
private static void applyModificationHistoryToSnapshot(final DataTreeModification dataTreeModification,
final Queue modificationHistory) {
while (!modificationHistory.isEmpty()) {
final Modification modification = modificationHistory.poll();
switch (modification.getOperation()) {
case WRITE:
dataTreeModification.write(modification.getPath(), modification.getData());
break;
case MERGE:
dataTreeModification.merge(modification.getPath(), modification.getData());
break;
case DELETE:
dataTreeModification.delete(modification.getPath());
break;
default:
// NOOP
}
}
}
static class Modification {
enum Operation {
WRITE, MERGE, DELETE
}
private final NormalizedNode, ?> data;
private final YangInstanceIdentifier path;
private final Operation operation;
Modification(final Operation operation, final YangInstanceIdentifier path, final NormalizedNode, ?> data) {
this.data = data;
this.path = requireNonNull(path);
this.operation = requireNonNull(operation);
}
Operation getOperation() {
return operation;
}
YangInstanceIdentifier getPath() {
return path;
}
NormalizedNode, ?> getData() {
return data;
}
}
}