X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=10dbbc84d873ee54b5421643c00b046043e58114;hb=8e42b08cb626a60919c145b2a46d94114c3905d6;hp=23e27c9f5ff909b2c9b887aa5f7687b92d7cda10;hpb=b584e686fdeba863643f80c0894d7fbd2dcaa540;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 23e27c9f5f..10dbbc84d8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -15,11 +15,10 @@ import akka.event.Logging; import akka.event.LoggingAdapter; import akka.japi.Creator; import akka.serialization.Serialization; - +import com.google.common.base.Optional; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; - import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; @@ -35,6 +34,8 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeList import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; +import org.opendaylight.controller.cluster.raft.ConfigParams; +import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; @@ -44,6 +45,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import scala.concurrent.duration.FiniteDuration; import java.util.ArrayList; import java.util.HashMap; @@ -51,6 +53,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; /** * A Shard represents a portion of the logical data tree
@@ -60,6 +63,8 @@ import java.util.concurrent.Executors; */ public class Shard extends RaftActor { + private static final ConfigParams configParams = new ShardConfigParams(); + public static final String DEFAULT_NAME = "default"; private final ListeningExecutorService storeExecutor = @@ -86,7 +91,7 @@ public class Shard extends RaftActor { private final List dataChangeListeners = new ArrayList<>(); private Shard(String name, Map peerAddresses) { - super(name, peerAddresses); + super(name, peerAddresses, Optional.of(configParams)); this.name = name; @@ -123,8 +128,8 @@ public class Shard extends RaftActor { } else if(getLeader() != null){ getLeader().forward(message, getContext()); } - } else if (message.getClass().equals(RegisterChangeListener.SERIALIZABLE_CLASS)) { - registerChangeListener(RegisterChangeListener.fromSerializable(getContext().system(), message)); + } else if (message instanceof RegisterChangeListener) { + registerChangeListener((RegisterChangeListener) message); } else if (message instanceof UpdateSchemaContext) { updateSchemaContext((UpdateSchemaContext) message); } else if (message instanceof ForwardedCommitTransaction) { @@ -143,13 +148,29 @@ public class Shard extends RaftActor { } } + private ActorRef createTypedTransactionActor(CreateTransaction createTransaction,String transactionId){ + if(createTransaction.getTransactionType()== TransactionProxy.TransactionType.READ_ONLY.ordinal()){ + return getContext().actorOf( + ShardTransaction.props( store.newReadOnlyTransaction(), getSelf(), schemaContext), transactionId); + + }else if (createTransaction.getTransactionType()== TransactionProxy.TransactionType.READ_WRITE.ordinal()){ + return getContext().actorOf( + ShardTransaction.props( store.newReadWriteTransaction(), getSelf(), schemaContext), transactionId); + + + }else if (createTransaction.getTransactionType()== TransactionProxy.TransactionType.WRITE_ONLY.ordinal()){ + return getContext().actorOf( + ShardTransaction.props( store.newWriteOnlyTransaction(), getSelf(), schemaContext), transactionId); + }else{ + throw new IllegalArgumentException ("CreateTransaction message has unidentified transaction type="+createTransaction.getTransactionType()) ; + } + } + private void createTransaction(CreateTransaction createTransaction) { - DOMStoreReadWriteTransaction transaction = - store.newReadWriteTransaction(); + String transactionId = "shard-" + createTransaction.getTransactionId(); LOG.info("Creating transaction : {} " , transactionId); - ActorRef transactionActor = getContext().actorOf( - ShardTransaction.props(transaction, getSelf(), schemaContext), transactionId); + ActorRef transactionActor = createTypedTransactionActor(createTransaction,transactionId); getSender() .tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor), createTransaction.getTransactionId()).toSerializable(), @@ -257,7 +278,7 @@ public class Shard extends RaftActor { LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = " + listenerRegistration.path().toString()); getSender() - .tell(new RegisterChangeListenerReply(listenerRegistration.path()).toSerializable(), + .tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf()); } @@ -304,9 +325,25 @@ public class Shard extends RaftActor { for(ActorSelection dataChangeListener : dataChangeListeners){ dataChangeListener.tell(new EnableNotification(isLeader()), getSelf()); } + + if(getLeaderId() != null){ + shardMBean.setLeader(getLeaderId()); + } + + shardMBean.setRaftState(getRaftState().name()); } @Override public String persistenceId() { return this.name; } + + + private static class ShardConfigParams extends DefaultConfigParamsImpl { + public static final FiniteDuration HEART_BEAT_INTERVAL = + new FiniteDuration(500, TimeUnit.MILLISECONDS); + + @Override public FiniteDuration getHeartBeatInterval() { + return HEART_BEAT_INTERVAL; + } + } }