Merge "Fixed for bug : 1171 - issue while creating subnet"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DistributedDataStore.java
index 4401104a85971c77c1b9a9333c727348f5398656..2ef8e5f449f8df564ae42943c6597224b832bcb4 100644 (file)
@@ -13,6 +13,7 @@ import akka.actor.ActorSystem;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
@@ -22,7 +23,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransactio
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
@@ -44,6 +45,9 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
     private final String type;
     private final ActorContext actorContext;
 
+    private SchemaContext schemaContext;
+
+
 
     /**
      * Executor used to run FutureTask's
@@ -56,8 +60,10 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
     private final ExecutorService executor =
         Executors.newFixedThreadPool(10);
 
-    public DistributedDataStore(ActorSystem actorSystem, String type) {
-        this(new ActorContext(actorSystem, actorSystem.actorOf(ShardManager.props(type), "shardmanager-" + type)), type);
+    public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster, Configuration configuration) {
+        this(new ActorContext(actorSystem, actorSystem
+            .actorOf(ShardManager.props(type, cluster, configuration),
+                "shardmanager-" + type), cluster, configuration), type);
     }
 
     public DistributedDataStore(ActorContext actorContext, String type) {
@@ -67,20 +73,22 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
 
 
     @Override
-    public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
-        InstanceIdentifier path, L listener,
+    public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
+        YangInstanceIdentifier path, L listener,
         AsyncDataBroker.DataChangeScope scope) {
 
         ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(
-            DataChangeListener.props(listener));
+            DataChangeListener.props(schemaContext,listener,path ));
+
+        String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
 
-        Object result = actorContext.executeShardOperation(Shard.DEFAULT_NAME,
+        Object result = actorContext.executeShardOperation(shardName,
             new RegisterChangeListener(path, dataChangeListenerActor.path(),
-                AsyncDataBroker.DataChangeScope.BASE),
+                scope).toSerializable(),
             ActorContext.ASK_DURATION
         );
 
-        RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
+        RegisterChangeListenerReply reply = RegisterChangeListenerReply.fromSerializable(actorContext.getActorSystem(),result);
         return new DataChangeListenerRegistrationProxy(actorContext.actorSelection(reply.getListenerRegistrationPath()), listener, dataChangeListenerActor);
     }
 
@@ -88,28 +96,29 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
 
     @Override
     public DOMStoreTransactionChain createTransactionChain() {
-        return new TransactionChainProxy(actorContext, executor);
+        return new TransactionChainProxy(actorContext, executor, schemaContext);
     }
 
     @Override
     public DOMStoreReadTransaction newReadOnlyTransaction() {
         return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY,
-            executor);
+            executor, schemaContext);
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
         return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY,
-            executor);
+            executor, schemaContext);
     }
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
         return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE,
-            executor);
+            executor, schemaContext);
     }
 
     @Override public void onGlobalContextUpdated(SchemaContext schemaContext) {
+        this.schemaContext = schemaContext;
         actorContext.getShardManager().tell(
             new UpdateSchemaContext(schemaContext), null);
     }