X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=10dbbc84d873ee54b5421643c00b046043e58114;hb=8e42b08cb626a60919c145b2a46d94114c3905d6;hp=9cda3f1aa168ee43c40f715753a49ecdbace8369;hpb=9fb47da6a1e8bfb57cc56d4bf31d837b05ba72e6;p=controller.git
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
index 9cda3f1aa1..10dbbc84d8 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
@@ -15,6 +15,7 @@ import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Creator;
import akka.serialization.Serialization;
+import com.google.common.base.Optional;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
@@ -33,6 +34,8 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeList
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.raft.ConfigParams;
+import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.RaftActor;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
@@ -42,6 +45,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.duration.FiniteDuration;
import java.util.ArrayList;
import java.util.HashMap;
@@ -49,6 +53,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
/**
* A Shard represents a portion of the logical data tree
@@ -58,6 +63,8 @@ import java.util.concurrent.Executors;
*/
public class Shard extends RaftActor {
+ private static final ConfigParams configParams = new ShardConfigParams();
+
public static final String DEFAULT_NAME = "default";
private final ListeningExecutorService storeExecutor =
@@ -84,7 +91,7 @@ public class Shard extends RaftActor {
private final List dataChangeListeners = new ArrayList<>();
private Shard(String name, Map peerAddresses) {
- super(name, peerAddresses);
+ super(name, peerAddresses, Optional.of(configParams));
this.name = name;
@@ -121,8 +128,8 @@ public class Shard extends RaftActor {
} else if(getLeader() != null){
getLeader().forward(message, getContext());
}
- } else if (message.getClass().equals(RegisterChangeListener.SERIALIZABLE_CLASS)) {
- registerChangeListener(RegisterChangeListener.fromSerializable(getContext().system(), message));
+ } else if (message instanceof RegisterChangeListener) {
+ registerChangeListener((RegisterChangeListener) message);
} else if (message instanceof UpdateSchemaContext) {
updateSchemaContext((UpdateSchemaContext) message);
} else if (message instanceof ForwardedCommitTransaction) {
@@ -271,7 +278,7 @@ public class Shard extends RaftActor {
LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = " + listenerRegistration.path().toString());
getSender()
- .tell(new RegisterChangeListenerReply(listenerRegistration.path()).toSerializable(),
+ .tell(new RegisterChangeListenerReply(listenerRegistration.path()),
getSelf());
}
@@ -318,9 +325,25 @@ public class Shard extends RaftActor {
for(ActorSelection dataChangeListener : dataChangeListeners){
dataChangeListener.tell(new EnableNotification(isLeader()), getSelf());
}
+
+ if(getLeaderId() != null){
+ shardMBean.setLeader(getLeaderId());
+ }
+
+ shardMBean.setRaftState(getRaftState().name());
}
@Override public String persistenceId() {
return this.name;
}
+
+
+ private static class ShardConfigParams extends DefaultConfigParamsImpl {
+ public static final FiniteDuration HEART_BEAT_INTERVAL =
+ new FiniteDuration(500, TimeUnit.MILLISECONDS);
+
+ @Override public FiniteDuration getHeartBeatInterval() {
+ return HEART_BEAT_INTERVAL;
+ }
+ }
}