X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStore.java;h=29fc259bb75935d25f13dcb8f7a0865f74f145cd;hb=9c0508d8b591e356f145d5d1a277c10965a647bb;hp=8d5b0c2f4a65f49d188786ad9f5927f29939166e;hpb=5213be60d33416fe7da6081cab65308d42765e41;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index 8d5b0c2f4a..29fc259bb7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -8,6 +8,15 @@ package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.ActorSystem; +import akka.util.Timeout; +import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; +import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; +import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; +import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.sal.core.spi.data.DOMStore; @@ -18,17 +27,93 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.api.SchemaContextListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.TimeUnit; + +import static akka.pattern.Patterns.ask; /** * */ -public class DistributedDataStore implements DOMStore { +public class DistributedDataStore implements DOMStore, SchemaContextListener { + + private static final Logger + LOG = LoggerFactory.getLogger(DistributedDataStore.class); + + final FiniteDuration ASK_DURATION = Duration.create(5, TimeUnit.SECONDS); + final Duration AWAIT_DURATION = Duration.create(5, TimeUnit.SECONDS); + + private final ActorRef shardManager; + private final ActorSystem actorSystem; + private final String type; + + + public DistributedDataStore(ActorSystem actorSystem, String type) { + this.actorSystem = actorSystem; + this.type = type; + shardManager = actorSystem.actorOf(ShardManager.props(type)); + } @Override - public >> ListenerRegistration registerChangeListener(InstanceIdentifier path, L listener, AsyncDataBroker.DataChangeScope scope) { - return new ListenerRegistrationProxy(); + public >> ListenerRegistration registerChangeListener( + InstanceIdentifier path, L listener, + AsyncDataBroker.DataChangeScope scope) { + + ActorSelection primary = findPrimary(); + + ActorRef dataChangeListenerActor = actorSystem.actorOf(DataChangeListener.props()); + + Object result = + getResult(primary, new RegisterChangeListener(path, dataChangeListenerActor.path(), + AsyncDataBroker.DataChangeScope.BASE), ASK_DURATION); + + RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result; + return new ListenerRegistrationProxy(reply.getListenerRegistrationPath()); } + private ActorSelection findPrimary() { + Object result = getResult(shardManager, new FindPrimary(Shard.DEFAULT_NAME), ASK_DURATION); + + if(result instanceof PrimaryFound){ + PrimaryFound found = (PrimaryFound) result; + LOG.error("Primary found {}", found.getPrimaryPath()); + + return actorSystem.actorSelection(found.getPrimaryPath()); + } + throw new RuntimeException("primary was not found"); + } + + private Object getResult(ActorRef actor, Object message, FiniteDuration duration){ + Future future = + ask(actor, message, new Timeout(duration)); + + try { + return Await.result(future, AWAIT_DURATION); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private Object getResult(ActorSelection actor, Object message, FiniteDuration duration){ + Future future = + ask(actor, message, new Timeout(duration)); + + try { + return Await.result(future, AWAIT_DURATION); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override public DOMStoreTransactionChain createTransactionChain() { return new TransactionChainProxy(); @@ -48,4 +133,8 @@ public class DistributedDataStore implements DOMStore { public DOMStoreReadWriteTransaction newReadWriteTransaction() { return new TransactionProxy(); } + + @Override public void onGlobalContextUpdated(SchemaContext schemaContext) { + shardManager.tell(new UpdateSchemaContext(schemaContext), null); + } }