<properties>
- <akka.version>2.3.9</akka.version>
+ <akka.version>2.3.10</akka.version>
<appauth.version>0.6.0-SNAPSHOT</appauth.version>
<archetype-app-northbound>0.2.0-SNAPSHOT</archetype-app-northbound>
<arphandler.version>0.7.0-SNAPSHOT</arphandler.version>
if (LocalServerChannel.class.equals(channelClass) == false) {
// makes no sense for LocalServer and produces warning
b.childOption(ChannelOption.SO_KEEPALIVE, true);
+ b.childOption(ChannelOption.TCP_NODELAY , true);
}
b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
customizeBootstrap(b);
Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
if(!Objects.equal(oldBehaviorLeaderId, currentBehavior.getLeaderId())) {
if(roleChangeNotifier.isPresent()) {
- roleChangeNotifier.get().tell(new LeaderStateChanged(getId(), currentBehavior.getLeaderId()), getSelf());
+ roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId()), getSelf());
}
onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId());
}
}
+ protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId) {
+ return new LeaderStateChanged(memberId, leaderId);
+ }
+
/**
* When a derived RaftActor needs to persist something it must call
* persistData.
package org.opendaylight.controller.sal.binding.api;
import java.util.EventListener;
-
import org.opendaylight.yangtools.yang.binding.Notification;
/**
* capture of this interface.
*
* @param <T> the interested notification type
+ * @deprecated Deprecated unused API.
*/
+@Deprecated
public interface NotificationListener<T extends Notification> extends EventListener {
/**
* Invoked to deliver a notification.
import java.util.EventListener;
import java.util.concurrent.ExecutorService;
-
import org.opendaylight.controller.md.sal.common.api.notify.NotificationPublishService;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.Notification;
* Interface for a notification service that provides publish/subscribe capabilities for YANG
* modeled notifications. This interface is a combination of the {@link NotificationService} and
* {@link NotificationPublishService} interfaces.
+ *
+ * @deprecated Please use {@link org.opendaylight.controller.md.sal.binding.api.NotificationPublishService}.
*/
+@Deprecated
public interface NotificationProviderService extends NotificationService, NotificationPublishService<Notification> {
/**
* </pre>
* The <code>onStart</code> method will be invoked when someone publishes a <code>Start</code> notification and
* the <code>onStop</code> method will be invoked when someone publishes a <code>Stop</code> notification.
+ *
+ * @deprecated Please use {@link org.opendaylight.controller.md.sal.binding.api.NotificationService} instead.
*/
+@Deprecated
public interface NotificationService extends BindingAwareService {
/**
* Registers a generic listener implementation for a specified notification type.
*/
package org.opendaylight.controller.cluster.notifications;
-import java.io.Serializable;
+import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
/**
- * A message initiated internally from the RaftActor when some state of a leader has changed
+ * A local message initiated internally from the RaftActor when some state of a leader has changed.
*
* @author Thomas Pantelis
*/
-public class LeaderStateChanged implements Serializable {
- private static final long serialVersionUID = 1L;
-
+public class LeaderStateChanged {
private final String memberId;
private final String leaderId;
- public LeaderStateChanged(String memberId, String leaderId) {
- this.memberId = memberId;
+ public LeaderStateChanged(@Nonnull String memberId, @Nullable String leaderId) {
+ this.memberId = Preconditions.checkNotNull(memberId);
this.leaderId = leaderId;
}
- public String getMemberId() {
+ public @Nonnull String getMemberId() {
return memberId;
}
- public String getLeaderId() {
+ public @Nullable String getLeaderId() {
return leaderId;
}
]
}
+
+ persistence {
+ # By default the snapshots/journal directories live in KARAF_HOME. You can choose to put it somewhere else by
+ # modifying the following two properties. The directory location specified may be a relative or absolute path.
+ # The relative path is always relative to KARAF_HOME.
+
+ # snapshot-store.local.dir = "target/snapshots"
+ # journal.leveldb.dir = "target/journal"
+
+ }
}
}
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.datastore.utils.MessageTracker;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
import org.opendaylight.controller.cluster.raft.RaftActor;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
return roleChangeNotifier;
}
+ @Override
+ protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId) {
+ return new ShardLeaderStateChanged(memberId, leaderId,
+ isLeader() ? Optional.<DataTree>of(store.getDataTree()) : Optional.<DataTree>absent());
+ }
+
private void onDatastoreContext(DatastoreContext context) {
datastoreContext = context;
import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
-import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
} else if(message instanceof ShardNotInitializedTimeout) {
onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
- } else if(message instanceof LeaderStateChanged) {
- onLeaderStateChanged((LeaderStateChanged)message);
+ } else if(message instanceof ShardLeaderStateChanged) {
+ onLeaderStateChanged((ShardLeaderStateChanged)message);
} else {
unknownMessage(message);
}
}
- private void onLeaderStateChanged(LeaderStateChanged leaderStateChanged) {
+ private void checkReady(){
+ if (isReadyWithLeaderId()) {
+ LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
+ persistenceId(), type, waitTillReadyCountdownLatch.getCount());
+
+ waitTillReadyCountdownLatch.countDown();
+ }
+ }
+
+ private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
if(shardInformation != null) {
+ shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
shardInformation.setLeaderId(leaderStateChanged.getLeaderId());
- if (isReadyWithLeaderId()) {
- LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
- persistenceId(), type, waitTillReadyCountdownLatch.getCount());
-
- waitTillReadyCountdownLatch.countDown();
- }
-
+ checkReady();
} else {
LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
}
ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
if(shardInformation != null) {
shardInformation.setRole(roleChanged.getNewRole());
-
- if (isReadyWithLeaderId()) {
- LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
- persistenceId(), type, waitTillReadyCountdownLatch.getCount());
-
- waitTillReadyCountdownLatch.countDown();
- }
-
+ checkReady();
mBean.setSyncStatus(isInSync());
}
}
info.updatePeerAddress(getShardIdentifier(memberName, shardName).toString(),
getShardActorPath(shardName, memberName), getSelf());
}
+
+ checkReady();
}
private void onDatastoreContext(DatastoreContext context) {
LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
final String shardName = message.getShardName();
+ final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
// First see if the there is a local replica for the shard
final ShardInformation info = localShards.get(shardName);
sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
@Override
public Object get() {
- Object found = new PrimaryFound(info.getSerializedLeaderActor());
+ String primaryPath = info.getSerializedLeaderActor();
+ Object found = canReturnLocalShardState && info.isLeader() ?
+ new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
+ new RemotePrimaryShardFound(primaryPath);
if(LOG.isDebugEnabled()) {
LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
shardName, path);
- getContext().actorSelection(path).forward(message, getContext());
+ getContext().actorSelection(path).forward(new RemoteFindPrimary(shardName,
+ message.isWaitUntilReady()), getContext());
return;
}
}
private ActorRef actor;
private ActorPath actorPath;
private final Map<String, String> peerAddresses;
+ private Optional<DataTree> localShardDataTree;
// flag that determines if the actor is ready for business
private boolean actorInitialized = false;
return shardId;
}
+ void setLocalDataTree(Optional<DataTree> localShardDataTree) {
+ this.localShardDataTree = localShardDataTree;
+ }
+
+ Optional<DataTree> getLocalShardDataTree() {
+ return localShardDataTree;
+ }
+
Map<String, String> getPeerAddresses() {
return peerAddresses;
}
}
boolean isShardReadyWithLeaderId() {
- return isShardReady() && (isLeader() || peerAddresses.containsKey(leaderId));
+ return isShardReady() && (isLeader() || peerAddresses.get(leaderId) != null);
}
boolean isShardInitialized() {
private final String shardName;
private final boolean waitUntilReady;
- public FindPrimary(String shardName, boolean waitUntilReady){
+ public FindPrimary(String shardName, boolean waitUntilReady) {
Preconditions.checkNotNull(shardName, "shardName should not be null");
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
- builder.append("FindPrimary [shardName=").append(shardName).append(", waitUntilReady=").append(waitUntilReady)
- .append("]");
+ builder.append(getClass().getName()).append(" [shardName=").append(shardName).append(", waitUntilReady=")
+ .append(waitUntilReady).append("]");
return builder.toString();
}
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
+import org.apache.commons.lang3.ObjectUtils;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+
+/**
+ * Local message sent in reply to FindPrimaryShard to indicate the primary shard is local to the caller.
+ *
+ * @author Thomas Pantelis
+ */
+public class LocalPrimaryShardFound {
+
+ private final String primaryPath;
+ private final DataTree localShardDataTree;
+
+ public LocalPrimaryShardFound(@Nonnull String primaryPath, @Nonnull DataTree localShardDataTree) {
+ this.primaryPath = Preconditions.checkNotNull(primaryPath);
+ this.localShardDataTree = Preconditions.checkNotNull(localShardDataTree);
+ }
+
+ public @Nonnull String getPrimaryPath() {
+ return primaryPath;
+ }
+
+ public @Nonnull DataTree getLocalShardDataTree() {
+ return localShardDataTree;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("LocalPrimaryShardFound [primaryPath=").append(primaryPath).append(", localShardDataTree=")
+ .append(ObjectUtils.identityToString(localShardDataTree)).append("]");
+ return builder.toString();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+/**
+ * A remote message sent to locate the primary shard.
+ *
+ * @author Thomas Pantelis
+ */
+public class RemoteFindPrimary extends FindPrimary {
+ private static final long serialVersionUID = 1L;
+
+ public RemoteFindPrimary(String shardName, boolean waitUntilReady) {
+ super(shardName, waitUntilReady);
+ }
+}
import java.io.Serializable;
-public class PrimaryFound implements Serializable {
+/**
+ * Local or remote message sent in reply to FindPrimaryShard to indicate the primary shard is remote to the caller.
+ */
+public class RemotePrimaryShardFound implements Serializable {
private static final long serialVersionUID = 1L;
private final String primaryPath;
- public PrimaryFound(final String primaryPath) {
+ public RemotePrimaryShardFound(final String primaryPath) {
this.primaryPath = primaryPath;
}
return primaryPath;
}
- @Override
- public boolean equals(final Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- PrimaryFound that = (PrimaryFound) o;
-
- if (!primaryPath.equals(that.primaryPath)) {
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode() {
- return primaryPath.hashCode();
- }
-
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
- builder.append("PrimaryFound [primaryPath=").append(primaryPath).append("]");
+ builder.append("RemotePrimaryShardFound [primaryPath=").append(primaryPath).append("]");
return builder.toString();
}
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+
+/**
+ * A local message derived from LeaderStateChanged containing additional Shard-specific info that is sent
+ * when some state of the shard leader has changed. This message is used by the ShardManager to maintain
+ * current Shard information.
+ *
+ * @author Thomas Pantelis
+ */
+public class ShardLeaderStateChanged extends LeaderStateChanged {
+
+ private final Optional<DataTree> localShardDataTree;
+
+ public ShardLeaderStateChanged(@Nonnull String memberId, @Nonnull String leaderId,
+ @Nonnull Optional<DataTree> localShardDataTree) {
+ super(memberId, leaderId);
+ this.localShardDataTree = Preconditions.checkNotNull(localShardDataTree);
+ }
+
+ public @Nonnull Optional<DataTree> getLocalShardDataTree() {
+ return localShardDataTree;
+ }
+}
import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.reporting.MetricsReporter;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
return future.transform(new Mapper<Object, PrimaryShardInfo>() {
@Override
public PrimaryShardInfo checkedApply(Object response) throws Exception {
- if(response instanceof PrimaryFound) {
- PrimaryFound found = (PrimaryFound)response;
-
- LOG.debug("Primary found {}", found.getPrimaryPath());
- ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath());
- PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, Optional.<DataTree>absent());
- primaryShardInfoCache.put(shardName, Futures.successful(info));
- return info;
+ if(response instanceof RemotePrimaryShardFound) {
+ LOG.debug("findPrimaryShardAsync received: {}", response);
+ return onPrimaryShardFound(shardName, ((RemotePrimaryShardFound)response).getPrimaryPath(), null);
+ } else if(response instanceof LocalPrimaryShardFound) {
+ LOG.debug("findPrimaryShardAsync received: {}", response);
+ LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
+ return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getLocalShardDataTree());
} else if(response instanceof NotInitializedException) {
throw (NotInitializedException)response;
} else if(response instanceof PrimaryNotFoundException) {
}, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
}
+ private PrimaryShardInfo onPrimaryShardFound(String shardName, String primaryActorPath,
+ DataTree localShardDataTree) {
+ ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
+ PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, Optional.fromNullable(localShardDataTree));
+ primaryShardInfoCache.put(shardName, Futures.successful(info));
+ return info;
+ }
+
/**
* Finds a local shard given its shard name and return it's ActorRef
*
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import akka.util.Timeout;
+import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.Await;
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), mockShardActor);
- shardManager.tell(new LeaderStateChanged(memberId, memberId), getRef());
+ DataTree mockDataTree = mock(DataTree.class);
+ shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree)), getRef());
MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
- PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
+ LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
primaryFound.getPrimaryPath().contains("member-1-shard-default"));
+ assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
+ }};
+ }
+
+ @Test
+ public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
+ String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
+ shardManager.tell(new RoleChangeNotification(memberId1,
+ RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
+ shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor);
+
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
+
+ expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
}};
}
String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
shardManager.tell(new RoleChangeNotification(memberId1,
RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
- shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor);
+ shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.<DataTree>absent()), mockShardActor);
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
- PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
+ RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
primaryFound.getPrimaryPath().contains("member-2-shard-default"));
}};
expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
- shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
+ DataTree mockDataTree = mock(DataTree.class);
+ shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree)), mockShardActor);
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
- PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
+ LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
primaryFound.getPrimaryPath().contains("member-1-shard-default"));
+ assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
}};
}
expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
- shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
+ DataTree mockDataTree = mock(DataTree.class);
+ shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree)), mockShardActor);
- PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
+ LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
primaryFound.getPrimaryPath().contains("member-1-shard-default"));
+ assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
}};
shardManager2.tell(new ActorInitialized(), mockShardActor2);
String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
- shardManager2.tell(new LeaderStateChanged(memberId2, memberId2), mockShardActor2);
+ shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2,
+ Optional.of(mock(DataTree.class))), mockShardActor2);
shardManager2.tell(new RoleChangeNotification(memberId2,
RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
shardManager1.tell(new FindPrimary("astronauts", false), getRef());
- PrimaryFound found = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
+ RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
String path = found.getPrimaryPath();
assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
}
@Test
- public void testRoleChangeNotificationAndLeaderStateChangedReleaseReady() throws Exception {
+ public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
new JavaTestKit(getSystem()) {
{
TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
verify(ready, never()).countDown();
- shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, memberId));
+ shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
+ Optional.of(mock(DataTree.class))));
verify(ready, times(1)).countDown();
}
@Test
- public void testRoleChangeNotificationToFollowerWithLeaderStateChangedReleaseReady() throws Exception {
+ public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
new JavaTestKit(getSystem()) {
{
TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
verify(ready, never()).countDown();
- shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix));
+ shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
+
+ shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
+ "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class))));
verify(ready, times(1)).countDown();
}};
}
+ @Test
+ public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
+ new JavaTestKit(getSystem()) {
+ {
+ TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+
+ String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+ shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
+ memberId, null, RaftState.Follower.name()));
+
+ verify(ready, never()).countDown();
+
+ shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
+ "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class))));
+
+ shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
+
+ verify(ready, times(1)).countDown();
+
+ }};
+ }
@Test
public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
shard.tell(new RegisterRoleChangeListener(), listener);
- // TODO: MessageCollectorActor exists as a test util in both the akka-raft and distributed-datastore
- // projects. Need to move it to commons as a regular utility and then we can get rid of this arbitrary
- // sleep.
- Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
- List<Object> allMatching = MessageCollectorActor.getAllMatching(listener, RegisterRoleChangeListenerReply.class);
+ ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
+ ShardLeaderStateChanged.class);
+ assertEquals("getLocalShardDataTree present", true,
+ leaderStateChanged.getLocalShardDataTree().isPresent());
+ assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
+ leaderStateChanged.getLocalShardDataTree().get());
- assertEquals(1, allMatching.size());
+ MessageCollectorActor.clearMessages(listener);
+
+ // Force a leader change
+
+ shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
+
+ leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
+ ShardLeaderStateChanged.class);
+ assertEquals("getLocalShardDataTree present", false,
+ leaderStateChanged.getLocalShardDataTree().isPresent());
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}
};
}
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doReturn;
import org.apache.commons.lang.time.StopWatch;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
}
@Test
- public void testFindPrimaryShardAsyncPrimaryFound() throws Exception {
+ public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception {
TestActorRef<MessageCollectorActor> shardManager =
TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
mock(Configuration.class), dataStoreContext) {
@Override
protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
- return Futures.successful((Object) new PrimaryFound(expPrimaryPath));
+ return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath));
}
};
-
Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
assertNull(cached);
+ }
+
+ @Test
+ public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception {
+
+ TestActorRef<MessageCollectorActor> shardManager =
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+
+ DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+ shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
+ final DataTree mockDataTree = Mockito.mock(DataTree.class);
+ final String expPrimaryPath = "akka://test-system/find-primary-shard";
+ ActorContext actorContext =
+ new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+ mock(Configuration.class), dataStoreContext) {
+ @Override
+ protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+ return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
+ }
+ };
+
+ Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
+ PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
+
+ assertNotNull(actual);
+ assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent());
+ assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
+ assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
+ expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
+
+ Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+
+ PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
+
+ assertEquals(cachedInfo, actual);
+
+ // Wait for 200 Milliseconds. The cached entry should have been removed.
+
+ Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+
+ cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+
+ assertNull(cached);
}
@Test
TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
- shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString()));
- shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString()));
+ shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(shardActorRef1.path().toString()));
+ shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(shardActorRef2.path().toString()));
shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
Configuration mockConfig = mock(Configuration.class);
}
- private static ClusterEvent.MemberUp createMemberUp(String memberName, String address) {
+ public static ClusterEvent.MemberUp createMemberUp(String memberName, String address) {
akka.cluster.UniqueAddress uniqueAddress = new UniqueAddress(
AddressFromURIString.parse(address), 55);
LOG.debug("Create rpc registry and broker actors");
rpcRegistry =
- getContext().actorOf(Props.create(RpcRegistry.class).
+ getContext().actorOf(RpcRegistry.props().
withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
rpcBroker =
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
+
import org.opendaylight.controller.remote.rpc.registry.gossip.Copier;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
}
}
+ public Set<RpcRouter.RouteIdentifier<?, ?, ?>> getRoutes() {
+ return table.keySet();
+ }
+
public void addRoute(RpcRouter.RouteIdentifier<?,?,?> routeId){
table.put(routeId, System.currentTimeMillis());
}
package org.opendaylight.controller.remote.rpc.registry;
import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.japi.Creator;
import akka.japi.Option;
import akka.japi.Pair;
import com.google.common.base.Preconditions;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
+import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBean;
+import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
getLocalBucket().setData(new RoutingTable());
}
+ public static Props props() {
+ return Props.create(new RpcRegistryCreator());
+ }
+
@Override
protected void handleReceive(Object message) throws Exception {
//TODO: if sender is remote, reject message
}
}
}
+
+ private static class RpcRegistryCreator implements Creator<RpcRegistry> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public RpcRegistry create() throws Exception {
+ RpcRegistry registry = new RpcRegistry();
+ RemoteRpcRegistryMXBean mxBean = new RemoteRpcRegistryMXBeanImpl(registry);
+ return registry;
+ }
+ }
}
import akka.actor.Address;
import akka.actor.Props;
import akka.cluster.ClusterActorRefProvider;
-import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
}
}
- protected BucketImpl<T> getLocalBucket() {
+ public BucketImpl<T> getLocalBucket() {
return localBucket;
}
versions.put(selfAddress, localBucket.getVersion());
}
- protected Map<Address, Bucket<T>> getRemoteBuckets() {
+ public Map<Address, Bucket<T>> getRemoteBuckets() {
return remoteBuckets;
}
- @VisibleForTesting
- Map<Address, Long> getVersions() {
+ public Map<Address, Long> getVersions() {
return versions;
}
}
--- /dev/null
+package org.opendaylight.controller.remote.rpc.registry.mbeans;
+
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * JMX bean to check remote rpc registry
+ */
+
+public interface RemoteRpcRegistryMXBean {
+
+ Set<String> getGlobalRpc();
+
+ String getBucketVersions();
+
+ Set<String> getLocalRegisteredRoutedRpc();
+
+ Map<String,String> findRpcByName(String name);
+
+ Map<String,String> findRpcByRoute(String route);
+}
--- /dev/null
+package org.opendaylight.controller.remote.rpc.registry.mbeans;
+
+import akka.actor.Address;
+import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
+import org.opendaylight.controller.remote.rpc.registry.RoutingTable;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+
+public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements RemoteRpcRegistryMXBean {
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final String NULL_CONSTANT = "null";
+
+ private final String LOCAL_CONSTANT = "local";
+
+ private final String ROUTE_CONSTANT = "route:";
+
+ private final String NAME_CONSTANT = " | name:";
+
+ private final RpcRegistry rpcRegistry;
+
+ public RemoteRpcRegistryMXBeanImpl(final RpcRegistry rpcRegistry) {
+ super("RemoteRpcRegistry", "RemoteRpcBroker", null);
+ this.rpcRegistry = rpcRegistry;
+ registerMBean();
+ }
+
+ @Override
+ public Set<String> getGlobalRpc() {
+ RoutingTable table = rpcRegistry.getLocalBucket().getData();
+ Set<String> globalRpc = new HashSet<>(table.getRoutes().size());
+ for(RpcRouter.RouteIdentifier<?, ?, ?> route : table.getRoutes()){
+ if(route.getRoute() == null) {
+ globalRpc.add(route.getType() != null ? route.getType().toString() : NULL_CONSTANT);
+ }
+ }
+ if(log.isDebugEnabled()) {
+ log.debug("Locally registered global RPCs {}", globalRpc);
+ }
+ return globalRpc;
+ }
+
+ @Override
+ public Set<String> getLocalRegisteredRoutedRpc() {
+ RoutingTable table = rpcRegistry.getLocalBucket().getData();
+ Set<String> routedRpc = new HashSet<>(table.getRoutes().size());
+ for(RpcRouter.RouteIdentifier<?, ?, ?> route : table.getRoutes()){
+ if(route.getRoute() != null) {
+ StringBuilder builder = new StringBuilder(ROUTE_CONSTANT);
+ builder.append(route.getRoute().toString()).append(NAME_CONSTANT).append(route.getType() != null ?
+ route.getType().toString() : NULL_CONSTANT);
+ routedRpc.add(builder.toString());
+ }
+ }
+ if(log.isDebugEnabled()) {
+ log.debug("Locally registered routed RPCs {}", routedRpc);
+ }
+ return routedRpc;
+ }
+
+ @Override
+ public Map<String, String> findRpcByName(final String name) {
+ RoutingTable localTable = rpcRegistry.getLocalBucket().getData();
+ // Get all RPCs from local bucket
+ Map<String, String> rpcMap = new HashMap<>(getRpcMemberMapByName(localTable, name, LOCAL_CONSTANT));
+
+ // Get all RPCs from remote bucket
+ Map<Address, Bucket<RoutingTable>> buckets = rpcRegistry.getRemoteBuckets();
+ for(Address address : buckets.keySet()) {
+ RoutingTable table = buckets.get(address).getData();
+ rpcMap.putAll(getRpcMemberMapByName(table, name, address.toString()));
+ }
+ if(log.isDebugEnabled()) {
+ log.debug("list of RPCs {} searched by name {}", rpcMap, name);
+ }
+ return rpcMap;
+ }
+
+ @Override
+ public Map<String, String> findRpcByRoute(String routeId) {
+ RoutingTable localTable = rpcRegistry.getLocalBucket().getData();
+ Map<String, String> rpcMap = new HashMap<>(getRpcMemberMapByRoute(localTable, routeId, LOCAL_CONSTANT));
+
+ Map<Address, Bucket<RoutingTable>> buckets = rpcRegistry.getRemoteBuckets();
+ for(Address address : buckets.keySet()) {
+ RoutingTable table = buckets.get(address).getData();
+ rpcMap.putAll(getRpcMemberMapByRoute(table, routeId, address.toString()));
+
+ }
+ if(log.isDebugEnabled()) {
+ log.debug("list of RPCs {} searched by route {}", rpcMap, routeId);
+ }
+ return rpcMap;
+ }
+
+ /**
+ * Search if the routing table route String contains routeName
+ */
+
+ private Map<String,String> getRpcMemberMapByRoute(final RoutingTable table, final String routeName,
+ final String address) {
+ Set<RpcRouter.RouteIdentifier<?, ?, ?>> routes = table.getRoutes();
+ Map<String, String> rpcMap = new HashMap<>(routes.size());
+ for(RpcRouter.RouteIdentifier<?, ?, ?> route : table.getRoutes()){
+ if(route.getRoute() != null) {
+ String routeString = route.getRoute().toString();
+ if(routeString.contains(routeName)) {
+ StringBuilder builder = new StringBuilder(ROUTE_CONSTANT);
+ builder.append(routeString).append(NAME_CONSTANT).append(route.getType() != null ?
+ route.getType().toString() : NULL_CONSTANT);
+ rpcMap.put(builder.toString(), address);
+ }
+ }
+ }
+ return rpcMap;
+ }
+
+ /**
+ * Search if the routing table route type contains name
+ */
+ private Map<String, String> getRpcMemberMapByName(final RoutingTable table, final String name,
+ final String address) {
+ Set<RpcRouter.RouteIdentifier<?, ?, ?>> routes = table.getRoutes();
+ Map<String, String> rpcMap = new HashMap<>(routes.size());
+ for(RpcRouter.RouteIdentifier<?, ?, ?> route : routes){
+ if(route.getType() != null) {
+ String type = route.getType().toString();
+ if(type.contains(name)) {
+ StringBuilder builder = new StringBuilder(ROUTE_CONSTANT);
+ builder.append(route.getRoute() != null ? route.getRoute().toString(): NULL_CONSTANT)
+ .append(NAME_CONSTANT).append(type);
+ rpcMap.put(builder.toString(), address);
+ }
+ }
+ }
+ return rpcMap;
+ }
+
+
+
+ @Override
+ public String getBucketVersions() {
+ return rpcRegistry.getVersions().toString();
+ }
+
+}
\ No newline at end of file
result = partialResult;
}
return new NormalizedNodeContext(path,result);
+ } catch (final RestconfDocumentedException e) {
+ throw e;
} catch (final Exception e) {
LOG.debug("Error parsing json input", e);
final NormalizedNode<?, ?> result = parse(path,doc);
return new NormalizedNodeContext(path,result);
+ } catch (final RestconfDocumentedException e){
+ throw e;
} catch (final Exception e) {
LOG.debug("Error parsing xml input", e);
final InstanceIdentifierBuilder builder = YangInstanceIdentifier.builder();
final Module latestModule = globalSchema.findModuleByName(startModule, null);
+
+ if (latestModule == null) {
+ throw new RestconfDocumentedException("The module named '" + startModule + "' does not exist.", ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT);
+ }
+
final InstanceIdentifierContext<?> iiWithSchemaNode = collectPathArguments(builder, pathArgs, latestModule, null,
toMountPointIdentifier);
import java.util.Collection;
import java.util.Collections;
import org.opendaylight.controller.config.yang.md.sal.rest.connector.Config;
+import org.opendaylight.controller.config.yang.md.sal.rest.connector.Delete;
import org.opendaylight.controller.config.yang.md.sal.rest.connector.Get;
import org.opendaylight.controller.config.yang.md.sal.rest.connector.Operational;
import org.opendaylight.controller.config.yang.md.sal.rest.connector.Post;
@Override
public Config getConfig() {
final Config config = new Config();
+
final Get get = new Get();
get.setReceivedRequests(stats.getConfigGet());
+ get.setSuccessfulResponses(stats.getSuccessGetConfig());
+ get.setFailedResponses(stats.getFailureGetConfig());
config.setGet(get);
+
final Post post = new Post();
post.setReceivedRequests(stats.getConfigPost());
+ post.setSuccessfulResponses(stats.getSuccessPost());
+ post.setFailedResponses(stats.getFailurePost());
config.setPost(post);
+
final Put put = new Put();
put.setReceivedRequests(stats.getConfigPut());
+ put.setSuccessfulResponses(stats.getSuccessPut());
+ put.setFailedResponses(stats.getFailurePut());
config.setPut(put);
+
+ final Delete delete = new Delete();
+ delete.setReceivedRequests(stats.getConfigDelete());
+ delete.setSuccessfulResponses(stats.getSuccessDelete());
+ delete.setFailedResponses(stats.getFailureDelete());
+ config.setDelete(delete);
+
return config;
}
final Operational operational = new Operational();
final Get get = new Get();
get.setReceivedRequests(opGet);
+ get.setSuccessfulResponses(stats.getSuccessGetOperational());
+ get.setFailedResponses(stats.getFailureGetOperational());
operational.setGet(get);
return operational;
}
final BigInteger rpcInvoke = stats.getRpc();
final Rpcs rpcs = new Rpcs();
rpcs.setReceivedRequests(rpcInvoke);
- return rpcs ;
+ return rpcs;
}
-}
+}
\ No newline at end of file
import java.math.BigInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriInfo;
import org.opendaylight.controller.sal.rest.api.RestconfService;
AtomicLong configPost = new AtomicLong();
AtomicLong configPut = new AtomicLong();
AtomicLong configDelete = new AtomicLong();
+ AtomicLong successGetConfig = new AtomicLong();
+ AtomicLong successGetOperational = new AtomicLong();
+ AtomicLong successPost = new AtomicLong();
+ AtomicLong successPut = new AtomicLong();
+ AtomicLong successDelete = new AtomicLong();
+ AtomicLong failureGetConfig = new AtomicLong();
+ AtomicLong failureGetOperational = new AtomicLong();
+ AtomicLong failurePost = new AtomicLong();
+ AtomicLong failurePut = new AtomicLong();
+ AtomicLong failureDelete = new AtomicLong();
private static final StatisticsRestconfServiceWrapper INSTANCE = new StatisticsRestconfServiceWrapper(RestconfImpl.getInstance());
@Override
public NormalizedNodeContext readConfigurationData(final String identifier, final UriInfo uriInfo) {
configGet.incrementAndGet();
- return delegate.readConfigurationData(identifier, uriInfo);
+ NormalizedNodeContext normalizedNodeContext = null;
+ try {
+ normalizedNodeContext = delegate.readConfigurationData(identifier, uriInfo);
+ if (normalizedNodeContext.getData() != null) {
+ successGetConfig.incrementAndGet();
+ }
+ else {
+ failureGetConfig.incrementAndGet();
+ }
+ } catch (Exception e) {
+ failureGetConfig.incrementAndGet();
+ throw e;
+ }
+ return normalizedNodeContext;
}
@Override
public NormalizedNodeContext readOperationalData(final String identifier, final UriInfo uriInfo) {
operationalGet.incrementAndGet();
- return delegate.readOperationalData(identifier, uriInfo);
+ NormalizedNodeContext normalizedNodeContext = null;
+ try {
+ normalizedNodeContext = delegate.readOperationalData(identifier, uriInfo);
+ if (normalizedNodeContext.getData() != null) {
+ successGetOperational.incrementAndGet();
+ }
+ else {
+ failureGetOperational.incrementAndGet();
+ }
+ } catch (Exception e) {
+ failureGetOperational.incrementAndGet();
+ throw e;
+ }
+ return normalizedNodeContext;
}
@Override
public Response updateConfigurationData(final String identifier, final NormalizedNodeContext payload) {
configPut.incrementAndGet();
- return delegate.updateConfigurationData(identifier, payload);
+ Response response = null;
+ try {
+ response = delegate.updateConfigurationData(identifier, payload);
+ if (response.getStatus() == Status.OK.getStatusCode()) {
+ successPut.incrementAndGet();
+ }
+ else {
+ failurePut.incrementAndGet();
+ }
+ } catch (Exception e) {
+ failurePut.incrementAndGet();
+ throw e;
+ }
+ return response;
}
@Override
public Response createConfigurationData(final String identifier, final NormalizedNodeContext payload, final UriInfo uriInfo) {
configPost.incrementAndGet();
- return delegate.createConfigurationData(identifier, payload, uriInfo);
+ Response response = null;
+ try {
+ response = delegate.createConfigurationData(identifier, payload, uriInfo);
+ if (response.getStatus() == Status.OK.getStatusCode()) {
+ successPost.incrementAndGet();
+ }
+ else {
+ failurePost.incrementAndGet();
+ }
+ } catch (Exception e) {
+ failurePost.incrementAndGet();
+ throw e;
+ }
+ return response;
}
@Override
public Response createConfigurationData(final NormalizedNodeContext payload, final UriInfo uriInfo) {
configPost.incrementAndGet();
- return delegate.createConfigurationData(payload, uriInfo);
+ Response response = null;
+ try {
+ response = delegate.createConfigurationData(payload, uriInfo);
+ if (response.getStatus() == Status.OK.getStatusCode()) {
+ successPost.incrementAndGet();
+ }
+ else {
+ failurePost.incrementAndGet();
+ }
+ }catch (Exception e) {
+ failurePost.incrementAndGet();
+ throw e;
+ }
+ return response;
}
@Override
public Response deleteConfigurationData(final String identifier) {
- return delegate.deleteConfigurationData(identifier);
+ configDelete.incrementAndGet();
+ Response response = null;
+ try {
+ response = delegate.deleteConfigurationData(identifier);
+ if (response.getStatus() == Status.OK.getStatusCode()) {
+ successDelete.incrementAndGet();
+ }
+ else {
+ failureDelete.incrementAndGet();
+ }
+ } catch (Exception e) {
+ failureDelete.incrementAndGet();
+ throw e;
+ }
+ return response;
}
@Override
public BigInteger getRpc() {
return BigInteger.valueOf(rpc.get());
}
-}
+
+ public BigInteger getSuccessGetConfig() {
+ return BigInteger.valueOf(successGetConfig.get());
+ }
+
+ public BigInteger getSuccessGetOperational() {
+ return BigInteger.valueOf(successGetOperational.get());
+ }
+
+ public BigInteger getSuccessPost() {
+ return BigInteger.valueOf(successPost.get());
+ }
+
+ public BigInteger getSuccessPut() {
+ return BigInteger.valueOf(successPut.get());
+ }
+
+ public BigInteger getSuccessDelete() {
+ return BigInteger.valueOf(successDelete.get());
+ }
+
+ public BigInteger getFailureGetConfig() {
+ return BigInteger.valueOf(failureGetConfig.get());
+ }
+
+ public BigInteger getFailureGetOperational() {
+ return BigInteger.valueOf(failureGetOperational.get());
+ }
+
+ public BigInteger getFailurePost() {
+ return BigInteger.valueOf(failurePost.get());
+ }
+
+ public BigInteger getFailurePut() {
+ return BigInteger.valueOf(failurePut.get());
+ }
+
+ public BigInteger getFailureDelete() {
+ return BigInteger.valueOf(failureDelete.get());
+ }
+}
\ No newline at end of file
leaf received-requests {
type uint64;
}
+
+ leaf successful-responses {
+ type uint64;
+ }
+
+ leaf failed-responses {
+ type uint64;
+ }
}
augment "/config:modules/config:module/config:configuration" {
container put {
uses statistics;
}
+
+ container delete {
+ uses statistics;
+ }
}
container operational {
}
public AutoCloseable registerCapabilityListener(final CapabilityListener listener) {
- final SoftReference<YangStoreSnapshot> yangStoreSnapshotSoftReference = ref.get();
- YangStoreContext ret = yangStoreSnapshotSoftReference != null ? yangStoreSnapshotSoftReference.get() : null;
- if(ret == null) {
- ret = getYangStoreSnapshot();
+ YangStoreContext context = ref.get().get();
+
+ if(context == null) {
+ context = getYangStoreSnapshot();
}
this.listeners.add(listener);
- listener.onCapabilitiesAdded(NetconfOperationServiceFactoryImpl.setupCapabilities(ret));
+ listener.onCapabilitiesAdded(NetconfOperationServiceFactoryImpl.setupCapabilities(context));
return new AutoCloseable() {
@Override
tx.put(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(NetconfState.class), state);
// FIXME first attempt (right after we register to binding broker) always fails
// Is it due to the fact that we are writing from the onSessionInitiated callback ?
- final CheckedFuture<Void, TransactionCommitFailedException> submit = tx.submit();
-
- Futures.addCallback(submit, new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void aVoid) {
- LOG.debug("Netconf state updated successfully");
- }
-
- @Override
- public void onFailure(final Throwable throwable) {
- LOG.warn("Unable to update netconf state", throwable);
- }
- });
+ try {
+ tx.submit().checkedGet();
+ LOG.debug("Netconf state updated successfully");
+ } catch (TransactionCommitFailedException e) {
+ LOG.warn("Unable to update netconf state", e);
+ }
}
@Override