<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
- <pattern>%date{"yyyy-MM-dd HH:mm:ss.SSS z"} [%thread] %-5level %logger{36} - %msg%n</pattern>
+ <pattern>%date{"yyyy-MM-dd HH:mm:ss.SSS z"} [%thread] %-5level %logger{36} %X{akkaSource} - %msg%n</pattern>
</encoder>
</appender>
<appender name="opendaylight.log" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- Web modules -->
<logger name="org.opendaylight.controller.web" level="INFO"/>
+ <!-- Clustering -->
+ <logger name="org.opendaylight.controller.cluster" level="INFO"/>
+ <logger name="org.opendaylight.controller.cluster.datastore.node" level="INFO"/>
+
<!--
Unsynchronized controller startup causes models to crop up in random
order, which results in temporary inability to fully resolve a model,
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
- <pattern>%date{"yyyy-MM-dd HH:mm:ss.SSS z"} [%thread] %-5level %logger{36} - %msg%n</pattern>
+ <pattern>%date{"yyyy-MM-dd HH:mm:ss.SSS z"} [%thread] %-5level %logger{36} %X{akkaSource} - %msg%n</pattern>
</encoder>
</appender>
<appender name="opendaylight.log" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- Web modules -->
<logger name="org.opendaylight.controller.web" level="INFO"/>
+ <!-- Clustering -->
+ <logger name="org.opendaylight.controller.cluster" level="INFO"/>
+ <logger name="org.opendaylight.controller.cluster.datastore.node" level="INFO"/>
+
<!--
Unsynchronized controller startup causes models to crop up in random
order, which results in temporary inability to fully resolve a model,
</schema-service>
</module>
- <!-- DISTRIBUTED_DATA_STORE -->
- <!-- Enable the following modules if you want to use the Distributed Data Store instead of the InMemoryDataStore -->
- <!--
- <module>
- <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:distributed-datastore-provider">prefix:distributed-operational-datastore-provider</type>
- <name>distributed-operational-store-module</name>
- <operational-schema-service>
- <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:schema-service</type>
- <name>yang-schema-service</name>
- </operational-schema-service>
- </module>
-
- <module>
- <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:distributed-datastore-provider">prefix:distributed-config-datastore-provider</type>
- <name>distributed-config-store-module</name>
- <configschema-service>
- <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:schema-service</type>
- <name>yang-schema-service</name>
- </config-schema-service>
- </module>
- -->
-
<module>
<type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:inmemory-datastore-provider">prefix:inmemory-operational-datastore-provider</type>
<name>operational-store-service</name>
<config-data-store>
<type xmlns:config-dom-store-spi="urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:config-dom-store">config-dom-store-spi:config-dom-datastore</type>
<name>config-store-service</name>
- <!-- DISTRIBUTED_DATA_STORE -->
- <!--
- <name>distributed-config-store-service</name>
- -->
</config-data-store>
<operational-data-store>
<type xmlns:operational-dom-store-spi="urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:operational-dom-store">operational-dom-store-spi:operational-dom-datastore</type>
<name>operational-store-service</name>
- <!-- DISTRIBUTED_DATA_STORE -->
- <!--
- <name>distributed-operational-store-service</name>
- -->
-
</operational-data-store>
</module>
<module>
</binding-mapping-service>
</binding-forwarded-data-broker>
</module>
- <!-- Cluster RPC -->
- <!-- Enable the following module if you want to use remote rpc connector
- <module>
- <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:remote-rpc-connector">prefix:remote-rpc-connector</type>
- <name>remote-rpc-connector</name>
- <dom-broker xmlns="urn:opendaylight:params:xml:ns:yang:controller:config:remote-rpc-connector">
- <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-broker-osgi-registry</type>
- <name>dom-broker</name>
- </dom-broker>
- </module>
- -->
</modules>
<services xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
<service>
</instance>
</service>
- <!-- DISTRIBUTED_DATA_STORE -->
- <!-- Enable the following if you want to use the Distributed Data Store instead of the InMemory Data Store -->
- <!-- Note that you MUST delete the InMemoryDataStore related services which provide config-dom-datastore and operational-dom-datastore -->
- <!--
- <service>
- <type xmlns:config-dom-store-spi="urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:config-dom-store">config-dom-store-spi:config-dom-datastore</type>
- <instance>
- <name>distributed-config-store-service</name>
- <provider>/modules/module[type='distributed-config-datastore-provider'][name='distributed-config-store-module']</provider>
- </instance>
- </service>
- <service>
- <type xmlns:operational-dom-store-spi="urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:operational-dom-store">operational-dom-store-spi:operational-dom-datastore</type>
- <instance>
- <name>distributed-operational-store-service</name>
- <provider>/modules/module[type='distributed-operational-datastore-provider'][name='distributed-operational-store-module']</provider>
- </instance>
- </service>
- -->
-
- <!-- DISTRIBUTED_DATA_STORE -->
- <!-- Delete the following two services (config-store-service and operational-store-service) if you want to use the distributed data store instead -->
<service>
<type xmlns:config-dom-store-spi="urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:config-dom-store">config-dom-store-spi:config-dom-datastore</type>
<instance>
}
@Override public void update(long currentTerm, String votedFor) {
- LOG.info("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
+ LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
this.currentTerm = currentTerm;
this.votedFor = votedFor;
protected RaftState requestVote(ActorRef sender,
RequestVote requestVote) {
+
+ context.getLogger().debug(requestVote.toString());
+
boolean grantVote = false;
// Reply false if term < currentTerm (§5.1)
}
// Send a local message to the local RaftActor (it's derived class to be
// specific to apply the log to it's index)
- context.getLogger().info("Setting last applied to {}", index);
+ context.getLogger().debug("Setting last applied to {}", index);
context.setLastApplied(index);
}
@Override protected RaftState handleAppendEntries(ActorRef sender,
AppendEntries appendEntries) {
- context.getLogger().info("Candidate: Received {}", appendEntries.toString());
+ context.getLogger().debug(appendEntries.toString());
return state();
}
if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
context.getLogger()
- .info("Follower: Received {}", appendEntries.toString());
+ .debug(appendEntries.toString());
}
// TODO : Refactor this method into a bunch of smaller methods
@Override protected RaftState handleAppendEntries(ActorRef sender,
AppendEntries appendEntries) {
- context.getLogger().info("Leader: Received {}", appendEntries.toString());
+ context.getLogger().debug(appendEntries.toString());
return state();
}
if(! appendEntriesReply.isSuccess()) {
context.getLogger()
- .info("Leader: Received {}", appendEntriesReply.toString());
+ .debug(appendEntriesReply.toString());
}
// Update the FollowerLogInformation
List<ReplicatedLogEntry> entries = Collections.emptyList();
if (context.getReplicatedLog().isPresent(nextIndex)) {
- // TODO: Instead of sending all entries from nextIndex
- // only send a fixed number of entries to each follower
- // This is to avoid the situation where there are a lot of
- // entries to install for a fresh follower or to a follower
- // that has fallen too far behind with the log but yet is not
- // eligible to receive a snapshot
+ // FIXME : Sending one entry at a time
entries =
context.getReplicatedLog().getFrom(nextIndex, 1);
}
<artifactId>akka-testkit_${scala.version}</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-slf4j_${scala.version}</artifactId>
+ </dependency>
+
<!-- SAL Dependencies -->
<dependency>
}
protected abstract void handleReceive(Object message) throws Exception;
+
+ protected void ignoreMessage(Object message){
+ LOG.debug("Unhandled message {} ", message);
+ }
+
+ protected void unknownMessage(Object message) throws Exception{
+ unhandled(message);
+ }
}
import akka.actor.ActorSystem;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
+import com.google.common.base.Preconditions;
public class ClusterWrapperImpl implements ClusterWrapper {
private final Cluster cluster;
private final String currentMemberName;
public ClusterWrapperImpl(ActorSystem actorSystem){
+ Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
+
cluster = Cluster.get(actorSystem);
+
+ Preconditions.checkState(cluster.getSelfRoles().size() > 0,
+ "No akka roles were specified\n" +
+ "One way to specify the member name is to pass a property on the command line like so\n" +
+ " -Dakka.cluster.roles.0=member-3\n" +
+ "member-3 here would be the name of the member"
+ );
+
currentMemberName = (String) cluster.getSelfRoles().toArray()[0];
}
public void subscribeToMemberEvents(ActorRef actorRef){
+ Preconditions.checkNotNull(actorRef, "actorRef should not be null");
+
cluster.subscribe(actorRef, ClusterEvent.initialStateAsEvents(),
ClusterEvent.MemberEvent.class,
ClusterEvent.UnreachableMember.class);
modification = null;
}
public CompositeModificationPayload(Object modification){
- this.modification = (PersistentMessages.CompositeModification) modification;
+ this.modification = (PersistentMessages.CompositeModification) Preconditions.checkNotNull(modification, "modification should not be null");
}
@Override public Map<GeneratedMessage.GeneratedExtension, PersistentMessages.CompositeModification> encode() {
package org.opendaylight.controller.cluster.datastore;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigObject;
private static final Logger
LOG = LoggerFactory.getLogger(DistributedDataStore.class);
+ // Look up maps to speed things up
+
+ // key = memberName, value = list of shardNames
+ private Map<String, List<String>> memberShardNames = new HashMap<>();
+
+ // key = shardName, value = list of replicaNames (replicaNames are the same as memberNames)
+ private Map<String, List<String>> shardReplicaNames = new HashMap<>();
+
public ConfigurationImpl(String moduleShardsConfigPath,
String modulesConfigPath){
+ Preconditions.checkNotNull(moduleShardsConfigPath, "moduleShardsConfigPath should not be null");
+ Preconditions.checkNotNull(modulesConfigPath, "modulesConfigPath should not be null");
+
+
File moduleShardsFile = new File("./configuration/initial/" + moduleShardsConfigPath);
File modulesFile = new File("./configuration/initial/" + modulesConfigPath);
}
@Override public List<String> getMemberShardNames(String memberName){
+
+ Preconditions.checkNotNull(memberName, "memberName should not be null");
+
+ if(memberShardNames.containsKey(memberName)){
+ return memberShardNames.get(memberName);
+ }
+
List<String> shards = new ArrayList();
for(ModuleShard ms : moduleShards){
for(Shard s : ms.getShards()){
}
}
}
+
+ memberShardNames.put(memberName, shards);
+
return shards;
}
@Override public Optional<String> getModuleNameFromNameSpace(String nameSpace) {
+
+ Preconditions.checkNotNull(nameSpace, "nameSpace should not be null");
+
for(Module m : modules){
if(m.getNameSpace().equals(nameSpace)){
return Optional.of(m.getName());
}
@Override public List<String> getShardNamesFromModuleName(String moduleName) {
+
+ Preconditions.checkNotNull(moduleName, "moduleName should not be null");
+
for(ModuleShard m : moduleShards){
if(m.getModuleName().equals(moduleName)){
List<String> l = new ArrayList<>();
}
@Override public List<String> getMembersFromShardName(String shardName) {
- List<String> shards = new ArrayList();
+
+ Preconditions.checkNotNull(shardName, "shardName should not be null");
+
+ if(shardReplicaNames.containsKey(shardName)){
+ return shardReplicaNames.get(shardName);
+ }
+
for(ModuleShard ms : moduleShards){
for(Shard s : ms.getShards()) {
if(s.getName().equals(shardName)){
- return s.getReplicas();
+ List<String> replicas = s.getReplicas();
+ shardReplicaNames.put(shardName, replicas);
+ return replicas;
}
}
}
+ shardReplicaNames.put(shardName, Collections.EMPTY_LIST);
return Collections.EMPTY_LIST;
}
import akka.actor.Props;
import akka.japi.Creator;
+import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply;
import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
public DataChangeListener(SchemaContext schemaContext,
AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, YangInstanceIdentifier pathId) {
- this.listener = listener;
- this.schemaContext = schemaContext;
- this.pathId = pathId;
+
+ this.schemaContext = Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
+ this.listener = Preconditions.checkNotNull(listener, "listener should not be null");
+ this.pathId = Preconditions.checkNotNull(pathId, "pathId should not be null");
}
@Override public void handleReceive(Object message) throws Exception {
notificationsEnabled = message.isEnabled();
}
- public void dataChanged(Object message) {
+ private void dataChanged(Object message) {
// Do nothing if notifications are not enabled
if(!notificationsEnabled){
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorSelection;
+import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
private final SchemaContext schemaContext;
public DataChangeListenerProxy(SchemaContext schemaContext,ActorSelection dataChangeListenerActor) {
- this.dataChangeListenerActor = dataChangeListenerActor;
+ this.dataChangeListenerActor = Preconditions.checkNotNull(dataChangeListenerActor, "dataChangeListenerActor should not be null");
this.schemaContext = schemaContext;
}
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
"mdsal.dist-datastore-executor-queue.size";
private static final int DEFAULT_EXECUTOR_MAX_QUEUE_SIZE = 5000;
- private final String type;
private final ActorContext actorContext;
private SchemaContext schemaContext;
DEFAULT_EXECUTOR_MAX_QUEUE_SIZE), "DistDataStore"));
public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster, Configuration configuration) {
- this(new ActorContext(actorSystem, actorSystem
+ Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
+ Preconditions.checkNotNull(type, "type should not be null");
+ Preconditions.checkNotNull(cluster, "cluster should not be null");
+ Preconditions.checkNotNull(configuration, "configuration should not be null");
+
+
+ String shardManagerId = ShardManagerIdentifier.builder().type(type).build().toString();
+
+ LOG.info("Creating ShardManager : {}", shardManagerId);
+
+ this.actorContext = new ActorContext(actorSystem, actorSystem
.actorOf(ShardManager.props(type, cluster, configuration),
- "shardmanager-" + type), cluster, configuration), type);
+ shardManagerId ), cluster, configuration);
}
- public DistributedDataStore(ActorContext actorContext, String type) {
- this.type = type;
- this.actorContext = actorContext;
+ public DistributedDataStore(ActorContext actorContext) {
+ this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
}
YangInstanceIdentifier path, L listener,
AsyncDataBroker.DataChangeScope scope) {
+ Preconditions.checkNotNull(path, "path should not be null");
+ Preconditions.checkNotNull(listener, "listener should not be null");
+
+
+ LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope);
+
ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(
DataChangeListener.props(schemaContext,listener,path ));
import akka.japi.Creator;
import akka.serialization.Serialization;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
public static final String DEFAULT_NAME = "default";
+ // The state of this Shard
private final InMemoryDOMDataStore store;
private final Map<Object, DOMStoreThreePhaseCommitCohort>
Logging.getLogger(getContext().system(), this);
// By default persistent will be true and can be turned off using the system
- // property persistent
+ // property shard.persistent
private final boolean persistent;
- private final String name;
+ /// The name of this shard
+ private final ShardIdentifier name;
private volatile SchemaContext schemaContext;
private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
- private Shard(String name, Map<String, String> peerAddresses) {
- super(name, peerAddresses, Optional.of(configParams));
+ private Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses) {
+ super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(configParams));
this.name = name;
this.persistent = !"false".equals(setting);
- LOG.info("Creating shard : {} persistent : {}", name, persistent);
+ LOG.info("Shard created : {} persistent : {}", name, persistent);
- store = InMemoryDOMDataStoreFactory.create(name, null);
+ store = InMemoryDOMDataStoreFactory.create(name.toString(), null);
- shardMBean = ShardMBeanFactory.getShardStatsMBean(name);
+ shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString());
}
- public static Props props(final String name,
- final Map<String, String> peerAddresses) {
+ private static Map<String, String> mapPeerAddresses(Map<ShardIdentifier, String> peerAddresses){
+ Map<String , String> map = new HashMap<>();
+
+ for(Map.Entry<ShardIdentifier, String> entry : peerAddresses.entrySet()){
+ map.put(entry.getKey().toString(), entry.getValue());
+ }
+
+ return map;
+ }
+
+
+
+
+ public static Props props(final ShardIdentifier name,
+ final Map<ShardIdentifier, String> peerAddresses) {
+ Preconditions.checkNotNull(name, "name should not be null");
+ Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
+
return Props.create(new Creator<Shard>() {
@Override
}
} else if (message instanceof PeerAddressResolved) {
PeerAddressResolved resolved = (PeerAddressResolved) message;
- setPeerAddress(resolved.getPeerId(), resolved.getPeerAddress());
- } else{
+ setPeerAddress(resolved.getPeerId().toString(), resolved.getPeerAddress());
+ } else {
super.onReceiveCommand(message);
}
}
private ActorRef createTypedTransactionActor(
- CreateTransaction createTransaction, String transactionId) {
+ CreateTransaction createTransaction, ShardTransactionIdentifier transactionId) {
if (createTransaction.getTransactionType()
== TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
+
shardMBean.incrementReadOnlyTransactionCount();
+
return getContext().actorOf(
ShardTransaction
.props(store.newReadOnlyTransaction(), getSelf(),
- schemaContext), transactionId);
+ schemaContext), transactionId.toString());
} else if (createTransaction.getTransactionType()
== TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
+
shardMBean.incrementReadWriteTransactionCount();
+
return getContext().actorOf(
ShardTransaction
.props(store.newReadWriteTransaction(), getSelf(),
- schemaContext), transactionId);
+ schemaContext), transactionId.toString());
} else if (createTransaction.getTransactionType()
== TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
+
shardMBean.incrementWriteOnlyTransactionCount();
+
return getContext().actorOf(
ShardTransaction
.props(store.newWriteOnlyTransaction(), getSelf(),
- schemaContext), transactionId);
+ schemaContext), transactionId.toString());
} else {
+ // FIXME: This does not seem right
throw new IllegalArgumentException(
"CreateTransaction message has unidentified transaction type="
+ createTransaction.getTransactionType());
private void createTransaction(CreateTransaction createTransaction) {
- String transactionId = "shard-" + createTransaction.getTransactionId();
- LOG.info("Creating transaction : {} ", transactionId);
+ ShardTransactionIdentifier transactionId = ShardTransactionIdentifier.builder().remoteTransactionId(createTransaction.getTransactionId()).build();
+ LOG.debug("Creating transaction : {} ", transactionId);
ActorRef transactionActor =
createTypedTransactionActor(createTransaction, transactionId);
DOMStoreThreePhaseCommitCohort cohort =
modificationToCohort.remove(serialized);
if (cohort == null) {
- LOG.error(
- "Could not find cohort for modification : {}", modification);
- LOG.info("Writing modification using a new transaction");
+ LOG.debug(
+ "Could not find cohort for modification : {}. Writing modification using a new transaction",
+ modification);
DOMStoreReadWriteTransaction transaction =
store.newReadWriteTransaction();
modification.apply(transaction);
self);
shardMBean.incrementCommittedTransactionCount();
shardMBean.setLastCommittedTransactionTime(new Date());
-
} catch (InterruptedException | ExecutionException e) {
shardMBean.incrementFailedTransactionsCount();
sender.tell(new akka.actor.Status.Failure(e),self);
private void registerChangeListener(
RegisterChangeListener registerChangeListener) {
- LOG.debug("registerDataChangeListener for " + registerChangeListener
+ LOG.debug("registerDataChangeListener for {}", registerChangeListener
.getPath());
DataChangeListenerRegistration.props(registration));
LOG.debug(
- "registerDataChangeListener sending reply, listenerRegistrationPath = "
- + listenerRegistration.path().toString());
+ "registerDataChangeListener sending reply, listenerRegistrationPath = {} "
+ , listenerRegistration.path().toString());
getSender()
.tell(new RegisterChangeListenerReply(listenerRegistration.path()),
if (modification != null) {
commit(clientActor, modification);
} else {
- LOG.error("modification is null - this is very unexpected");
+ LOG.error(
+ "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
+ identifier, clientActor.path().toString());
}
LOG.error("Unknown state received {}", data);
}
+ // Update stats
ReplicatedLogEntry lastLogEntry = getLastLogEntry();
if(lastLogEntry != null){
}
@Override public String persistenceId() {
- return this.name;
+ return this.name.toString();
}
import akka.japi.Creator;
import akka.japi.Function;
import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import scala.concurrent.duration.Duration;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
private final Configuration configuration;
+ private ShardManagerInfoMBean mBean;
+
/**
* @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
* configuration or operational
public static Props props(final String type,
final ClusterWrapper cluster,
final Configuration configuration) {
+
+ Preconditions.checkNotNull(type, "type should not be null");
+ Preconditions.checkNotNull(cluster, "cluster should not be null");
+ Preconditions.checkNotNull(configuration, "configuration should not be null");
+
return Props.create(new Creator<ShardManager>() {
@Override
} else if(message instanceof ClusterEvent.UnreachableMember) {
ignoreMessage(message);
} else{
- throw new Exception ("Not recognized message received, message="+message);
+ unknownMessage(message);
}
}
return;
}
- getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
- }
-
- private void ignoreMessage(Object message){
- LOG.debug("Unhandled message : " + message);
+ getSender().tell(new LocalShardNotFound(message.getShardName()),
+ getSelf());
}
private void memberRemoved(ClusterEvent.MemberRemoved message) {
for(ShardInformation info : localShards.values()){
String shardName = info.getShardName();
- info.updatePeerAddress(getShardActorName(memberName, shardName),
+ info.updatePeerAddress(getShardIdentifier(memberName, shardName),
getShardActorPath(shardName, memberName));
}
}
private void findPrimary(FindPrimary message) {
String shardName = message.getShardName();
- List<String> members =
- configuration.getMembersFromShardName(shardName);
-
// First see if the there is a local replica for the shard
ShardInformation info = localShards.get(shardName);
if(info != null) {
}
}
+ List<String> members =
+ configuration.getMembersFromShardName(shardName);
+
if(cluster.getCurrentMemberName() != null) {
members.remove(cluster.getCurrentMemberName());
}
private String getShardActorPath(String shardName, String memberName) {
Address address = memberNameToAddress.get(memberName);
if(address != null) {
- return address.toString() + "/user/shardmanager-" + this.type + "/"
- + getShardActorName(
- memberName, shardName);
+ StringBuilder builder = new StringBuilder();
+ builder.append(address.toString())
+ .append("/user/")
+ .append(ShardManagerIdentifier.builder().type(type).build().toString())
+ .append("/")
+ .append(getShardIdentifier(memberName, shardName));
+ return builder.toString();
}
return null;
}
* @param shardName
* @return
*/
- private String getShardActorName(String memberName, String shardName){
- return memberName + "-shard-" + shardName + "-" + this.type;
+ private ShardIdentifier getShardIdentifier(String memberName, String shardName){
+ return ShardIdentifier.builder().memberName(memberName).shardName(shardName).type(type).build();
}
/**
List<String> memberShardNames =
this.configuration.getMemberShardNames(memberName);
+ List<String> localShardActorNames = new ArrayList<>();
for(String shardName : memberShardNames){
- String shardActorName = getShardActorName(memberName, shardName);
- Map<String, String> peerAddresses = getPeerAddresses(shardName);
+ ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
+ Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
ActorRef actor = getContext()
- .actorOf(Shard.props(shardActorName, peerAddresses),
- shardActorName);
+ .actorOf(Shard.props(shardId, peerAddresses),
+ shardId.toString());
+ localShardActorNames.add(shardId.toString());
localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
}
+ mBean = ShardManagerInfo
+ .createShardManagerMBean("shard-manager-" + this.type, localShardActorNames);
+
}
/**
* @param shardName
* @return
*/
- private Map<String, String> getPeerAddresses(String shardName){
+ private Map<ShardIdentifier, String> getPeerAddresses(String shardName){
- Map<String, String> peerAddresses = new HashMap<>();
+ Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
List<String> members =
this.configuration.getMembersFromShardName(shardName);
for(String memberName : members){
if(!currentMemberName.equals(memberName)){
- String shardActorName = getShardActorName(memberName, shardName);
+ ShardIdentifier shardId = getShardIdentifier(memberName,
+ shardName);
String path =
getShardActorPath(shardName, currentMemberName);
- peerAddresses.put(shardActorName, path);
+ peerAddresses.put(shardId, path);
}
}
return peerAddresses;
}
-
@Override
public SupervisorStrategy supervisorStrategy() {
return new OneForOneStrategy(10, Duration.create("1 minute"),
private final String shardName;
private final ActorRef actor;
private final ActorPath actorPath;
- private final Map<String, String> peerAddresses;
+ private final Map<ShardIdentifier, String> peerAddresses;
private ShardInformation(String shardName, ActorRef actor,
- Map<String, String> peerAddresses) {
+ Map<ShardIdentifier, String> peerAddresses) {
this.shardName = shardName;
this.actor = actor;
this.actorPath = actor.path();
return actorPath;
}
- public Map<String, String> getPeerAddresses() {
- return peerAddresses;
- }
-
- public void updatePeerAddress(String peerId, String peerAddress){
- LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
+ public void updatePeerAddress(ShardIdentifier peerId, String peerAddress){
+ LOG.info("updatePeerAddress for peer {} with address {}", peerId,
+ peerAddress);
if(peerAddresses.containsKey(peerId)){
peerAddresses.put(peerId, peerAddress);
- LOG.info("Sending PeerAddressResolved for peer {} with address {} to {}", peerId, peerAddress, actor.path());
+ LOG.debug(
+ "Sending PeerAddressResolved for peer {} with address {} to {}",
+ peerId, peerAddress, actor.path());
actor
.tell(new PeerAddressResolved(peerId, peerAddress),
}
}
}
+
+
+
import akka.japi.Creator;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
protected ShardTransaction(DOMStoreTransactionChain transactionChain,
ActorRef shardActor, SchemaContext schemaContext) {
this.transactionChain = transactionChain;
- //this.transaction = transaction;
this.shardActor = shardActor;
this.schemaContext = schemaContext;
}
getSender().tell(new GetCompositeModificationReply(
new ImmutableCompositeModification(modification)), getSelf());
}else{
- throw new Exception ("ShardTransaction:handleRecieve received an unknown message"+message);
+ throw new UnknownMessageException(message);
}
}
}
protected void deleteData(DOMStoreWriteTransaction transaction, DeleteData message) {
+ LOG.debug("deleteData at path : " + message.getPath().toString());
modification.addModification(new DeleteModification(message.getPath()));
try {
transaction.delete(message.getPath());
chain.close();
getSender().tell(new CloseTransactionChainReply().toSerializable(), getSelf());
}else{
- throw new Exception("Not recognized message recieved="+message);
+ unknownMessage(message);
}
}
+ private ActorRef getShardActor(){
+ return getContext().parent();
+ }
+
private ActorRef createTypedTransactionActor(CreateTransaction createTransaction,String transactionId){
if(createTransaction.getTransactionType()== TransactionProxy.TransactionType.READ_ONLY.ordinal()){
return getContext().actorOf(
- ShardTransaction.props( chain.newReadOnlyTransaction(), getSelf(), schemaContext), transactionId);
+ ShardTransaction.props( chain.newReadOnlyTransaction(), getShardActor(), schemaContext), transactionId);
}else if (createTransaction.getTransactionType()== TransactionProxy.TransactionType.READ_WRITE.ordinal()){
return getContext().actorOf(
- ShardTransaction.props( chain.newReadWriteTransaction(), getSelf(), schemaContext), transactionId);
+ ShardTransaction.props( chain.newReadWriteTransaction(), getShardActor(), schemaContext), transactionId);
}else if (createTransaction.getTransactionType()== TransactionProxy.TransactionType.WRITE_ONLY.ordinal()){
return getContext().actorOf(
- ShardTransaction.props( chain.newWriteOnlyTransaction(), getSelf(), schemaContext), transactionId);
+ ShardTransaction.props( chain.newWriteOnlyTransaction(), getShardActor(), schemaContext), transactionId);
}else{
throw new IllegalArgumentException ("CreateTransaction message has unidentified transaction type="+createTransaction.getTransactionType()) ;
}
} else if (message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
abort(new AbortTransaction());
} else {
- throw new Exception ("Not recognized message received,message="+message);
+ unknownMessage(message);
}
}
Boolean canCommit = future.get();
sender.tell(new CanCommitTransactionReply(canCommit).toSerializable(), self);
} catch (InterruptedException | ExecutionException e) {
- log.error(e, "An exception happened when aborting");
+ log.error(e, "An exception happened when checking canCommit");
}
}
}, getContext().dispatcher());
}
@Override public ListenableFuture<Boolean> canCommit() {
+ LOG.debug("txn {} canCommit", transactionId);
Callable<Boolean> call = new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
for(ActorPath actorPath : cohortPaths){
+
+ Object message = new CanCommitTransaction().toSerializable();
+ LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath);
+
ActorSelection cohort = actorContext.actorSelection(actorPath);
try {
Object response =
actorContext.executeRemoteOperation(cohort,
- new CanCommitTransaction().toSerializable(),
+ message,
ActorContext.ASK_DURATION);
if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
}
}
} catch(RuntimeException e){
+ // FIXME : Need to properly handle this
LOG.error("Unexpected Exception", e);
return false;
}
}
@Override public ListenableFuture<Void> preCommit() {
+ LOG.debug("txn {} preCommit", transactionId);
return voidOperation(new PreCommitTransaction().toSerializable(), PreCommitTransactionReply.SERIALIZABLE_CLASS);
}
@Override public ListenableFuture<Void> abort() {
+ LOG.debug("txn {} abort", transactionId);
return voidOperation(new AbortTransaction().toSerializable(), AbortTransactionReply.SERIALIZABLE_CLASS);
}
@Override public ListenableFuture<Void> commit() {
+ LOG.debug("txn {} commit", transactionId);
return voidOperation(new CommitTransaction().toSerializable(), CommitTransactionReply.SERIALIZABLE_CLASS);
}
for(ActorPath actorPath : cohortPaths){
ActorSelection cohort = actorContext.actorSelection(actorPath);
+ LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath);
+
try {
Object response =
actorContext.executeRemoteOperation(cohort,
import akka.actor.ActorSelection;
import akka.actor.Props;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
private final TransactionType transactionType;
private final ActorContext actorContext;
private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
- private final String identifier;
+ private final TransactionIdentifier identifier;
private final ListeningExecutorService executor;
private final SchemaContext schemaContext;
ListeningExecutorService executor,
SchemaContext schemaContext
) {
+ this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
+ this.transactionType = Preconditions.checkNotNull(transactionType, "transactionType should not be null");
+ this.executor = Preconditions.checkNotNull(executor, "executor should not be null");
+ this.schemaContext = Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
+
+ String memberName = actorContext.getCurrentMemberName();
+ if(memberName == null){
+ memberName = "UNKNOWN-MEMBER";
+ }
+ this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(counter.getAndIncrement()).build();
- this.identifier = actorContext.getCurrentMemberName() + "-txn-" + counter.getAndIncrement();
- this.transactionType = transactionType;
- this.actorContext = actorContext;
- this.executor = executor;
- this.schemaContext = schemaContext;
-
+ LOG.debug("Created txn {}", identifier);
}
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
final YangInstanceIdentifier path) {
+ LOG.debug("txn {} read {}", identifier, path);
+
createTransactionIfMissing(actorContext, path);
return transactionContext(path).readData(path);
@Override
public void write(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ LOG.debug("txn {} write {}", identifier, path);
+
createTransactionIfMissing(actorContext, path);
transactionContext(path).writeData(path, data);
@Override
public void merge(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ LOG.debug("txn {} merge {}", identifier, path);
+
createTransactionIfMissing(actorContext, path);
transactionContext(path).mergeData(path, data);
@Override
public void delete(YangInstanceIdentifier path) {
+ LOG.debug("txn {} delete {}", identifier, path);
+
createTransactionIfMissing(actorContext, path);
transactionContext(path).deleteData(path);
public DOMStoreThreePhaseCommitCohort ready() {
List<ActorPath> cohortPaths = new ArrayList<>();
+ LOG.debug("txn {} Trying to get {} transactions ready for commit", identifier, remoteTransactionPaths.size());
+
for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
+
+ LOG.debug("txn {} Readying transaction for shard {}", identifier, transactionContext.getShardName());
+
Object result = transactionContext.readyTransaction();
if(result.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)){
}
}
- return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier, executor);
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier.toString(), executor);
}
@Override
try {
Object response = actorContext.executeShardOperation(shardName,
- new CreateTransaction(identifier,this.transactionType.ordinal() ).toSerializable(),
+ new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable(),
ActorContext.ASK_DURATION);
if (response.getClass()
.equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
String transactionPath = reply.getTransactionPath();
- LOG.info("Received transaction path = {}" , transactionPath );
+ LOG.debug("txn {} Received transaction path = {}", identifier, transactionPath);
ActorSelection transactionActor =
actorContext.actorSelection(transactionPath);
remoteTransactionPaths.put(shardName, transactionContext);
}
} catch(TimeoutException | PrimaryNotFoundException e){
- LOG.error("Creating NoOpTransaction because of : {}", e.getMessage());
+ LOG.error("txn {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
remoteTransactionPaths.put(shardName,
new NoOpTransactionContext(shardName));
}
}
@Override public void closeTransaction() {
- LOG.error("closeTransaction called");
+ LOG.warn("txn {} closeTransaction called", identifier);
}
@Override public Object readyTransaction() {
- LOG.error("readyTransaction called");
+ LOG.warn("txn {} readyTransaction called", identifier);
cohort = actorContext.getActorSystem().actorOf(Props.create(NoOpCohort.class));
return new ReadyTransactionReply(cohort.path()).toSerializable();
}
@Override public void deleteData(YangInstanceIdentifier path) {
- LOG.error("deleteData called path = {}", path);
+ LOG.warn("txt {} deleteData called path = {}", identifier, path);
}
@Override public void mergeData(YangInstanceIdentifier path,
NormalizedNode<?, ?> data) {
- LOG.error("mergeData called path = {}", path);
+ LOG.warn("txn {} mergeData called path = {}", identifier, path);
}
@Override
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
YangInstanceIdentifier path) {
- LOG.error("readData called path = {}", path);
+ LOG.warn("txn {} readData called path = {}", identifier, path);
return Futures.immediateCheckedFuture(
Optional.<NormalizedNode<?, ?>>absent());
}
@Override public void writeData(YangInstanceIdentifier path,
NormalizedNode<?, ?> data) {
- LOG.error("writeData called path = {}", path);
+ LOG.warn("txn {} writeData called path = {}", identifier, path);
}
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.exceptions;
+
+public class UnknownMessageException extends Exception {
+ private final Object message;
+
+ public UnknownMessageException(Object message) {
+ this.message = message;
+ }
+
+ @Override public String getMessage() {
+ return "Unknown message received " + " - " + message;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.identifiers;
+
+import com.google.common.base.Preconditions;
+
+public class ShardIdentifier {
+ private final String shardName;
+ private final String memberName;
+ private final String type;
+
+
+ public ShardIdentifier(String shardName, String memberName, String type) {
+
+ Preconditions.checkNotNull(shardName, "shardName should not be null");
+ Preconditions.checkNotNull(memberName, "memberName should not be null");
+ Preconditions.checkNotNull(type, "type should not be null");
+
+ this.shardName = shardName;
+ this.memberName = memberName;
+ this.type = type;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ShardIdentifier that = (ShardIdentifier) o;
+
+ if (!memberName.equals(that.memberName)) {
+ return false;
+ }
+ if (!shardName.equals(that.shardName)) {
+ return false;
+ }
+ if (!type.equals(that.type)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = shardName.hashCode();
+ result = 31 * result + memberName.hashCode();
+ result = 31 * result + type.hashCode();
+ return result;
+ }
+
+ @Override public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append(memberName).append("-shard-").append(shardName).append("-").append(type);
+ return builder.toString();
+ }
+
+ public static Builder builder(){
+ return new Builder();
+ }
+
+ public static class Builder {
+ private String shardName;
+ private String memberName;
+ private String type;
+
+ public ShardIdentifier build(){
+ return new ShardIdentifier(shardName, memberName, type);
+ }
+
+ public Builder shardName(String shardName){
+ this.shardName = shardName;
+ return this;
+ }
+
+ public Builder memberName(String memberName){
+ this.memberName = memberName;
+ return this;
+ }
+
+ public Builder type(String type){
+ this.type = type;
+ return this;
+ }
+
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.identifiers;
+
+public class ShardManagerIdentifier {
+ private final String type;
+
+ public ShardManagerIdentifier(String type) {
+ this.type = type;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ShardManagerIdentifier that = (ShardManagerIdentifier) o;
+
+ if (!type.equals(that.type)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return type.hashCode();
+ }
+
+ @Override public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("shardmanager-").append(type);
+ return builder.toString();
+ }
+
+ public static Builder builder(){
+ return new Builder();
+ }
+
+ public static class Builder {
+ private String type;
+
+ public Builder type(String type){
+ this.type = type;
+ return this;
+ }
+
+ public ShardManagerIdentifier build(){
+ return new ShardManagerIdentifier(this.type);
+ }
+
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.identifiers;
+
+import com.google.common.base.Preconditions;
+
+public class ShardTransactionIdentifier {
+ private final String remoteTransactionId;
+
+ public ShardTransactionIdentifier(String remoteTransactionId) {
+ this.remoteTransactionId = Preconditions.checkNotNull(remoteTransactionId, "remoteTransactionId should not be null");
+ }
+
+ public static Builder builder(){
+ return new Builder();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ShardTransactionIdentifier that = (ShardTransactionIdentifier) o;
+
+ if (!remoteTransactionId.equals(that.remoteTransactionId)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return remoteTransactionId.hashCode();
+ }
+
+ @Override public String toString() {
+ final StringBuilder sb =
+ new StringBuilder();
+ sb.append("shard-").append(remoteTransactionId);
+ return sb.toString();
+ }
+
+ public static class Builder {
+ private String remoteTransactionId;
+
+ public Builder remoteTransactionId(String remoteTransactionId){
+ this.remoteTransactionId = remoteTransactionId;
+ return this;
+ }
+
+ public ShardTransactionIdentifier build(){
+ return new ShardTransactionIdentifier(remoteTransactionId);
+ }
+
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.identifiers;
+
+import com.google.common.base.Preconditions;
+
+public class TransactionIdentifier {
+ private final String memberName;
+ private final long counter;
+
+
+ public TransactionIdentifier(String memberName, long counter) {
+ this.memberName = Preconditions.checkNotNull(memberName, "memberName should not be null");
+ this.counter = counter;
+ }
+
+ public static Builder builder(){
+ return new Builder();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ TransactionIdentifier that = (TransactionIdentifier) o;
+
+ if (counter != that.counter) {
+ return false;
+ }
+ if (!memberName.equals(that.memberName)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = memberName.hashCode();
+ result = 31 * result + (int) (counter ^ (counter >>> 32));
+ return result;
+ }
+
+ @Override public String toString() {
+ final StringBuilder sb =
+ new StringBuilder();
+ sb.append(memberName).append("-txn-").append(counter);
+ return sb.toString();
+ }
+
+ public static class Builder {
+ private String memberName;
+ private long counter;
+
+ public TransactionIdentifier build(){
+ return new TransactionIdentifier(memberName, counter);
+ }
+
+ public Builder memberName(String memberName){
+ this.memberName = memberName;
+ return this;
+ }
+
+ public Builder counter(long counter){
+ this.counter = counter;
+ return this;
+ }
+ }
+}
public static String BASE_JMX_PREFIX = "org.opendaylight.controller:";
public static String JMX_TYPE_DISTRIBUTED_DATASTORE = "DistributedDatastore";
public static String JMX_CATEGORY_SHARD = "Shard";
+ public static String JMX_CATEGORY_SHARD_MANAGER = "ShardManager";
private static final Logger LOG = LoggerFactory
.getLogger(AbstractBaseMBean.class);
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager;
+
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.AbstractBaseMBean;
+
+import java.util.List;
+
+public class ShardManagerInfo extends AbstractBaseMBean implements
+ ShardManagerInfoMBean {
+
+ private final String name;
+ private final List<String> localShards;
+
+ public ShardManagerInfo(String name, List<String> localShards) {
+ this.name = name;
+ this.localShards = localShards;
+ }
+
+
+ @Override protected String getMBeanName() {
+ return name;
+ }
+
+ @Override protected String getMBeanType() {
+ return JMX_TYPE_DISTRIBUTED_DATASTORE;
+ }
+
+ @Override protected String getMBeanCategory() {
+ return JMX_CATEGORY_SHARD_MANAGER;
+ }
+
+ public static ShardManagerInfo createShardManagerMBean(String name, List<String> localShards){
+ ShardManagerInfo shardManagerInfo = new ShardManagerInfo(name,
+ localShards);
+
+ shardManagerInfo.registerMBean();
+
+ return shardManagerInfo;
+ }
+
+ @Override public List<String> getLocalShards() {
+ return localShards;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager;
+
+import java.util.List;
+
+public interface ShardManagerInfoMBean {
+ List<String> getLocalShards();
+}
package org.opendaylight.controller.cluster.datastore.messages;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+
public class PeerAddressResolved {
- private final String peerId;
+ private final ShardIdentifier peerId;
private final String peerAddress;
- public PeerAddressResolved(String peerId, String peerAddress) {
+ public PeerAddressResolved(ShardIdentifier peerId, String peerAddress) {
this.peerId = peerId;
this.peerAddress = peerAddress;
}
- public String getPeerId() {
+ public ShardIdentifier getPeerId() {
return peerId;
}
odl-cluster-data {
akka {
+ loggers = ["akka.event.slf4j.Slf4jLogger"]
cluster {
roles = [
"member-1"
odl-cluster-rpc {
akka {
+ loggers = ["akka.event.slf4j.Slf4jLogger"]
actor {
provider = "akka.cluster.ClusterActorRefProvider"
import akka.actor.Props;
import akka.event.Logging;
import akka.testkit.JavaTestKit;
-import junit.framework.Assert;
import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
import java.util.Collections;
import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
public class BasicIntegrationTest extends AbstractActorTest {
new JavaTestKit(getSystem()) {{
- final Props props = Shard.props("config", Collections.EMPTY_MAP);
+ final ShardIdentifier identifier =
+ ShardIdentifier.builder().memberName("member-1")
+ .shardName("inventory").type("config").build();
+
+ final Props props = Shard.props(identifier, Collections.EMPTY_MAP);
final ActorRef shard = getSystem().actorOf(props);
new Within(duration("5 seconds")) {
}
}.get(); // this extracts the received message
- Assert.assertNotNull(transactionChain);
+ assertNotNull(transactionChain);
System.out.println("Successfully created transaction chain");
}
}.get(); // this extracts the received message
- Assert.assertNotNull(transaction);
+ assertNotNull(transaction);
System.out.println("Successfully created transaction");
}
}.get(); // this extracts the received message
- Assert.assertTrue(writeDone);
+ assertTrue(writeDone);
System.out.println("Successfully wrote data");
}
}.get(); // this extracts the received message
- Assert.assertNotNull(cohort);
+ assertNotNull(cohort);
System.out.println("Successfully readied the transaction");
}
}.get(); // this extracts the received message
- Assert.assertTrue(preCommitDone);
+ assertTrue(preCommitDone);
System.out.println("Successfully pre-committed the transaction");
import java.io.File;
import java.util.List;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class ConfigurationImplTest {
assertTrue(memberShardNames.contains("people-1"));
assertTrue(memberShardNames.contains("cars-1"));
+
+ // Retrieve once again to hit cache
+
+ memberShardNames =
+ configuration.getMemberShardNames("member-1");
+
+ assertTrue(memberShardNames.contains("people-1"));
+ assertTrue(memberShardNames.contains("cars-1"));
+
+ }
+
+ @Test
+ public void testGetMembersFromShardName(){
+ List<String> members =
+ configuration.getMembersFromShardName("default");
+
+ assertEquals(3, members.size());
+
+ assertTrue(members.contains("member-1"));
+ assertTrue(members.contains("member-2"));
+ assertTrue(members.contains("member-3"));
+
+ assertFalse(members.contains("member-26"));
+
+ // Retrieve once again to hit cache
+ members =
+ configuration.getMembersFromShardName("default");
+
+ assertEquals(3, members.size());
+
+ assertTrue(members.contains("member-1"));
+ assertTrue(members.contains("member-2"));
+ assertTrue(members.contains("member-3"));
+
+ assertFalse(members.contains("member-26"));
+
+
+ // Try to find a shard which is not present
+
+ members =
+ configuration.getMembersFromShardName("foobar");
+
+ assertEquals(0, members.size());
}
@Test
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
import akka.actor.Props;
-import junit.framework.Assert;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
public class DistributedDataStoreTest extends AbstractActorTest{
private DistributedDataStore distributedDataStore;
private MockActorContext mockActorContext;
private ActorRef doNothingActorRef;
- @org.junit.Before
+ @Before
public void setUp() throws Exception {
ShardStrategyFactory.setConfiguration(new MockConfiguration());
final Props props = Props.create(DoNothingActor.class);
doNothingActorRef = getSystem().actorOf(props);
mockActorContext = new MockActorContext(getSystem(), doNothingActorRef);
- distributedDataStore = new DistributedDataStore(mockActorContext, "config");
+ distributedDataStore = new DistributedDataStore(mockActorContext);
distributedDataStore.onGlobalContextUpdated(
TestModel.createTestContext());
.build());
}
- @org.junit.After
+ @After
public void tearDown() throws Exception {
}
- @org.junit.Test
+ @Test
+ public void testConstructor(){
+ ActorSystem actorSystem = mock(ActorSystem.class);
+
+ new DistributedDataStore(actorSystem, "config",
+ mock(ClusterWrapper.class), mock(Configuration.class));
+
+ verify(actorSystem).actorOf(any(Props.class), eq("shardmanager-config"));
+ }
+
+ @Test
public void testRegisterChangeListenerWhenShardIsNotLocal() throws Exception {
ListenerRegistration registration =
}, AsyncDataBroker.DataChangeScope.BASE);
// Since we do not expect the shard to be local registration will return a NoOpRegistration
- Assert.assertTrue(registration instanceof NoOpDataChangeListenerRegistration);
+ assertTrue(registration instanceof NoOpDataChangeListenerRegistration);
- Assert.assertNotNull(registration);
+ assertNotNull(registration);
}
- @org.junit.Test
+ @Test
public void testRegisterChangeListenerWhenShardIsLocal() throws Exception {
mockActorContext.setExecuteLocalShardOperationResponse(new RegisterChangeListenerReply(doNothingActorRef.path()));
}
}, AsyncDataBroker.DataChangeScope.BASE);
- Assert.assertTrue(registration instanceof DataChangeListenerRegistrationProxy);
+ assertTrue(registration instanceof DataChangeListenerRegistrationProxy);
- Assert.assertNotNull(registration);
+ assertNotNull(registration);
}
- @org.junit.Test
+ @Test
public void testCreateTransactionChain() throws Exception {
final DOMStoreTransactionChain transactionChain = distributedDataStore.createTransactionChain();
- Assert.assertNotNull(transactionChain);
+ assertNotNull(transactionChain);
}
- @org.junit.Test
+ @Test
public void testNewReadOnlyTransaction() throws Exception {
final DOMStoreReadTransaction transaction = distributedDataStore.newReadOnlyTransaction();
- Assert.assertNotNull(transaction);
+ assertNotNull(transaction);
}
- @org.junit.Test
+ @Test
public void testNewWriteOnlyTransaction() throws Exception {
final DOMStoreWriteTransaction transaction = distributedDataStore.newWriteOnlyTransaction();
- Assert.assertNotNull(transaction);
+ assertNotNull(transaction);
}
- @org.junit.Test
+ @Test
public void testNewReadWriteTransaction() throws Exception {
final DOMStoreReadWriteTransaction transaction = distributedDataStore.newReadWriteTransaction();
- Assert.assertNotNull(transaction);
+ assertNotNull(transaction);
}
}
import akka.testkit.JavaTestKit;
import junit.framework.Assert;
import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
@Test
public void testOnReceiveCreateTransactionChain() throws Exception {
new JavaTestKit(getSystem()) {{
- final Props props = Shard.props("config", Collections.EMPTY_MAP);
+ final ShardIdentifier identifier =
+ ShardIdentifier.builder().memberName("member-1")
+ .shardName("inventory").type("config").build();
+
+ final Props props = Shard.props(identifier, Collections.EMPTY_MAP);
final ActorRef subject =
getSystem().actorOf(props, "testCreateTransactionChain");
@Test
public void testOnReceiveRegisterListener() throws Exception {
new JavaTestKit(getSystem()) {{
- final Props props = Shard.props("config", Collections.EMPTY_MAP);
+ final ShardIdentifier identifier =
+ ShardIdentifier.builder().memberName("member-1")
+ .shardName("inventory").type("config").build();
+
+ final Props props = Shard.props(identifier, Collections.EMPTY_MAP);
final ActorRef subject =
getSystem().actorOf(props, "testRegisterChangeListener");
@Test
public void testCreateTransaction(){
new JavaTestKit(getSystem()) {{
- final Props props = Shard.props("config", Collections.EMPTY_MAP);
+ final ShardIdentifier identifier =
+ ShardIdentifier.builder().memberName("member-1")
+ .shardName("inventory").type("config").build();
+
+ final Props props = Shard.props(identifier, Collections.EMPTY_MAP);
final ActorRef subject =
getSystem().actorOf(props, "testCreateTransaction");
@Test
public void testPeerAddressResolved(){
new JavaTestKit(getSystem()) {{
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put("member-2", null);
- final Props props = Shard.props("config", peerAddresses);
+ Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
+
+ final ShardIdentifier identifier =
+ ShardIdentifier.builder().memberName("member-1")
+ .shardName("inventory").type("config").build();
+
+ peerAddresses.put(identifier, null);
+ final Props props = Shard.props(identifier, peerAddresses);
final ActorRef subject =
getSystem().actorOf(props, "testPeerAddressResolved");
protected void run() {
subject.tell(
- new PeerAddressResolved("member-2", "akka://foobar"),
+ new PeerAddressResolved(identifier, "akka://foobar"),
getRef());
expectNoMsg();
import akka.testkit.TestActorRef;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
-import org.junit.Assert;
import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import java.util.Collections;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
private static final SchemaContext testSchemaContext =
TestModel.createTestContext();
+ private static final ShardIdentifier SHARD_IDENTIFIER =
+ ShardIdentifier.builder().memberName("member-1")
+ .shardName("inventory").type("config").build();
+
static {
store.onGlobalContextUpdated(testSchemaContext);
}
- @Test
- public void testNegativePerformingWriteOperationOnReadTransaction()
- throws Exception {
- try {
-
- final ActorRef
- shard = getSystem()
- .actorOf(Shard.props("config", Collections.EMPTY_MAP));
- final Props props =
- ShardTransaction
- .props(store.newReadOnlyTransaction(), shard, TestModel
- .createTestContext());
- final TestActorRef subject = TestActorRef.apply(props, getSystem());
-
- subject
- .receive(new DeleteData(TestModel.TEST_PATH).toSerializable(),
- ActorRef.noSender());
- Assert.assertFalse(true);
-
-
- } catch (Exception cs) {
- assertEquals(cs.getClass().getSimpleName(),
- Exception.class.getSimpleName());
- assertTrue(cs.getMessage().startsWith(
- "ShardTransaction:handleRecieve received an unknown message"));
- }
- }
-
@Test(expected = ReadFailedException.class)
public void testNegativeReadWithReadOnlyTransactionClosed()
throws Throwable {
final ActorRef shard =
- getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+ getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
final Props props =
ShardTransaction.props(store.newReadOnlyTransaction(), shard,
TestModel.createTestContext());
throws Throwable {
final ActorRef shard =
- getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+ getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
final Props props =
ShardTransaction.props(store.newReadWriteTransaction(), shard,
TestModel.createTestContext());
}
-
@Test(expected = IllegalStateException.class)
public void testNegativeWriteWithTransactionReady() throws Exception {
final ActorRef shard =
- getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+ getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
final Props props =
ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
TestModel.createTestContext());
final ActorRef shard =
- getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+ getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
final Props props =
ShardTransaction.props(store.newReadWriteTransaction(), shard,
TestModel.createTestContext());
final ActorRef shard =
- getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+ getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
final Props props =
ShardTransaction.props(store.newReadWriteTransaction(), shard,
TestModel.createTestContext());
final ActorRef shard =
- getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+ getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
final Props props =
ShardTransaction.props(store.newReadWriteTransaction(), shard,
TestModel.createTestContext());
}
+
+
}
import com.google.common.util.concurrent.MoreExecutors;
import org.junit.Assert;
import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
private static final SchemaContext testSchemaContext = TestModel.createTestContext();
+ private static final ShardIdentifier SHARD_IDENTIFIER =
+ ShardIdentifier.builder().memberName("member-1")
+ .shardName("inventory").type("config").build();
+
+
static {
store.onGlobalContextUpdated(testSchemaContext);
}
@Test
public void testOnReceiveReadData() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
final Props props =
ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext);
final ActorRef subject = getSystem().actorOf(props, "testReadData");
@Test
public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
final Props props =
ShardTransaction.props( store.newReadOnlyTransaction(), shard, testSchemaContext);
final ActorRef subject = getSystem().actorOf(props, "testReadDataWhenDataNotFound");
@Test
public void testOnReceiveWriteData() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
final Props props =
ShardTransaction.props(store.newWriteOnlyTransaction(), shard, TestModel.createTestContext());
final ActorRef subject =
@Test
public void testOnReceiveMergeData() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
final Props props =
ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext);
final ActorRef subject =
@Test
public void testOnReceiveDeleteData() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
final Props props =
ShardTransaction.props( store.newWriteOnlyTransaction(), shard, TestModel.createTestContext());
final ActorRef subject =
@Test
public void testOnReceiveReadyTransaction() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
final Props props =
ShardTransaction.props( store.newReadWriteTransaction(), shard, TestModel.createTestContext());
final ActorRef subject =
@Test
public void testOnReceiveCloseTransaction() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shard = getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
final Props props =
ShardTransaction.props(store.newReadWriteTransaction(), shard, TestModel.createTestContext());
final ActorRef subject =
public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
try {
- final ActorRef shard = getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+ final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
final Props props =
ShardTransaction.props(store.newReadOnlyTransaction(), shard, TestModel.createTestContext());
final TestActorRef subject = TestActorRef.apply(props,getSystem());
} catch (Exception cs) {
- assertEquals(cs.getClass().getSimpleName(), Exception.class.getSimpleName());
- assertTrue(cs.getMessage().startsWith("ShardTransaction:handleRecieve received an unknown message"));
+ assertEquals(UnknownMessageException.class.getSimpleName(), cs.getClass().getSimpleName());
+ assertTrue(cs.getMessage(), cs.getMessage().startsWith("Unknown message received "));
}
}
}
--- /dev/null
+package org.opendaylight.controller.cluster.datastore.identifiers;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ShardIdentifierTest {
+
+ @Test
+ public void testBasic(){
+ ShardIdentifier id = ShardIdentifier.builder().memberName("member-1")
+ .shardName("inventory").type("config").build();
+
+ assertEquals("member-1-shard-inventory-config", id.toString());
+ }
+
+
+}
--- /dev/null
+package org.opendaylight.controller.cluster.datastore.identifiers;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+ public class ShardManagerIdentifierTest {
+
+ @Test
+ public void testIdentifier(){
+ assertEquals("shardmanager-operational", ShardManagerIdentifier.builder().type("operational").build().toString());
+ }
+
+}
akka {
- loggers = [akka.testkit.TestEventListener]
+ loggers = ["akka.testkit.TestEventListener", "akka.event.slf4j.Slf4jLogger"]
+
actor {
serializers {
java = "akka.serialization.JavaSerializer"