import akka.actor.ActorRef;
import com.google.common.base.Preconditions;
import java.util.concurrent.ExecutionException;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendType;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
* @author Thomas Pantelis
*/
class ShardSnapshotCohort implements RaftActorSnapshotCohort {
+ private static final FrontendType SNAPSHOT_APPLY = FrontendType.forName("snapshot-apply");
+ private static final FrontendType SNAPSHOT_READ = FrontendType.forName("snapshot-read");
+
private final ShardTransactionActorFactory transactionActorFactory;
+ private final LocalHistoryIdentifier applyHistoryId;
+ private final LocalHistoryIdentifier readHistoryId;
private final ShardDataTree store;
private final String logId;
private final Logger log;
- private int createSnapshotTransactionCounter;
+ private long applyCounter;
+ private long readCounter;
- ShardSnapshotCohort(ShardTransactionActorFactory transactionActorFactory, ShardDataTree store,
+ ShardSnapshotCohort(MemberName memberName, ShardTransactionActorFactory transactionActorFactory, ShardDataTree store,
Logger log, String logId) {
- this.transactionActorFactory = transactionActorFactory;
+ this.transactionActorFactory = Preconditions.checkNotNull(transactionActorFactory);
this.store = Preconditions.checkNotNull(store);
this.log = log;
this.logId = logId;
+
+ this.applyHistoryId = new LocalHistoryIdentifier(ClientIdentifier.create(
+ FrontendIdentifier.create(memberName, SNAPSHOT_APPLY), 0), 0);
+ this.readHistoryId = new LocalHistoryIdentifier(ClientIdentifier.create(
+ FrontendIdentifier.create(memberName, SNAPSHOT_READ), 0), 0);
}
@Override
// so that this actor does not get block building the snapshot. THe transaction actor will
// after processing the CreateSnapshot message.
- ShardTransactionIdentifier transactionID = new ShardTransactionIdentifier(
- "createSnapshot" + ++createSnapshotTransactionCounter);
-
ActorRef createSnapshotTransaction = transactionActorFactory.newShardTransaction(
- TransactionType.READ_ONLY, transactionID, "");
+ TransactionType.READ_ONLY, new TransactionIdentifier(readHistoryId, readCounter++));
createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, actorRef);
}
log.info("{}: Applying snapshot", logId);
try {
- ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction("snapshot-" + logId, null);
+ ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction(
+ new TransactionIdentifier(applyHistoryId, applyCounter++));
NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);