<artifactId>sal-test-model</artifactId>
<version>${mdsal.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-distributed-datastore</artifactId>
+ <version>${mdsal.version}</version>
+ </dependency>
<!-- SAL Extension bundles -->
<dependency>
<artifactId>jeromq</artifactId>
<version>0.3.1</version>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-distributed-datastore</artifactId>
+ </dependency>
+
</dependencies>
</profile>
<profile>
LOG.debug("Snapshot applied to state :" + ((HashMap) snapshot).size());
}
+ @Override protected void onStateChanged() {
+
+ }
+
@Override public void onReceiveRecover(Object message) {
super.onReceiveRecover(message);
}
*/
protected abstract void applySnapshot(Object snapshot);
+ /**
+ * This method will be called by the RaftActor when the state of the
+ * RaftActor changes. The derived actor can then use methods like
+ * isLeader or getLeader to do something useful
+ */
+ protected abstract void onStateChanged();
+
private RaftActorBehavior switchBehavior(RaftState state) {
if (currentBehavior != null) {
if (currentBehavior.state() == state) {
} else {
behavior = new Leader(context);
}
+
+ onStateChanged();
+
return behavior;
}
--- /dev/null
+ODLCluster{
+ akka {
+ actor {
+ serialize-messages = on
+
+ provider = "akka.cluster.ClusterActorRefProvider"
+ serializers {
+ java = "akka.serialization.JavaSerializer"
+ proto = "akka.remote.serialization.ProtobufSerializer"
+ }
+
+ serialization-bindings {
+ "com.google.protobuf.Message" = proto
+ "com.google.protobuf.GeneratedMessage" = proto
+ "com.google.protobuf.GeneratedMessage$GeneratedExtension" = proto
+ "com.google.protobuf.FieldSet" = proto
+ }
+ }
+ remote {
+ log-remote-lifecycle-events = off
+ netty.tcp {
+ hostname = "127.0.0.1"
+ port = 2552
+ maximum-frame-size = 2097152
+ send-buffer-size = 52428800
+ receive-buffer-size = 52428800
+ }
+ }
+
+ cluster {
+ seed-nodes = ["akka.tcp://opendaylight-cluster@127.0.0.1:2550"]
+
+ auto-down-unreachable-after = 10s
+ }
+ }
+}
--- /dev/null
+
+ODLCluster{
+ akka {
+ actor {
+ serialize-messages = on
+
+ provider = "akka.cluster.ClusterActorRefProvider"
+ serializers {
+ java = "akka.serialization.JavaSerializer"
+ proto = "akka.remote.serialization.ProtobufSerializer"
+ }
+
+ serialization-bindings {
+ "com.google.protobuf.Message" = proto
+ "com.google.protobuf.GeneratedMessage" = proto
+ "com.google.protobuf.GeneratedMessage$GeneratedExtension" = proto
+ "com.google.protobuf.FieldSet" = proto
+ }
+ }
+ remote {
+ log-remote-lifecycle-events = off
+ netty.tcp {
+ hostname = "127.0.0.1"
+ port = 2550
+ maximum-frame-size = 2097152
+ send-buffer-size = 52428800
+ receive-buffer-size = 52428800
+ }
+ }
+
+ cluster {
+ seed-nodes = ["akka.tcp://opendaylight-cluster@127.0.0.1:2550"]
+
+ auto-down-unreachable-after = 10s
+ }
+ }
+}
import com.google.common.base.Preconditions;
import com.google.protobuf.GeneratedMessage;
-import org.opendaylight.controller.cluster.example.protobuff.messages.KeyValueMessages;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.UnknownFieldSet;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages;
import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
PersistentMessages.CompositeModification modification = payload
.getExtension(
org.opendaylight.controller.mdsal.CompositeModificationPayload.modification);
- payload.getExtension(KeyValueMessages.value);
+
+
+
+ // The extension was put in the unknown field.
+ // This is because extensions need to be registered
+ // see org.opendaylight.controller.mdsal.CompositeModificationPayload.registerAllExtensions
+ // also see https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/ExtensionRegistry
+ // If that is not done then on the other end the extension shows up as an unknown field
+ // Need to figure out a better way to do this
+ if(payload.getUnknownFields().hasField(2)){
+ UnknownFieldSet.Field field =
+ payload.getUnknownFields().getField(2);
+
+ try {
+ modification =
+ PersistentMessages.CompositeModification
+ .parseFrom(field.getLengthDelimitedList().get(0));
+ } catch (InvalidProtocolBufferException e) {
+
+ }
+ }
+
return new CompositeModificationPayload(modification);
}
import akka.japi.Creator;
import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply;
+import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
private final SchemaContext schemaContext;
private final YangInstanceIdentifier pathId;
+ private boolean notificationsEnabled = false;
public DataChangeListener(SchemaContext schemaContext,
AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, YangInstanceIdentifier pathId) {
@Override public void handleReceive(Object message) throws Exception {
if(message.getClass().equals(DataChanged.SERIALIZABLE_CLASS)){
- DataChanged reply = DataChanged.fromSerialize(schemaContext,message, pathId);
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>
- change = reply.getChange();
- this.listener.onDataChanged(change);
+ dataChanged(message);
+ } else if(message instanceof EnableNotification){
+ enableNotification((EnableNotification) message);
+ }
+ }
- if(getSender() != null){
- getSender().tell(new DataChangedReply().toSerializable(), getSelf());
- }
+ private void enableNotification(EnableNotification message) {
+ notificationsEnabled = message.isEnabled();
+ }
+
+ public void dataChanged(Object message) {
+
+ // Do nothing if notifications are not enabled
+ if(!notificationsEnabled){
+ return;
+ }
+
+ DataChanged reply = DataChanged.fromSerialize(schemaContext,message, pathId);
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>
+ change = reply.getChange();
+ this.listener.onDataChanged(change);
+ if(getSender() != null){
+ getSender().tell(new DataChangedReply().toSerializable(), getSelf());
}
}
String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
- Object result = actorContext.executeShardOperation(shardName,
+ Object result = actorContext.executeLocalShardOperation(shardName,
new RegisterChangeListener(path, dataChangeListenerActor.path(),
scope).toSerializable(),
ActorContext.ASK_DURATION
);
- RegisterChangeListenerReply reply = RegisterChangeListenerReply.fromSerializable(actorContext.getActorSystem(),result);
- return new DataChangeListenerRegistrationProxy(actorContext.actorSelection(reply.getListenerRegistrationPath()), listener, dataChangeListenerActor);
+ if (result != null) {
+ RegisterChangeListenerReply reply = RegisterChangeListenerReply
+ .fromSerializable(actorContext.getActorSystem(), result);
+ return new DataChangeListenerRegistrationProxy(actorContext
+ .actorSelection(reply.getListenerRegistrationPath()), listener,
+ dataChangeListenerActor);
+ }
+
+ LOG.debug(
+ "No local shard for shardName {} was found so returning a noop registration",
+ shardName);
+ return new NoOpDataChangeListenerRegistration(listener);
}
+
+
@Override
public DOMStoreTransactionChain createTransactionChain() {
return new TransactionChainProxy(actorContext, executor, schemaContext);
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * When a consumer registers a data change listener and no local shard is
+ * available to register that listener with then we return an instance of
+ * NoOpDataChangeListenerRegistration
+ *
+ * <p>
+ *
+ * The NoOpDataChangeListenerRegistration as it's name suggests does
+ * nothing when an operation is invoked on it
+ */
+public class NoOpDataChangeListenerRegistration
+ implements ListenerRegistration {
+
+ private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
+ listener;
+
+ public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> NoOpDataChangeListenerRegistration(
+ AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener) {
+
+ this.listener = listener;
+ }
+
+ @Override
+ public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> getInstance() {
+ return listener;
+ }
+
+ @Override public void close() {
+
+ }
+}
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
private final ShardStats shardMBean;
+ private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
+
private Shard(String name, Map<String, String> peerAddresses) {
super(name, peerAddresses);
modificationToCohort.remove(serialized);
if (cohort == null) {
LOG.error(
- "Could not find cohort for modification : " + modification);
+ "Could not find cohort for modification : {}", modification);
LOG.info("Writing modification using a new transaction");
- modification.apply(store.newReadWriteTransaction());
- return;
+ DOMStoreReadWriteTransaction transaction =
+ store.newReadWriteTransaction();
+ modification.apply(transaction);
+ DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+ ListenableFuture<Void> future =
+ commitCohort.preCommit();
+ try {
+ future.get();
+ future = commitCohort.commit();
+ future.get();
+ } catch (InterruptedException e) {
+ LOG.error("Failed to commit", e);
+ } catch (ExecutionException e) {
+ LOG.error("Failed to commit", e);
+ }
}
final ListenableFuture<Void> future = cohort.commit();
.system().actorSelection(
registerChangeListener.getDataChangeListenerPath());
+
+ // Notify the listener if notifications should be enabled or not
+ // If this shard is the leader then it will enable notifications else
+ // it will not
+ dataChangeListenerPath.tell(new EnableNotification(isLeader()), getSelf());
+
+ // Now store a reference to the data change listener so it can be notified
+ // at a later point if notifications should be enabled or disabled
+ dataChangeListeners.add(dataChangeListenerPath);
+
AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
listener = new DataChangeListenerProxy(schemaContext,dataChangeListenerPath);
if(data instanceof CompositeModificationPayload){
Object modification =
((CompositeModificationPayload) data).getModification();
- commit(clientActor, modification);
+
+ if(modification != null){
+ commit(clientActor, modification);
+ } else {
+ LOG.error("modification is null - this is very unexpected");
+ }
+
+
} else {
LOG.error("Unknown state received {}", data);
}
throw new UnsupportedOperationException("applySnapshot");
}
+ @Override protected void onStateChanged() {
+ for(ActorSelection dataChangeListener : dataChangeListeners){
+ dataChangeListener.tell(new EnableNotification(isLeader()), getSelf());
+ }
+ }
+
@Override public String persistenceId() {
return this.name;
}
import akka.japi.Creator;
import akka.japi.Function;
import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
/**
* The ShardManager has the following jobs,
- * <p>
+ * <ul>
* <li> Create all the local shard replicas that belong on this cluster member
+ * <li> Find the address of the local shard
* <li> Find the primary replica for any given shard
- * <li> Engage in shard replica elections which decide which replica should be the primary
- * </p>
- * <p/>
- * <h3>>Creation of Shard replicas</h3
- * <p>
- * When the ShardManager is constructed it reads the cluster configuration to find out which shard replicas
- * belong on this member. It finds out the name of the current cluster member from the Akka Clustering Service.
- * </p>
- * <p/>
- * <h3> Replica Elections </h3>
- * <p/>
- * <p>
- * The Shard Manager uses multiple cues to initiate election.
- * <li> When a member of the cluster dies
- * <li> When a local shard replica dies
- * <li> When a local shard replica comes alive
- * </p>
+ * <li> Monitor the cluster members and store their addresses
+ * <ul>
*/
public class ShardManager extends AbstractUntypedActor {
// Stores a mapping between a member name and the address of the member
+ // Member names look like "member-1", "member-2" etc and are as specified
+ // in configuration
private final Map<String, Address> memberNameToAddress = new HashMap<>();
+ // Stores a mapping between a shard name and it's corresponding information
+ // Shard names look like inventory, topology etc and are as specified in
+ // configuration
private final Map<String, ShardInformation> localShards = new HashMap<>();
-
+ // The type of a ShardManager reflects the type of the datastore itself
+ // A data store could be of type config/operational
private final String type;
private final ClusterWrapper cluster;
if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
findPrimary(
FindPrimary.fromSerializable(message));
-
+ } else if(message instanceof FindLocalShard){
+ findLocalShard((FindLocalShard) message);
} else if (message instanceof UpdateSchemaContext) {
updateSchemaContext(message);
} else if (message instanceof ClusterEvent.MemberUp){
}
+ private void findLocalShard(FindLocalShard message) {
+ ShardInformation shardInformation =
+ localShards.get(message.getShardName());
+
+ if(shardInformation != null){
+ getSender().tell(new LocalShardFound(shardInformation.getActor()), getSelf());
+ return;
+ }
+
+ getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
+ }
+
private void ignoreMessage(Object message){
LOG.debug("Unhandled message : " + message);
}
}
}
+ /**
+ * Notifies all the local shards of a change in the schema context
+ *
+ * @param message
+ */
private void updateSchemaContext(Object message) {
for(ShardInformation info : localShards.values()){
info.getActor().tell(message,getSelf());
getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
}
- private String
-
-
- getShardActorPath(String shardName, String memberName) {
+ private String getShardActorPath(String shardName, String memberName) {
Address address = memberNameToAddress.get(memberName);
if(address != null) {
return address.toString() + "/user/shardmanager-" + this.type + "/"
return null;
}
+ /**
+ * Construct the name of the shard actor given the name of the member on
+ * which the shard resides and the name of the shard
+ *
+ * @param memberName
+ * @param shardName
+ * @return
+ */
private String getShardActorName(String memberName, String shardName){
return memberName + "-shard-" + shardName + "-" + this.type;
}
- // Create the shards that are local to this member
+ /**
+ * Create shards that are local to the member on which the ShardManager
+ * runs
+ *
+ */
private void createLocalShards() {
String memberName = this.cluster.getCurrentMemberName();
List<String> memberShardNames =
}
+ /**
+ * Given the name of the shard find the addresses of all it's peers
+ *
+ * @param shardName
+ * @return
+ */
private Map<String, String> getPeerAddresses(String shardName){
Map<String, String> peerAddresses = new HashMap<>();
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Props;
-
import com.google.common.base.Optional;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningExecutorService;
-
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
remoteTransactionPaths.put(shardName, transactionContext);
}
- } catch(TimeoutException e){
- LOG.warn("Timed out trying to create transaction on shard {}: {}", shardName, e);
- remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName));
+ } catch(TimeoutException | PrimaryNotFoundException e){
+ LOG.error("Creating NoOpTransaction because of : {}", e.getMessage());
+ remoteTransactionPaths.put(shardName,
+ new NoOpTransactionContext(shardName));
}
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class EnableNotification {
+ private final boolean enabled;
+
+ public EnableNotification(boolean enabled) {
+ this.enabled = enabled;
+ }
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+/**
+ * FindLocalShard is a message that should be sent to the {@link org.opendaylight.controller.cluster.datastore.ShardManager}
+ * when we need to find a reference to a LocalShard
+ */
+public class FindLocalShard {
+ private final String shardName;
+
+ public FindLocalShard(String shardName) {
+ this.shardName = shardName;
+ }
+
+ public String getShardName() {
+ return shardName;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import akka.actor.ActorRef;
+
+/**
+ * LocalShardFound is a message that is sent by the {@link org.opendaylight.controller.cluster.datastore.ShardManager}
+ * when it finds a shard with the specified name in it's local shard registry
+ */
+public class LocalShardFound {
+ private final ActorRef path;
+
+ public LocalShardFound(ActorRef path) {
+ this.path = path;
+ }
+
+ public ActorRef getPath() {
+ return path;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+/**
+ * LocalShardNotFound is a message that is sent by the {@link org.opendaylight.controller.cluster.datastore.ShardManager}
+ * when it cannot locate a shard in it's local registry with the shardName specified
+ */
+public class LocalShardNotFound {
+ private final String shardName;
+
+ /**
+ *
+ * @param shardName the name of the shard that could not be found
+ */
+ public LocalShardNotFound(String shardName) {
+ this.shardName = shardName;
+ }
+
+ public String getShardName() {
+ return shardName;
+ }
+}
import org.opendaylight.controller.cluster.datastore.Configuration;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
return actorSystem.actorSelection(path);
}
+ /**
+ * Finds a local shard given it's shard name and return it's ActorRef
+ *
+ * @param shardName the name of the local shard that needs to be found
+ * @return a reference to a local shard actor which represents the shard
+ * specified by the shardName
+ */
+ public ActorRef findLocalShard(String shardName) {
+ Object result = executeLocalOperation(shardManager,
+ new FindLocalShard(shardName), ASK_DURATION);
+
+ if (result instanceof LocalShardFound) {
+ LocalShardFound found = (LocalShardFound) result;
+
+ LOG.debug("Local shard found {}", found.getPath());
+
+ return found.getPath();
+ }
+
+ return null;
+ }
+
+
public String findPrimaryPath(String shardName) {
Object result = executeLocalOperation(shardManager,
new FindPrimary(shardName).toSerializable(), ASK_DURATION);
return executeRemoteOperation(primary, message, duration);
}
+ /**
+ * Execute an operation on the the local shard only
+ * <p>
+ * This method first finds the address of the local shard if any. It then
+ * executes the operation on it.
+ * </p>
+ *
+ * @param shardName the name of the shard on which the operation needs to be executed
+ * @param message the message that needs to be sent to the shard
+ * @param duration the time duration in which this operation should complete
+ * @return the message that was returned by the local actor on which the
+ * the operation was executed. If a local shard was not found then
+ * null is returned
+ * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException
+ * if the operation does not complete in a specified time duration
+ */
+ public Object executeLocalShardOperation(String shardName, Object message,
+ FiniteDuration duration) {
+ ActorRef local = findLocalShard(shardName);
+
+ if(local != null) {
+ return executeLocalOperation(local, message, duration);
+ }
+
+ return null;
+ }
+
+
public void shutdown() {
shardManager.tell(PoisonPill.getInstance(), null);
actorSystem.shutdown();
--- /dev/null
+package org.opendaylight.controller.cluster.datastore;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class CompositeModificationPayloadTest {
+
+
+ private static final String SERIALIZE_OUT = "serialize.out";
+
+ @After
+ public void shutDown(){
+ File f = new File(SERIALIZE_OUT);
+ if(f.exists()){
+ f.delete();
+ }
+ }
+
+ @Test
+ public void testBasic() throws IOException {
+
+ List<ReplicatedLogEntry> entries = new ArrayList<>();
+
+ entries.add(0, new ReplicatedLogEntry() {
+ @Override public Payload getData() {
+ WriteModification writeModification =
+ new WriteModification(TestModel.TEST_PATH, ImmutableNodes
+ .containerNode(TestModel.TEST_QNAME),
+ TestModel.createTestContext());
+
+ MutableCompositeModification compositeModification =
+ new MutableCompositeModification();
+
+ compositeModification.addModification(writeModification);
+
+ return new CompositeModificationPayload(compositeModification.toSerializable());
+ }
+
+ @Override public long getTerm() {
+ return 1;
+ }
+
+ @Override public long getIndex() {
+ return 1;
+ }
+ });
+
+ AppendEntries appendEntries =
+ new AppendEntries(1, "member-1", 0, 100, entries, 1);
+
+ AppendEntriesMessages.AppendEntries o = (AppendEntriesMessages.AppendEntries) appendEntries.toSerializable();
+
+ o.writeDelimitedTo(new FileOutputStream(SERIALIZE_OUT));
+
+ AppendEntriesMessages.AppendEntries appendEntries2 =
+ AppendEntriesMessages.AppendEntries
+ .parseDelimitedFrom(new FileInputStream(SERIALIZE_OUT));
+
+ AppendEntries appendEntries1 = AppendEntries.fromSerializable(appendEntries2);
+
+ Payload data = appendEntries1.getEntries().get(0).getData();
+
+
+ Assert.assertTrue(((CompositeModificationPayload) data).getModification().toString().contains(TestModel.TEST_QNAME.getNamespace().toString()));
+
+ }
+
+}
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply;
+import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.md.cluster.datastore.model.CompositeModel;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
}
@Test
- public void testDataChanged(){
+ public void testDataChangedWhenNotificationsAreEnabled(){
new JavaTestKit(getSystem()) {{
final MockDataChangeListener listener = new MockDataChangeListener();
final Props props = DataChangeListener.props(CompositeModel.createTestContext(),listener,CompositeModel.FAMILY_PATH );
final ActorRef subject =
- getSystem().actorOf(props, "testDataChanged");
+ getSystem().actorOf(props, "testDataChangedNotificationsEnabled");
new Within(duration("1 seconds")) {
protected void run() {
+ // Let the DataChangeListener know that notifications should
+ // be enabled
+ subject.tell(new EnableNotification(true), getRef());
+
subject.tell(
new DataChanged(CompositeModel.createTestContext(),new MockDataChangedEvent()).toSerializable(),
getRef());
- final Boolean out = new ExpectMsg<Boolean>("dataChanged") {
+ final Boolean out = new ExpectMsg<Boolean>(duration("800 millis"), "dataChanged") {
// do not put code outside this method, will run afterwards
protected Boolean match(Object in) {
- if (in.getClass().equals(DataChangedReply.SERIALIZABLE_CLASS)) {
+ if (in != null && in.getClass().equals(DataChangedReply.SERIALIZABLE_CLASS)) {
return true;
} else {
assertTrue(out);
assertTrue(listener.gotIt());
assertNotNull(listener.getChange().getCreatedData());
- // Will wait for the rest of the 3 seconds
+
+ expectNoMsg();
+ }
+
+
+ };
+ }};
+ }
+
+ @Test
+ public void testDataChangedWhenNotificationsAreDisabled(){
+ new JavaTestKit(getSystem()) {{
+ final MockDataChangeListener listener = new MockDataChangeListener();
+ final Props props = DataChangeListener.props(CompositeModel.createTestContext(),listener,CompositeModel.FAMILY_PATH );
+ final ActorRef subject =
+ getSystem().actorOf(props, "testDataChangedNotificationsDisabled");
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ subject.tell(
+ new DataChanged(CompositeModel.createTestContext(),new MockDataChangedEvent()).toSerializable(),
+ getRef());
+
expectNoMsg();
}
import com.google.common.base.Optional;
import com.google.common.util.concurrent.ListenableFuture;
-
+import junit.framework.Assert;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
- // This sleep is fragile - test can fail intermittently if all Shards aren't updated with
- // the SchemaContext in time. Is there any way we can make this deterministic?
- Thread.sleep(2000);
+ Thread.sleep(1500);
DOMStoreReadWriteTransaction transaction =
distributedDataStore.newReadWriteTransaction();
Optional<NormalizedNode<?, ?>> optional = future.get();
+ Assert.assertTrue(optional.isPresent());
+
NormalizedNode<?, ?> normalizedNode = optional.get();
assertEquals(TestModel.TEST_QNAME, normalizedNode.getNodeType());
}
@org.junit.Test
- public void testRegisterChangeListener() throws Exception {
- mockActorContext.setExecuteShardOperationResponse(new RegisterChangeListenerReply(doNothingActorRef.path()).toSerializable());
+ public void testRegisterChangeListenerWhenShardIsNotLocal() throws Exception {
+
ListenerRegistration registration =
distributedDataStore.registerChangeListener(TestModel.TEST_PATH, new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
@Override
}
}, AsyncDataBroker.DataChangeScope.BASE);
+ // Since we do not expect the shard to be local registration will return a NoOpRegistration
+ Assert.assertTrue(registration instanceof NoOpDataChangeListenerRegistration);
+
+ Assert.assertNotNull(registration);
+ }
+
+ @org.junit.Test
+ public void testRegisterChangeListenerWhenShardIsLocal() throws Exception {
+
+ mockActorContext.setExecuteLocalShardOperationResponse(new RegisterChangeListenerReply(doNothingActorRef.path()).toSerializable());
+
+ ListenerRegistration registration =
+ distributedDataStore.registerChangeListener(TestModel.TEST_PATH, new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
+ @Override
+ public void onDataChanged(AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
+ throw new UnsupportedOperationException("onDataChanged");
+ }
+ }, AsyncDataBroker.DataChangeScope.BASE);
+
+ Assert.assertTrue(registration instanceof DataChangeListenerRegistrationProxy);
+
Assert.assertNotNull(registration);
}
+
@org.junit.Test
public void testCreateTransactionChain() throws Exception {
final DOMStoreTransactionChain transactionChain = distributedDataStore.createTransactionChain();
package org.opendaylight.controller.cluster.datastore;
+import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
import scala.concurrent.duration.Duration;
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
public class ShardManagerTest {
private static ActorSystem system;
expectMsgEquals(Duration.Zero(),
new PrimaryNotFound("inventory").toSerializable());
- // Will wait for the rest of the 3 seconds
expectNoMsg();
}
};
final TestActorRef<ShardManager> subject =
TestActorRef.create(system, props);
- // the run() method needs to finish within 3 seconds
new Within(duration("1 seconds")) {
protected void run() {
}};
}
+ @Test
+ public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
+
+ new JavaTestKit(system) {{
+ final Props props = ShardManager
+ .props("config", new MockClusterWrapper(),
+ new MockConfiguration());
+ final TestActorRef<ShardManager> subject =
+ TestActorRef.create(system, props);
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ subject.tell(new FindLocalShard("inventory"), getRef());
+
+ final String out = new ExpectMsg<String>(duration("1 seconds"), "find local") {
+ protected String match(Object in) {
+ if (in instanceof LocalShardNotFound) {
+ return ((LocalShardNotFound) in).getShardName();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ assertEquals("inventory", out);
+
+ expectNoMsg();
+ }
+ };
+ }};
+ }
+
+ @Test
+ public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
+
+ final MockClusterWrapper mockClusterWrapper = new MockClusterWrapper();
+
+ new JavaTestKit(system) {{
+ final Props props = ShardManager
+ .props("config", mockClusterWrapper,
+ new MockConfiguration());
+ final TestActorRef<ShardManager> subject =
+ TestActorRef.create(system, props);
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ subject.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
+
+ final ActorRef out = new ExpectMsg<ActorRef>(duration("1 seconds"), "find local") {
+ protected ActorRef match(Object in) {
+ if (in instanceof LocalShardFound) {
+ return ((LocalShardFound) in).getPath();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ assertTrue(out.path().toString(), out.path().toString().contains("member-1-shard-default-config"));
+
+
+ expectNoMsg();
+ }
+ };
+ }};
+ }
+
@Test
public void testOnReceiveMemberUp() throws Exception {
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
+import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import java.util.HashMap;
import java.util.Map;
+import static junit.framework.Assert.assertFalse;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
getRef().path(), AsyncDataBroker.DataChangeScope.BASE).toSerializable(),
getRef());
+ final Boolean notificationEnabled = new ExpectMsg<Boolean>("enable notification") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if(in instanceof EnableNotification){
+ return ((EnableNotification) in).isEnabled();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ assertFalse(notificationEnabled);
+
final String out = new ExpectMsg<String>("match hint") {
// do not put code outside this method, will run afterwards
protected String match(Object in) {
assertTrue(out.matches(
"akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
- // Will wait for the rest of the 3 seconds
- expectNoMsg();
}
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
+import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.cluster.datastore.messages.MergeData;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import scala.concurrent.duration.FiniteDuration;
import java.util.List;
import java.util.concurrent.Executors;
+import static junit.framework.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
public class TransactionProxyTest extends AbstractActorTest {
private final Configuration configuration = new MockConfiguration();
Assert.assertFalse(normalizedNodeOptional.isPresent());
}
+ @Test
+ public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Exception {
+ final ActorContext actorContext = mock(ActorContext.class);
+
+ when(actorContext.executeShardOperation(anyString(), any(), any(
+ FiniteDuration.class))).thenThrow(new PrimaryNotFoundException("test"));
+
+ TransactionProxy transactionProxy =
+ new TransactionProxy(actorContext,
+ TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+
+
+ ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
+ transactionProxy.read(TestModel.TEST_PATH);
+
+ Assert.assertFalse(read.get().isPresent());
+
+ }
+
+
+ @Test
+ public void testReadWhenATimeoutExceptionIsThrown() throws Exception {
+ final ActorContext actorContext = mock(ActorContext.class);
+
+ when(actorContext.executeShardOperation(anyString(), any(), any(
+ FiniteDuration.class))).thenThrow(new TimeoutException("test", new Exception("reason")));
+
+ TransactionProxy transactionProxy =
+ new TransactionProxy(actorContext,
+ TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+
+
+ ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
+ transactionProxy.read(TestModel.TEST_PATH);
+
+ Assert.assertFalse(read.get().isPresent());
+
+ }
+
+ @Test
+ public void testReadWhenAAnyOtherExceptionIsThrown() throws Exception {
+ final ActorContext actorContext = mock(ActorContext.class);
+
+ when(actorContext.executeShardOperation(anyString(), any(), any(
+ FiniteDuration.class))).thenThrow(new NullPointerException());
+
+ TransactionProxy transactionProxy =
+ new TransactionProxy(actorContext,
+ TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
+
+
+ try {
+ ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
+ transactionProxy.read(TestModel.TEST_PATH);
+ fail("A null pointer exception was expected");
+ } catch(NullPointerException e){
+
+ }
+ }
+
+
+
@Test
public void testWrite() throws Exception {
final Props props = Props.create(MessageCollectorActor.class);
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.japi.Creator;
+import akka.testkit.JavaTestKit;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
+import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
public class ActorContextTest extends AbstractActorTest{
System.out.println(actorContext
.actorFor("akka://system/user/shardmanager/shard/transaction"));
}
+
+
+ private static class MockShardManager extends UntypedActor {
+
+ private final boolean found;
+ private final ActorRef actorRef;
+
+ private MockShardManager(boolean found, ActorRef actorRef){
+
+ this.found = found;
+ this.actorRef = actorRef;
+ }
+
+ @Override public void onReceive(Object message) throws Exception {
+ if(found){
+ getSender().tell(new LocalShardFound(actorRef), getSelf());
+ } else {
+ getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
+ }
+ }
+
+ private static Props props(final boolean found, final ActorRef actorRef){
+ return Props.create(new Creator<MockShardManager>() {
+
+ @Override public MockShardManager create()
+ throws Exception {
+ return new MockShardManager(found,
+ actorRef);
+ }
+ });
+ }
+ }
+
+ @Test
+ public void testExecuteLocalShardOperationWithShardFound(){
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
+
+ ActorRef shardManagerActorRef = getSystem()
+ .actorOf(MockShardManager.props(true, shardActorRef));
+
+ ActorContext actorContext =
+ new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
+ mock(Configuration.class));
+
+ Object out = actorContext.executeLocalShardOperation("default", "hello", duration("1 seconds"));
+
+ assertEquals("hello", out);
+
+
+ expectNoMsg();
+ }
+ };
+ }};
+
+ }
+
+ @Test
+ public void testExecuteLocalShardOperationWithShardNotFound(){
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ ActorRef shardManagerActorRef = getSystem()
+ .actorOf(MockShardManager.props(false, null));
+
+ ActorContext actorContext =
+ new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
+ mock(Configuration.class));
+
+ Object out = actorContext.executeLocalShardOperation("default", "hello", duration("1 seconds"));
+
+ assertNull(out);
+
+
+ expectNoMsg();
+ }
+ };
+ }};
+
+ }
+
+
+ @Test
+ public void testFindLocalShardWithShardFound(){
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
+
+ ActorRef shardManagerActorRef = getSystem()
+ .actorOf(MockShardManager.props(true, shardActorRef));
+
+ ActorContext actorContext =
+ new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
+ mock(Configuration.class));
+
+ Object out = actorContext.findLocalShard("default");
+
+ assertEquals(shardActorRef, out);
+
+
+ expectNoMsg();
+ }
+ };
+ }};
+
+ }
+
+ @Test
+ public void testFindLocalShardWithShardNotFound(){
+ new JavaTestKit(getSystem()) {{
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ ActorRef shardManagerActorRef = getSystem()
+ .actorOf(MockShardManager.props(false, null));
+
+ ActorContext actorContext =
+ new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
+ mock(Configuration.class));
+
+ Object out = actorContext.findLocalShard("default");
+
+ assertNull(out);
+
+
+ expectNoMsg();
+ }
+ };
+ }};
+
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import akka.actor.UntypedActor;
+
+/**
+ * The EchoActor simply responds back with the same message that it receives
+ */
+public class EchoActor extends UntypedActor{
+
+ @Override public void onReceive(Object message) throws Exception {
+ getSender().tell(message, getSelf());
+ }
+}
private Object executeShardOperationResponse;
private Object executeRemoteOperationResponse;
private Object executeLocalOperationResponse;
+ private Object executeLocalShardOperationResponse;
public MockActorContext(ActorSystem actorSystem) {
super(actorSystem, null, new MockClusterWrapper(), new MockConfiguration());
this.executeLocalOperationResponse = executeLocalOperationResponse;
}
+ public void setExecuteLocalShardOperationResponse(
+ Object executeLocalShardOperationResponse) {
+ this.executeLocalShardOperationResponse = executeLocalShardOperationResponse;
+ }
+
@Override public Object executeLocalOperation(ActorRef actor,
Object message, FiniteDuration duration) {
return this.executeLocalOperationResponse;
}
+
+ @Override public Object executeLocalShardOperation(String shardName,
+ Object message, FiniteDuration duration) {
+ return this.executeLocalShardOperationResponse;
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.programs.appendentries;
+
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import com.typesafe.config.ConfigFactory;
+import org.opendaylight.controller.cluster.datastore.CompositeModificationPayload;
+import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.example.messages.KeyValue;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class Client {
+
+ private static ActorSystem actorSystem;
+
+ public static class ClientActor extends UntypedActor {
+
+ @Override public void onReceive(Object message) throws Exception {
+
+ }
+ }
+
+ public static void main(String[] args){
+ actorSystem = ActorSystem.create("appendentries", ConfigFactory
+ .load().getConfig("ODLCluster"));
+
+ ActorSelection actorSelection = actorSystem.actorSelection(
+ "akka.tcp://appendentries@127.0.0.1:2550/user/server");
+
+ AppendEntries appendEntries = modificationAppendEntries();
+
+ Payload data = appendEntries.getEntries().get(0).getData();
+ if(data instanceof CompositeModificationPayload) {
+ System.out.println(
+ "Sending : " + ((CompositeModificationPayload) data)
+ .getModification());
+ } else {
+ System.out.println(
+ "Sending : " + ((KeyValue) data)
+ .getKey());
+
+ }
+
+ actorSelection.tell(appendEntries.toSerializable(), null);
+
+
+
+
+ actorSystem.actorOf(Props.create(ClientActor.class), "client");
+ }
+
+ public static AppendEntries modificationAppendEntries() {
+ List<ReplicatedLogEntry> modification = new ArrayList<>();
+
+ modification.add(0, new ReplicatedLogEntry() {
+ @Override public Payload getData() {
+ WriteModification writeModification =
+ new WriteModification(TestModel.TEST_PATH, ImmutableNodes
+ .containerNode(TestModel.TEST_QNAME),
+ TestModel.createTestContext()
+ );
+
+ MutableCompositeModification compositeModification =
+ new MutableCompositeModification();
+
+ compositeModification.addModification(writeModification);
+
+ return new CompositeModificationPayload(
+ compositeModification.toSerializable());
+ }
+
+ @Override public long getTerm() {
+ return 1;
+ }
+
+ @Override public long getIndex() {
+ return 1;
+ }
+ });
+
+ return new AppendEntries(1, "member-1", 0, 100, modification, 1);
+ }
+
+ public static AppendEntries keyValueAppendEntries() {
+ List<ReplicatedLogEntry> modification = new ArrayList<>();
+
+ modification.add(0, new ReplicatedLogEntry() {
+ @Override public Payload getData() {
+ return new KeyValue("moiz", "test");
+ }
+
+ @Override public long getTerm() {
+ return 1;
+ }
+
+ @Override public long getIndex() {
+ return 1;
+ }
+ });
+
+ return new AppendEntries(1, "member-1", 0, 100, modification, 1);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.programs.appendentries;
+
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import com.typesafe.config.ConfigFactory;
+import org.opendaylight.controller.cluster.datastore.CompositeModificationPayload;
+import org.opendaylight.controller.cluster.example.messages.KeyValue;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+
+public class Server {
+
+ private static ActorSystem actorSystem;
+
+ public static class ServerActor extends UntypedActor {
+
+ @Override public void onReceive(Object message) throws Exception {
+ if(AppendEntries.SERIALIZABLE_CLASS.equals(message.getClass())){
+ AppendEntries appendEntries =
+ AppendEntries.fromSerializable(message);
+
+ Payload data = appendEntries.getEntries()
+ .get(0).getData();
+ if(data instanceof KeyValue){
+ System.out.println("Received : " + ((KeyValue) appendEntries.getEntries().get(0).getData()).getKey());
+ } else {
+ System.out.println("Received :" +
+ ((CompositeModificationPayload) appendEntries
+ .getEntries()
+ .get(0).getData()).getModification().toString());
+ }
+ } else if(message instanceof String){
+ System.out.println(message);
+ }
+ }
+ }
+
+ public static void main(String[] args){
+ actorSystem = ActorSystem.create("appendentries", ConfigFactory
+ .load().getConfig("ODLCluster"));
+
+ actorSystem.actorOf(Props.create(ServerActor.class), "server");
+ }
+}