-
- class IntegrationTestKit extends ShardTestKit {
-
- IntegrationTestKit(ActorSystem actorSystem) {
- super(actorSystem);
- }
-
- DistributedDataStore setupDistributedDataStore(String typeName, String... shardNames) {
- return setupDistributedDataStore(typeName, true, shardNames);
- }
-
- DistributedDataStore setupDistributedDataStore(String typeName, boolean waitUntilLeader,
- String... shardNames) {
- MockClusterWrapper cluster = new MockClusterWrapper();
- Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
- ShardStrategyFactory.setConfiguration(config);
-
- datastoreContextBuilder.dataStoreType(typeName);
-
- DatastoreContext datastoreContext = datastoreContextBuilder.build();
-
- DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster,
- config, datastoreContext);
-
- SchemaContext schemaContext = SchemaContextHelper.full();
- dataStore.onGlobalContextUpdated(schemaContext);
-
- if(waitUntilLeader) {
- for(String shardName: shardNames) {
- ActorRef shard = null;
- for(int i = 0; i < 20 * 5 && shard == null; i++) {
- Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
- Optional<ActorRef> shardReply = dataStore.getActorContext().findLocalShard(shardName);
- if(shardReply.isPresent()) {
- shard = shardReply.get();
- }
- }
-
- assertNotNull("Shard was not created", shard);
-
- waitUntilLeader(shard);
- }
- }
-
- return dataStore;
- }
-
- void testWriteTransaction(DistributedDataStore dataStore, YangInstanceIdentifier nodePath,
- NormalizedNode<?, ?> nodeToWrite) throws Exception {
-
- // 1. Create a write-only Tx
-
- DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
- assertNotNull("newWriteOnlyTransaction returned null", writeTx);
-
- // 2. Write some data
-
- writeTx.write(nodePath, nodeToWrite);
-
- // 3. Ready the Tx for commit
-
- DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
-
- // 4. Commit the Tx
-
- doCommit(cohort);
-
- // 5. Verify the data in the store
-
- DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
-
- Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
- assertEquals("isPresent", true, optional.isPresent());
- assertEquals("Data node", nodeToWrite, optional.get());
- }
-
- void doCommit(final DOMStoreThreePhaseCommitCohort cohort) throws Exception {
- Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
- assertEquals("canCommit", true, canCommit);
- cohort.preCommit().get(5, TimeUnit.SECONDS);
- cohort.commit().get(5, TimeUnit.SECONDS);
- }
-
- void cleanup(DistributedDataStore dataStore) {
- dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null);
- }
-
- void assertExceptionOnCall(Callable<Void> callable, Class<? extends Exception> expType)
- throws Exception {
- try {
- callable.call();
- fail("Expected " + expType.getSimpleName());
- } catch(Exception e) {
- assertEquals("Exception type", expType, e.getClass());
- }
- }
-
- void assertExceptionOnTxChainCreates(final DOMStoreTransactionChain txChain,
- Class<? extends Exception> expType) throws Exception {
- assertExceptionOnCall(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- txChain.newWriteOnlyTransaction();
- return null;
- }
- }, expType);
-
- assertExceptionOnCall(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- txChain.newReadWriteTransaction();
- return null;
- }
- }, expType);
-
- assertExceptionOnCall(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- txChain.newReadOnlyTransaction();
- return null;
- }
- }, expType);
- }
- }
-