From: Ed Warnicke Date: Tue, 12 Aug 2014 23:09:32 +0000 (+0000) Subject: Merge "Optimizations, Monitoring and Logging" X-Git-Tag: release/helium~302 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=886fe1a50077d9dc9c4c36f938fc7c86317cb149;hp=b725fdb758008195a98f7fad0fd3804c363170aa Merge "Optimizations, Monitoring and Logging" --- diff --git a/opendaylight/distribution/opendaylight-karaf/src/main/resources/configuration/logback.xml b/opendaylight/distribution/opendaylight-karaf/src/main/resources/configuration/logback.xml index d1a5dcc416..ed659bf603 100644 --- a/opendaylight/distribution/opendaylight-karaf/src/main/resources/configuration/logback.xml +++ b/opendaylight/distribution/opendaylight-karaf/src/main/resources/configuration/logback.xml @@ -2,7 +2,7 @@ - %date{"yyyy-MM-dd HH:mm:ss.SSS z"} [%thread] %-5level %logger{36} - %msg%n + %date{"yyyy-MM-dd HH:mm:ss.SSS z"} [%thread] %-5level %logger{36} %X{akkaSource} - %msg%n @@ -76,6 +76,10 @@ + + + + + + + + - - - prefix:inmemory-operational-datastore-provider operational-store-service @@ -97,20 +75,11 @@ config-dom-store-spi:config-dom-datastore config-store-service - - operational-dom-store-spi:operational-dom-datastore operational-store-service - - - @@ -147,17 +116,6 @@ - - @@ -168,28 +126,6 @@ - - - - - - - config-dom-store-spi:config-dom-datastore diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index a0bb004607..ae8b6fe8e3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -618,7 +618,7 @@ public abstract class RaftActor extends UntypedPersistentActor { } @Override public void update(long currentTerm, String votedFor) { - LOG.info("Set currentTerm={}, votedFor={}", currentTerm, votedFor); + LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor); this.currentTerm = currentTerm; this.votedFor = votedFor; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index 707c5321f5..251a13d583 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -127,6 +127,9 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { protected RaftState requestVote(ActorRef sender, RequestVote requestVote) { + + context.getLogger().debug(requestVote.toString()); + boolean grantVote = false; // Reply false if term < currentTerm (§5.1) @@ -326,7 +329,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } // Send a local message to the local RaftActor (it's derived class to be // specific to apply the log to it's index) - context.getLogger().info("Setting last applied to {}", index); + context.getLogger().debug("Setting last applied to {}", index); context.setLastApplied(index); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java index c125bd32b6..bb1927ef23 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java @@ -81,7 +81,7 @@ public class Candidate extends AbstractRaftActorBehavior { @Override protected RaftState handleAppendEntries(ActorRef sender, AppendEntries appendEntries) { - context.getLogger().info("Candidate: Received {}", appendEntries.toString()); + context.getLogger().debug(appendEntries.toString()); return state(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index c8cd41dfa1..54e0494b9d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -42,7 +42,7 @@ public class Follower extends AbstractRaftActorBehavior { if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) { context.getLogger() - .info("Follower: Received {}", appendEntries.toString()); + .debug(appendEntries.toString()); } // TODO : Refactor this method into a bunch of smaller methods diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index 8b95e8b7a6..234f9db664 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -120,7 +120,7 @@ public class Leader extends AbstractRaftActorBehavior { @Override protected RaftState handleAppendEntries(ActorRef sender, AppendEntries appendEntries) { - context.getLogger().info("Leader: Received {}", appendEntries.toString()); + context.getLogger().debug(appendEntries.toString()); return state(); } @@ -130,7 +130,7 @@ public class Leader extends AbstractRaftActorBehavior { if(! appendEntriesReply.isSuccess()) { context.getLogger() - .info("Leader: Received {}", appendEntriesReply.toString()); + .debug(appendEntriesReply.toString()); } // Update the FollowerLogInformation @@ -294,12 +294,7 @@ public class Leader extends AbstractRaftActorBehavior { List entries = Collections.emptyList(); if (context.getReplicatedLog().isPresent(nextIndex)) { - // TODO: Instead of sending all entries from nextIndex - // only send a fixed number of entries to each follower - // This is to avoid the situation where there are a lot of - // entries to install for a fresh follower or to a follower - // that has fallen too far behind with the log but yet is not - // eligible to receive a snapshot + // FIXME : Sending one entry at a time entries = context.getReplicatedLog().getFrom(nextIndex, 1); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/pom.xml b/opendaylight/md-sal/sal-distributed-datastore/pom.xml index 848d425bf9..648e8d23d0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/pom.xml +++ b/opendaylight/md-sal/sal-distributed-datastore/pom.xml @@ -40,6 +40,11 @@ akka-testkit_${scala.version} + + com.typesafe.akka + akka-slf4j_${scala.version} + + diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java index ce0516064e..ac01f42a7f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java @@ -33,4 +33,12 @@ public abstract class AbstractUntypedActor extends UntypedActor { } protected abstract void handleReceive(Object message) throws Exception; + + protected void ignoreMessage(Object message){ + LOG.debug("Unhandled message {} ", message); + } + + protected void unknownMessage(Object message) throws Exception{ + unhandled(message); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ClusterWrapperImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ClusterWrapperImpl.java index 142aacde65..8910137ec4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ClusterWrapperImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ClusterWrapperImpl.java @@ -12,18 +12,31 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.cluster.Cluster; import akka.cluster.ClusterEvent; +import com.google.common.base.Preconditions; public class ClusterWrapperImpl implements ClusterWrapper { private final Cluster cluster; private final String currentMemberName; public ClusterWrapperImpl(ActorSystem actorSystem){ + Preconditions.checkNotNull(actorSystem, "actorSystem should not be null"); + cluster = Cluster.get(actorSystem); + + Preconditions.checkState(cluster.getSelfRoles().size() > 0, + "No akka roles were specified\n" + + "One way to specify the member name is to pass a property on the command line like so\n" + + " -Dakka.cluster.roles.0=member-3\n" + + "member-3 here would be the name of the member" + ); + currentMemberName = (String) cluster.getSelfRoles().toArray()[0]; } public void subscribeToMemberEvents(ActorRef actorRef){ + Preconditions.checkNotNull(actorRef, "actorRef should not be null"); + cluster.subscribe(actorRef, ClusterEvent.initialStateAsEvents(), ClusterEvent.MemberEvent.class, ClusterEvent.UnreachableMember.class); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayload.java index abc69f1897..d0abb20718 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayload.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayload.java @@ -29,7 +29,7 @@ public class CompositeModificationPayload extends Payload implements modification = null; } public CompositeModificationPayload(Object modification){ - this.modification = (PersistentMessages.CompositeModification) modification; + this.modification = (PersistentMessages.CompositeModification) Preconditions.checkNotNull(modification, "modification should not be null"); } @Override public Map encode() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConfigurationImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConfigurationImpl.java index 34590025d5..37b565d213 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConfigurationImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConfigurationImpl.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigObject; @@ -34,11 +35,23 @@ public class ConfigurationImpl implements Configuration { private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class); + // Look up maps to speed things up + + // key = memberName, value = list of shardNames + private Map> memberShardNames = new HashMap<>(); + + // key = shardName, value = list of replicaNames (replicaNames are the same as memberNames) + private Map> shardReplicaNames = new HashMap<>(); + public ConfigurationImpl(String moduleShardsConfigPath, String modulesConfigPath){ + Preconditions.checkNotNull(moduleShardsConfigPath, "moduleShardsConfigPath should not be null"); + Preconditions.checkNotNull(modulesConfigPath, "modulesConfigPath should not be null"); + + File moduleShardsFile = new File("./configuration/initial/" + moduleShardsConfigPath); File modulesFile = new File("./configuration/initial/" + modulesConfigPath); @@ -66,6 +79,13 @@ public class ConfigurationImpl implements Configuration { } @Override public List getMemberShardNames(String memberName){ + + Preconditions.checkNotNull(memberName, "memberName should not be null"); + + if(memberShardNames.containsKey(memberName)){ + return memberShardNames.get(memberName); + } + List shards = new ArrayList(); for(ModuleShard ms : moduleShards){ for(Shard s : ms.getShards()){ @@ -76,11 +96,17 @@ public class ConfigurationImpl implements Configuration { } } } + + memberShardNames.put(memberName, shards); + return shards; } @Override public Optional getModuleNameFromNameSpace(String nameSpace) { + + Preconditions.checkNotNull(nameSpace, "nameSpace should not be null"); + for(Module m : modules){ if(m.getNameSpace().equals(nameSpace)){ return Optional.of(m.getName()); @@ -98,6 +124,9 @@ public class ConfigurationImpl implements Configuration { } @Override public List getShardNamesFromModuleName(String moduleName) { + + Preconditions.checkNotNull(moduleName, "moduleName should not be null"); + for(ModuleShard m : moduleShards){ if(m.getModuleName().equals(moduleName)){ List l = new ArrayList<>(); @@ -112,14 +141,23 @@ public class ConfigurationImpl implements Configuration { } @Override public List getMembersFromShardName(String shardName) { - List shards = new ArrayList(); + + Preconditions.checkNotNull(shardName, "shardName should not be null"); + + if(shardReplicaNames.containsKey(shardName)){ + return shardReplicaNames.get(shardName); + } + for(ModuleShard ms : moduleShards){ for(Shard s : ms.getShards()) { if(s.getName().equals(shardName)){ - return s.getReplicas(); + List replicas = s.getReplicas(); + shardReplicaNames.put(shardName, replicas); + return replicas; } } } + shardReplicaNames.put(shardName, Collections.EMPTY_LIST); return Collections.EMPTY_LIST; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java index cdf04dd093..1dab285679 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.Props; import akka.japi.Creator; +import com.google.common.base.Preconditions; import org.opendaylight.controller.cluster.datastore.messages.DataChanged; import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply; import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; @@ -27,9 +28,10 @@ public class DataChangeListener extends AbstractUntypedActor { public DataChangeListener(SchemaContext schemaContext, AsyncDataChangeListener> listener, YangInstanceIdentifier pathId) { - this.listener = listener; - this.schemaContext = schemaContext; - this.pathId = pathId; + + this.schemaContext = Preconditions.checkNotNull(schemaContext, "schemaContext should not be null"); + this.listener = Preconditions.checkNotNull(listener, "listener should not be null"); + this.pathId = Preconditions.checkNotNull(pathId, "pathId should not be null"); } @Override public void handleReceive(Object message) throws Exception { @@ -44,7 +46,7 @@ public class DataChangeListener extends AbstractUntypedActor { notificationsEnabled = message.isEnabled(); } - public void dataChanged(Object message) { + private void dataChanged(Object message) { // Do nothing if notifications are not enabled if(!notificationsEnabled){ diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxy.java index a4ca456268..6d835498af 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxy.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; +import com.google.common.base.Preconditions; import org.opendaylight.controller.cluster.datastore.messages.DataChanged; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; @@ -24,7 +25,7 @@ public class DataChangeListenerProxy implements AsyncDataChangeListener @@ -76,10 +80,11 @@ public class Shard extends RaftActor { Logging.getLogger(getContext().system(), this); // By default persistent will be true and can be turned off using the system - // property persistent + // property shard.persistent private final boolean persistent; - private final String name; + /// The name of this shard + private final ShardIdentifier name; private volatile SchemaContext schemaContext; @@ -87,8 +92,8 @@ public class Shard extends RaftActor { private final List dataChangeListeners = new ArrayList<>(); - private Shard(String name, Map peerAddresses) { - super(name, peerAddresses, Optional.of(configParams)); + private Shard(ShardIdentifier name, Map peerAddresses) { + super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(configParams)); this.name = name; @@ -96,16 +101,32 @@ public class Shard extends RaftActor { this.persistent = !"false".equals(setting); - LOG.info("Creating shard : {} persistent : {}", name, persistent); + LOG.info("Shard created : {} persistent : {}", name, persistent); - store = InMemoryDOMDataStoreFactory.create(name, null); + store = InMemoryDOMDataStoreFactory.create(name.toString(), null); - shardMBean = ShardMBeanFactory.getShardStatsMBean(name); + shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString()); } - public static Props props(final String name, - final Map peerAddresses) { + private static Map mapPeerAddresses(Map peerAddresses){ + Map map = new HashMap<>(); + + for(Map.Entry entry : peerAddresses.entrySet()){ + map.put(entry.getKey().toString(), entry.getValue()); + } + + return map; + } + + + + + public static Props props(final ShardIdentifier name, + final Map peerAddresses) { + Preconditions.checkNotNull(name, "name should not be null"); + Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null"); + return Props.create(new Creator() { @Override @@ -143,39 +164,46 @@ public class Shard extends RaftActor { } } else if (message instanceof PeerAddressResolved) { PeerAddressResolved resolved = (PeerAddressResolved) message; - setPeerAddress(resolved.getPeerId(), resolved.getPeerAddress()); - } else{ + setPeerAddress(resolved.getPeerId().toString(), resolved.getPeerAddress()); + } else { super.onReceiveCommand(message); } } private ActorRef createTypedTransactionActor( - CreateTransaction createTransaction, String transactionId) { + CreateTransaction createTransaction, ShardTransactionIdentifier transactionId) { if (createTransaction.getTransactionType() == TransactionProxy.TransactionType.READ_ONLY.ordinal()) { + shardMBean.incrementReadOnlyTransactionCount(); + return getContext().actorOf( ShardTransaction .props(store.newReadOnlyTransaction(), getSelf(), - schemaContext), transactionId); + schemaContext), transactionId.toString()); } else if (createTransaction.getTransactionType() == TransactionProxy.TransactionType.READ_WRITE.ordinal()) { + shardMBean.incrementReadWriteTransactionCount(); + return getContext().actorOf( ShardTransaction .props(store.newReadWriteTransaction(), getSelf(), - schemaContext), transactionId); + schemaContext), transactionId.toString()); } else if (createTransaction.getTransactionType() == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) { + shardMBean.incrementWriteOnlyTransactionCount(); + return getContext().actorOf( ShardTransaction .props(store.newWriteOnlyTransaction(), getSelf(), - schemaContext), transactionId); + schemaContext), transactionId.toString()); } else { + // FIXME: This does not seem right throw new IllegalArgumentException( "CreateTransaction message has unidentified transaction type=" + createTransaction.getTransactionType()); @@ -184,8 +212,8 @@ public class Shard extends RaftActor { private void createTransaction(CreateTransaction createTransaction) { - String transactionId = "shard-" + createTransaction.getTransactionId(); - LOG.info("Creating transaction : {} ", transactionId); + ShardTransactionIdentifier transactionId = ShardTransactionIdentifier.builder().remoteTransactionId(createTransaction.getTransactionId()).build(); + LOG.debug("Creating transaction : {} ", transactionId); ActorRef transactionActor = createTypedTransactionActor(createTransaction, transactionId); @@ -202,9 +230,9 @@ public class Shard extends RaftActor { DOMStoreThreePhaseCommitCohort cohort = modificationToCohort.remove(serialized); if (cohort == null) { - LOG.error( - "Could not find cohort for modification : {}", modification); - LOG.info("Writing modification using a new transaction"); + LOG.debug( + "Could not find cohort for modification : {}. Writing modification using a new transaction", + modification); DOMStoreReadWriteTransaction transaction = store.newReadWriteTransaction(); modification.apply(transaction); @@ -237,7 +265,6 @@ public class Shard extends RaftActor { self); shardMBean.incrementCommittedTransactionCount(); shardMBean.setLastCommittedTransactionTime(new Date()); - } catch (InterruptedException | ExecutionException e) { shardMBean.incrementFailedTransactionsCount(); sender.tell(new akka.actor.Status.Failure(e),self); @@ -269,7 +296,7 @@ public class Shard extends RaftActor { private void registerChangeListener( RegisterChangeListener registerChangeListener) { - LOG.debug("registerDataChangeListener for " + registerChangeListener + LOG.debug("registerDataChangeListener for {}", registerChangeListener .getPath()); @@ -301,8 +328,8 @@ public class Shard extends RaftActor { DataChangeListenerRegistration.props(registration)); LOG.debug( - "registerDataChangeListener sending reply, listenerRegistrationPath = " - + listenerRegistration.path().toString()); + "registerDataChangeListener sending reply, listenerRegistrationPath = {} " + , listenerRegistration.path().toString()); getSender() .tell(new RegisterChangeListenerReply(listenerRegistration.path()), @@ -330,7 +357,9 @@ public class Shard extends RaftActor { if (modification != null) { commit(clientActor, modification); } else { - LOG.error("modification is null - this is very unexpected"); + LOG.error( + "modification is null - this is very unexpected, clientActor = {}, identifier = {}", + identifier, clientActor.path().toString()); } @@ -338,6 +367,7 @@ public class Shard extends RaftActor { LOG.error("Unknown state received {}", data); } + // Update stats ReplicatedLogEntry lastLogEntry = getLastLogEntry(); if(lastLogEntry != null){ @@ -373,7 +403,7 @@ public class Shard extends RaftActor { } @Override public String persistenceId() { - return this.name; + return this.name.toString(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 64c6821120..6162a0327c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -18,6 +18,10 @@ import akka.cluster.ClusterEvent; import akka.japi.Creator; import akka.japi.Function; import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; +import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo; +import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; @@ -28,6 +32,7 @@ import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import scala.concurrent.duration.Duration; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -61,6 +66,8 @@ public class ShardManager extends AbstractUntypedActor { private final Configuration configuration; + private ShardManagerInfoMBean mBean; + /** * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be * configuration or operational @@ -82,6 +89,11 @@ public class ShardManager extends AbstractUntypedActor { public static Props props(final String type, final ClusterWrapper cluster, final Configuration configuration) { + + Preconditions.checkNotNull(type, "type should not be null"); + Preconditions.checkNotNull(cluster, "cluster should not be null"); + Preconditions.checkNotNull(configuration, "configuration should not be null"); + return Props.create(new Creator() { @Override @@ -108,7 +120,7 @@ public class ShardManager extends AbstractUntypedActor { } else if(message instanceof ClusterEvent.UnreachableMember) { ignoreMessage(message); } else{ - throw new Exception ("Not recognized message received, message="+message); + unknownMessage(message); } } @@ -122,11 +134,8 @@ public class ShardManager extends AbstractUntypedActor { return; } - getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf()); - } - - private void ignoreMessage(Object message){ - LOG.debug("Unhandled message : " + message); + getSender().tell(new LocalShardNotFound(message.getShardName()), + getSelf()); } private void memberRemoved(ClusterEvent.MemberRemoved message) { @@ -140,7 +149,7 @@ public class ShardManager extends AbstractUntypedActor { for(ShardInformation info : localShards.values()){ String shardName = info.getShardName(); - info.updatePeerAddress(getShardActorName(memberName, shardName), + info.updatePeerAddress(getShardIdentifier(memberName, shardName), getShardActorPath(shardName, memberName)); } } @@ -159,9 +168,6 @@ public class ShardManager extends AbstractUntypedActor { private void findPrimary(FindPrimary message) { String shardName = message.getShardName(); - List members = - configuration.getMembersFromShardName(shardName); - // First see if the there is a local replica for the shard ShardInformation info = localShards.get(shardName); if(info != null) { @@ -175,6 +181,9 @@ public class ShardManager extends AbstractUntypedActor { } } + List members = + configuration.getMembersFromShardName(shardName); + if(cluster.getCurrentMemberName() != null) { members.remove(cluster.getCurrentMemberName()); } @@ -196,9 +205,13 @@ public class ShardManager extends AbstractUntypedActor { private String getShardActorPath(String shardName, String memberName) { Address address = memberNameToAddress.get(memberName); if(address != null) { - return address.toString() + "/user/shardmanager-" + this.type + "/" - + getShardActorName( - memberName, shardName); + StringBuilder builder = new StringBuilder(); + builder.append(address.toString()) + .append("/user/") + .append(ShardManagerIdentifier.builder().type(type).build().toString()) + .append("/") + .append(getShardIdentifier(memberName, shardName)); + return builder.toString(); } return null; } @@ -211,8 +224,8 @@ public class ShardManager extends AbstractUntypedActor { * @param shardName * @return */ - private String getShardActorName(String memberName, String shardName){ - return memberName + "-shard-" + shardName + "-" + this.type; + private ShardIdentifier getShardIdentifier(String memberName, String shardName){ + return ShardIdentifier.builder().memberName(memberName).shardName(shardName).type(type).build(); } /** @@ -225,15 +238,20 @@ public class ShardManager extends AbstractUntypedActor { List memberShardNames = this.configuration.getMemberShardNames(memberName); + List localShardActorNames = new ArrayList<>(); for(String shardName : memberShardNames){ - String shardActorName = getShardActorName(memberName, shardName); - Map peerAddresses = getPeerAddresses(shardName); + ShardIdentifier shardId = getShardIdentifier(memberName, shardName); + Map peerAddresses = getPeerAddresses(shardName); ActorRef actor = getContext() - .actorOf(Shard.props(shardActorName, peerAddresses), - shardActorName); + .actorOf(Shard.props(shardId, peerAddresses), + shardId.toString()); + localShardActorNames.add(shardId.toString()); localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses)); } + mBean = ShardManagerInfo + .createShardManagerMBean("shard-manager-" + this.type, localShardActorNames); + } /** @@ -242,9 +260,9 @@ public class ShardManager extends AbstractUntypedActor { * @param shardName * @return */ - private Map getPeerAddresses(String shardName){ + private Map getPeerAddresses(String shardName){ - Map peerAddresses = new HashMap<>(); + Map peerAddresses = new HashMap<>(); List members = this.configuration.getMembersFromShardName(shardName); @@ -253,16 +271,16 @@ public class ShardManager extends AbstractUntypedActor { for(String memberName : members){ if(!currentMemberName.equals(memberName)){ - String shardActorName = getShardActorName(memberName, shardName); + ShardIdentifier shardId = getShardIdentifier(memberName, + shardName); String path = getShardActorPath(shardName, currentMemberName); - peerAddresses.put(shardActorName, path); + peerAddresses.put(shardId, path); } } return peerAddresses; } - @Override public SupervisorStrategy supervisorStrategy() { return new OneForOneStrategy(10, Duration.create("1 minute"), @@ -280,10 +298,10 @@ public class ShardManager extends AbstractUntypedActor { private final String shardName; private final ActorRef actor; private final ActorPath actorPath; - private final Map peerAddresses; + private final Map peerAddresses; private ShardInformation(String shardName, ActorRef actor, - Map peerAddresses) { + Map peerAddresses) { this.shardName = shardName; this.actor = actor; this.actorPath = actor.path(); @@ -302,16 +320,15 @@ public class ShardManager extends AbstractUntypedActor { return actorPath; } - public Map getPeerAddresses() { - return peerAddresses; - } - - public void updatePeerAddress(String peerId, String peerAddress){ - LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress); + public void updatePeerAddress(ShardIdentifier peerId, String peerAddress){ + LOG.info("updatePeerAddress for peer {} with address {}", peerId, + peerAddress); if(peerAddresses.containsKey(peerId)){ peerAddresses.put(peerId, peerAddress); - LOG.info("Sending PeerAddressResolved for peer {} with address {} to {}", peerId, peerAddress, actor.path()); + LOG.debug( + "Sending PeerAddressResolved for peer {} with address {} to {}", + peerId, peerAddress, actor.path()); actor .tell(new PeerAddressResolved(peerId, peerAddress), @@ -321,3 +338,6 @@ public class ShardManager extends AbstractUntypedActor { } } } + + + diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java index 312ec9a4ff..1ffe5ca402 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java @@ -15,6 +15,7 @@ import akka.event.LoggingAdapter; import akka.japi.Creator; import com.google.common.base.Optional; import com.google.common.util.concurrent.ListenableFuture; +import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.DeleteData; import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply; @@ -89,7 +90,6 @@ public abstract class ShardTransaction extends AbstractUntypedActor { protected ShardTransaction(DOMStoreTransactionChain transactionChain, ActorRef shardActor, SchemaContext schemaContext) { this.transactionChain = transactionChain; - //this.transaction = transaction; this.shardActor = shardActor; this.schemaContext = schemaContext; } @@ -173,7 +173,7 @@ public abstract class ShardTransaction extends AbstractUntypedActor { getSender().tell(new GetCompositeModificationReply( new ImmutableCompositeModification(modification)), getSelf()); }else{ - throw new Exception ("ShardTransaction:handleRecieve received an unknown message"+message); + throw new UnknownMessageException(message); } } @@ -232,6 +232,7 @@ public abstract class ShardTransaction extends AbstractUntypedActor { } protected void deleteData(DOMStoreWriteTransaction transaction, DeleteData message) { + LOG.debug("deleteData at path : " + message.getPath().toString()); modification.addModification(new DeleteModification(message.getPath())); try { transaction.delete(message.getPath()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java index ce63f1107d..c508255ea4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java @@ -40,23 +40,27 @@ public class ShardTransactionChain extends AbstractUntypedActor { chain.close(); getSender().tell(new CloseTransactionChainReply().toSerializable(), getSelf()); }else{ - throw new Exception("Not recognized message recieved="+message); + unknownMessage(message); } } + private ActorRef getShardActor(){ + return getContext().parent(); + } + private ActorRef createTypedTransactionActor(CreateTransaction createTransaction,String transactionId){ if(createTransaction.getTransactionType()== TransactionProxy.TransactionType.READ_ONLY.ordinal()){ return getContext().actorOf( - ShardTransaction.props( chain.newReadOnlyTransaction(), getSelf(), schemaContext), transactionId); + ShardTransaction.props( chain.newReadOnlyTransaction(), getShardActor(), schemaContext), transactionId); }else if (createTransaction.getTransactionType()== TransactionProxy.TransactionType.READ_WRITE.ordinal()){ return getContext().actorOf( - ShardTransaction.props( chain.newReadWriteTransaction(), getSelf(), schemaContext), transactionId); + ShardTransaction.props( chain.newReadWriteTransaction(), getShardActor(), schemaContext), transactionId); }else if (createTransaction.getTransactionType()== TransactionProxy.TransactionType.WRITE_ONLY.ordinal()){ return getContext().actorOf( - ShardTransaction.props( chain.newWriteOnlyTransaction(), getSelf(), schemaContext), transactionId); + ShardTransaction.props( chain.newWriteOnlyTransaction(), getShardActor(), schemaContext), transactionId); }else{ throw new IllegalArgumentException ("CreateTransaction message has unidentified transaction type="+createTransaction.getTransactionType()) ; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java index a8deb0153a..500b73ce9d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java @@ -67,7 +67,7 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor { } else if (message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) { abort(new AbortTransaction()); } else { - throw new Exception ("Not recognized message received,message="+message); + unknownMessage(message); } } @@ -130,7 +130,7 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor { Boolean canCommit = future.get(); sender.tell(new CanCommitTransactionReply(canCommit).toSerializable(), self); } catch (InterruptedException | ExecutionException e) { - log.error(e, "An exception happened when aborting"); + log.error(e, "An exception happened when checking canCommit"); } } }, getContext().dispatcher()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java index 915b13dd8b..5b447943ea 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java @@ -59,17 +59,22 @@ public class ThreePhaseCommitCohortProxy implements } @Override public ListenableFuture canCommit() { + LOG.debug("txn {} canCommit", transactionId); Callable call = new Callable() { @Override public Boolean call() throws Exception { for(ActorPath actorPath : cohortPaths){ + + Object message = new CanCommitTransaction().toSerializable(); + LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath); + ActorSelection cohort = actorContext.actorSelection(actorPath); try { Object response = actorContext.executeRemoteOperation(cohort, - new CanCommitTransaction().toSerializable(), + message, ActorContext.ASK_DURATION); if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) { @@ -80,6 +85,7 @@ public class ThreePhaseCommitCohortProxy implements } } } catch(RuntimeException e){ + // FIXME : Need to properly handle this LOG.error("Unexpected Exception", e); return false; } @@ -93,14 +99,17 @@ public class ThreePhaseCommitCohortProxy implements } @Override public ListenableFuture preCommit() { + LOG.debug("txn {} preCommit", transactionId); return voidOperation(new PreCommitTransaction().toSerializable(), PreCommitTransactionReply.SERIALIZABLE_CLASS); } @Override public ListenableFuture abort() { + LOG.debug("txn {} abort", transactionId); return voidOperation(new AbortTransaction().toSerializable(), AbortTransactionReply.SERIALIZABLE_CLASS); } @Override public ListenableFuture commit() { + LOG.debug("txn {} commit", transactionId); return voidOperation(new CommitTransaction().toSerializable(), CommitTransactionReply.SERIALIZABLE_CLASS); } @@ -111,6 +120,8 @@ public class ThreePhaseCommitCohortProxy implements for(ActorPath actorPath : cohortPaths){ ActorSelection cohort = actorContext.actorSelection(actorPath); + LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath); + try { Object response = actorContext.executeRemoteOperation(cohort, diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index fa98905a66..5f9f1f83c4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -13,11 +13,13 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Props; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListeningExecutorService; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; +import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; @@ -75,7 +77,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private final TransactionType transactionType; private final ActorContext actorContext; private final Map remoteTransactionPaths = new HashMap<>(); - private final String identifier; + private final TransactionIdentifier identifier; private final ListeningExecutorService executor; private final SchemaContext schemaContext; @@ -85,13 +87,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { ListeningExecutorService executor, SchemaContext schemaContext ) { + this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null"); + this.transactionType = Preconditions.checkNotNull(transactionType, "transactionType should not be null"); + this.executor = Preconditions.checkNotNull(executor, "executor should not be null"); + this.schemaContext = Preconditions.checkNotNull(schemaContext, "schemaContext should not be null"); + + String memberName = actorContext.getCurrentMemberName(); + if(memberName == null){ + memberName = "UNKNOWN-MEMBER"; + } + this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(counter.getAndIncrement()).build(); - this.identifier = actorContext.getCurrentMemberName() + "-txn-" + counter.getAndIncrement(); - this.transactionType = transactionType; - this.actorContext = actorContext; - this.executor = executor; - this.schemaContext = schemaContext; - + LOG.debug("Created txn {}", identifier); } @@ -99,6 +106,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { public CheckedFuture>, ReadFailedException> read( final YangInstanceIdentifier path) { + LOG.debug("txn {} read {}", identifier, path); + createTransactionIfMissing(actorContext, path); return transactionContext(path).readData(path); @@ -107,6 +116,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public void write(YangInstanceIdentifier path, NormalizedNode data) { + LOG.debug("txn {} write {}", identifier, path); + createTransactionIfMissing(actorContext, path); transactionContext(path).writeData(path, data); @@ -115,6 +126,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public void merge(YangInstanceIdentifier path, NormalizedNode data) { + LOG.debug("txn {} merge {}", identifier, path); + createTransactionIfMissing(actorContext, path); transactionContext(path).mergeData(path, data); @@ -123,6 +136,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public void delete(YangInstanceIdentifier path) { + LOG.debug("txn {} delete {}", identifier, path); + createTransactionIfMissing(actorContext, path); transactionContext(path).deleteData(path); @@ -132,7 +147,12 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { public DOMStoreThreePhaseCommitCohort ready() { List cohortPaths = new ArrayList<>(); + LOG.debug("txn {} Trying to get {} transactions ready for commit", identifier, remoteTransactionPaths.size()); + for(TransactionContext transactionContext : remoteTransactionPaths.values()) { + + LOG.debug("txn {} Readying transaction for shard {}", identifier, transactionContext.getShardName()); + Object result = transactionContext.readyTransaction(); if(result.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)){ @@ -143,7 +163,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } } - return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier, executor); + return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier.toString(), executor); } @Override @@ -180,7 +200,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { try { Object response = actorContext.executeShardOperation(shardName, - new CreateTransaction(identifier,this.transactionType.ordinal() ).toSerializable(), + new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable(), ActorContext.ASK_DURATION); if (response.getClass() .equals(CreateTransactionReply.SERIALIZABLE_CLASS)) { @@ -189,7 +209,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { String transactionPath = reply.getTransactionPath(); - LOG.info("Received transaction path = {}" , transactionPath ); + LOG.debug("txn {} Received transaction path = {}", identifier, transactionPath); ActorSelection transactionActor = actorContext.actorSelection(transactionPath); @@ -200,7 +220,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { remoteTransactionPaths.put(shardName, transactionContext); } } catch(TimeoutException | PrimaryNotFoundException e){ - LOG.error("Creating NoOpTransaction because of : {}", e.getMessage()); + LOG.error("txn {} Creating NoOpTransaction because of : {}", identifier, e.getMessage()); remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName)); } @@ -324,35 +344,35 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } @Override public void closeTransaction() { - LOG.error("closeTransaction called"); + LOG.warn("txn {} closeTransaction called", identifier); } @Override public Object readyTransaction() { - LOG.error("readyTransaction called"); + LOG.warn("txn {} readyTransaction called", identifier); cohort = actorContext.getActorSystem().actorOf(Props.create(NoOpCohort.class)); return new ReadyTransactionReply(cohort.path()).toSerializable(); } @Override public void deleteData(YangInstanceIdentifier path) { - LOG.error("deleteData called path = {}", path); + LOG.warn("txt {} deleteData called path = {}", identifier, path); } @Override public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { - LOG.error("mergeData called path = {}", path); + LOG.warn("txn {} mergeData called path = {}", identifier, path); } @Override public CheckedFuture>, ReadFailedException> readData( YangInstanceIdentifier path) { - LOG.error("readData called path = {}", path); + LOG.warn("txn {} readData called path = {}", identifier, path); return Futures.immediateCheckedFuture( Optional.>absent()); } @Override public void writeData(YangInstanceIdentifier path, NormalizedNode data) { - LOG.error("writeData called path = {}", path); + LOG.warn("txn {} writeData called path = {}", identifier, path); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/UnknownMessageException.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/UnknownMessageException.java new file mode 100644 index 0000000000..f4f2524a8d --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/UnknownMessageException.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.exceptions; + +public class UnknownMessageException extends Exception { + private final Object message; + + public UnknownMessageException(Object message) { + this.message = message; + } + + @Override public String getMessage() { + return "Unknown message received " + " - " + message; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardIdentifier.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardIdentifier.java new file mode 100644 index 0000000000..c692881593 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardIdentifier.java @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.identifiers; + +import com.google.common.base.Preconditions; + +public class ShardIdentifier { + private final String shardName; + private final String memberName; + private final String type; + + + public ShardIdentifier(String shardName, String memberName, String type) { + + Preconditions.checkNotNull(shardName, "shardName should not be null"); + Preconditions.checkNotNull(memberName, "memberName should not be null"); + Preconditions.checkNotNull(type, "type should not be null"); + + this.shardName = shardName; + this.memberName = memberName; + this.type = type; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ShardIdentifier that = (ShardIdentifier) o; + + if (!memberName.equals(that.memberName)) { + return false; + } + if (!shardName.equals(that.shardName)) { + return false; + } + if (!type.equals(that.type)) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + int result = shardName.hashCode(); + result = 31 * result + memberName.hashCode(); + result = 31 * result + type.hashCode(); + return result; + } + + @Override public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append(memberName).append("-shard-").append(shardName).append("-").append(type); + return builder.toString(); + } + + public static Builder builder(){ + return new Builder(); + } + + public static class Builder { + private String shardName; + private String memberName; + private String type; + + public ShardIdentifier build(){ + return new ShardIdentifier(shardName, memberName, type); + } + + public Builder shardName(String shardName){ + this.shardName = shardName; + return this; + } + + public Builder memberName(String memberName){ + this.memberName = memberName; + return this; + } + + public Builder type(String type){ + this.type = type; + return this; + } + + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardManagerIdentifier.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardManagerIdentifier.java new file mode 100644 index 0000000000..65bf010b0a --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardManagerIdentifier.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.identifiers; + +public class ShardManagerIdentifier { + private final String type; + + public ShardManagerIdentifier(String type) { + this.type = type; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ShardManagerIdentifier that = (ShardManagerIdentifier) o; + + if (!type.equals(that.type)) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + return type.hashCode(); + } + + @Override public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("shardmanager-").append(type); + return builder.toString(); + } + + public static Builder builder(){ + return new Builder(); + } + + public static class Builder { + private String type; + + public Builder type(String type){ + this.type = type; + return this; + } + + public ShardManagerIdentifier build(){ + return new ShardManagerIdentifier(this.type); + } + + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardTransactionIdentifier.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardTransactionIdentifier.java new file mode 100644 index 0000000000..77e81422e6 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardTransactionIdentifier.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.identifiers; + +import com.google.common.base.Preconditions; + +public class ShardTransactionIdentifier { + private final String remoteTransactionId; + + public ShardTransactionIdentifier(String remoteTransactionId) { + this.remoteTransactionId = Preconditions.checkNotNull(remoteTransactionId, "remoteTransactionId should not be null"); + } + + public static Builder builder(){ + return new Builder(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ShardTransactionIdentifier that = (ShardTransactionIdentifier) o; + + if (!remoteTransactionId.equals(that.remoteTransactionId)) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + return remoteTransactionId.hashCode(); + } + + @Override public String toString() { + final StringBuilder sb = + new StringBuilder(); + sb.append("shard-").append(remoteTransactionId); + return sb.toString(); + } + + public static class Builder { + private String remoteTransactionId; + + public Builder remoteTransactionId(String remoteTransactionId){ + this.remoteTransactionId = remoteTransactionId; + return this; + } + + public ShardTransactionIdentifier build(){ + return new ShardTransactionIdentifier(remoteTransactionId); + } + + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/TransactionIdentifier.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/TransactionIdentifier.java new file mode 100644 index 0000000000..ba2e27c69f --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/TransactionIdentifier.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.identifiers; + +import com.google.common.base.Preconditions; + +public class TransactionIdentifier { + private final String memberName; + private final long counter; + + + public TransactionIdentifier(String memberName, long counter) { + this.memberName = Preconditions.checkNotNull(memberName, "memberName should not be null"); + this.counter = counter; + } + + public static Builder builder(){ + return new Builder(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + TransactionIdentifier that = (TransactionIdentifier) o; + + if (counter != that.counter) { + return false; + } + if (!memberName.equals(that.memberName)) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + int result = memberName.hashCode(); + result = 31 * result + (int) (counter ^ (counter >>> 32)); + return result; + } + + @Override public String toString() { + final StringBuilder sb = + new StringBuilder(); + sb.append(memberName).append("-txn-").append(counter); + return sb.toString(); + } + + public static class Builder { + private String memberName; + private long counter; + + public TransactionIdentifier build(){ + return new TransactionIdentifier(memberName, counter); + } + + public Builder memberName(String memberName){ + this.memberName = memberName; + return this; + } + + public Builder counter(long counter){ + this.counter = counter; + return this; + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/AbstractBaseMBean.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/AbstractBaseMBean.java index de1ac18533..a5d7b77a64 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/AbstractBaseMBean.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/AbstractBaseMBean.java @@ -34,6 +34,7 @@ public abstract class AbstractBaseMBean { public static String BASE_JMX_PREFIX = "org.opendaylight.controller:"; public static String JMX_TYPE_DISTRIBUTED_DATASTORE = "DistributedDatastore"; public static String JMX_CATEGORY_SHARD = "Shard"; + public static String JMX_CATEGORY_SHARD_MANAGER = "ShardManager"; private static final Logger LOG = LoggerFactory .getLogger(AbstractBaseMBean.class); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfo.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfo.java new file mode 100644 index 0000000000..0c609b459e --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfo.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager; + +import org.opendaylight.controller.cluster.datastore.jmx.mbeans.AbstractBaseMBean; + +import java.util.List; + +public class ShardManagerInfo extends AbstractBaseMBean implements + ShardManagerInfoMBean { + + private final String name; + private final List localShards; + + public ShardManagerInfo(String name, List localShards) { + this.name = name; + this.localShards = localShards; + } + + + @Override protected String getMBeanName() { + return name; + } + + @Override protected String getMBeanType() { + return JMX_TYPE_DISTRIBUTED_DATASTORE; + } + + @Override protected String getMBeanCategory() { + return JMX_CATEGORY_SHARD_MANAGER; + } + + public static ShardManagerInfo createShardManagerMBean(String name, List localShards){ + ShardManagerInfo shardManagerInfo = new ShardManagerInfo(name, + localShards); + + shardManagerInfo.registerMBean(); + + return shardManagerInfo; + } + + @Override public List getLocalShards() { + return localShards; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfoMBean.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfoMBean.java new file mode 100644 index 0000000000..28ccc4f0b3 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfoMBean.java @@ -0,0 +1,15 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager; + +import java.util.List; + +public interface ShardManagerInfoMBean { + List getLocalShards(); +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PeerAddressResolved.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PeerAddressResolved.java index 8c2543e486..346519ed5a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PeerAddressResolved.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PeerAddressResolved.java @@ -8,16 +8,18 @@ package org.opendaylight.controller.cluster.datastore.messages; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; + public class PeerAddressResolved { - private final String peerId; + private final ShardIdentifier peerId; private final String peerAddress; - public PeerAddressResolved(String peerId, String peerAddress) { + public PeerAddressResolved(ShardIdentifier peerId, String peerAddress) { this.peerId = peerId; this.peerAddress = peerAddress; } - public String getPeerId() { + public ShardIdentifier getPeerId() { return peerId; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/resources/application.conf b/opendaylight/md-sal/sal-distributed-datastore/src/main/resources/application.conf index 6178f4903e..8af9bd07d7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/resources/application.conf +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/resources/application.conf @@ -1,6 +1,7 @@ odl-cluster-data { akka { + loggers = ["akka.event.slf4j.Slf4jLogger"] cluster { roles = [ "member-1" @@ -39,6 +40,7 @@ odl-cluster-data { odl-cluster-rpc { akka { + loggers = ["akka.event.slf4j.Slf4jLogger"] actor { provider = "akka.cluster.ClusterActorRefProvider" diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java index 6599bd8eeb..319451f8f0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java @@ -14,8 +14,8 @@ import akka.actor.ActorSelection; import akka.actor.Props; import akka.event.Logging; import akka.testkit.JavaTestKit; -import junit.framework.Assert; import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain; @@ -37,6 +37,8 @@ import scala.concurrent.duration.FiniteDuration; import java.util.Collections; import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertTrue; +import static org.junit.Assert.assertNotNull; public class BasicIntegrationTest extends AbstractActorTest { @@ -52,7 +54,11 @@ public class BasicIntegrationTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ - final Props props = Shard.props("config", Collections.EMPTY_MAP); + final ShardIdentifier identifier = + ShardIdentifier.builder().memberName("member-1") + .shardName("inventory").type("config").build(); + + final Props props = Shard.props(identifier, Collections.EMPTY_MAP); final ActorRef shard = getSystem().actorOf(props); new Within(duration("5 seconds")) { @@ -95,7 +101,7 @@ public class BasicIntegrationTest extends AbstractActorTest { } }.get(); // this extracts the received message - Assert.assertNotNull(transactionChain); + assertNotNull(transactionChain); System.out.println("Successfully created transaction chain"); @@ -116,7 +122,7 @@ public class BasicIntegrationTest extends AbstractActorTest { } }.get(); // this extracts the received message - Assert.assertNotNull(transaction); + assertNotNull(transaction); System.out.println("Successfully created transaction"); @@ -135,7 +141,7 @@ public class BasicIntegrationTest extends AbstractActorTest { } }.get(); // this extracts the received message - Assert.assertTrue(writeDone); + assertTrue(writeDone); System.out.println("Successfully wrote data"); @@ -158,7 +164,7 @@ public class BasicIntegrationTest extends AbstractActorTest { } }.get(); // this extracts the received message - Assert.assertNotNull(cohort); + assertNotNull(cohort); System.out.println("Successfully readied the transaction"); @@ -177,7 +183,7 @@ public class BasicIntegrationTest extends AbstractActorTest { } }.get(); // this extracts the received message - Assert.assertTrue(preCommitDone); + assertTrue(preCommitDone); System.out.println("Successfully pre-committed the transaction"); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConfigurationImplTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConfigurationImplTest.java index 56fd3c568a..17329611b0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConfigurationImplTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConfigurationImplTest.java @@ -8,6 +8,8 @@ import org.junit.Test; import java.io.File; import java.util.List; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class ConfigurationImplTest { @@ -31,6 +33,49 @@ public class ConfigurationImplTest { assertTrue(memberShardNames.contains("people-1")); assertTrue(memberShardNames.contains("cars-1")); + + // Retrieve once again to hit cache + + memberShardNames = + configuration.getMemberShardNames("member-1"); + + assertTrue(memberShardNames.contains("people-1")); + assertTrue(memberShardNames.contains("cars-1")); + + } + + @Test + public void testGetMembersFromShardName(){ + List members = + configuration.getMembersFromShardName("default"); + + assertEquals(3, members.size()); + + assertTrue(members.contains("member-1")); + assertTrue(members.contains("member-2")); + assertTrue(members.contains("member-3")); + + assertFalse(members.contains("member-26")); + + // Retrieve once again to hit cache + members = + configuration.getMembersFromShardName("default"); + + assertEquals(3, members.size()); + + assertTrue(members.contains("member-1")); + assertTrue(members.contains("member-2")); + assertTrue(members.contains("member-3")); + + assertFalse(members.contains("member-26")); + + + // Try to find a shard which is not present + + members = + configuration.getMembersFromShardName("foobar"); + + assertEquals(0, members.size()); } @Test diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java index d1beab9049..406f0ffd9e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java @@ -1,8 +1,11 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; +import akka.actor.ActorSystem; import akka.actor.Props; -import junit.framework.Assert; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; @@ -21,13 +24,20 @@ import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + public class DistributedDataStoreTest extends AbstractActorTest{ private DistributedDataStore distributedDataStore; private MockActorContext mockActorContext; private ActorRef doNothingActorRef; - @org.junit.Before + @Before public void setUp() throws Exception { ShardStrategyFactory.setConfiguration(new MockConfiguration()); final Props props = Props.create(DoNothingActor.class); @@ -35,7 +45,7 @@ public class DistributedDataStoreTest extends AbstractActorTest{ doNothingActorRef = getSystem().actorOf(props); mockActorContext = new MockActorContext(getSystem(), doNothingActorRef); - distributedDataStore = new DistributedDataStore(mockActorContext, "config"); + distributedDataStore = new DistributedDataStore(mockActorContext); distributedDataStore.onGlobalContextUpdated( TestModel.createTestContext()); @@ -48,12 +58,22 @@ public class DistributedDataStoreTest extends AbstractActorTest{ .build()); } - @org.junit.After + @After public void tearDown() throws Exception { } - @org.junit.Test + @Test + public void testConstructor(){ + ActorSystem actorSystem = mock(ActorSystem.class); + + new DistributedDataStore(actorSystem, "config", + mock(ClusterWrapper.class), mock(Configuration.class)); + + verify(actorSystem).actorOf(any(Props.class), eq("shardmanager-config")); + } + + @Test public void testRegisterChangeListenerWhenShardIsNotLocal() throws Exception { ListenerRegistration registration = @@ -65,12 +85,12 @@ public class DistributedDataStoreTest extends AbstractActorTest{ }, AsyncDataBroker.DataChangeScope.BASE); // Since we do not expect the shard to be local registration will return a NoOpRegistration - Assert.assertTrue(registration instanceof NoOpDataChangeListenerRegistration); + assertTrue(registration instanceof NoOpDataChangeListenerRegistration); - Assert.assertNotNull(registration); + assertNotNull(registration); } - @org.junit.Test + @Test public void testRegisterChangeListenerWhenShardIsLocal() throws Exception { mockActorContext.setExecuteLocalShardOperationResponse(new RegisterChangeListenerReply(doNothingActorRef.path())); @@ -83,33 +103,33 @@ public class DistributedDataStoreTest extends AbstractActorTest{ } }, AsyncDataBroker.DataChangeScope.BASE); - Assert.assertTrue(registration instanceof DataChangeListenerRegistrationProxy); + assertTrue(registration instanceof DataChangeListenerRegistrationProxy); - Assert.assertNotNull(registration); + assertNotNull(registration); } - @org.junit.Test + @Test public void testCreateTransactionChain() throws Exception { final DOMStoreTransactionChain transactionChain = distributedDataStore.createTransactionChain(); - Assert.assertNotNull(transactionChain); + assertNotNull(transactionChain); } - @org.junit.Test + @Test public void testNewReadOnlyTransaction() throws Exception { final DOMStoreReadTransaction transaction = distributedDataStore.newReadOnlyTransaction(); - Assert.assertNotNull(transaction); + assertNotNull(transaction); } - @org.junit.Test + @Test public void testNewWriteOnlyTransaction() throws Exception { final DOMStoreWriteTransaction transaction = distributedDataStore.newWriteOnlyTransaction(); - Assert.assertNotNull(transaction); + assertNotNull(transaction); } - @org.junit.Test + @Test public void testNewReadWriteTransaction() throws Exception { final DOMStoreReadWriteTransaction transaction = distributedDataStore.newReadWriteTransaction(); - Assert.assertNotNull(transaction); + assertNotNull(transaction); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 431a266b14..0d86ffb844 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -6,6 +6,7 @@ import akka.event.Logging; import akka.testkit.JavaTestKit; import junit.framework.Assert; import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply; @@ -35,7 +36,11 @@ public class ShardTest extends AbstractActorTest { @Test public void testOnReceiveCreateTransactionChain() throws Exception { new JavaTestKit(getSystem()) {{ - final Props props = Shard.props("config", Collections.EMPTY_MAP); + final ShardIdentifier identifier = + ShardIdentifier.builder().memberName("member-1") + .shardName("inventory").type("config").build(); + + final Props props = Shard.props(identifier, Collections.EMPTY_MAP); final ActorRef subject = getSystem().actorOf(props, "testCreateTransactionChain"); @@ -87,7 +92,11 @@ public class ShardTest extends AbstractActorTest { @Test public void testOnReceiveRegisterListener() throws Exception { new JavaTestKit(getSystem()) {{ - final Props props = Shard.props("config", Collections.EMPTY_MAP); + final ShardIdentifier identifier = + ShardIdentifier.builder().memberName("member-1") + .shardName("inventory").type("config").build(); + + final Props props = Shard.props(identifier, Collections.EMPTY_MAP); final ActorRef subject = getSystem().actorOf(props, "testRegisterChangeListener"); @@ -141,7 +150,11 @@ public class ShardTest extends AbstractActorTest { @Test public void testCreateTransaction(){ new JavaTestKit(getSystem()) {{ - final Props props = Shard.props("config", Collections.EMPTY_MAP); + final ShardIdentifier identifier = + ShardIdentifier.builder().memberName("member-1") + .shardName("inventory").type("config").build(); + + final Props props = Shard.props(identifier, Collections.EMPTY_MAP); final ActorRef subject = getSystem().actorOf(props, "testCreateTransaction"); @@ -196,9 +209,14 @@ public class ShardTest extends AbstractActorTest { @Test public void testPeerAddressResolved(){ new JavaTestKit(getSystem()) {{ - Map peerAddresses = new HashMap<>(); - peerAddresses.put("member-2", null); - final Props props = Shard.props("config", peerAddresses); + Map peerAddresses = new HashMap<>(); + + final ShardIdentifier identifier = + ShardIdentifier.builder().memberName("member-1") + .shardName("inventory").type("config").build(); + + peerAddresses.put(identifier, null); + final Props props = Shard.props(identifier, peerAddresses); final ActorRef subject = getSystem().actorOf(props, "testPeerAddressResolved"); @@ -206,7 +224,7 @@ public class ShardTest extends AbstractActorTest { protected void run() { subject.tell( - new PeerAddressResolved("member-2", "akka://foobar"), + new PeerAddressResolved(identifier, "akka://foobar"), getRef()); expectNoMsg(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java index 86016a677b..0f9e771ab4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java @@ -15,9 +15,8 @@ import akka.actor.Props; import akka.testkit.TestActorRef; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import org.junit.Assert; import org.junit.Test; -import org.opendaylight.controller.cluster.datastore.messages.DeleteData; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; @@ -30,7 +29,6 @@ import scala.concurrent.duration.Duration; import java.util.Collections; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; /** @@ -48,45 +46,21 @@ public class ShardTransactionFailureTest extends AbstractActorTest { private static final SchemaContext testSchemaContext = TestModel.createTestContext(); + private static final ShardIdentifier SHARD_IDENTIFIER = + ShardIdentifier.builder().memberName("member-1") + .shardName("inventory").type("config").build(); + static { store.onGlobalContextUpdated(testSchemaContext); } - @Test - public void testNegativePerformingWriteOperationOnReadTransaction() - throws Exception { - try { - - final ActorRef - shard = getSystem() - .actorOf(Shard.props("config", Collections.EMPTY_MAP)); - final Props props = - ShardTransaction - .props(store.newReadOnlyTransaction(), shard, TestModel - .createTestContext()); - final TestActorRef subject = TestActorRef.apply(props, getSystem()); - - subject - .receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), - ActorRef.noSender()); - Assert.assertFalse(true); - - - } catch (Exception cs) { - assertEquals(cs.getClass().getSimpleName(), - Exception.class.getSimpleName()); - assertTrue(cs.getMessage().startsWith( - "ShardTransaction:handleRecieve received an unknown message")); - } - } - @Test(expected = ReadFailedException.class) public void testNegativeReadWithReadOnlyTransactionClosed() throws Throwable { final ActorRef shard = - getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP)); + getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP)); final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, TestModel.createTestContext()); @@ -121,7 +95,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { throws Throwable { final ActorRef shard = - getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP)); + getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP)); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, TestModel.createTestContext()); @@ -150,13 +124,12 @@ public class ShardTransactionFailureTest extends AbstractActorTest { } - @Test(expected = IllegalStateException.class) public void testNegativeWriteWithTransactionReady() throws Exception { final ActorRef shard = - getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP)); + getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP)); final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard, TestModel.createTestContext()); @@ -195,7 +168,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = - getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP)); + getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP)); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, TestModel.createTestContext()); @@ -233,7 +206,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = - getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP)); + getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP)); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, TestModel.createTestContext()); @@ -271,7 +244,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = - getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP)); + getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP)); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, TestModel.createTestContext()); @@ -300,4 +273,6 @@ public class ShardTransactionFailureTest extends AbstractActorTest { } + + } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index 6fe5154d55..a2273f61f8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -9,6 +9,8 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import org.junit.Assert; import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.DeleteData; @@ -46,6 +48,11 @@ public class ShardTransactionTest extends AbstractActorTest { private static final SchemaContext testSchemaContext = TestModel.createTestContext(); + private static final ShardIdentifier SHARD_IDENTIFIER = + ShardIdentifier.builder().memberName("member-1") + .shardName("inventory").type("config").build(); + + static { store.onGlobalContextUpdated(testSchemaContext); } @@ -53,7 +60,7 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveReadData() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP)); + final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP)); final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext); final ActorRef subject = getSystem().actorOf(props, "testReadData"); @@ -95,7 +102,7 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveReadDataWhenDataNotFound() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP)); + final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP)); final Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard, testSchemaContext); final ActorRef subject = getSystem().actorOf(props, "testReadDataWhenDataNotFound"); @@ -173,7 +180,7 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveWriteData() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP)); + final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP)); final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard, TestModel.createTestContext()); final ActorRef subject = @@ -213,7 +220,7 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveMergeData() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP)); + final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP)); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext); final ActorRef subject = @@ -254,7 +261,7 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveDeleteData() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP)); + final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP)); final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard, TestModel.createTestContext()); final ActorRef subject = @@ -293,7 +300,7 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveReadyTransaction() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP)); + final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP)); final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard, TestModel.createTestContext()); final ActorRef subject = @@ -331,7 +338,7 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveCloseTransaction() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shard = getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP)); + final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP)); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, TestModel.createTestContext()); final ActorRef subject = @@ -386,7 +393,7 @@ public class ShardTransactionTest extends AbstractActorTest { public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception { try { - final ActorRef shard = getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP)); + final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP)); final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, TestModel.createTestContext()); final TestActorRef subject = TestActorRef.apply(props,getSystem()); @@ -396,8 +403,8 @@ public class ShardTransactionTest extends AbstractActorTest { } catch (Exception cs) { - assertEquals(cs.getClass().getSimpleName(), Exception.class.getSimpleName()); - assertTrue(cs.getMessage().startsWith("ShardTransaction:handleRecieve received an unknown message")); + assertEquals(UnknownMessageException.class.getSimpleName(), cs.getClass().getSimpleName()); + assertTrue(cs.getMessage(), cs.getMessage().startsWith("Unknown message received ")); } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardIdentifierTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardIdentifierTest.java new file mode 100644 index 0000000000..afcd045434 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardIdentifierTest.java @@ -0,0 +1,18 @@ +package org.opendaylight.controller.cluster.datastore.identifiers; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class ShardIdentifierTest { + + @Test + public void testBasic(){ + ShardIdentifier id = ShardIdentifier.builder().memberName("member-1") + .shardName("inventory").type("config").build(); + + assertEquals("member-1-shard-inventory-config", id.toString()); + } + + +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardManagerIdentifierTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardManagerIdentifierTest.java new file mode 100644 index 0000000000..44bb4b3528 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardManagerIdentifierTest.java @@ -0,0 +1,14 @@ +package org.opendaylight.controller.cluster.datastore.identifiers; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + + public class ShardManagerIdentifierTest { + + @Test + public void testIdentifier(){ + assertEquals("shardmanager-operational", ShardManagerIdentifier.builder().type("operational").build().toString()); + } + +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf index eda1c304e4..27b0374bac 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf @@ -1,5 +1,6 @@ akka { - loggers = [akka.testkit.TestEventListener] + loggers = ["akka.testkit.TestEventListener", "akka.event.slf4j.Slf4jLogger"] + actor { serializers { java = "akka.serialization.JavaSerializer"