From: Moiz Raja Date: Thu, 4 Sep 2014 22:47:29 +0000 (+0000) Subject: Merge "Bug 1637: Change Rpc actor calls to async" X-Git-Tag: release/helium~140 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=2dc333588d0c15eb7f2df6223dcdcc15e05b077e;hp=7f8512fcbe4ac373995b7e2e370d38a01f4eaeec Merge "Bug 1637: Change Rpc actor calls to async" --- diff --git a/features/akka/pom.xml b/features/akka/pom.xml new file mode 100644 index 0000000000..f1f3017c20 --- /dev/null +++ b/features/akka/pom.xml @@ -0,0 +1,264 @@ + + + 4.0.0 + + org.opendaylight.controller + commons.opendaylight + 1.4.2-SNAPSHOT + ../../opendaylight/commons/opendaylight + + features-akka + org.opendaylight.controller + jar + + features.xml + + 1.0.0-SNAPSHOT + 1.4.2-SNAPSHOT + 3.0.1 + 0.6.2-SNAPSHOT + 1.4.2-SNAPSHOT + 2.16 + + + + + + + + + org.scala-lang + scala-library + ${scala.version}.${scala.micro.version} + + + org.scala-lang + scala-reflect + ${scala.version}.${scala.micro.version} + + + com.typesafe + config + ${typesafe.config.version} + + + com.typesafe.akka + akka-actor_${scala.version} + ${akka.version} + + + com.typesafe.akka + akka-slf4j_${scala.version} + ${akka.version} + + + com.typesafe.akka + akka-osgi_${scala.version} + ${akka.version} + + + org.uncommons.maths + uncommons-maths + ${uncommons.maths.version} + + + jfree + jcommon + + + jfree + jfreechart + + + + + com.google.protobuf + protobuf-java + ${protobuf.version} + + + io.netty + netty + 3.8.0.Final + + + com.typesafe.akka + akka-remote_${scala.version} + ${akka.version} + + + com.typesafe.akka + akka-cluster_${scala.version} + ${akka.version} + + + org.iq80.leveldb + leveldb + ${leveldb.version} + + + org.fusesource.leveldbjni + leveldbjni-all + ${leveldbjni.version} + + + + + org.opendaylight.yangtools + features-test + ${feature.test.version} + test + + + + org.opendaylight.controller + opendaylight-karaf-empty + ${karaf.empty.version} + zip + + + + + + + + src/main/resources + true + + + + + org.apache.maven.plugins + maven-resources-plugin + + + filter + generate-resources + + resources + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + attach-artifacts + package + + attach-artifact + + + + + ${project.build.directory}/classes/${features.file} + xml + features + + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + ${surefire.version} + + + org.opendaylight.controller + opendaylight-karaf-empty + ${karaf.empty.version} + + + org.opendaylight.yangtools:features-test + + + + + + + scm:git:ssh://git.opendaylight.org:29418/controller.git + scm:git:ssh://git.opendaylight.org:29418/controller.git + HEAD + https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=summary + + diff --git a/features/akka/src/main/resources/features.xml b/features/akka/src/main/resources/features.xml new file mode 100644 index 0000000000..182ff766e6 --- /dev/null +++ b/features/akka/src/main/resources/features.xml @@ -0,0 +1,117 @@ + + + + + + + + + odl-akka-scala + odl-akka-system + odl-akka-clustering + odl-akka-leveldb + odl-akka-persistence + + + + mvn:org.scala-lang/scala-library/${scala.version}.${scala.micro.version} + mvn:org.scala-lang/scala-reflect/${scala.version}.${scala.micro.version} + + + odl-akka-scala + mvn:com.typesafe/config/${typesafe.config.version} + mvn:com.typesafe.akka/akka-actor_${scala.version}/${akka.version} + mvn:com.typesafe.akka/akka-slf4j_${scala.version}/${akka.version} + mvn:com.typesafe.akka/akka-osgi_${scala.version}/${akka.version} + + + odl-akka-system + wrap:mvn:org.uncommons.maths/uncommons-maths/${uncommons.maths.version} + mvn:com.google.protobuf/protobuf-java/${protobuf.version} + mvn:io.netty/netty/3.8.0.Final + mvn:com.typesafe.akka/akka-remote_${scala.version}/${akka.version} + mvn:com.typesafe.akka/akka-cluster_${scala.version}/${akka.version} + + + wrap:mvn:org.iq80.leveldb/leveldb/${leveldb.version} + mvn:org.fusesource.leveldbjni/leveldbjni-all/${leveldbjni.version} + + + odl-akka-leveldb + odl-akka-system + mvn:com.typesafe.akka/akka-persistence-experimental_${scala.version}/${akka.version} + wrap:mvn:com.google.protobuf/protobuf-java/${protobuf.version}$overwrite=merge&DynamicImport-Package=org.opendaylight.controller.protobuff.messages.*;org.opendaylight.controller.cluster.raft.protobuff.client.messages.* + + + + diff --git a/features/base/pom.xml b/features/base/pom.xml index ed8e2a8c20..8fec90fd9d 100644 --- a/features/base/pom.xml +++ b/features/base/pom.xml @@ -258,47 +258,38 @@ orbit org.apache.catalina - 7.0.53.v201406061610 orbit org.apache.catalina.ha - 7.0.53.v201406070630 orbit org.apache.catalina.tribes - 7.0.53.v201406070630 orbit org.apache.coyote - 7.0.53.v201406070630 orbit org.apache.el - 7.0.53.v201406060720 orbit org.apache.jasper - 7.0.53.v201406070630 orbit org.apache.juli.extras - 7.0.53.v201406060720 orbit org.apache.tomcat.api - 7.0.53.v201406060720 orbit org.apache.tomcat.util - 7.0.53.v201406070630 org.aopalliance diff --git a/features/base/src/main/resources/features.xml b/features/base/src/main/resources/features.xml index 999cf704d2..e4c455ccca 100644 --- a/features/base/src/main/resources/features.xml +++ b/features/base/src/main/resources/features.xml @@ -114,16 +114,16 @@ odl-base-gemini-web odl-base-eclipselink-persistence - mvn:orbit/org.apache.catalina/${commons.karaf.catalina} + mvn:orbit/org.apache.catalina/${commons.catalina} mvn:geminiweb/org.eclipse.gemini.web.tomcat/${geminiweb.version} - mvn:orbit/org.apache.catalina.ha/${commons.karaf.catalina.ha} - mvn:orbit/org.apache.catalina.tribes/${commons.karaf.catalina.tribes} - mvn:orbit/org.apache.coyote/${commons.karaf.coyote} - mvn:orbit/org.apache.el/${commons.karaf.el} - mvn:orbit/org.apache.jasper/${commons.karaf.jasper} - mvn:orbit/org.apache.juli.extras/${commons.karaf.juli.version} - mvn:orbit/org.apache.tomcat.api/${commons.karaf.tomcat.api} - mvn:orbit/org.apache.tomcat.util/${commons.karaf.tomcat.util} + mvn:orbit/org.apache.catalina.ha/${commons.catalina.ha} + mvn:orbit/org.apache.catalina.tribes/${commons.catalina.tribes} + mvn:orbit/org.apache.coyote/${commons.coyote} + mvn:orbit/org.apache.el/${commons.el} + mvn:orbit/org.apache.jasper/${commons.jasper} + mvn:orbit/org.apache.juli.extras/${commons.juli.version} + mvn:orbit/org.apache.tomcat.api/${commons.tomcat.api} + mvn:orbit/org.apache.tomcat.util/${commons.tomcat.util} mvn:org.opendaylight.controller/karaf-tomcat-security/${karaf.security.version} wrap:mvn:virgomirror/org.eclipse.jdt.core.compiler.batch/${eclipse.jdt.core.compiler.batch.version} diff --git a/features/mdsal/pom.xml b/features/mdsal/pom.xml index c6856c89fb..38fe92fa82 100644 --- a/features/mdsal/pom.xml +++ b/features/mdsal/pom.xml @@ -40,6 +40,13 @@ features xml + + org.opendaylight.controller + features-akka + ${commons.opendaylight.version} + features + xml + org.opendaylight.controller sal-core-api @@ -97,6 +104,30 @@ xml config + + org.opendaylight.controller + sal-distributed-datastore + + + org.opendaylight.controller + sal-remoterpc-connector + + + org.opendaylight.controller + sal-clustering-commons + + + org.opendaylight.controller + sal-akka-raft + ${mdsal.version} + + + org.opendaylight.controller + sal-clustering-config + ${mdsal.version} + xml + config + org.opendaylight.controller sal-netconf-connector diff --git a/features/mdsal/src/main/resources/features.xml b/features/mdsal/src/main/resources/features.xml index 408be621f5..ae73c71ee4 100644 --- a/features/mdsal/src/main/resources/features.xml +++ b/features/mdsal/src/main/resources/features.xml @@ -7,11 +7,13 @@ mvn:org.opendaylight.controller/features-config/${config.version}/xml/features mvn:org.opendaylight.controller/features-config-persister/${config.version}/xml/features mvn:org.opendaylight.controller/features-config-netty/${config.version}/xml/features + mvn:org.opendaylight.controller/features-akka/${commons.opendaylight.version}/xml/features odl-mdsal-broker odl-mdsal-netconf-connector odl-restconf odl-mdsal-xsql + odl-mdsal-clustering odl-toaster @@ -87,4 +89,33 @@ mvn:com.sun.jersey/jersey-servlet/${jersey.version} wrap:mvn:org.json/json/${org.json.version} + + odl-mdsal-broker + odl-akka-system + odl-akka-persistence + mvn:org.opendaylight.controller/sal-clustering-commons/${project.version} + mvn:org.opendaylight.controller/sal-akka-raft/${project.version} + mvn:com.codahale.metrics/metrics-core/3.0.1 + + + odl-mdsal-broker + odl-mdsal-clustering-commons + odl-akka-clustering + mvn:org.opendaylight.controller/sal-distributed-datastore/${project.version} + + + odl-mdsal-broker + odl-mdsal-clustering-commons + odl-akka-clustering + odl-akka-leveldb + mvn:org.opendaylight.controller/sal-remoterpc-connector/${project.version} + + + odl-mdsal-remoterpc-connector + odl-mdsal-distributed-datastore + mvn:org.opendaylight.controller/sal-clustering-config/${project.version}/xml/config + mvn:org.opendaylight.controller/sal-clustering-config/${project.version}/xml/akkaconf + mvn:org.opendaylight.controller/sal-clustering-config/${project.version}/xml/moduleshardconf + mvn:org.opendaylight.controller/sal-clustering-config/${project.version}/xml/moduleconf + diff --git a/features/pom.xml b/features/pom.xml index 039060b4ae..01156cf02a 100644 --- a/features/pom.xml +++ b/features/pom.xml @@ -26,5 +26,6 @@ netconf protocol-framework adsal-compatibility + akka \ No newline at end of file diff --git a/opendaylight/commons/opendaylight/pom.xml b/opendaylight/commons/opendaylight/pom.xml index 2e95307268..b05170ddc8 100644 --- a/opendaylight/commons/opendaylight/pom.xml +++ b/opendaylight/commons/opendaylight/pom.xml @@ -40,25 +40,16 @@ 0.4.2-SNAPSHOT 3.0.1 - 7.0.32.v201211201336 - 7.0.32.v201211201952 - 7.0.32.v201211201952 - 7.0.32.v201211201952 - 7.0.32.v201211081135 - 7.0.32.v201211201952 - 7.0.32.v201211081135 - 7.0.32.v201211081135 - 7.0.32.v201211201952 - 7.0.53.v201406061610 - 7.0.53.v201406070630 - 7.0.53.v201406070630 - 7.0.53.v201406070630 - 7.0.53.v201406060720 - 7.0.53.v201406070630 - 7.0.53.v201406060720 - 7.0.53.v201406060720 - 7.0.53.v201406070630 + 7.0.53.v201406061610 + 7.0.53.v201406070630 + 7.0.53.v201406070630 + 7.0.53.v201406070630 + 7.0.53.v201406060720 + 7.0.53.v201406070630 + 7.0.53.v201406060720 + 7.0.53.v201406060720 + 7.0.53.v201406070630 0.0.3-SNAPSHOT 1.2.2 @@ -77,6 +68,7 @@ 1.4 0.2.5-SNAPSHOT etc/opendaylight/karaf + 05-clustering.xml 00-netty.xml 01-mdsal.xml 04-xsql.xml @@ -208,7 +200,7 @@ 1.0.0-SNAPSHOT 0.4.2-SNAPSHOT 1.2.0 - 1.2.2 + 1.2.2a 0.4.2-SNAPSHOT 0.0.2-SNAPSHOT 0.4.2-SNAPSHOT diff --git a/opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/etc/custom.properties b/opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/etc/custom.properties index c2ac77a5d6..a644bf6ee8 100644 --- a/opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/etc/custom.properties +++ b/opendaylight/distribution/opendaylight-karaf-resources/src/main/resources/etc/custom.properties @@ -92,6 +92,11 @@ ovsdb.listenPort=6640 # default Openflow version = 1.0, we also support 1.3. # ovsdb.of.version=1.3 +# ovsdb can be configured with ml2 to perform l3 forwarding. When used in that scenario, the mac address of the default +# gateway --on the external subnet-- is expected to be resolved from its inet address. The config below overrides that +# specific arp/neighDiscovery lookup. +# ovsdb.l3gateway.mac=00:00:5E:00:02:01 + # TLS configuration # To enable TLS, set secureChannelEnabled=true and specify the location of controller Java KeyStore and TrustStore files. # The Java KeyStore contains controller's private key and certificate. The Java TrustStore contains the trusted certificate diff --git a/opendaylight/distribution/opendaylight/pom.xml b/opendaylight/distribution/opendaylight/pom.xml index 4ae35c905f..fcb452f422 100644 --- a/opendaylight/distribution/opendaylight/pom.xml +++ b/opendaylight/distribution/opendaylight/pom.xml @@ -634,10 +634,6 @@ org.opendaylight.controller.thirdparty net.sf.jung2 - - org.opendaylight.controller.thirdparty - org.apache.catalina.filters.CorsFilter - org.opendaylight.controller.thirdparty org.openflow.openflowj @@ -1070,10 +1066,6 @@ org.opendaylight.controller sal-restconf-broker - - org.opendaylight.controller - sal-remoterpc-connector - @@ -1320,10 +1312,6 @@ jeromq 0.3.1 - - org.opendaylight.controller - sal-distributed-datastore - org.opendaylight.controller sal-clustering-config @@ -1379,8 +1367,8 @@ generate-resources ${project.build.directory}/configuration - sal-rest-connector-config,config-netty-config,md-sal-config,netconf-config,toaster-config,netconf-connector-config - **\/*.xml + sal-rest-connector-config,config-netty-config,md-sal-config,netconf-config,toaster-config,netconf-connector-config,sal-clustering-config + **\/*.xml,**/*.conf true false diff --git a/opendaylight/distribution/opendaylight/src/main/resources/configuration/config.ini b/opendaylight/distribution/opendaylight/src/main/resources/configuration/config.ini index b2fc3cb386..530e46e14a 100644 --- a/opendaylight/distribution/opendaylight/src/main/resources/configuration/config.ini +++ b/opendaylight/distribution/opendaylight/src/main/resources/configuration/config.ini @@ -116,6 +116,11 @@ ovsdb.listenPort=6640 # default Openflow version = 1.3, we also support 1.0. ovsdb.of.version=1.3 +# ovsdb can be configured with ml2 to perform l3 forwarding. When used in that scenario, the mac address of the default +# gateway --on the external subnet-- is expected to be resolved from its inet address. The config below overrides that +# specific arp/neighDiscovery lookup. +# ovsdb.l3gateway.mac=00:00:5E:00:02:01 + # TLS configuration # To enable TLS, set secureChannelEnabled=true and specify the location of controller Java KeyStore and TrustStore files. # The Java KeyStore contains controller's private key and certificate. The Java TrustStore contains the trusted certificate diff --git a/opendaylight/md-sal/sal-akka-raft/pom.xml b/opendaylight/md-sal/sal-akka-raft/pom.xml index 325005b239..98c81c267f 100644 --- a/opendaylight/md-sal/sal-akka-raft/pom.xml +++ b/opendaylight/md-sal/sal-akka-raft/pom.xml @@ -97,9 +97,8 @@ ${project.groupId}.${project.artifactId} - - - + org.opendaylight.cluster.raft + * diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index cbd7ca2d70..c4ff108611 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -12,14 +12,21 @@ import akka.actor.ActorRef; import akka.actor.Props; import akka.japi.Creator; import com.google.common.base.Optional; +import com.google.protobuf.ByteString; import org.opendaylight.controller.cluster.example.messages.KeyValue; import org.opendaylight.controller.cluster.example.messages.KeyValueSaved; import org.opendaylight.controller.cluster.example.messages.PrintRole; import org.opendaylight.controller.cluster.example.messages.PrintState; import org.opendaylight.controller.cluster.raft.ConfigParams; import org.opendaylight.controller.cluster.raft.RaftActor; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.util.HashMap; import java.util.Map; @@ -82,14 +89,63 @@ public class ExampleActor extends RaftActor { } } - @Override protected Object createSnapshot() { - return state; + @Override protected void createSnapshot() { + ByteString bs = null; + try { + bs = fromObject(state); + } catch (Exception e) { + LOG.error("Exception in creating snapshot", e); + } + getSelf().tell(new CaptureSnapshotReply(bs), null); } - @Override protected void applySnapshot(Object snapshot) { + @Override protected void applySnapshot(ByteString snapshot) { state.clear(); - state.putAll((HashMap) snapshot); - LOG.debug("Snapshot applied to state :" + ((HashMap) snapshot).size()); + try { + state.putAll((HashMap) toObject(snapshot)); + } catch (Exception e) { + LOG.error("Exception in applying snapshot", e); + } + LOG.debug("Snapshot applied to state :" + ((HashMap) state).size()); + } + + private ByteString fromObject(Object snapshot) throws Exception { + ByteArrayOutputStream b = null; + ObjectOutputStream o = null; + try { + b = new ByteArrayOutputStream(); + o = new ObjectOutputStream(b); + o.writeObject(snapshot); + byte[] snapshotBytes = b.toByteArray(); + return ByteString.copyFrom(snapshotBytes); + } finally { + if (o != null) { + o.flush(); + o.close(); + } + if (b != null) { + b.close(); + } + } + } + + private Object toObject(ByteString bs) throws ClassNotFoundException, IOException { + Object obj = null; + ByteArrayInputStream bis = null; + ObjectInputStream ois = null; + try { + bis = new ByteArrayInputStream(bs.toByteArray()); + ois = new ObjectInputStream(bis); + obj = ois.readObject(); + } finally { + if (bis != null) { + bis.close(); + } + if (ois != null) { + ois.close(); + } + } + return obj; } @Override protected void onStateChanged() { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleConfigParamsImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleConfigParamsImpl.java index d11377dbcb..6192cad230 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleConfigParamsImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleConfigParamsImpl.java @@ -17,4 +17,9 @@ public class ExampleConfigParamsImpl extends DefaultConfigParamsImpl { public long getSnapshotBatchCount() { return 50; } + + @Override + public int getSnapshotChunkSize() { + return 50; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java index fd6e192bf0..978ea91089 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java @@ -109,6 +109,8 @@ public class TestDriver { td.printState(); } else if (command.startsWith("printNodes")) { td.printNodes(); + } else { + System.out.println("Invalid command:" + command); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java index b5b034afb9..b436bce500 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java @@ -7,6 +7,8 @@ */ package org.opendaylight.controller.cluster.raft; +import com.google.protobuf.ByteString; + import java.util.ArrayList; import java.util.List; @@ -16,12 +18,18 @@ import java.util.List; */ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { - protected final List journal; - protected final Object snapshot; + protected List journal; + protected ByteString snapshot; protected long snapshotIndex = -1; protected long snapshotTerm = -1; - public AbstractReplicatedLogImpl(Object state, long snapshotIndex, + // to be used for rollback during save snapshot failure + protected List snapshottedJournal; + protected ByteString previousSnapshot; + protected long previousSnapshotIndex = -1; + protected long previousSnapshotTerm = -1; + + public AbstractReplicatedLogImpl(ByteString state, long snapshotIndex, long snapshotTerm, List unAppliedEntries) { this.snapshot = state; this.snapshotIndex = snapshotIndex; @@ -137,11 +145,11 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { @Override public boolean isInSnapshot(long logEntryIndex) { - return logEntryIndex <= snapshotIndex; + return logEntryIndex <= snapshotIndex && snapshotIndex != -1; } @Override - public Object getSnapshot() { + public ByteString getSnapshot() { return snapshot; } @@ -160,4 +168,68 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { @Override public abstract void removeFromAndPersist(long index); + + @Override + public void setSnapshotIndex(long snapshotIndex) { + this.snapshotIndex = snapshotIndex; + } + + @Override + public void setSnapshotTerm(long snapshotTerm) { + this.snapshotTerm = snapshotTerm; + } + + @Override + public void setSnapshot(ByteString snapshot) { + this.snapshot = snapshot; + } + + @Override + public void clear(int startIndex, int endIndex) { + journal.subList(startIndex, endIndex).clear(); + } + + @Override + public void snapshotPreCommit(ByteString snapshot, long snapshotCapturedIndex, long snapshotCapturedTerm) { + snapshottedJournal = new ArrayList<>(journal.size()); + + snapshottedJournal.addAll(journal.subList(0, (int)(snapshotCapturedIndex - snapshotIndex))); + clear(0, (int) (snapshotCapturedIndex - snapshotIndex)); + + previousSnapshotIndex = snapshotIndex; + setSnapshotIndex(snapshotCapturedIndex); + + previousSnapshotTerm = snapshotTerm; + setSnapshotTerm(snapshotCapturedTerm); + + previousSnapshot = getSnapshot(); + setSnapshot(snapshot); + } + + @Override + public void snapshotCommit() { + snapshottedJournal.clear(); + snapshottedJournal = null; + previousSnapshotIndex = -1; + previousSnapshotTerm = -1; + previousSnapshot = null; + } + + @Override + public void snapshotRollback() { + snapshottedJournal.addAll(journal); + journal.clear(); + journal = snapshottedJournal; + snapshottedJournal = null; + + snapshotIndex = previousSnapshotIndex; + previousSnapshotIndex = -1; + + snapshotTerm = previousSnapshotTerm; + previousSnapshotTerm = -1; + + snapshot = previousSnapshot; + previousSnapshot = null; + + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java index 4c6434aec4..ed6439d8c3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java @@ -52,4 +52,9 @@ public interface ConfigParams { * @return int */ public int getElectionTimeVariance(); + + /** + * The size (in bytes) of the snapshot chunk sent from Leader + */ + public int getSnapshotChunkSize(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java index 6432fa4811..75c237f503 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java @@ -25,6 +25,8 @@ public class DefaultConfigParamsImpl implements ConfigParams { */ private static final int ELECTION_TIME_MAX_VARIANCE = 100; + private final int SNAPSHOT_CHUNK_SIZE = 2048 * 1000; //2MB + /** * The interval at which a heart beat message will be sent to the remote @@ -58,4 +60,9 @@ public class DefaultConfigParamsImpl implements ConfigParams { public int getElectionTimeVariance() { return ELECTION_TIME_MAX_VARIANCE; } + + @Override + public int getSnapshotChunkSize() { + return SNAPSHOT_CHUNK_SIZE; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 988789b401..296ce2d24a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -19,10 +19,14 @@ import akka.persistence.SaveSnapshotSuccess; import akka.persistence.SnapshotOffer; import akka.persistence.SnapshotSelectionCriteria; import akka.persistence.UntypedPersistentActor; +import com.google.common.base.Optional; +import com.google.protobuf.ByteString; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; -import com.google.common.base.Optional; +import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.behaviors.Candidate; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.Leader; @@ -31,10 +35,11 @@ import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer; +import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; import java.io.Serializable; -import java.util.List; import java.util.Map; /** @@ -98,6 +103,9 @@ public abstract class RaftActor extends UntypedPersistentActor { */ private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl(); + private CaptureSnapshot captureSnapshot = null; + + private volatile boolean hasSnapshotCaptureInitiated = false; public RaftActor(String id, Map peerAddresses) { this(id, peerAddresses, Optional.absent()); @@ -125,6 +133,7 @@ public abstract class RaftActor extends UntypedPersistentActor { replicatedLog = new ReplicatedLogImpl(snapshot); context.setReplicatedLog(replicatedLog); + context.setLastApplied(snapshot.getLastAppliedIndex()); LOG.debug("Applied snapshot to replicatedLog. " + "snapshotIndex={}, snapshotTerm={}, journal-size={}", @@ -132,7 +141,7 @@ public abstract class RaftActor extends UntypedPersistentActor { replicatedLog.size()); // Apply the snapshot to the actors state - applySnapshot(snapshot.getState()); + applySnapshot(ByteString.copyFrom(snapshot.getState())); } else if (message instanceof ReplicatedLogEntry) { replicatedLog.append((ReplicatedLogEntry) message); @@ -164,7 +173,17 @@ public abstract class RaftActor extends UntypedPersistentActor { applyState.getReplicatedLogEntry().getData()); } else if(message instanceof ApplySnapshot ) { - applySnapshot(((ApplySnapshot) message).getSnapshot()); + Snapshot snapshot = ((ApplySnapshot) message).getSnapshot(); + + LOG.debug("ApplySnapshot called on Follower Actor " + + "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(), + snapshot.getLastAppliedTerm()); + applySnapshot(ByteString.copyFrom(snapshot.getState())); + + //clears the followers log, sets the snapshot index to ensure adjusted-index works + replicatedLog = new ReplicatedLogImpl(snapshot); + context.setReplicatedLog(replicatedLog); + context.setLastApplied(snapshot.getLastAppliedIndex()); } else if (message instanceof FindLeader) { getSender().tell( @@ -174,13 +193,26 @@ public abstract class RaftActor extends UntypedPersistentActor { } else if (message instanceof SaveSnapshotSuccess) { SaveSnapshotSuccess success = (SaveSnapshotSuccess) message; + LOG.info("SaveSnapshotSuccess received for snapshot"); + + context.getReplicatedLog().snapshotCommit(); // TODO: Not sure if we want to be this aggressive with trimming stuff trimPersistentData(success.metadata().sequenceNr()); } else if (message instanceof SaveSnapshotFailure) { + SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message; + + LOG.info("saveSnapshotFailure.metadata():{}", saveSnapshotFailure.metadata().toString()); + LOG.error(saveSnapshotFailure.cause(), "SaveSnapshotFailure received for snapshot Cause:"); - // TODO: Handle failure in saving the snapshot + context.getReplicatedLog().snapshotRollback(); + + LOG.info("Replicated Log rollbacked. Snapshot will be attempted in the next cycle." + + "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", + context.getReplicatedLog().getSnapshotIndex(), + context.getReplicatedLog().getSnapshotTerm(), + context.getReplicatedLog().size()); } else if (message instanceof AddRaftPeer){ @@ -196,7 +228,25 @@ public abstract class RaftActor extends UntypedPersistentActor { RemoveRaftPeer rrp = (RemoveRaftPeer)message; context.removePeer(rrp.getName()); + } else if (message instanceof CaptureSnapshot) { + LOG.debug("CaptureSnapshot received by actor"); + CaptureSnapshot cs = (CaptureSnapshot)message; + captureSnapshot = cs; + createSnapshot(); + + } else if (message instanceof CaptureSnapshotReply){ + LOG.debug("CaptureSnapshotReply received by actor"); + CaptureSnapshotReply csr = (CaptureSnapshotReply) message; + + ByteString stateInBytes = csr.getSnapshot(); + LOG.debug("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size()); + handleCaptureSnapshotReply(stateInBytes); + } else { + if (!(message instanceof AppendEntriesMessages.AppendEntries) + && !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) { + LOG.debug("onReceiveCommand: message:" + message.getClass()); + } RaftState state = currentBehavior.handleMessage(getSender(), message); @@ -344,7 +394,7 @@ public abstract class RaftActor extends UntypedPersistentActor { * * @return The current state of the actor */ - protected abstract Object createSnapshot(); + protected abstract void createSnapshot(); /** * This method will be called by the RaftActor during recovery to @@ -356,7 +406,7 @@ public abstract class RaftActor extends UntypedPersistentActor { * * @param snapshot A snapshot of the state of the actor */ - protected abstract void applySnapshot(Object snapshot); + protected abstract void applySnapshot(ByteString snapshot); /** * This method will be called by the RaftActor when the state of the @@ -423,11 +473,39 @@ public abstract class RaftActor extends UntypedPersistentActor { return peerAddress; } + private void handleCaptureSnapshotReply(ByteString stateInBytes) { + // create a snapshot object from the state provided and save it + // when snapshot is saved async, SaveSnapshotSuccess is raised. + + Snapshot sn = Snapshot.create(stateInBytes.toByteArray(), + context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1), + captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(), + captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm()); + + saveSnapshot(sn); + + LOG.info("Persisting of snapshot done:{}", sn.getLogMessage()); + + //be greedy and remove entries from in-mem journal which are in the snapshot + // and update snapshotIndex and snapshotTerm without waiting for the success, + + context.getReplicatedLog().snapshotPreCommit(stateInBytes, + captureSnapshot.getLastAppliedIndex(), + captureSnapshot.getLastAppliedTerm()); + + LOG.info("Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " + + "and term:{}", captureSnapshot.getLastAppliedIndex(), + captureSnapshot.getLastAppliedTerm()); + + captureSnapshot = null; + hasSnapshotCaptureInitiated = false; + } + private class ReplicatedLogImpl extends AbstractReplicatedLogImpl { public ReplicatedLogImpl(Snapshot snapshot) { - super(snapshot.getState(), + super(ByteString.copyFrom(snapshot.getState()), snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(), snapshot.getUnAppliedEntries()); } @@ -476,8 +554,10 @@ public abstract class RaftActor extends UntypedPersistentActor { persist(replicatedLogEntry, new Procedure() { public void apply(ReplicatedLogEntry evt) throws Exception { - // FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned. - if (journal.size() > context.getConfigParams().getSnapshotBatchCount()) { + // when a snaphsot is being taken, captureSnapshot != null + if (hasSnapshotCaptureInitiated == false && + journal.size() % context.getConfigParams().getSnapshotBatchCount() == 0) { + LOG.info("Initiating Snapshot Capture.."); long lastAppliedIndex = -1; long lastAppliedTerm = -1; @@ -493,26 +573,11 @@ public abstract class RaftActor extends UntypedPersistentActor { LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex); LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm); - // create a snapshot object from the state provided and save it - // when snapshot is saved async, SaveSnapshotSuccess is raised. - Snapshot sn = Snapshot.create(createSnapshot(), - getFrom(context.getLastApplied() + 1), - lastIndex(), lastTerm(), lastAppliedIndex, - lastAppliedTerm); - saveSnapshot(sn); - - LOG.info("Persisting of snapshot done:{}", sn.getLogMessage()); - - //be greedy and remove entries from in-mem journal which are in the snapshot - // and update snapshotIndex and snapshotTerm without waiting for the success, - // TODO: damage-recovery to be done on failure - journal.subList(0, (int) (lastAppliedIndex - snapshotIndex)).clear(); - snapshotIndex = lastAppliedIndex; - snapshotTerm = lastAppliedTerm; - - LOG.info("Removed in-memory snapshotted entries, " + - "adjusted snaphsotIndex:{}" + - "and term:{}", snapshotIndex, lastAppliedTerm); + // send a CaptureSnapshot to self to make the expensive operation async. + getSelf().tell(new CaptureSnapshot( + lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm), + null); + hasSnapshotCaptureInitiated = true; } // Send message for replication if (clientActor != null) { @@ -542,65 +607,6 @@ public abstract class RaftActor extends UntypedPersistentActor { } - private static class Snapshot implements Serializable { - private final Object state; - private final List unAppliedEntries; - private final long lastIndex; - private final long lastTerm; - private final long lastAppliedIndex; - private final long lastAppliedTerm; - - private Snapshot(Object state, - List unAppliedEntries, long lastIndex, - long lastTerm, long lastAppliedIndex, long lastAppliedTerm) { - this.state = state; - this.unAppliedEntries = unAppliedEntries; - this.lastIndex = lastIndex; - this.lastTerm = lastTerm; - this.lastAppliedIndex = lastAppliedIndex; - this.lastAppliedTerm = lastAppliedTerm; - } - - - public static Snapshot create(Object state, - List entries, long lastIndex, long lastTerm, - long lastAppliedIndex, long lastAppliedTerm) { - return new Snapshot(state, entries, lastIndex, lastTerm, - lastAppliedIndex, lastAppliedTerm); - } - - public Object getState() { - return state; - } - - public List getUnAppliedEntries() { - return unAppliedEntries; - } - - public long getLastTerm() { - return lastTerm; - } - - public long getLastAppliedIndex() { - return lastAppliedIndex; - } - - public long getLastAppliedTerm() { - return lastAppliedTerm; - } - - public String getLogMessage() { - StringBuilder sb = new StringBuilder(); - return sb.append("Snapshot={") - .append("lastTerm:" + this.getLastTerm() + ", ") - .append("LastAppliedIndex:" + this.getLastAppliedIndex() + ", ") - .append("LastAppliedTerm:" + this.getLastAppliedTerm() + ", ") - .append("UnAppliedEntries size:" + this.getUnAppliedEntries().size() + "}") - .toString(); - - } - } - private class ElectionTermImpl implements ElectionTerm { /** * Identifier of the actor whose election term information this is diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java index e6e160bc02..c17f5448c6 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java @@ -8,6 +8,8 @@ package org.opendaylight.controller.cluster.raft; +import com.google.protobuf.ByteString; + import java.util.List; /** @@ -118,7 +120,7 @@ public interface ReplicatedLog { * * @return an object representing the snapshot if it exists. null otherwise */ - Object getSnapshot(); + ByteString getSnapshot(); /** * Get the index of the snapshot @@ -134,4 +136,49 @@ public interface ReplicatedLog { * otherwise */ long getSnapshotTerm(); + + /** + * sets the snapshot index in the replicated log + * @param snapshotIndex + */ + void setSnapshotIndex(long snapshotIndex); + + /** + * sets snapshot term + * @param snapshotTerm + */ + public void setSnapshotTerm(long snapshotTerm); + + /** + * sets the snapshot in bytes + * @param snapshot + */ + public void setSnapshot(ByteString snapshot); + + /** + * Clears the journal entries with startIndex(inclusive) and endIndex (exclusive) + * @param startIndex + * @param endIndex + */ + public void clear(int startIndex, int endIndex); + + /** + * Handles all the bookkeeping in order to perform a rollback in the + * event of SaveSnapshotFailure + * @param snapshot + * @param snapshotCapturedIndex + * @param snapshotCapturedTerm + */ + public void snapshotPreCommit(ByteString snapshot, + long snapshotCapturedIndex, long snapshotCapturedTerm); + + /** + * Sets the Replicated log to state after snapshot success. + */ + public void snapshotCommit(); + + /** + * Restores the replicated log to a state in the event of a save snapshot failure + */ + public void snapshotRollback(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SerializationUtils.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SerializationUtils.java index 374e0fa9ba..2f5ba48f92 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SerializationUtils.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SerializationUtils.java @@ -9,12 +9,16 @@ package org.opendaylight.controller.cluster.raft; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; public class SerializationUtils { public static Object fromSerializable(Object serializable){ if(serializable.getClass().equals(AppendEntries.SERIALIZABLE_CLASS)){ return AppendEntries.fromSerializable(serializable); + + } else if (serializable.getClass().equals(InstallSnapshot.SERIALIZABLE_CLASS)) { + return InstallSnapshot.fromSerializable(serializable); } return serializable; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java new file mode 100644 index 0000000000..8e0fcca9f7 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import java.io.Serializable; +import java.util.List; + + +public class Snapshot implements Serializable { + private final byte[] state; + private final List unAppliedEntries; + private final long lastIndex; + private final long lastTerm; + private final long lastAppliedIndex; + private final long lastAppliedTerm; + + private Snapshot(byte[] state, + List unAppliedEntries, long lastIndex, + long lastTerm, long lastAppliedIndex, long lastAppliedTerm) { + this.state = state; + this.unAppliedEntries = unAppliedEntries; + this.lastIndex = lastIndex; + this.lastTerm = lastTerm; + this.lastAppliedIndex = lastAppliedIndex; + this.lastAppliedTerm = lastAppliedTerm; + } + + + public static Snapshot create(byte[] state, + List entries, long lastIndex, long lastTerm, + long lastAppliedIndex, long lastAppliedTerm) { + return new Snapshot(state, entries, lastIndex, lastTerm, + lastAppliedIndex, lastAppliedTerm); + } + + public byte[] getState() { + return state; + } + + public List getUnAppliedEntries() { + return unAppliedEntries; + } + + public long getLastTerm() { + return lastTerm; + } + + public long getLastAppliedIndex() { + return lastAppliedIndex; + } + + public long getLastAppliedTerm() { + return lastAppliedTerm; + } + + public long getLastIndex() { + return this.lastIndex; + } + + public String getLogMessage() { + StringBuilder sb = new StringBuilder(); + return sb.append("Snapshot={") + .append("lastTerm:" + this.getLastTerm() + ", ") + .append("lastIndex:" + this.getLastIndex() + ", ") + .append("LastAppliedIndex:" + this.getLastAppliedIndex() + ", ") + .append("LastAppliedTerm:" + this.getLastAppliedTerm() + ", ") + .append("UnAppliedEntries size:" + this.getUnAppliedEntries().size() + "}") + .toString(); + + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplySnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplySnapshot.java index 9739fb2f1b..c356804223 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplySnapshot.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplySnapshot.java @@ -8,16 +8,21 @@ package org.opendaylight.controller.cluster.raft.base.messages; +import org.opendaylight.controller.cluster.raft.Snapshot; + import java.io.Serializable; +/** + * Internal message, issued by follower to its actor + */ public class ApplySnapshot implements Serializable { - private final Object snapshot; + private final Snapshot snapshot; - public ApplySnapshot(Object snapshot) { + public ApplySnapshot(Snapshot snapshot) { this.snapshot = snapshot; } - public Object getSnapshot() { + public Snapshot getSnapshot() { return snapshot; } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java new file mode 100644 index 0000000000..bb86e1a37d --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.base.messages; + +public class CaptureSnapshot { + private long lastAppliedIndex; + private long lastAppliedTerm; + private long lastIndex; + private long lastTerm; + + public CaptureSnapshot(long lastIndex, long lastTerm, + long lastAppliedIndex, long lastAppliedTerm) { + this.lastIndex = lastIndex; + this.lastTerm = lastTerm; + this.lastAppliedIndex = lastAppliedIndex; + this.lastAppliedTerm = lastAppliedTerm; + } + + public long getLastAppliedIndex() { + return lastAppliedIndex; + } + + public long getLastAppliedTerm() { + return lastAppliedTerm; + } + + public long getLastIndex() { + return lastIndex; + } + + public long getLastTerm() { + return lastTerm; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshotReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshotReply.java new file mode 100644 index 0000000000..96150db689 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshotReply.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft.base.messages; + +import com.google.protobuf.ByteString; + +public class CaptureSnapshotReply { + private ByteString snapshot; + + public CaptureSnapshotReply(ByteString snapshot) { + this.snapshot = snapshot; + } + + public ByteString getSnapshot() { + return snapshot; + } + + public void setSnapshot(ByteString snapshot) { + this.snapshot = snapshot; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index 251a13d583..7e896fed29 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -305,6 +305,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * @param index a log index that is known to be committed */ protected void applyLogToStateMachine(final long index) { + long newLastApplied = context.getLastApplied(); // Now maybe we apply to the state machine for (long i = context.getLastApplied() + 1; i < index + 1; i++) { @@ -322,15 +323,19 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { if (replicatedLogEntry != null) { actor().tell(new ApplyState(clientActor, identifier, replicatedLogEntry), actor()); + newLastApplied = i; } else { + //if one index is not present in the log, no point in looping + // around as the rest wont be present either context.getLogger().error( - "Missing index " + i + " from log. Cannot apply state."); + "Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index ); + break; } } // Send a local message to the local RaftActor (it's derived class to be // specific to apply the log to it's index) - context.getLogger().debug("Setting last applied to {}", index); - context.setLastApplied(index); + context.getLogger().debug("Setting last applied to {}", newLastApplied); + context.setLastApplied(newLastApplied); } protected Object fromSerializableMessage(Object serializable){ diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 54e0494b9d..610fdc987f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -9,17 +9,22 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; +import com.google.protobuf.ByteString; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; +import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import java.util.ArrayList; + /** * The behavior of a RaftActor in the Follower state *

@@ -31,6 +36,8 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; * */ public class Follower extends AbstractRaftActorBehavior { + private ByteString snapshotChunksCollected = ByteString.EMPTY; + public Follower(RaftActorContext context) { super(context); @@ -106,6 +113,9 @@ public class Follower extends AbstractRaftActorBehavior { if (outOfSync) { // We found that the log was out of sync so just send a negative // reply and return + context.getLogger().debug("Follower is out-of-sync, " + + "so sending negative reply, lastIndex():{}, lastTerm():{}", + lastIndex(), lastTerm()); sender.tell( new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(), lastTerm()), actor() @@ -191,7 +201,13 @@ public class Follower extends AbstractRaftActorBehavior { // If commitIndex > lastApplied: increment lastApplied, apply // log[lastApplied] to state machine (§5.3) - if (appendEntries.getLeaderCommit() > context.getLastApplied()) { + // check if there are any entries to be applied. last-applied can be equal to last-index + if (appendEntries.getLeaderCommit() > context.getLastApplied() && + context.getLastApplied() < lastIndex()) { + context.getLogger().debug("applyLogToStateMachine, " + + "appendEntries.getLeaderCommit():{}," + + "context.getLastApplied():{}, lastIndex():{}", + appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex()); applyLogToStateMachine(appendEntries.getLeaderCommit()); } @@ -234,7 +250,7 @@ public class Follower extends AbstractRaftActorBehavior { } else if (message instanceof InstallSnapshot) { InstallSnapshot installSnapshot = (InstallSnapshot) message; - actor().tell(new ApplySnapshot(installSnapshot.getData()), actor()); + handleInstallSnapshot(sender, installSnapshot); } scheduleElection(electionDuration()); @@ -242,6 +258,47 @@ public class Follower extends AbstractRaftActorBehavior { return super.handleMessage(sender, message); } + private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) { + context.getLogger().debug("InstallSnapshot received by follower " + + "datasize:{} , Chunk:{}/{}", installSnapshot.getData().size(), + installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks()); + + try { + if (installSnapshot.getChunkIndex() == installSnapshot.getTotalChunks()) { + // this is the last chunk, create a snapshot object and apply + + snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData()); + context.getLogger().debug("Last chunk received: snapshotChunksCollected.size:{}", + snapshotChunksCollected.size()); + + Snapshot snapshot = Snapshot.create(snapshotChunksCollected.toByteArray(), + new ArrayList(), + installSnapshot.getLastIncludedIndex(), + installSnapshot.getLastIncludedTerm(), + installSnapshot.getLastIncludedIndex(), + installSnapshot.getLastIncludedTerm()); + + actor().tell(new ApplySnapshot(snapshot), actor()); + + } else { + // we have more to go + snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData()); + context.getLogger().debug("Chunk={},snapshotChunksCollected.size:{}", + installSnapshot.getChunkIndex(), snapshotChunksCollected.size()); + } + + sender.tell(new InstallSnapshotReply( + currentTerm(), context.getId(), installSnapshot.getChunkIndex(), + true), actor()); + + } catch (Exception e) { + context.getLogger().error("Exception in InstallSnapshot of follower", e); + //send reply with success as false. The chunk will be sent again on failure + sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(), + installSnapshot.getChunkIndex(), false), actor()); + } + } + @Override public void close() throws Exception { stopElection(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index 234f9db664..90948ffef7 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -12,6 +12,7 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; @@ -30,6 +31,7 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import scala.concurrent.duration.FiniteDuration; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -64,8 +66,9 @@ import java.util.concurrent.atomic.AtomicLong; public class Leader extends AbstractRaftActorBehavior { - private final Map followerToLog = + protected final Map followerToLog = new HashMap(); + protected final Map mapFollowerToSnapshot = new HashMap<>(); private final Set followers; @@ -246,16 +249,48 @@ public class Leader extends AbstractRaftActorBehavior { return super.handleMessage(sender, message); } - private void handleInstallSnapshotReply(InstallSnapshotReply message) { - InstallSnapshotReply reply = message; + private void handleInstallSnapshotReply(InstallSnapshotReply reply) { String followerId = reply.getFollowerId(); - FollowerLogInformation followerLogInformation = - followerToLog.get(followerId); + FollowerToSnapshot followerToSnapshot = + mapFollowerToSnapshot.get(followerId); + + if (followerToSnapshot != null && + followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) { + + if (reply.isSuccess()) { + if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) { + //this was the last chunk reply + context.getLogger().debug("InstallSnapshotReply received, " + + "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}", + reply.getChunkIndex(), followerId, + context.getReplicatedLog().getSnapshotIndex() + 1); + + FollowerLogInformation followerLogInformation = + followerToLog.get(followerId); + followerLogInformation.setMatchIndex( + context.getReplicatedLog().getSnapshotIndex()); + followerLogInformation.setNextIndex( + context.getReplicatedLog().getSnapshotIndex() + 1); + mapFollowerToSnapshot.remove(followerId); + context.getLogger().debug("followerToLog.get(followerId).getNextIndex().get()=" + + followerToLog.get(followerId).getNextIndex().get()); + + } else { + followerToSnapshot.markSendStatus(true); + } + } else { + context.getLogger().info("InstallSnapshotReply received, " + + "sending snapshot chunk failed, Will retry, Chunk:{}", + reply.getChunkIndex()); + followerToSnapshot.markSendStatus(false); + } - followerLogInformation - .setMatchIndex(context.getReplicatedLog().getSnapshotIndex()); - followerLogInformation - .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1); + } else { + context.getLogger().error("ERROR!!" + + "FollowerId in InstallSnapshotReply not known to Leader" + + " or Chunk Index in InstallSnapshotReply not matching {} != {}", + followerToSnapshot.getChunkIndex(), reply.getChunkIndex() ); + } } private void replicate(Replicate replicate) { @@ -282,30 +317,56 @@ public class Leader extends AbstractRaftActorBehavior { private void sendAppendEntries() { // Send an AppendEntries to all followers for (String followerId : followers) { - ActorSelection followerActor = - context.getPeerActorSelection(followerId); + ActorSelection followerActor = context.getPeerActorSelection(followerId); if (followerActor != null) { - FollowerLogInformation followerLogInformation = - followerToLog.get(followerId); - - long nextIndex = followerLogInformation.getNextIndex().get(); - + FollowerLogInformation followerLogInformation = followerToLog.get(followerId); + long followerNextIndex = followerLogInformation.getNextIndex().get(); List entries = Collections.emptyList(); - if (context.getReplicatedLog().isPresent(nextIndex)) { - // FIXME : Sending one entry at a time - entries = - context.getReplicatedLog().getFrom(nextIndex, 1); + if (mapFollowerToSnapshot.get(followerId) != null) { + if (mapFollowerToSnapshot.get(followerId).canSendNextChunk()) { + sendSnapshotChunk(followerActor, followerId); + } + + } else { + + if (context.getReplicatedLog().isPresent(followerNextIndex)) { + // FIXME : Sending one entry at a time + entries = context.getReplicatedLog().getFrom(followerNextIndex, 1); + + followerActor.tell( + new AppendEntries(currentTerm(), context.getId(), + prevLogIndex(followerNextIndex), + prevLogTerm(followerNextIndex), entries, + context.getCommitIndex()).toSerializable(), + actor() + ); + + } else { + // if the followers next index is not present in the leaders log, then snapshot should be sent + long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex(); + long leaderLastIndex = context.getReplicatedLog().lastIndex(); + if (followerNextIndex >= 0 && leaderLastIndex >= followerNextIndex ) { + // if the follower is just not starting and leader's index + // is more than followers index + context.getLogger().debug("SendInstallSnapshot to follower:{}," + + "follower-nextIndex:{}, leader-snapshot-index:{}, " + + "leader-last-index:{}", followerId, + followerNextIndex, leaderSnapShotIndex, leaderLastIndex); + + actor().tell(new SendInstallSnapshot(), actor()); + } else { + followerActor.tell( + new AppendEntries(currentTerm(), context.getId(), + prevLogIndex(followerNextIndex), + prevLogTerm(followerNextIndex), entries, + context.getCommitIndex()).toSerializable(), + actor() + ); + } + } } - - followerActor.tell( - new AppendEntries(currentTerm(), context.getId(), - prevLogIndex(nextIndex), - prevLogTerm(nextIndex), entries, - context.getCommitIndex()).toSerializable(), - actor() - ); } } } @@ -326,21 +387,55 @@ public class Leader extends AbstractRaftActorBehavior { long nextIndex = followerLogInformation.getNextIndex().get(); - if (!context.getReplicatedLog().isPresent(nextIndex) && context - .getReplicatedLog().isInSnapshot(nextIndex)) { - followerActor.tell( - new InstallSnapshot(currentTerm(), context.getId(), - context.getReplicatedLog().getSnapshotIndex(), - context.getReplicatedLog().getSnapshotTerm(), - context.getReplicatedLog().getSnapshot() - ), - actor() - ); + if (!context.getReplicatedLog().isPresent(nextIndex) && + context.getReplicatedLog().isInSnapshot(nextIndex)) { + sendSnapshotChunk(followerActor, followerId); } } } } + /** + * Sends a snapshot chunk to a given follower + * InstallSnapshot should qualify as a heartbeat too. + */ + private void sendSnapshotChunk(ActorSelection followerActor, String followerId) { + try { + followerActor.tell( + new InstallSnapshot(currentTerm(), context.getId(), + context.getReplicatedLog().getSnapshotIndex(), + context.getReplicatedLog().getSnapshotTerm(), + getNextSnapshotChunk(followerId, + context.getReplicatedLog().getSnapshot()), + mapFollowerToSnapshot.get(followerId).incrementChunkIndex(), + mapFollowerToSnapshot.get(followerId).getTotalChunks() + ).toSerializable(), + actor() + ); + context.getLogger().info("InstallSnapshot sent to follower {}, Chunk: {}/{}", + followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(), + mapFollowerToSnapshot.get(followerId).getTotalChunks()); + } catch (IOException e) { + context.getLogger().error("InstallSnapshot failed for Leader.", e); + } + } + + /** + * Acccepts snaphot as ByteString, enters into map for future chunks + * creates and return a ByteString chunk + */ + private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException { + FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); + if (followerToSnapshot == null) { + followerToSnapshot = new FollowerToSnapshot(snapshotBytes); + mapFollowerToSnapshot.put(followerId, followerToSnapshot); + } + ByteString nextChunk = followerToSnapshot.getNextChunk(); + context.getLogger().debug("Leader's snapshot nextChunk size:{}", nextChunk.size()); + + return nextChunk; + } + private RaftState sendHeartBeat() { if (followers.size() > 0) { sendAppendEntries(); @@ -410,4 +505,97 @@ public class Leader extends AbstractRaftActorBehavior { return context.getId(); } + /** + * Encapsulates the snapshot bytestring and handles the logic of sending + * snapshot chunks + */ + protected class FollowerToSnapshot { + private ByteString snapshotBytes; + private int offset = 0; + // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset + private int replyReceivedForOffset; + // if replyStatus is false, the previous chunk is attempted + private boolean replyStatus = false; + private int chunkIndex; + private int totalChunks; + + public FollowerToSnapshot(ByteString snapshotBytes) { + this.snapshotBytes = snapshotBytes; + replyReceivedForOffset = -1; + chunkIndex = 1; + int size = snapshotBytes.size(); + totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) + + ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0); + context.getLogger().debug("Snapshot {} bytes, total chunks to send:{}", + size, totalChunks); + } + + public ByteString getSnapshotBytes() { + return snapshotBytes; + } + + public int incrementOffset() { + if(replyStatus) { + // if prev chunk failed, we would want to sent the same chunk again + offset = offset + context.getConfigParams().getSnapshotChunkSize(); + } + return offset; + } + + public int incrementChunkIndex() { + if (replyStatus) { + // if prev chunk failed, we would want to sent the same chunk again + chunkIndex = chunkIndex + 1; + } + return chunkIndex; + } + + public int getChunkIndex() { + return chunkIndex; + } + + public int getTotalChunks() { + return totalChunks; + } + + public boolean canSendNextChunk() { + // we only send a false if a chunk is sent but we have not received a reply yet + return replyReceivedForOffset == offset; + } + + public boolean isLastChunk(int chunkIndex) { + return totalChunks == chunkIndex; + } + + public void markSendStatus(boolean success) { + if (success) { + // if the chunk sent was successful + replyReceivedForOffset = offset; + replyStatus = true; + } else { + // if the chunk sent was failure + replyReceivedForOffset = offset; + replyStatus = false; + } + } + + public ByteString getNextChunk() { + int snapshotLength = getSnapshotBytes().size(); + int start = incrementOffset(); + int size = context.getConfigParams().getSnapshotChunkSize(); + if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) { + size = snapshotLength; + } else { + if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) { + size = snapshotLength - start; + } + } + + context.getLogger().debug("length={}, offset={},size={}", + snapshotLength, start, size); + return getSnapshotBytes().substring(start, start + size); + + } + } + } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java index 888854fa71..9d40fa3d9e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java @@ -8,19 +8,29 @@ package org.opendaylight.controller.cluster.raft.messages; +import com.google.protobuf.ByteString; +import org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages; + public class InstallSnapshot extends AbstractRaftRPC { + public static final Class SERIALIZABLE_CLASS = InstallSnapshotMessages.InstallSnapshot.class; + private final String leaderId; private final long lastIncludedIndex; private final long lastIncludedTerm; - private final Object data; + private final ByteString data; + private final int chunkIndex; + private final int totalChunks; - public InstallSnapshot(long term, String leaderId, long lastIncludedIndex, long lastIncludedTerm, Object data) { + public InstallSnapshot(long term, String leaderId, long lastIncludedIndex, + long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks) { super(term); this.leaderId = leaderId; this.lastIncludedIndex = lastIncludedIndex; this.lastIncludedTerm = lastIncludedTerm; this.data = data; + this.chunkIndex = chunkIndex; + this.totalChunks = totalChunks; } public String getLeaderId() { @@ -35,7 +45,38 @@ public class InstallSnapshot extends AbstractRaftRPC { return lastIncludedTerm; } - public Object getData() { + public ByteString getData() { return data; } + + public int getChunkIndex() { + return chunkIndex; + } + + public int getTotalChunks() { + return totalChunks; + } + + public Object toSerializable(){ + return InstallSnapshotMessages.InstallSnapshot.newBuilder() + .setLeaderId(this.getLeaderId()) + .setChunkIndex(this.getChunkIndex()) + .setData(this.getData()) + .setLastIncludedIndex(this.getLastIncludedIndex()) + .setLastIncludedTerm(this.getLastIncludedTerm()) + .setTotalChunks(this.getTotalChunks()).build(); + + } + + public static InstallSnapshot fromSerializable (Object o) { + InstallSnapshotMessages.InstallSnapshot from = + (InstallSnapshotMessages.InstallSnapshot) o; + + InstallSnapshot installSnapshot = new InstallSnapshot(from.getTerm(), + from.getLeaderId(), from.getLastIncludedIndex(), + from.getLastIncludedTerm(), from.getData(), + from.getChunkIndex(), from.getTotalChunks()); + + return installSnapshot; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java index 85b89b70ae..d293a47c8e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java @@ -13,13 +13,26 @@ public class InstallSnapshotReply extends AbstractRaftRPC { // The followerId - this will be used to figure out which follower is // responding private final String followerId; + private final int chunkIndex; + private boolean success; - protected InstallSnapshotReply(long term, String followerId) { + public InstallSnapshotReply(long term, String followerId, int chunkIndex, + boolean success) { super(term); this.followerId = followerId; + this.chunkIndex = chunkIndex; + this.success = success; } public String getFollowerId() { return followerId; } + + public int getChunkIndex() { + return chunkIndex; + } + + public boolean isSuccess() { + return success; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/messages/InstallSnapshotMessages.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/messages/InstallSnapshotMessages.java new file mode 100644 index 0000000000..e801ae1c10 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/messages/InstallSnapshotMessages.java @@ -0,0 +1,1015 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: InstallSnapshot.proto + +package org.opendaylight.controller.cluster.raft.protobuff.messages; + +public final class InstallSnapshotMessages { + private InstallSnapshotMessages() {} + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistry registry) { + } + public interface InstallSnapshotOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // optional int64 term = 1; + /** + * optional int64 term = 1; + */ + boolean hasTerm(); + /** + * optional int64 term = 1; + */ + long getTerm(); + + // optional string leaderId = 2; + /** + * optional string leaderId = 2; + */ + boolean hasLeaderId(); + /** + * optional string leaderId = 2; + */ + java.lang.String getLeaderId(); + /** + * optional string leaderId = 2; + */ + com.google.protobuf.ByteString + getLeaderIdBytes(); + + // optional int64 lastIncludedIndex = 3; + /** + * optional int64 lastIncludedIndex = 3; + */ + boolean hasLastIncludedIndex(); + /** + * optional int64 lastIncludedIndex = 3; + */ + long getLastIncludedIndex(); + + // optional int64 lastIncludedTerm = 4; + /** + * optional int64 lastIncludedTerm = 4; + */ + boolean hasLastIncludedTerm(); + /** + * optional int64 lastIncludedTerm = 4; + */ + long getLastIncludedTerm(); + + // optional bytes data = 5; + /** + * optional bytes data = 5; + */ + boolean hasData(); + /** + * optional bytes data = 5; + */ + com.google.protobuf.ByteString getData(); + + // optional int32 chunkIndex = 6; + /** + * optional int32 chunkIndex = 6; + */ + boolean hasChunkIndex(); + /** + * optional int32 chunkIndex = 6; + */ + int getChunkIndex(); + + // optional int32 totalChunks = 7; + /** + * optional int32 totalChunks = 7; + */ + boolean hasTotalChunks(); + /** + * optional int32 totalChunks = 7; + */ + int getTotalChunks(); + } + /** + * Protobuf type {@code org.opendaylight.controller.cluster.raft.InstallSnapshot} + */ + public static final class InstallSnapshot extends + com.google.protobuf.GeneratedMessage + implements InstallSnapshotOrBuilder { + // Use InstallSnapshot.newBuilder() to construct. + private InstallSnapshot(com.google.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private InstallSnapshot(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final InstallSnapshot defaultInstance; + public static InstallSnapshot getDefaultInstance() { + return defaultInstance; + } + + public InstallSnapshot getDefaultInstanceForType() { + return defaultInstance; + } + + private final com.google.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private InstallSnapshot( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 8: { + bitField0_ |= 0x00000001; + term_ = input.readInt64(); + break; + } + case 18: { + bitField0_ |= 0x00000002; + leaderId_ = input.readBytes(); + break; + } + case 24: { + bitField0_ |= 0x00000004; + lastIncludedIndex_ = input.readInt64(); + break; + } + case 32: { + bitField0_ |= 0x00000008; + lastIncludedTerm_ = input.readInt64(); + break; + } + case 42: { + bitField0_ |= 0x00000010; + data_ = input.readBytes(); + break; + } + case 48: { + bitField0_ |= 0x00000020; + chunkIndex_ = input.readInt32(); + break; + } + case 56: { + bitField0_ |= 0x00000040; + totalChunks_ = input.readInt32(); + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.class, org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.Builder.class); + } + + public static com.google.protobuf.Parser PARSER = + new com.google.protobuf.AbstractParser() { + public InstallSnapshot parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new InstallSnapshot(input, extensionRegistry); + } + }; + + @java.lang.Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; + } + + private int bitField0_; + // optional int64 term = 1; + public static final int TERM_FIELD_NUMBER = 1; + private long term_; + /** + * optional int64 term = 1; + */ + public boolean hasTerm() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * optional int64 term = 1; + */ + public long getTerm() { + return term_; + } + + // optional string leaderId = 2; + public static final int LEADERID_FIELD_NUMBER = 2; + private java.lang.Object leaderId_; + /** + * optional string leaderId = 2; + */ + public boolean hasLeaderId() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * optional string leaderId = 2; + */ + public java.lang.String getLeaderId() { + java.lang.Object ref = leaderId_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + leaderId_ = s; + } + return s; + } + } + /** + * optional string leaderId = 2; + */ + public com.google.protobuf.ByteString + getLeaderIdBytes() { + java.lang.Object ref = leaderId_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + leaderId_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + // optional int64 lastIncludedIndex = 3; + public static final int LASTINCLUDEDINDEX_FIELD_NUMBER = 3; + private long lastIncludedIndex_; + /** + * optional int64 lastIncludedIndex = 3; + */ + public boolean hasLastIncludedIndex() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional int64 lastIncludedIndex = 3; + */ + public long getLastIncludedIndex() { + return lastIncludedIndex_; + } + + // optional int64 lastIncludedTerm = 4; + public static final int LASTINCLUDEDTERM_FIELD_NUMBER = 4; + private long lastIncludedTerm_; + /** + * optional int64 lastIncludedTerm = 4; + */ + public boolean hasLastIncludedTerm() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * optional int64 lastIncludedTerm = 4; + */ + public long getLastIncludedTerm() { + return lastIncludedTerm_; + } + + // optional bytes data = 5; + public static final int DATA_FIELD_NUMBER = 5; + private com.google.protobuf.ByteString data_; + /** + * optional bytes data = 5; + */ + public boolean hasData() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * optional bytes data = 5; + */ + public com.google.protobuf.ByteString getData() { + return data_; + } + + // optional int32 chunkIndex = 6; + public static final int CHUNKINDEX_FIELD_NUMBER = 6; + private int chunkIndex_; + /** + * optional int32 chunkIndex = 6; + */ + public boolean hasChunkIndex() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + /** + * optional int32 chunkIndex = 6; + */ + public int getChunkIndex() { + return chunkIndex_; + } + + // optional int32 totalChunks = 7; + public static final int TOTALCHUNKS_FIELD_NUMBER = 7; + private int totalChunks_; + /** + * optional int32 totalChunks = 7; + */ + public boolean hasTotalChunks() { + return ((bitField0_ & 0x00000040) == 0x00000040); + } + /** + * optional int32 totalChunks = 7; + */ + public int getTotalChunks() { + return totalChunks_; + } + + private void initFields() { + term_ = 0L; + leaderId_ = ""; + lastIncludedIndex_ = 0L; + lastIncludedTerm_ = 0L; + data_ = com.google.protobuf.ByteString.EMPTY; + chunkIndex_ = 0; + totalChunks_ = 0; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeInt64(1, term_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeBytes(2, getLeaderIdBytes()); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeInt64(3, lastIncludedIndex_); + } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + output.writeInt64(4, lastIncludedTerm_); + } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + output.writeBytes(5, data_); + } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + output.writeInt32(6, chunkIndex_); + } + if (((bitField0_ & 0x00000040) == 0x00000040)) { + output.writeInt32(7, totalChunks_); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeInt64Size(1, term_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(2, getLeaderIdBytes()); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeInt64Size(3, lastIncludedIndex_); + } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + size += com.google.protobuf.CodedOutputStream + .computeInt64Size(4, lastIncludedTerm_); + } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(5, data_); + } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + size += com.google.protobuf.CodedOutputStream + .computeInt32Size(6, chunkIndex_); + } + if (((bitField0_ & 0x00000040) == 0x00000040)) { + size += com.google.protobuf.CodedOutputStream + .computeInt32Size(7, totalChunks_); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code org.opendaylight.controller.cluster.raft.InstallSnapshot} + */ + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshotOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.class, org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.Builder.class); + } + + // Construct using org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + term_ = 0L; + bitField0_ = (bitField0_ & ~0x00000001); + leaderId_ = ""; + bitField0_ = (bitField0_ & ~0x00000002); + lastIncludedIndex_ = 0L; + bitField0_ = (bitField0_ & ~0x00000004); + lastIncludedTerm_ = 0L; + bitField0_ = (bitField0_ & ~0x00000008); + data_ = com.google.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000010); + chunkIndex_ = 0; + bitField0_ = (bitField0_ & ~0x00000020); + totalChunks_ = 0; + bitField0_ = (bitField0_ & ~0x00000040); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor; + } + + public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot getDefaultInstanceForType() { + return org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance(); + } + + public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot build() { + org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot buildPartial() { + org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot result = new org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.term_ = term_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.leaderId_ = leaderId_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.lastIncludedIndex_ = lastIncludedIndex_; + if (((from_bitField0_ & 0x00000008) == 0x00000008)) { + to_bitField0_ |= 0x00000008; + } + result.lastIncludedTerm_ = lastIncludedTerm_; + if (((from_bitField0_ & 0x00000010) == 0x00000010)) { + to_bitField0_ |= 0x00000010; + } + result.data_ = data_; + if (((from_bitField0_ & 0x00000020) == 0x00000020)) { + to_bitField0_ |= 0x00000020; + } + result.chunkIndex_ = chunkIndex_; + if (((from_bitField0_ & 0x00000040) == 0x00000040)) { + to_bitField0_ |= 0x00000040; + } + result.totalChunks_ = totalChunks_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot) { + return mergeFrom((org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot other) { + if (other == org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot.getDefaultInstance()) return this; + if (other.hasTerm()) { + setTerm(other.getTerm()); + } + if (other.hasLeaderId()) { + bitField0_ |= 0x00000002; + leaderId_ = other.leaderId_; + onChanged(); + } + if (other.hasLastIncludedIndex()) { + setLastIncludedIndex(other.getLastIncludedIndex()); + } + if (other.hasLastIncludedTerm()) { + setLastIncludedTerm(other.getLastIncludedTerm()); + } + if (other.hasData()) { + setData(other.getData()); + } + if (other.hasChunkIndex()) { + setChunkIndex(other.getChunkIndex()); + } + if (other.hasTotalChunks()) { + setTotalChunks(other.getTotalChunks()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages.InstallSnapshot) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // optional int64 term = 1; + private long term_ ; + /** + * optional int64 term = 1; + */ + public boolean hasTerm() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * optional int64 term = 1; + */ + public long getTerm() { + return term_; + } + /** + * optional int64 term = 1; + */ + public Builder setTerm(long value) { + bitField0_ |= 0x00000001; + term_ = value; + onChanged(); + return this; + } + /** + * optional int64 term = 1; + */ + public Builder clearTerm() { + bitField0_ = (bitField0_ & ~0x00000001); + term_ = 0L; + onChanged(); + return this; + } + + // optional string leaderId = 2; + private java.lang.Object leaderId_ = ""; + /** + * optional string leaderId = 2; + */ + public boolean hasLeaderId() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * optional string leaderId = 2; + */ + public java.lang.String getLeaderId() { + java.lang.Object ref = leaderId_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((com.google.protobuf.ByteString) ref) + .toStringUtf8(); + leaderId_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * optional string leaderId = 2; + */ + public com.google.protobuf.ByteString + getLeaderIdBytes() { + java.lang.Object ref = leaderId_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + leaderId_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * optional string leaderId = 2; + */ + public Builder setLeaderId( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + leaderId_ = value; + onChanged(); + return this; + } + /** + * optional string leaderId = 2; + */ + public Builder clearLeaderId() { + bitField0_ = (bitField0_ & ~0x00000002); + leaderId_ = getDefaultInstance().getLeaderId(); + onChanged(); + return this; + } + /** + * optional string leaderId = 2; + */ + public Builder setLeaderIdBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + leaderId_ = value; + onChanged(); + return this; + } + + // optional int64 lastIncludedIndex = 3; + private long lastIncludedIndex_ ; + /** + * optional int64 lastIncludedIndex = 3; + */ + public boolean hasLastIncludedIndex() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional int64 lastIncludedIndex = 3; + */ + public long getLastIncludedIndex() { + return lastIncludedIndex_; + } + /** + * optional int64 lastIncludedIndex = 3; + */ + public Builder setLastIncludedIndex(long value) { + bitField0_ |= 0x00000004; + lastIncludedIndex_ = value; + onChanged(); + return this; + } + /** + * optional int64 lastIncludedIndex = 3; + */ + public Builder clearLastIncludedIndex() { + bitField0_ = (bitField0_ & ~0x00000004); + lastIncludedIndex_ = 0L; + onChanged(); + return this; + } + + // optional int64 lastIncludedTerm = 4; + private long lastIncludedTerm_ ; + /** + * optional int64 lastIncludedTerm = 4; + */ + public boolean hasLastIncludedTerm() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * optional int64 lastIncludedTerm = 4; + */ + public long getLastIncludedTerm() { + return lastIncludedTerm_; + } + /** + * optional int64 lastIncludedTerm = 4; + */ + public Builder setLastIncludedTerm(long value) { + bitField0_ |= 0x00000008; + lastIncludedTerm_ = value; + onChanged(); + return this; + } + /** + * optional int64 lastIncludedTerm = 4; + */ + public Builder clearLastIncludedTerm() { + bitField0_ = (bitField0_ & ~0x00000008); + lastIncludedTerm_ = 0L; + onChanged(); + return this; + } + + // optional bytes data = 5; + private com.google.protobuf.ByteString data_ = com.google.protobuf.ByteString.EMPTY; + /** + * optional bytes data = 5; + */ + public boolean hasData() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * optional bytes data = 5; + */ + public com.google.protobuf.ByteString getData() { + return data_; + } + /** + * optional bytes data = 5; + */ + public Builder setData(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000010; + data_ = value; + onChanged(); + return this; + } + /** + * optional bytes data = 5; + */ + public Builder clearData() { + bitField0_ = (bitField0_ & ~0x00000010); + data_ = getDefaultInstance().getData(); + onChanged(); + return this; + } + + // optional int32 chunkIndex = 6; + private int chunkIndex_ ; + /** + * optional int32 chunkIndex = 6; + */ + public boolean hasChunkIndex() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + /** + * optional int32 chunkIndex = 6; + */ + public int getChunkIndex() { + return chunkIndex_; + } + /** + * optional int32 chunkIndex = 6; + */ + public Builder setChunkIndex(int value) { + bitField0_ |= 0x00000020; + chunkIndex_ = value; + onChanged(); + return this; + } + /** + * optional int32 chunkIndex = 6; + */ + public Builder clearChunkIndex() { + bitField0_ = (bitField0_ & ~0x00000020); + chunkIndex_ = 0; + onChanged(); + return this; + } + + // optional int32 totalChunks = 7; + private int totalChunks_ ; + /** + * optional int32 totalChunks = 7; + */ + public boolean hasTotalChunks() { + return ((bitField0_ & 0x00000040) == 0x00000040); + } + /** + * optional int32 totalChunks = 7; + */ + public int getTotalChunks() { + return totalChunks_; + } + /** + * optional int32 totalChunks = 7; + */ + public Builder setTotalChunks(int value) { + bitField0_ |= 0x00000040; + totalChunks_ = value; + onChanged(); + return this; + } + /** + * optional int32 totalChunks = 7; + */ + public Builder clearTotalChunks() { + bitField0_ = (bitField0_ & ~0x00000040); + totalChunks_ = 0; + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.cluster.raft.InstallSnapshot) + } + + static { + defaultInstance = new InstallSnapshot(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:org.opendaylight.controller.cluster.raft.InstallSnapshot) + } + + private static com.google.protobuf.Descriptors.Descriptor + internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable; + + public static com.google.protobuf.Descriptors.FileDescriptor + getDescriptor() { + return descriptor; + } + private static com.google.protobuf.Descriptors.FileDescriptor + descriptor; + static { + java.lang.String[] descriptorData = { + "\n\025InstallSnapshot.proto\022(org.opendayligh" + + "t.controller.cluster.raft\"\235\001\n\017InstallSna" + + "pshot\022\014\n\004term\030\001 \001(\003\022\020\n\010leaderId\030\002 \001(\t\022\031\n" + + "\021lastIncludedIndex\030\003 \001(\003\022\030\n\020lastIncluded" + + "Term\030\004 \001(\003\022\014\n\004data\030\005 \001(\014\022\022\n\nchunkIndex\030\006" + + " \001(\005\022\023\n\013totalChunks\030\007 \001(\005BX\n;org.openday" + + "light.controller.cluster.raft.protobuff." + + "messagesB\027InstallSnapshotMessagesH\001" + }; + com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = + new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { + public com.google.protobuf.ExtensionRegistry assignDescriptors( + com.google.protobuf.Descriptors.FileDescriptor root) { + descriptor = root; + internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor = + getDescriptor().getMessageTypes().get(0); + internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor, + new java.lang.String[] { "Term", "LeaderId", "LastIncludedIndex", "LastIncludedTerm", "Data", "ChunkIndex", "TotalChunks", }); + return null; + } + }; + com.google.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, + new com.google.protobuf.Descriptors.FileDescriptor[] { + }, assigner); + } + + // @@protoc_insertion_point(outer_class_scope) +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/resources/InstallSnapshot.proto b/opendaylight/md-sal/sal-akka-raft/src/main/resources/InstallSnapshot.proto new file mode 100644 index 0000000000..14f821b5e2 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/resources/InstallSnapshot.proto @@ -0,0 +1,15 @@ +package org.opendaylight.controller.cluster.raft; + +option java_package = "org.opendaylight.controller.cluster.raft.protobuff.messages"; +option java_outer_classname = "InstallSnapshotMessages"; +option optimize_for = SPEED; + +message InstallSnapshot { + optional int64 term = 1; + optional string leaderId = 2; + optional int64 lastIncludedIndex = 3; + optional int64 lastIncludedTerm = 4; + optional bytes data = 5; + optional int32 chunkIndex = 6; + optional int32 totalChunks = 7; +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index ea3c9e759d..ca34a34ca4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -14,17 +14,14 @@ import akka.actor.ActorSystem; import akka.actor.Props; import akka.event.Logging; import akka.event.LoggingAdapter; +import com.google.common.base.Preconditions; import com.google.protobuf.GeneratedMessage; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; import org.opendaylight.controller.protobuff.messages.cluster.raft.test.MockPayloadMessages; -import com.google.common.base.Preconditions; import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; public class MockRaftActorContext implements RaftActorContext { @@ -37,6 +34,7 @@ public class MockRaftActorContext implements RaftActorContext { private final ElectionTerm electionTerm; private ReplicatedLog replicatedLog; private Map peerAddresses = new HashMap(); + private ConfigParams configParams; public MockRaftActorContext(){ electionTerm = null; @@ -79,6 +77,8 @@ public class MockRaftActorContext implements RaftActorContext { } }; + configParams = new DefaultConfigParamsImpl(); + initReplicatedLog(); } @@ -179,118 +179,21 @@ public class MockRaftActorContext implements RaftActorContext { @Override public ConfigParams getConfigParams() { - return new DefaultConfigParamsImpl(); + return configParams; } - public static class SimpleReplicatedLog implements ReplicatedLog { - private final List log = new ArrayList<>(); - - @Override public ReplicatedLogEntry get(long index) { - if(index >= log.size() || index < 0){ - return null; - } - return log.get((int) index); - } - - @Override public ReplicatedLogEntry last() { - if(log.size() == 0){ - return null; - } - return log.get(log.size()-1); - } - - @Override public long lastIndex() { - if(log.size() == 0){ - return -1; - } - - return last().getIndex(); - } - - @Override public long lastTerm() { - if(log.size() == 0){ - return -1; - } - - return last().getTerm(); - } - - @Override public void removeFrom(long index) { - if(index >= log.size() || index < 0){ - return; - } - - log.subList((int) index, log.size()).clear(); - //log.remove((int) index); - } - - @Override public void removeFromAndPersist(long index) { - removeFrom(index); - } - - @Override public void append(ReplicatedLogEntry replicatedLogEntry) { - log.add(replicatedLogEntry); - } + public void setConfigParams(ConfigParams configParams) { + this.configParams = configParams; + } + public static class SimpleReplicatedLog extends AbstractReplicatedLogImpl { @Override public void appendAndPersist( ReplicatedLogEntry replicatedLogEntry) { append(replicatedLogEntry); } - @Override public List getFrom(long index) { - if(index >= log.size() || index < 0){ - return Collections.EMPTY_LIST; - } - List entries = new ArrayList<>(); - for(int i=(int) index ; i < log.size() ; i++) { - entries.add(get(i)); - } - return entries; - } - - @Override public List getFrom(long index, int max) { - if(index >= log.size() || index < 0){ - return Collections.EMPTY_LIST; - } - List entries = new ArrayList<>(); - int maxIndex = (int) index + max; - if(maxIndex > log.size()){ - maxIndex = log.size(); - } - - for(int i=(int) index ; i < maxIndex ; i++) { - entries.add(get(i)); - } - return entries; - - } - - @Override public long size() { - return log.size(); - } - - @Override public boolean isPresent(long index) { - if(index >= log.size() || index < 0){ - return false; - } - - return true; - } - - @Override public boolean isInSnapshot(long index) { - return false; - } - - @Override public Object getSnapshot() { - return null; - } - - @Override public long getSnapshotIndex() { - return -1; - } - - @Override public long getSnapshotTerm() { - return -1; + @Override public void removeFromAndPersist(long index) { + removeFrom(index); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index ff0ffeb271..12123db129 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -6,6 +6,7 @@ import akka.actor.Props; import akka.event.Logging; import akka.japi.Creator; import akka.testkit.JavaTestKit; +import com.google.protobuf.ByteString; import org.junit.Test; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; @@ -39,11 +40,11 @@ public class RaftActorTest extends AbstractActorTest { Object data) { } - @Override protected Object createSnapshot() { + @Override protected void createSnapshot() { throw new UnsupportedOperationException("createSnapshot"); } - @Override protected void applySnapshot(Object snapshot) { + @Override protected void applySnapshot(ByteString snapshot) { throw new UnsupportedOperationException("applySnapshot"); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index c5a81aa1c9..227d1effa7 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -158,17 +158,18 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { createActorContext(); context.setLastApplied(100); - setLastLogEntry((MockRaftActorContext) context, 0, 0, new MockRaftActorContext.MockPayload("")); + setLastLogEntry((MockRaftActorContext) context, 1, 100, new MockRaftActorContext.MockPayload("")); + ((MockRaftActorContext) context).getReplicatedLog().setSnapshotIndex(99); List entries = Arrays.asList( - (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(100, 101, + (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(2, 101, new MockRaftActorContext.MockPayload("foo")) ); // The new commitIndex is 101 AppendEntries appendEntries = - new AppendEntries(100, "leader-1", 0, 0, entries, 101); + new AppendEntries(2, "leader-1", 100, 1, entries, 101); RaftState raftState = createBehavior(context).handleMessage(getRef(), appendEntries); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 17c22a134a..73c9f96b82 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -1,24 +1,40 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; +import akka.actor.ActorSystem; import akka.actor.Props; import akka.testkit.JavaTestKit; -import junit.framework.Assert; +import com.google.protobuf.ByteString; +import org.junit.Assert; import org.junit.Test; +import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; +import org.opendaylight.controller.cluster.raft.FollowerLogInformation; +import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; +import org.opendaylight.controller.cluster.raft.SerializationUtils; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; +import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; +import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; +import org.opendaylight.controller.cluster.raft.protobuff.messages.InstallSnapshotMessages; import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; public class LeaderTest extends AbstractRaftActorBehaviorTest { @@ -82,8 +98,6 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { assertEquals("match", out); } - - }; }}; } @@ -194,18 +208,372 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { assertEquals("match", out); } + }; + }}; + } + + @Test + public void testSendInstallSnapshot() { + new LeaderTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + ActorRef followerActor = getTestActor(); + + Map peerAddresses = new HashMap(); + peerAddresses.put(followerActor.path().toString(), + followerActor.path().toString()); + + + MockRaftActorContext actorContext = + (MockRaftActorContext) createActorContext(getRef()); + actorContext.setPeerAddresses(peerAddresses); + + + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); + + //clears leaders log + actorContext.getReplicatedLog().removeFrom(0); + + final int followersLastIndex = 2; + final int snapshotIndex = 3; + final int newEntryIndex = 4; + final int snapshotTerm = 1; + final int currentTerm = 2; + + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshot( + toByteString(leadersSnapshot)); + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + + MockLeader leader = new MockLeader(actorContext); + // set the follower info in leader + leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1); + + // new entry + ReplicatedLogImplEntry entry = + new ReplicatedLogImplEntry(newEntryIndex, currentTerm, + new MockRaftActorContext.MockPayload("D")); + + // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex + RaftState raftState = leader.handleMessage( + senderActor, new Replicate(null, "state-id", entry)); + + assertEquals(RaftState.Leader, raftState); + + // we might receive some heartbeat messages, so wait till we SendInstallSnapshot + Boolean[] matches = new ReceiveWhile(Boolean.class, duration("2 seconds")) { + @Override + protected Boolean match(Object o) throws Exception { + if (o instanceof SendInstallSnapshot) { + return true; + } + return false; + } + }.get(); + + boolean sendInstallSnapshotReceived = false; + for (Boolean b: matches) { + sendInstallSnapshotReceived = b | sendInstallSnapshotReceived; + } + + assertTrue(sendInstallSnapshotReceived); + + } + }; + }}; + } + + @Test + public void testInstallSnapshot() { + new LeaderTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + ActorRef followerActor = getTestActor(); + + Map peerAddresses = new HashMap(); + peerAddresses.put(followerActor.path().toString(), + followerActor.path().toString()); + + MockRaftActorContext actorContext = + (MockRaftActorContext) createActorContext(); + actorContext.setPeerAddresses(peerAddresses); + + + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); + + //clears leaders log + actorContext.getReplicatedLog().removeFrom(0); + + final int followersLastIndex = 2; + final int snapshotIndex = 3; + final int newEntryIndex = 4; + final int snapshotTerm = 1; + final int currentTerm = 2; + + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshot(toByteString(leadersSnapshot)); + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + + actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); + + MockLeader leader = new MockLeader(actorContext); + // set the follower info in leader + leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1); + + // new entry + ReplicatedLogImplEntry entry = + new ReplicatedLogImplEntry(newEntryIndex, currentTerm, + new MockRaftActorContext.MockPayload("D")); + + + RaftState raftState = leader.handleMessage(senderActor, new SendInstallSnapshot()); + + assertEquals(RaftState.Leader, raftState); + + // check if installsnapshot gets called with the correct values. + final String out = + new ExpectMsg(duration("1 seconds"), "match hint") { + // do not put code outside this method, will run afterwards + protected String match(Object in) { + if (in instanceof InstallSnapshotMessages.InstallSnapshot) { + InstallSnapshot is = (InstallSnapshot) + SerializationUtils.fromSerializable(in); + if (is.getData() == null) { + return "InstallSnapshot data is null"; + } + if (is.getLastIncludedIndex() != snapshotIndex) { + return is.getLastIncludedIndex() + "!=" + snapshotIndex; + } + if (is.getLastIncludedTerm() != snapshotTerm) { + return is.getLastIncludedTerm() + "!=" + snapshotTerm; + } + if (is.getTerm() == currentTerm) { + return is.getTerm() + "!=" + currentTerm; + } + + return "match"; + + } else { + return "message mismatch:" + in.getClass(); + } + } + }.get(); // this extracts the received message + + assertEquals("match", out); + } + }; + }}; + } + + @Test + public void testHandleInstallSnapshotReplyLastChunk() { + new LeaderTestKit(getSystem()) {{ + new Within(duration("1 seconds")) { + protected void run() { + ActorRef followerActor = getTestActor(); + + Map peerAddresses = new HashMap(); + peerAddresses.put(followerActor.path().toString(), + followerActor.path().toString()); + + MockRaftActorContext actorContext = + (MockRaftActorContext) createActorContext(); + actorContext.setPeerAddresses(peerAddresses); + + final int followersLastIndex = 2; + final int snapshotIndex = 3; + final int newEntryIndex = 4; + final int snapshotTerm = 1; + final int currentTerm = 2; + + MockLeader leader = new MockLeader(actorContext); + // set the follower info in leader + leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1); + + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); + + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshot( + toByteString(leadersSnapshot)); + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); + + ByteString bs = toByteString(leadersSnapshot); + leader.createFollowerToSnapshot(followerActor.path().toString(), bs); + while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) { + leader.getFollowerToSnapshot().getNextChunk(); + leader.getFollowerToSnapshot().incrementChunkIndex(); + } + + //clears leaders log + actorContext.getReplicatedLog().removeFrom(0); + RaftState raftState = leader.handleMessage(senderActor, + new InstallSnapshotReply(currentTerm, followerActor.path().toString(), + leader.getFollowerToSnapshot().getChunkIndex(), true)); + assertEquals(RaftState.Leader, raftState); + + assertEquals(leader.mapFollowerToSnapshot.size(), 0); + assertEquals(leader.followerToLog.size(), 1); + assertNotNull(leader.followerToLog.get(followerActor.path().toString())); + FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString()); + assertEquals(snapshotIndex, fli.getMatchIndex().get()); + assertEquals(snapshotIndex, fli.getMatchIndex().get()); + assertEquals(snapshotIndex + 1, fli.getNextIndex().get()); + } }; }}; } + @Test + public void testFollowerToSnapshotLogic() { + + MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext(); + + actorContext.setConfigParams(new DefaultConfigParamsImpl() { + @Override + public int getSnapshotChunkSize() { + return 50; + } + }); + + MockLeader leader = new MockLeader(actorContext); + + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); + + ByteString bs = toByteString(leadersSnapshot); + byte[] barray = bs.toByteArray(); + + leader.createFollowerToSnapshot("followerId", bs); + assertEquals(bs.size(), barray.length); + + int chunkIndex=0; + for (int i=0; i < barray.length; i = i + 50) { + int j = i + 50; + chunkIndex++; + + if (i + 50 > barray.length) { + j = barray.length; + } + + ByteString chunk = leader.getFollowerToSnapshot().getNextChunk(); + assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size()); + assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex()); + + leader.getFollowerToSnapshot().markSendStatus(true); + if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) { + leader.getFollowerToSnapshot().incrementChunkIndex(); + } + } + + assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks()); + } + + @Override protected RaftActorBehavior createBehavior( RaftActorContext actorContext) { return new Leader(actorContext); } @Override protected RaftActorContext createActorContext() { - return new MockRaftActorContext("test", getSystem(), leaderActor); + return createActorContext(leaderActor); + } + + protected RaftActorContext createActorContext(ActorRef actorRef) { + return new MockRaftActorContext("test", getSystem(), actorRef); + } + + private ByteString toByteString(Map state) { + ByteArrayOutputStream b = null; + ObjectOutputStream o = null; + try { + try { + b = new ByteArrayOutputStream(); + o = new ObjectOutputStream(b); + o.writeObject(state); + byte[] snapshotBytes = b.toByteArray(); + return ByteString.copyFrom(snapshotBytes); + } finally { + if (o != null) { + o.flush(); + o.close(); + } + if (b != null) { + b.close(); + } + } + } catch (IOException e) { + Assert.fail("IOException in converting Hashmap to Bytestring:" + e); + } + return null; + } + + private static class LeaderTestKit extends JavaTestKit { + + private LeaderTestKit(ActorSystem actorSystem) { + super(actorSystem); + } + + protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){ + // Wait for a specific log message to show up + final boolean result = + new JavaTestKit.EventFilter(logLevel + ) { + @Override + protected Boolean run() { + return true; + } + }.from(subject.path().toString()) + .message(logMessage) + .occurrences(1).exec(); + + Assert.assertEquals(true, result); + + } + } + + class MockLeader extends Leader { + + FollowerToSnapshot fts; + + public MockLeader(RaftActorContext context){ + super(context); + } + + public void addToFollowerToLog(String followerId, long nextIndex, long matchIndex) { + FollowerLogInformation followerLogInformation = + new FollowerLogInformationImpl(followerId, + new AtomicLong(nextIndex), + new AtomicLong(matchIndex)); + followerToLog.put(followerId, followerLogInformation); + } + + public FollowerToSnapshot getFollowerToSnapshot() { + return fts; + } + + public void createFollowerToSnapshot(String followerId, ByteString bs ) { + fts = new FollowerToSnapshot(bs); + mapFollowerToSnapshot.put(followerId, fts); + + } } } diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingTranslatedTransactionChain.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingTranslatedTransactionChain.java index 2d8e51cff9..d16170ba48 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingTranslatedTransactionChain.java +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingTranslatedTransactionChain.java @@ -7,11 +7,10 @@ */ package org.opendaylight.controller.md.sal.binding.impl; -import java.util.Map; -import java.util.WeakHashMap; - -import javax.annotation.concurrent.GuardedBy; - +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain; import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; @@ -19,6 +18,7 @@ import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; import org.opendaylight.controller.md.sal.common.api.data.TransactionChain; import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; @@ -26,20 +26,19 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain; import org.opendaylight.yangtools.concepts.Delegator; -import com.google.common.base.Preconditions; - class BindingTranslatedTransactionChain implements BindingTransactionChain, Delegator { private final DOMTransactionChain delegate; - - @GuardedBy("this") - private final Map, AsyncTransaction> delegateTxToBindingTx = new WeakHashMap<>(); private final BindingToNormalizedNodeCodec codec; + private final DelegateChainListener delegatingListener; + private final TransactionChainListener listener; public BindingTranslatedTransactionChain(final DOMDataBroker chainFactory, final BindingToNormalizedNodeCodec codec, final TransactionChainListener listener) { Preconditions.checkNotNull(chainFactory, "DOM Transaction chain factory must not be null"); - this.delegate = chainFactory.createTransactionChain(new ListenerInvoker(listener)); + this.delegatingListener = new DelegateChainListener(); + this.listener = listener; + this.delegate = chainFactory.createTransactionChain(listener); this.codec = codec; } @@ -52,56 +51,79 @@ class BindingTranslatedTransactionChain implements BindingTransactionChain, Dele public ReadOnlyTransaction newReadOnlyTransaction() { DOMDataReadOnlyTransaction delegateTx = delegate.newReadOnlyTransaction(); ReadOnlyTransaction bindingTx = new BindingDataReadTransactionImpl(delegateTx, codec); - putDelegateToBinding(delegateTx, bindingTx); return bindingTx; } @Override public ReadWriteTransaction newReadWriteTransaction() { DOMDataReadWriteTransaction delegateTx = delegate.newReadWriteTransaction(); - ReadWriteTransaction bindingTx = new BindingDataReadWriteTransactionImpl(delegateTx, codec); - putDelegateToBinding(delegateTx, bindingTx); + ReadWriteTransaction bindingTx = new BindingDataReadWriteTransactionImpl(delegateTx, codec) { + + @Override + public CheckedFuture submit() { + return listenForFailure(this,super.submit()); + } + + }; return bindingTx; } @Override public WriteTransaction newWriteOnlyTransaction() { - DOMDataWriteTransaction delegateTx = delegate.newWriteOnlyTransaction(); - WriteTransaction bindingTx = new BindingDataWriteTransactionImpl<>(delegateTx, codec); - putDelegateToBinding(delegateTx, bindingTx); + final DOMDataWriteTransaction delegateTx = delegate.newWriteOnlyTransaction(); + WriteTransaction bindingTx = new BindingDataWriteTransactionImpl(delegateTx, codec) { + + @Override + public CheckedFuture submit() { + return listenForFailure(this,super.submit()); + }; + + }; return bindingTx; } - @Override - public void close() { - delegate.close(); + protected CheckedFuture listenForFailure( + final WriteTransaction tx, CheckedFuture future) { + Futures.addCallback(future, new FutureCallback() { + @Override + public void onFailure(Throwable t) { + failTransactionChain(tx,t); + } + + @Override + public void onSuccess(Void result) { + // Intentionally NOOP + } + }); + + return future; } - private synchronized void putDelegateToBinding(final AsyncTransaction domTx, - final AsyncTransaction bindingTx) { - final Object previous = delegateTxToBindingTx.put(domTx, bindingTx); - Preconditions.checkState(previous == null, "DOM Transaction %s has already associated binding transation %s",domTx,previous); + protected void failTransactionChain(WriteTransaction tx, Throwable t) { + // We asume correct state change for underlaying transaction + // chain, so we are not changing any of our internal state + // to mark that we failed. + this.delegatingListener.onTransactionChainFailed(this, tx, t); } - private synchronized AsyncTransaction getBindingTransaction(final AsyncTransaction transaction) { - return delegateTxToBindingTx.get(transaction); + @Override + public void close() { + delegate.close(); } - private final class ListenerInvoker implements TransactionChainListener { - - private final TransactionChainListener listener; - - public ListenerInvoker(final TransactionChainListener listener) { - this.listener = Preconditions.checkNotNull(listener, "Listener must not be null."); - } + private final class DelegateChainListener implements TransactionChainListener { @Override public void onTransactionChainFailed(final TransactionChain chain, final AsyncTransaction transaction, final Throwable cause) { - Preconditions.checkState(delegate.equals(chain), - "Illegal state - listener for %s was invoked for incorrect chain %s.", delegate, chain); - AsyncTransaction bindingTx = getBindingTransaction(transaction); - listener.onTransactionChainFailed(chain, bindingTx, cause); + /* + * Intentionally NOOP, callback for failure, since we + * are also listening on each transaction for failure. + * + * by listening on submit future for Binding transaction + * in order to provide Binding transaction (which was seen by client + * of this transaction chain, instead of + */ } @Override diff --git a/opendaylight/md-sal/sal-clustering-commons/pom.xml b/opendaylight/md-sal/sal-clustering-commons/pom.xml index b8980cd0be..a3619ec4d2 100644 --- a/opendaylight/md-sal/sal-clustering-commons/pom.xml +++ b/opendaylight/md-sal/sal-clustering-commons/pom.xml @@ -209,6 +209,7 @@ + org.jacoco @@ -238,7 +239,18 @@ - + + org.apache.felix + maven-bundle-plugin + true + + + ${project.groupId}.${project.artifactId} + org.opendaylight.controller.cluster.*,org.opendaylight.common.actor,org.opendaylight.common.reporting,org.opendaylight.controller.protobuff.*,org.opendaylight.controller.xml.* + * + + + + - diff --git a/opendaylight/md-sal/sal-clustering-config/pom.xml b/opendaylight/md-sal/sal-clustering-config/pom.xml index 31b658d1d7..91c0b5caa1 100644 --- a/opendaylight/md-sal/sal-clustering-config/pom.xml +++ b/opendaylight/md-sal/sal-clustering-config/pom.xml @@ -36,6 +36,21 @@ xml config + + ${project.build.directory}/classes/initial/akka.conf + xml + akkaconf + + + ${project.build.directory}/classes/initial/module-shards.conf + xml + moduleshardconf + + + ${project.build.directory}/classes/initial/modules.conf + xml + moduleconf + diff --git a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf index 05322137aa..5a2116b50f 100644 --- a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf +++ b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf @@ -21,7 +21,7 @@ odl-cluster-data { remote { log-remote-lifecycle-events = off netty.tcp { - hostname = "" + hostname = "127.0.0.1" port = 2550 maximum-frame-size = 419430400 send-buffer-size = 52428800 @@ -30,9 +30,14 @@ odl-cluster-data { } cluster { - seed-nodes = ["akka.tcp://opendaylight-cluster-data@:2550"] + seed-nodes = ["akka.tcp://opendaylight-cluster-data@127.0.0.1:2550"] auto-down-unreachable-after = 10s + + roles = [ + "member-1" + ] + } } } @@ -51,13 +56,13 @@ odl-cluster-rpc { remote { log-remote-lifecycle-events = off netty.tcp { - hostname = "" + hostname = "127.0.0.1" port = 2551 } } cluster { - seed-nodes = ["akka.tcp://opendaylight-cluster-rpc@:2551"] + seed-nodes = ["akka.tcp://opendaylight-cluster-rpc@127.0.0.1:2551"] auto-down-unreachable-after = 10s } diff --git a/opendaylight/md-sal/sal-distributed-datastore/pom.xml b/opendaylight/md-sal/sal-distributed-datastore/pom.xml index dd5a7f2979..82998226b6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/pom.xml +++ b/opendaylight/md-sal/sal-distributed-datastore/pom.xml @@ -45,6 +45,11 @@ akka-slf4j_${scala.version} + + com.typesafe.akka + akka-osgi_${scala.version} + + @@ -178,6 +183,7 @@ !*snappy;!org.jboss.*;!com.jcraft.*;!*jetty*;!sun.security.*;* + diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ActorSystemFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ActorSystemFactory.java index 15c0548761..b326d61fc6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ActorSystemFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ActorSystemFactory.java @@ -10,24 +10,55 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSystem; import akka.actor.Props; -import com.google.common.base.Function; +import akka.osgi.BundleDelegatingClassLoader; +import com.google.common.base.Preconditions; +import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import org.osgi.framework.BundleContext; -import javax.annotation.Nullable; +import java.io.File; public class ActorSystemFactory { - private static final ActorSystem actorSystem = (new Function(){ - - @Nullable @Override public ActorSystem apply(@Nullable Void aVoid) { - ActorSystem system = - ActorSystem.create("opendaylight-cluster-data", ConfigFactory - .load().getConfig("odl-cluster-data")); - system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor"); - return system; - } - }).apply(null); + + public static final String AKKA_CONF_PATH = "./configuration/initial/akka.conf"; + public static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-data"; + public static final String CONFIGURATION_NAME = "odl-cluster-data"; + + private static volatile ActorSystem actorSystem = null; public static final ActorSystem getInstance(){ return actorSystem; } + + /** + * This method should be called only once during initialization + * + * @param bundleContext + */ + public static final ActorSystem createInstance(final BundleContext bundleContext) { + if(actorSystem == null) { + // Create an OSGi bundle classloader for actor system + BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(), + Thread.currentThread().getContextClassLoader()); + synchronized (ActorSystemFactory.class) { + // Double check + + if (actorSystem == null) { + ActorSystem system = ActorSystem.create(ACTOR_SYSTEM_NAME, + ConfigFactory.load(readAkkaConfiguration()).getConfig(CONFIGURATION_NAME), classLoader); + system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor"); + actorSystem = system; + } + } + } + + return actorSystem; + } + + + private static final Config readAkkaConfiguration(){ + File defaultConfigFile = new File(AKKA_CONF_PATH); + Preconditions.checkState(defaultConfigFile.exists(), "akka.conf is missing"); + return ConfigFactory.parseFile(defaultConfigFile); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index 202ced9a26..0a137e07df 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -72,6 +72,8 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au actorSystem, actorSystem.actorOf( ShardManager.props(type, cluster, configuration, datastoreContext). withMailbox(ActorContext.MAILBOX), shardManagerId ), cluster, configuration); + + actorContext.setOperationTimeout(dataStoreProperties.getOperationTimeoutInSeconds()); } public DistributedDataStore(ActorContext actorContext) { @@ -98,8 +100,7 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au String shardName = ShardStrategyFactory.getStrategy(path).findShard(path); Object result = actorContext.executeLocalShardOperation(shardName, - new RegisterChangeListener(path, dataChangeListenerActor.path(), scope), - ActorContext.ASK_DURATION); + new RegisterChangeListener(path, dataChangeListenerActor.path(), scope)); if (result != null) { RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java index 65a39a60e6..72b593f010 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java @@ -12,12 +12,13 @@ import akka.actor.ActorSystem; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.sal.core.api.model.SchemaService; +import org.osgi.framework.BundleContext; public class DistributedDataStoreFactory { public static DistributedDataStore createInstance(String name, SchemaService schemaService, - DistributedDataStoreProperties dataStoreProperties) { + DistributedDataStoreProperties dataStoreProperties, BundleContext bundleContext) { - ActorSystem actorSystem = ActorSystemFactory.getInstance(); + ActorSystem actorSystem = ActorSystemFactory.createInstance(bundleContext); Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf"); final DistributedDataStore dataStore = new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem), diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreProperties.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreProperties.java index eb6a536138..df3245ffb2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreProperties.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreProperties.java @@ -18,21 +18,24 @@ public class DistributedDataStoreProperties { private final int maxShardDataChangeExecutorQueueSize; private final int maxShardDataChangeExecutorPoolSize; private final int shardTransactionIdleTimeoutInMinutes; + private final int operationTimeoutInSeconds; public DistributedDataStoreProperties() { maxShardDataChangeListenerQueueSize = 1000; maxShardDataChangeExecutorQueueSize = 1000; maxShardDataChangeExecutorPoolSize = 20; shardTransactionIdleTimeoutInMinutes = 10; + operationTimeoutInSeconds = 5; } public DistributedDataStoreProperties(int maxShardDataChangeListenerQueueSize, int maxShardDataChangeExecutorQueueSize, int maxShardDataChangeExecutorPoolSize, - int shardTransactionIdleTimeoutInMinutes) { + int shardTransactionIdleTimeoutInMinutes, int operationTimeoutInSeconds) { this.maxShardDataChangeListenerQueueSize = maxShardDataChangeListenerQueueSize; this.maxShardDataChangeExecutorQueueSize = maxShardDataChangeExecutorQueueSize; this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize; this.shardTransactionIdleTimeoutInMinutes = shardTransactionIdleTimeoutInMinutes; + this.operationTimeoutInSeconds = operationTimeoutInSeconds; } public int getMaxShardDataChangeListenerQueueSize() { @@ -50,4 +53,8 @@ public class DistributedDataStoreProperties { public int getShardTransactionIdleTimeoutInMinutes() { return shardTransactionIdleTimeoutInMinutes; } + + public int getOperationTimeoutInSeconds() { + return operationTimeoutInSeconds; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 43a9faa3e4..6a6a181b6c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -14,6 +14,7 @@ import akka.actor.Props; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.japi.Creator; +import akka.persistence.RecoveryFailure; import akka.serialization.Serialization; import com.google.common.base.Optional; @@ -21,7 +22,7 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; - +import com.google.protobuf.ByteString; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory; @@ -144,8 +145,19 @@ public class Shard extends RaftActor { return Props.create(new ShardCreator(name, peerAddresses, datastoreContext)); } + @Override public void onReceiveRecover(Object message) { + LOG.debug("onReceiveRecover: Received message {} from {}", message.getClass().toString(), + getSender()); + + if (message instanceof RecoveryFailure){ + LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause"); + } else { + super.onReceiveRecover(message); + } + } + @Override public void onReceiveCommand(Object message) { - LOG.debug("Received message {} from {}", message.getClass().toString(), + LOG.debug("onReceiveCommand: Received message {} from {}", message.getClass().toString(), getSender()); if (message.getClass() @@ -228,7 +240,8 @@ public class Shard extends RaftActor { .tell(new CreateTransactionReply( Serialization.serializedActorPath(transactionActor), createTransaction.getTransactionId()).toSerializable(), - getSelf()); + getSelf() + ); } private void commit(final ActorRef sender, Object serialized) { @@ -266,9 +279,9 @@ public class Shard extends RaftActor { Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(Void v) { - sender.tell(new CommitTransactionReply().toSerializable(),self); - shardMBean.incrementCommittedTransactionCount(); - shardMBean.setLastCommittedTransactionTime(new Date()); + sender.tell(new CommitTransactionReply().toSerializable(), self); + shardMBean.incrementCommittedTransactionCount(); + shardMBean.setLastCommittedTransactionTime(new Date()); } @Override @@ -365,7 +378,6 @@ public class Shard extends RaftActor { identifier, clientActor.path().toString()); } - } else { LOG.error("Unknown state received {}", data); } @@ -383,11 +395,11 @@ public class Shard extends RaftActor { } - @Override protected Object createSnapshot() { + @Override protected void createSnapshot() { throw new UnsupportedOperationException("createSnapshot"); } - @Override protected void applySnapshot(Object snapshot) { + @Override protected void applySnapshot(ByteString snapshot) { throw new UnsupportedOperationException("applySnapshot"); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java index c557118b1e..a5be69531d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java @@ -151,8 +151,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho ActorSelection cohort = actorContext.actorSelection(actorPath); - futureList.add(actorContext.executeRemoteOperationAsync(cohort, message, - ActorContext.ASK_DURATION)); + futureList.add(actorContext.executeRemoteOperationAsync(cohort, message)); } return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index fc1a3aad74..a8b20c030e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -353,8 +353,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { try { Object response = actorContext.executeShardOperation(shardName, - new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable(), - ActorContext.ASK_DURATION); + new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable()); if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) { CreateTransactionReply reply = CreateTransactionReply.fromSerializable(response); @@ -472,7 +471,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { // Send the ReadyTransaction message to the Tx actor. final Future replyFuture = actorContext.executeRemoteOperationAsync(getActor(), - new ReadyTransaction().toSerializable(), ActorContext.ASK_DURATION); + new ReadyTransaction().toSerializable()); // Combine all the previously recorded put/merge/delete operation reply Futures and the // ReadyTransactionReply Future into one Future. If any one fails then the combined @@ -532,23 +531,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { public void deleteData(YangInstanceIdentifier path) { LOG.debug("Tx {} deleteData called path = {}", identifier, path); recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(), - new DeleteData(path).toSerializable(), ActorContext.ASK_DURATION )); + new DeleteData(path).toSerializable() )); } @Override public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { LOG.debug("Tx {} mergeData called path = {}", identifier, path); recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(), - new MergeData(path, data, schemaContext).toSerializable(), - ActorContext.ASK_DURATION)); + new MergeData(path, data, schemaContext).toSerializable())); } @Override public void writeData(YangInstanceIdentifier path, NormalizedNode data) { LOG.debug("Tx {} writeData called path = {}", identifier, path); recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(), - new WriteData(path, data, schemaContext).toSerializable(), - ActorContext.ASK_DURATION)); + new WriteData(path, data, schemaContext).toSerializable())); } @Override @@ -634,7 +631,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { }; Future readFuture = actorContext.executeRemoteOperationAsync(getActor(), - new ReadData(path).toSerializable(), ActorContext.ASK_DURATION); + new ReadData(path).toSerializable()); readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); } @@ -715,7 +712,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { }; Future future = actorContext.executeRemoteOperationAsync(getActor(), - new DataExists(path).toSerializable(), ActorContext.ASK_DURATION); + new DataExists(path).toSerializable()); future.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index 818a8ca8b3..b87dc4f608 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -47,10 +47,7 @@ public class ActorContext { private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class); - public static final FiniteDuration ASK_DURATION = - Duration.create(5, TimeUnit.SECONDS); - public static final Duration AWAIT_DURATION = - Duration.create(5, TimeUnit.SECONDS); + private static final FiniteDuration DEFAULT_OPER_DURATION = Duration.create(5, TimeUnit.SECONDS); public static final String MAILBOX = "bounded-mailbox"; @@ -59,6 +56,8 @@ public class ActorContext { private final ClusterWrapper clusterWrapper; private final Configuration configuration; private volatile SchemaContext schemaContext; + private FiniteDuration operationDuration = DEFAULT_OPER_DURATION; + private Timeout operationTimeout = new Timeout(operationDuration); public ActorContext(ActorSystem actorSystem, ActorRef shardManager, ClusterWrapper clusterWrapper, @@ -93,6 +92,11 @@ public class ActorContext { } } + public void setOperationTimeout(int timeoutInSeconds) { + operationDuration = Duration.create(timeoutInSeconds, TimeUnit.SECONDS); + operationTimeout = new Timeout(operationDuration); + } + public SchemaContext getSchemaContext() { return schemaContext; } @@ -117,7 +121,7 @@ public class ActorContext { */ public ActorRef findLocalShard(String shardName) { Object result = executeLocalOperation(shardManager, - new FindLocalShard(shardName), ASK_DURATION); + new FindLocalShard(shardName)); if (result instanceof LocalShardFound) { LocalShardFound found = (LocalShardFound) result; @@ -133,7 +137,7 @@ public class ActorContext { public String findPrimaryPath(String shardName) { Object result = executeLocalOperation(shardManager, - new FindPrimary(shardName).toSerializable(), ASK_DURATION); + new FindPrimary(shardName).toSerializable()); if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) { PrimaryFound found = PrimaryFound.fromSerializable(result); @@ -151,16 +155,13 @@ public class ActorContext { * * @param actor * @param message - * @param duration * @return The response of the operation */ - public Object executeLocalOperation(ActorRef actor, Object message, - FiniteDuration duration) { - Future future = - ask(actor, message, new Timeout(duration)); + public Object executeLocalOperation(ActorRef actor, Object message) { + Future future = ask(actor, message, operationTimeout); try { - return Await.result(future, AWAIT_DURATION); + return Await.result(future, operationDuration); } catch (Exception e) { throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e); } @@ -171,21 +172,19 @@ public class ActorContext { * * @param actor * @param message - * @param duration * @return */ - public Object executeRemoteOperation(ActorSelection actor, Object message, - FiniteDuration duration) { + public Object executeRemoteOperation(ActorSelection actor, Object message) { LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString()); - Future future = - ask(actor, message, new Timeout(duration)); + Future future = ask(actor, message, operationTimeout); try { - return Await.result(future, AWAIT_DURATION); + return Await.result(future, operationDuration); } catch (Exception e) { - throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e); + throw new TimeoutException("Sending message " + message.getClass().toString() + + " to actor " + actor.toString() + " failed" , e); } } @@ -194,15 +193,13 @@ public class ActorContext { * * @param actor the ActorSelection * @param message the message to send - * @param duration the maximum amount of time to send he message * @return a Future containing the eventual result */ - public Future executeRemoteOperationAsync(ActorSelection actor, Object message, - FiniteDuration duration) { + public Future executeRemoteOperationAsync(ActorSelection actor, Object message) { LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString()); - return ask(actor, message, new Timeout(duration)); + return ask(actor, message, operationTimeout); } /** @@ -225,16 +222,14 @@ public class ActorContext { * * @param shardName * @param message - * @param duration * @return * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException if the message to the remote shard times out * @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found */ - public Object executeShardOperation(String shardName, Object message, - FiniteDuration duration) { + public Object executeShardOperation(String shardName, Object message) { ActorSelection primary = findPrimary(shardName); - return executeRemoteOperation(primary, message, duration); + return executeRemoteOperation(primary, message); } /** @@ -246,19 +241,17 @@ public class ActorContext { * * @param shardName the name of the shard on which the operation needs to be executed * @param message the message that needs to be sent to the shard - * @param duration the time duration in which this operation should complete * @return the message that was returned by the local actor on which the * the operation was executed. If a local shard was not found then * null is returned * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException * if the operation does not complete in a specified time duration */ - public Object executeLocalShardOperation(String shardName, Object message, - FiniteDuration duration) { + public Object executeLocalShardOperation(String shardName, Object message) { ActorRef local = findLocalShard(shardName); if(local != null) { - return executeLocalOperation(local, message, duration); + return executeLocalOperation(local, message); } return null; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java index f5a0d3783a..e2fbacb461 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java @@ -2,9 +2,12 @@ package org.opendaylight.controller.config.yang.config.distributed_datastore_pro import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory; import org.opendaylight.controller.cluster.datastore.DistributedDataStoreProperties; +import org.osgi.framework.BundleContext; public class DistributedConfigDataStoreProviderModule extends org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedConfigDataStoreProviderModule { + private BundleContext bundleContext; + public DistributedConfigDataStoreProviderModule( org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) { @@ -33,9 +36,15 @@ public class DistributedConfigDataStoreProviderModule extends } return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(), - new DistributedDataStoreProperties(props.getMaxShardDataChangeExecutorPoolSize(), - props.getMaxShardDataChangeExecutorQueueSize(), - props.getMaxShardDataChangeListenerQueueSize(), - props.getShardTransactionIdleTimeoutInMinutes())); + new DistributedDataStoreProperties( + props.getMaxShardDataChangeExecutorPoolSize().getValue(), + props.getMaxShardDataChangeExecutorQueueSize().getValue(), + props.getMaxShardDataChangeListenerQueueSize().getValue(), + props.getShardTransactionIdleTimeoutInMinutes().getValue(), + props.getOperationTimeoutInSeconds().getValue()), bundleContext); + } + + public void setBundleContext(BundleContext bundleContext) { + this.bundleContext = bundleContext; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModuleFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModuleFactory.java index 67bf599454..0cdaca3a15 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModuleFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModuleFactory.java @@ -8,6 +8,29 @@ * Do not modify this file unless it is present under src/main directory */ package org.opendaylight.controller.config.yang.config.distributed_datastore_provider; + +import org.opendaylight.controller.config.api.DependencyResolver; +import org.opendaylight.controller.config.api.DynamicMBeanWithInstance; +import org.opendaylight.controller.config.spi.Module; +import org.osgi.framework.BundleContext; + public class DistributedConfigDataStoreProviderModuleFactory extends org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedConfigDataStoreProviderModuleFactory { + @Override + public Module createModule(String instanceName, DependencyResolver dependencyResolver, BundleContext bundleContext) { + DistributedConfigDataStoreProviderModule module = (DistributedConfigDataStoreProviderModule)super.createModule(instanceName,dependencyResolver,bundleContext); + module.setBundleContext(bundleContext); + return module; + } + + @Override + public Module createModule(String instanceName, DependencyResolver dependencyResolver, + DynamicMBeanWithInstance old, BundleContext bundleContext) throws Exception { + DistributedConfigDataStoreProviderModule module = (DistributedConfigDataStoreProviderModule)super.createModule(instanceName, dependencyResolver, + old, bundleContext); + module.setBundleContext(bundleContext); + return module; + } + + } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java index 443334d11f..c185e871ea 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java @@ -2,9 +2,12 @@ package org.opendaylight.controller.config.yang.config.distributed_datastore_pro import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory; import org.opendaylight.controller.cluster.datastore.DistributedDataStoreProperties; +import org.osgi.framework.BundleContext; public class DistributedOperationalDataStoreProviderModule extends org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedOperationalDataStoreProviderModule { + private BundleContext bundleContext; + public DistributedOperationalDataStoreProviderModule( org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) { @@ -34,10 +37,16 @@ public class DistributedOperationalDataStoreProviderModule extends return DistributedDataStoreFactory.createInstance("operational", getOperationalSchemaServiceDependency(), - new DistributedDataStoreProperties(props.getMaxShardDataChangeExecutorPoolSize(), - props.getMaxShardDataChangeExecutorQueueSize(), - props.getMaxShardDataChangeListenerQueueSize(), - props.getShardTransactionIdleTimeoutInMinutes())); + new DistributedDataStoreProperties( + props.getMaxShardDataChangeExecutorPoolSize().getValue(), + props.getMaxShardDataChangeExecutorQueueSize().getValue(), + props.getMaxShardDataChangeListenerQueueSize().getValue(), + props.getShardTransactionIdleTimeoutInMinutes().getValue(), + props.getOperationTimeoutInSeconds().getValue()), bundleContext); + } + + public void setBundleContext(BundleContext bundleContext) { + this.bundleContext = bundleContext; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModuleFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModuleFactory.java index c9965fee09..364fe62923 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModuleFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModuleFactory.java @@ -8,6 +8,27 @@ * Do not modify this file unless it is present under src/main directory */ package org.opendaylight.controller.config.yang.config.distributed_datastore_provider; + +import org.opendaylight.controller.config.api.DependencyResolver; +import org.opendaylight.controller.config.api.DynamicMBeanWithInstance; +import org.opendaylight.controller.config.spi.Module; +import org.osgi.framework.BundleContext; + public class DistributedOperationalDataStoreProviderModuleFactory extends org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedOperationalDataStoreProviderModuleFactory { + @Override + public Module createModule(String instanceName, DependencyResolver dependencyResolver, BundleContext bundleContext) { + DistributedOperationalDataStoreProviderModule module = (DistributedOperationalDataStoreProviderModule)super.createModule(instanceName,dependencyResolver,bundleContext); + module.setBundleContext(bundleContext); + return module; + } + + @Override + public Module createModule(String instanceName, DependencyResolver dependencyResolver, + DynamicMBeanWithInstance old, BundleContext bundleContext) throws Exception { + DistributedOperationalDataStoreProviderModule module = (DistributedOperationalDataStoreProviderModule)super.createModule(instanceName, dependencyResolver, + old, bundleContext); + module.setBundleContext(bundleContext); + return module; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang index a9a8a1ad98..d50be2ca0e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang @@ -36,30 +36,48 @@ module distributed-datastore-provider { config:java-name-prefix DistributedOperationalDataStoreProvider; } + typedef non-zero-uint16-type { + type uint16 { + range "1..max"; + } + } + + typedef operation-timeout-type { + type uint16 { + range "5..max"; + } + } + grouping data-store-properties { leaf max-shard-data-change-executor-queue-size { default 1000; - type uint16; + type non-zero-uint16-type; description "The maximum queue size for each shard's data store data change notification executor."; } leaf max-shard-data-change-executor-pool-size { default 20; - type uint16; + type non-zero-uint16-type; description "The maximum thread pool size for each shard's data store data change notification executor."; } leaf max-shard-data-change-listener-queue-size { default 1000; - type uint16; + type non-zero-uint16-type; description "The maximum queue size for each shard's data store data change listeners."; } leaf shard-transaction-idle-timeout-in-minutes { default 10; - type uint16; + type non-zero-uint16-type; description "The maximum amount of time a shard transaction can be idle without receiving any messages before it self-destructs."; } + + leaf operation-timeout-in-seconds { + default 5; + type operation-timeout-type; + description "The maximum amount of time for akka operations (remote or local) to complete before failing."; + } } // Augments the 'configuration' choice node under modules/module. diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxyTest.java index e653c3d371..2ed11cfbda 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxyTest.java @@ -82,8 +82,7 @@ public class DataChangeListenerProxyTest extends AbstractActorTest { ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockClusterWrapper(), new MockConfiguration()); Object messages = testContext - .executeLocalOperation(actorRef, "messages", - ActorContext.ASK_DURATION); + .executeLocalOperation(actorRef, "messages"); Assert.assertNotNull(messages); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java index c99a7e8c8c..3d0aaa0082 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java @@ -62,8 +62,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest{ ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)),new MockClusterWrapper(), new MockConfiguration()); Object messages = testContext - .executeLocalOperation(actorRef, "messages", - ActorContext.ASK_DURATION); + .executeLocalOperation(actorRef, "messages"); Assert.assertNotNull(messages); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortFailureTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortFailureTest.java index e10570cd15..e39b9abd65 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortFailureTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortFailureTest.java @@ -13,6 +13,7 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.Props; import akka.testkit.TestActorRef; +import akka.util.Timeout; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListeningExecutorService; @@ -185,17 +186,17 @@ public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest { ).build(); + Timeout askTimeout = new Timeout(ASK_RESULT_DURATION); + //This is done so that Modification list is updated which is used during commit - Future future = - akka.pattern.Patterns.ask(shardTransaction, writeData, 3000); + Future future = akka.pattern.Patterns.ask(shardTransaction, writeData, askTimeout); //ready transaction creates the cohort so that we get into the //block where in commmit is done ShardTransactionMessages.ReadyTransaction readyTransaction = ShardTransactionMessages.ReadyTransaction.newBuilder().build(); - future = - akka.pattern.Patterns.ask(shardTransaction, readyTransaction, 3000); + future = akka.pattern.Patterns.ask(shardTransaction, readyTransaction, askTimeout); //but when the message is sent it will have the MockCommit object //so that we can simulate throwing of exception @@ -216,10 +217,7 @@ public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest { when(mockModification.toSerializable()).thenReturn( PersistentMessages.CompositeModification.newBuilder().build()); - future = - akka.pattern.Patterns.ask(subject, - mockForwardCommitTransaction - , 3000); + future = akka.pattern.Patterns.ask(subject, mockForwardCommitTransaction, askTimeout); Await.result(future, ASK_RESULT_DURATION); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java index adb12b298e..1cd0f85fa1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java @@ -35,7 +35,6 @@ import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; import java.util.List; import java.util.concurrent.ExecutionException; @@ -93,12 +92,12 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { } stubber.when(actorContext).executeRemoteOperationAsync(any(ActorSelection.class), - isA(requestType), any(FiniteDuration.class)); + isA(requestType)); } private void verifyCohortInvocations(int nCohorts, Class requestType) { verify(actorContext, times(nCohorts)).executeRemoteOperationAsync( - any(ActorSelection.class), isA(requestType), any(FiniteDuration.class)); + any(ActorSelection.class), isA(requestType)); } private void propagateExecutionExceptionCause(ListenableFuture future) throws Throwable { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index f69ae88ec8..e5392e0251 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -56,8 +56,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; -import scala.concurrent.duration.FiniteDuration; - import java.util.List; import java.util.concurrent.TimeUnit; @@ -216,10 +214,6 @@ public class TransactionProxyTest extends AbstractActorTest { return getSystem().actorSelection(actorRef.path()); } - private FiniteDuration anyDuration() { - return any(FiniteDuration.class); - } - private CreateTransactionReply createTransactionReply(ActorRef actorRef){ return CreateTransactionReply.newBuilder() .setTransactionActorPath(actorRef.path().toString()) @@ -232,7 +226,7 @@ public class TransactionProxyTest extends AbstractActorTest { when(mockActorContext).actorSelection(actorRef.path().toString()); doReturn(createTransactionReply(actorRef)).when(mockActorContext). executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD), - eqCreateTransaction(memberName, type), anyDuration()); + eqCreateTransaction(memberName, type)); doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath( anyString(), eq(actorRef.path().toString())); doReturn(actorRef.path()).when(mockActorContext).actorFor(actorRef.path().toString()); @@ -259,7 +253,7 @@ public class TransactionProxyTest extends AbstractActorTest { READ_ONLY); doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqReadData(), anyDuration()); + eq(actorSelection(actorRef)), eqReadData()); Optional> readOptional = transactionProxy.read( TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); @@ -269,7 +263,7 @@ public class TransactionProxyTest extends AbstractActorTest { NormalizedNode expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqReadData(), anyDuration()); + eq(actorSelection(actorRef)), eqReadData()); readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); @@ -283,7 +277,7 @@ public class TransactionProxyTest extends AbstractActorTest { setupActorContextWithInitialCreateTransaction(READ_ONLY); doReturn(Futures.successful(new Object())).when(mockActorContext). - executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration()); + executeRemoteOperationAsync(any(ActorSelection.class), any()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); @@ -296,7 +290,7 @@ public class TransactionProxyTest extends AbstractActorTest { setupActorContextWithInitialCreateTransaction(READ_ONLY); doReturn(Futures.failed(new TestException())).when(mockActorContext). - executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration()); + executeRemoteOperationAsync(any(ActorSelection.class), any()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); @@ -308,7 +302,7 @@ public class TransactionProxyTest extends AbstractActorTest { throws Throwable { doThrow(exToThrow).when(mockActorContext).executeShardOperation( - anyString(), any(), anyDuration()); + anyString(), any()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); @@ -348,14 +342,13 @@ public class TransactionProxyTest extends AbstractActorTest { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration()); + eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); doReturn(Futures.failed(new TestException())).when(mockActorContext). - executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData(), - anyDuration()); + executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData()); doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqReadData(), anyDuration()); + eq(actorSelection(actorRef)), eqReadData()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); @@ -368,7 +361,7 @@ public class TransactionProxyTest extends AbstractActorTest { propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH)); } finally { verify(mockActorContext, times(0)).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqReadData(), anyDuration()); + eq(actorSelection(actorRef)), eqReadData()); } } @@ -379,10 +372,10 @@ public class TransactionProxyTest extends AbstractActorTest { NormalizedNode expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqWriteData(expectedNode), anyDuration()); + eq(actorSelection(actorRef)), eqWriteData(expectedNode)); doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqReadData(), anyDuration()); + eq(actorSelection(actorRef)), eqReadData()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); @@ -414,14 +407,14 @@ public class TransactionProxyTest extends AbstractActorTest { READ_ONLY); doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqDataExists(), anyDuration()); + eq(actorSelection(actorRef)), eqDataExists()); Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(); assertEquals("Exists response", false, exists); doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqDataExists(), anyDuration()); + eq(actorSelection(actorRef)), eqDataExists()); exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(); @@ -443,7 +436,7 @@ public class TransactionProxyTest extends AbstractActorTest { setupActorContextWithInitialCreateTransaction(READ_ONLY); doReturn(Futures.successful(new Object())).when(mockActorContext). - executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration()); + executeRemoteOperationAsync(any(ActorSelection.class), any()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); @@ -456,7 +449,7 @@ public class TransactionProxyTest extends AbstractActorTest { setupActorContextWithInitialCreateTransaction(READ_ONLY); doReturn(Futures.failed(new TestException())).when(mockActorContext). - executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration()); + executeRemoteOperationAsync(any(ActorSelection.class), any()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); @@ -471,14 +464,13 @@ public class TransactionProxyTest extends AbstractActorTest { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration()); + eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); doReturn(Futures.failed(new TestException())).when(mockActorContext). - executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData(), - anyDuration()); + executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData()); doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqDataExists(), anyDuration()); + eq(actorSelection(actorRef)), eqDataExists()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); @@ -491,7 +483,7 @@ public class TransactionProxyTest extends AbstractActorTest { propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH)); } finally { verify(mockActorContext, times(0)).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqDataExists(), anyDuration()); + eq(actorSelection(actorRef)), eqDataExists()); } } @@ -502,10 +494,10 @@ public class TransactionProxyTest extends AbstractActorTest { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration()); + eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqDataExists(), anyDuration()); + eq(actorSelection(actorRef)), eqDataExists()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); @@ -556,7 +548,7 @@ public class TransactionProxyTest extends AbstractActorTest { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration()); + eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -564,7 +556,7 @@ public class TransactionProxyTest extends AbstractActorTest { transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); verify(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration()); + eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), WriteDataReply.SERIALIZABLE_CLASS); @@ -599,7 +591,7 @@ public class TransactionProxyTest extends AbstractActorTest { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration()); + eq(actorSelection(actorRef)), eqMergeData(nodeToWrite)); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -607,7 +599,7 @@ public class TransactionProxyTest extends AbstractActorTest { transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); verify(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration()); + eq(actorSelection(actorRef)), eqMergeData(nodeToWrite)); verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), MergeDataReply.SERIALIZABLE_CLASS); @@ -618,7 +610,7 @@ public class TransactionProxyTest extends AbstractActorTest { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); doReturn(deleteDataReply()).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqDeleteData(), anyDuration()); + eq(actorSelection(actorRef)), eqDeleteData()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -626,7 +618,7 @@ public class TransactionProxyTest extends AbstractActorTest { transactionProxy.delete(TestModel.TEST_PATH); verify(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqDeleteData(), anyDuration()); + eq(actorSelection(actorRef)), eqDeleteData()); verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), DeleteDataReply.SERIALIZABLE_CLASS); @@ -665,13 +657,13 @@ public class TransactionProxyTest extends AbstractActorTest { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqReadData(), anyDuration()); + eq(actorSelection(actorRef)), eqReadData()); doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration()); + eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration()); + eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); @@ -700,14 +692,13 @@ public class TransactionProxyTest extends AbstractActorTest { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration()); + eq(actorSelection(actorRef)), eqMergeData(nodeToWrite)); doReturn(Futures.failed(new TestException())).when(mockActorContext). - executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), - anyDuration()); + executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration()); + eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -736,11 +727,11 @@ public class TransactionProxyTest extends AbstractActorTest { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration()); + eq(actorSelection(actorRef)), eqMergeData(nodeToWrite)); doReturn(Futures.failed(new TestException())).when(mockActorContext). executeRemoteOperationAsync(eq(actorSelection(actorRef)), - isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration()); + isA(ReadyTransaction.SERIALIZABLE_CLASS)); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -763,7 +754,7 @@ public class TransactionProxyTest extends AbstractActorTest { public void testReadyWithInitialCreateTransactionFailure() throws Exception { doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation( - anyString(), any(), anyDuration()); + anyString(), any()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -793,11 +784,11 @@ public class TransactionProxyTest extends AbstractActorTest { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration()); + eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); doReturn(Futures.successful(new Object())).when(mockActorContext). executeRemoteOperationAsync(eq(actorSelection(actorRef)), - isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration()); + isA(ReadyTransaction.SERIALIZABLE_CLASS)); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -830,7 +821,7 @@ public class TransactionProxyTest extends AbstractActorTest { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqReadData(), anyDuration()); + eq(actorSelection(actorRef)), eqReadData()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index fda9ccdfdb..5d8fb8393d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -117,7 +117,7 @@ public class ActorContextTest extends AbstractActorTest{ new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class), mock(Configuration.class)); - Object out = actorContext.executeLocalShardOperation("default", "hello", duration("1 seconds")); + Object out = actorContext.executeLocalShardOperation("default", "hello"); assertEquals("hello", out); @@ -144,7 +144,7 @@ public class ActorContextTest extends AbstractActorTest{ new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class), mock(Configuration.class)); - Object out = actorContext.executeLocalShardOperation("default", "hello", duration("1 seconds")); + Object out = actorContext.executeLocalShardOperation("default", "hello"); assertNull(out); @@ -232,7 +232,7 @@ public class ActorContextTest extends AbstractActorTest{ ActorSelection actor = actorContext.actorSelection(shardActorRef.path()); - Object out = actorContext.executeRemoteOperation(actor, "hello", duration("3 seconds")); + Object out = actorContext.executeRemoteOperation(actor, "hello"); assertEquals("hello", out); @@ -261,8 +261,7 @@ public class ActorContextTest extends AbstractActorTest{ ActorSelection actor = actorContext.actorSelection(shardActorRef.path()); - Future future = actorContext.executeRemoteOperationAsync(actor, "hello", - Duration.create(3, TimeUnit.SECONDS)); + Future future = actorContext.executeRemoteOperationAsync(actor, "hello"); try { Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS)); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java index b19fd3a529..8fa3a17f90 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java @@ -12,7 +12,6 @@ import static org.junit.Assert.assertNotNull; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; -import scala.concurrent.duration.FiniteDuration; public class MockActorContext extends ActorContext { @@ -33,12 +32,12 @@ public class MockActorContext extends ActorContext { @Override public Object executeShardOperation(String shardName, - Object message, FiniteDuration duration) { + Object message) { return executeShardOperationResponse; } @Override public Object executeRemoteOperation(ActorSelection actor, - Object message, FiniteDuration duration) { + Object message) { return executeRemoteOperationResponse; } @@ -76,13 +75,13 @@ public class MockActorContext extends ActorContext { @Override public Object executeLocalOperation(ActorRef actor, - Object message, FiniteDuration duration) { + Object message) { return this.executeLocalOperationResponse; } @Override public Object executeLocalShardOperation(String shardName, - Object message, FiniteDuration duration) { + Object message) { return this.executeLocalShardOperationResponse; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/TestUtils.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/TestUtils.java index 939096e7f3..4ddba2f1b9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/TestUtils.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/TestUtils.java @@ -21,8 +21,7 @@ public class TestUtils { ActorContext testContext = new ActorContext(actorSystem, actorSystem.actorOf( Props.create(DoNothingActor.class)), new MockClusterWrapper(), new MockConfiguration()); Object messages = testContext - .executeLocalOperation(actorRef, "messages", - ActorContext.ASK_DURATION); + .executeLocalOperation(actorRef, "messages"); Assert.assertNotNull(messages); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/pom.xml b/opendaylight/md-sal/sal-remoterpc-connector/pom.xml index 41cdd59d6b..8d454c4bd6 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/pom.xml +++ b/opendaylight/md-sal/sal-remoterpc-connector/pom.xml @@ -189,6 +189,7 @@ !org.iq80.*;!*snappy;!org.jboss.*;!com.jcraft.*;!org.fusesource.*;!*jetty*;!sun.security.*;* + diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorSystemFactory.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorSystemFactory.java index f1ca3ccd50..6a442c57cc 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorSystemFactory.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorSystemFactory.java @@ -10,12 +10,16 @@ package org.opendaylight.controller.remote.rpc; import akka.actor.ActorSystem; import akka.osgi.BundleDelegatingClassLoader; -import com.typesafe.config.ConfigFactory; +import org.opendaylight.controller.remote.rpc.utils.AkkaConfigurationReader; import org.osgi.framework.BundleContext; public class ActorSystemFactory { - private static volatile ActorSystem actorSystem = null; + + public static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-rpc"; + public static final String CONFIGURATION_NAME = "odl-cluster-rpc"; + + private static volatile ActorSystem actorSystem = null; public static final ActorSystem getInstance(){ return actorSystem; @@ -26,7 +30,7 @@ public class ActorSystemFactory { * * @param bundleContext */ - public static final void createInstance(final BundleContext bundleContext) { + public static final void createInstance(final BundleContext bundleContext, AkkaConfigurationReader akkaConfigurationReader) { if(actorSystem == null) { // Create an OSGi bundle classloader for actor system BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(), @@ -34,8 +38,8 @@ public class ActorSystemFactory { synchronized (ActorSystemFactory.class) { // Double check if (actorSystem == null) { - ActorSystem system = ActorSystem.create("opendaylight-cluster-rpc", - ConfigFactory.load().getConfig("odl-cluster-rpc"), classLoader); + ActorSystem system = ActorSystem.create(ACTOR_SYSTEM_NAME, + akkaConfigurationReader.read().getConfig(CONFIGURATION_NAME), classLoader); actorSystem = system; } } @@ -43,4 +47,5 @@ public class ActorSystemFactory { throw new IllegalStateException("Actor system should be created only once. Use getInstance method to access existing actor system"); } } + } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java index fc75f7747a..0e6b795c05 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.remote.rpc; +import org.opendaylight.controller.remote.rpc.utils.DefaultAkkaConfigurationReader; import org.opendaylight.controller.sal.core.api.Broker; import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry; import org.osgi.framework.BundleContext; @@ -16,7 +17,7 @@ import org.osgi.framework.BundleContext; public class RemoteRpcProviderFactory { public static RemoteRpcProvider createInstance(final Broker broker, final BundleContext bundleContext){ - ActorSystemFactory.createInstance(bundleContext); + ActorSystemFactory.createInstance(bundleContext, new DefaultAkkaConfigurationReader()); RemoteRpcProvider rpcProvider = new RemoteRpcProvider(ActorSystemFactory.getInstance(), (RpcProvisionRegistry) broker); broker.registerProvider(rpcProvider); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/AkkaConfigurationReader.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/AkkaConfigurationReader.java new file mode 100644 index 0000000000..035ce9a203 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/AkkaConfigurationReader.java @@ -0,0 +1,15 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc.utils; + +import com.typesafe.config.Config; + +public interface AkkaConfigurationReader { + Config read(); +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/DefaultAkkaConfigurationReader.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/DefaultAkkaConfigurationReader.java new file mode 100644 index 0000000000..a44d20ca39 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/DefaultAkkaConfigurationReader.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc.utils; + +import com.google.common.base.Preconditions; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import java.io.File; + +public class DefaultAkkaConfigurationReader implements AkkaConfigurationReader { + public static final String AKKA_CONF_PATH = "./configuration/initial/akka.conf"; + + @Override public Config read() { + File defaultConfigFile = new File(AKKA_CONF_PATH); + Preconditions.checkState(defaultConfigFile.exists(), "akka.conf is missing"); + return ConfigFactory.parseFile(defaultConfigFile); + + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/ActorSystemFactoryTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/ActorSystemFactoryTest.java index ed5fa6d16e..cd1cd91869 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/ActorSystemFactoryTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/ActorSystemFactoryTest.java @@ -10,9 +10,11 @@ package org.opendaylight.controller.remote.rpc; import akka.actor.ActorSystem; +import com.typesafe.config.ConfigFactory; import junit.framework.Assert; import org.junit.After; import org.junit.Test; +import org.opendaylight.controller.remote.rpc.utils.AkkaConfigurationReader; import org.osgi.framework.Bundle; import org.osgi.framework.BundleContext; @@ -27,13 +29,17 @@ public class ActorSystemFactoryTest { public void testActorSystemCreation(){ BundleContext context = mock(BundleContext.class); when(context.getBundle()).thenReturn(mock(Bundle.class)); - ActorSystemFactory.createInstance(context); + + AkkaConfigurationReader reader = mock(AkkaConfigurationReader.class); + when(reader.read()).thenReturn(ConfigFactory.load()); + + ActorSystemFactory.createInstance(context, reader); system = ActorSystemFactory.getInstance(); Assert.assertNotNull(system); // Check illegal state exception try { - ActorSystemFactory.createInstance(context); + ActorSystemFactory.createInstance(context, reader); fail("Illegal State exception should be thrown, while creating actor system second time"); } catch (IllegalStateException e) { } @@ -45,5 +51,4 @@ public class ActorSystemFactoryTest { system.shutdown(); } } - } diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/config/yang/md/sal/rest/connector/RestConnectorModule.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/config/yang/md/sal/rest/connector/RestConnectorModule.java index 52115a8f32..fe20e3a441 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/config/yang/md/sal/rest/connector/RestConnectorModule.java +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/config/yang/md/sal/rest/connector/RestConnectorModule.java @@ -1,10 +1,12 @@ package org.opendaylight.controller.config.yang.md.sal.rest.connector; -import org.opendaylight.controller.sal.rest.impl.RestconfProviderImpl; +import org.opendaylight.controller.sal.restconf.impl.RestconfProviderImpl; public class RestConnectorModule extends org.opendaylight.controller.config.yang.md.sal.rest.connector.AbstractRestConnectorModule { + private static RestConnectorRuntimeRegistration runtimeRegistration; + public RestConnectorModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) { super(identifier, dependencyResolver); } @@ -26,6 +28,14 @@ public class RestConnectorModule extends org.opendaylight.controller.config.yang instance.setWebsocketPort(getWebsocketPort()); // Register it with the Broker getDomBrokerDependency().registerProvider(instance); + + if(runtimeRegistration != null){ + runtimeRegistration.close(); + } + + runtimeRegistration = + getRootRuntimeBeanRegistratorWrapper().register(instance); + return instance; } } diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/RestconfApplication.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/RestconfApplication.java index a298f4b093..c9496af4c8 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/RestconfApplication.java +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/RestconfApplication.java @@ -14,6 +14,7 @@ import javax.ws.rs.core.Application; import org.opendaylight.controller.sal.restconf.impl.BrokerFacade; import org.opendaylight.controller.sal.restconf.impl.ControllerContext; import org.opendaylight.controller.sal.restconf.impl.RestconfImpl; +import org.opendaylight.controller.sal.restconf.impl.StatisticsRestconfServiceWrapper; public class RestconfApplication extends Application { @@ -38,7 +39,7 @@ public class RestconfApplication extends Application { restconfImpl.setControllerContext(controllerContext); singletons.add(controllerContext); singletons.add(brokerFacade); - singletons.add(restconfImpl); + singletons.add(StatisticsRestconfServiceWrapper.getInstance()); singletons.add(StructuredDataToXmlProvider.INSTANCE); singletons.add(StructuredDataToJsonProvider.INSTANCE); singletons.add(JsonToCompositeNodeProvider.INSTANCE); diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfImpl.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfImpl.java index adad26e141..5d8c910afc 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfImpl.java +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfImpl.java @@ -15,6 +15,7 @@ import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import java.math.BigInteger; import java.net.URI; import java.text.ParseException; import java.text.SimpleDateFormat; @@ -85,6 +86,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class RestconfImpl implements RestconfService { + private enum UriParameters { PRETTY_PRINT("prettyPrint"), DEPTH("depth"); @@ -101,6 +103,8 @@ public class RestconfImpl implements RestconfService { } } + + private final static RestconfImpl INSTANCE = new RestconfImpl(); private static final int NOTIFICATION_PORT = 8181; @@ -1550,4 +1554,9 @@ public class RestconfImpl implements RestconfService { } return false; } + + public BigInteger getOperationalReceived() { + // TODO Auto-generated method stub + return null; + } } diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/RestconfProviderImpl.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfProviderImpl.java similarity index 59% rename from opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/RestconfProviderImpl.java rename to opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfProviderImpl.java index 2fa99819d5..abadbf6cb8 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/RestconfProviderImpl.java +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfProviderImpl.java @@ -5,35 +5,41 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.sal.rest.impl; +package org.opendaylight.controller.sal.restconf.impl; -import java.util.Collection; -import java.util.Collections; +import org.opendaylight.controller.config.yang.md.sal.rest.connector.Config; +import org.opendaylight.controller.config.yang.md.sal.rest.connector.Get; +import org.opendaylight.controller.config.yang.md.sal.rest.connector.Operational; +import org.opendaylight.controller.config.yang.md.sal.rest.connector.Post; +import org.opendaylight.controller.config.yang.md.sal.rest.connector.Put; +import org.opendaylight.controller.config.yang.md.sal.rest.connector.RestConnectorRuntimeMXBean; +import org.opendaylight.controller.config.yang.md.sal.rest.connector.Rpcs; import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService; import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; import org.opendaylight.controller.sal.core.api.Provider; import org.opendaylight.controller.sal.core.api.model.SchemaService; import org.opendaylight.controller.sal.rest.api.RestConnector; -import org.opendaylight.controller.sal.restconf.impl.BrokerFacade; -import org.opendaylight.controller.sal.restconf.impl.ControllerContext; import org.opendaylight.controller.sal.streams.websockets.WebSocketServer; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.PortNumber; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.model.api.SchemaContextListener; -public class RestconfProviderImpl implements Provider, AutoCloseable, RestConnector { +import java.math.BigInteger; +import java.util.Collection; +import java.util.Collections; - public final static String NOT_INITALIZED_MSG = "Restconf is not initialized yet. Please try again later"; +public class RestconfProviderImpl implements Provider, AutoCloseable, RestConnector, RestConnectorRuntimeMXBean { + private final StatisticsRestconfServiceWrapper stats = StatisticsRestconfServiceWrapper.getInstance(); private ListenerRegistration listenerRegistration; private PortNumber port; + private Thread webSocketServerThread; + public void setWebsocketPort(PortNumber port) { this.port = port; } - private Thread webSocketServerThread; - @Override public void onSessionInitiated(ProviderSession session) { final DOMDataBroker domDataBroker = session.getService(DOMDataBroker.class); @@ -58,9 +64,45 @@ public class RestconfProviderImpl implements Provider, AutoCloseable, RestConnec @Override public void close() { + if (listenerRegistration != null) { listenerRegistration.close(); } + + WebSocketServer.destroyInstance(); webSocketServerThread.interrupt(); } + + @Override + public Config getConfig() { + Config config = new Config(); + Get get = new Get(); + get.setReceivedRequests(stats.getConfigGet()); + config.setGet(get); + Post post = new Post(); + post.setReceivedRequests(stats.getConfigPost()); + config.setPost(post); + Put put = new Put(); + put.setReceivedRequests(stats.getConfigPut()); + config.setPut(put); + return config; + } + + @Override + public Operational getOperational() { + BigInteger opGet = stats.getOperationalGet(); + Operational operational = new Operational(); + Get get = new Get(); + get.setReceivedRequests(opGet); + operational.setGet(get); + return operational; + } + + @Override + public Rpcs getRpcs() { + BigInteger rpcInvoke = stats.getRpc(); + Rpcs rpcs = new Rpcs(); + rpcs.setReceivedRequests(rpcInvoke); + return rpcs ; + } } diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/StatisticsRestconfServiceWrapper.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/StatisticsRestconfServiceWrapper.java new file mode 100644 index 0000000000..eafbb81c48 --- /dev/null +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/StatisticsRestconfServiceWrapper.java @@ -0,0 +1,150 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.sal.restconf.impl; + +import java.math.BigInteger; +import java.util.concurrent.atomic.AtomicLong; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.UriInfo; +import org.opendaylight.controller.sal.rest.api.RestconfService; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.opendaylight.yangtools.yang.data.api.Node; + +public class StatisticsRestconfServiceWrapper implements RestconfService { + + AtomicLong operationalGet = new AtomicLong(); + AtomicLong configGet = new AtomicLong(); + AtomicLong rpc = new AtomicLong(); + AtomicLong configPost = new AtomicLong(); + AtomicLong configPut = new AtomicLong(); + AtomicLong configDelete = new AtomicLong(); + + private static final StatisticsRestconfServiceWrapper INSTANCE = new StatisticsRestconfServiceWrapper(RestconfImpl.getInstance()); + + final RestconfService delegate; + + private StatisticsRestconfServiceWrapper(RestconfService delegate) { + this.delegate = delegate; + } + + public static StatisticsRestconfServiceWrapper getInstance() { + return INSTANCE; + } + + @Override + public Object getRoot() { + return delegate.getRoot(); + } + + @Override + public StructuredData getModules(UriInfo uriInfo) { + return delegate.getModules(uriInfo); + } + + @Override + public StructuredData getModules(String identifier, UriInfo uriInfo) { + return delegate.getModules(identifier, uriInfo); + } + + @Override + public StructuredData getModule(String identifier, UriInfo uriInfo) { + return delegate.getModule(identifier, uriInfo); + } + + @Override + public StructuredData getOperations(UriInfo uriInfo) { + return delegate.getOperations(uriInfo); + } + + @Override + public StructuredData getOperations(String identifier, UriInfo uriInfo) { + return delegate.getOperations(identifier, uriInfo); + } + + @Override + public StructuredData invokeRpc(String identifier, CompositeNode payload, UriInfo uriInfo) { + rpc.incrementAndGet(); + return delegate.invokeRpc(identifier, payload, uriInfo); + } + + @Override + public StructuredData invokeRpc(String identifier, String noPayload, UriInfo uriInfo) { + rpc.incrementAndGet(); + return delegate.invokeRpc(identifier, noPayload, uriInfo); + } + + @Override + public NormalizedNodeContext readConfigurationData(String identifier, UriInfo uriInfo) { + configGet.incrementAndGet(); + return delegate.readConfigurationData(identifier, uriInfo); + } + + @Override + public NormalizedNodeContext readOperationalData(String identifier, UriInfo uriInfo) { + operationalGet.incrementAndGet(); + return delegate.readOperationalData(identifier, uriInfo); + } + + @Override + public Response updateConfigurationData(String identifier, Node payload) { + configPut.incrementAndGet(); + return delegate.updateConfigurationData(identifier, payload); + } + + @Override + public Response createConfigurationData(String identifier, Node payload) { + configPost.incrementAndGet(); + return delegate.createConfigurationData(identifier, payload); + } + + @Override + public Response createConfigurationData(Node payload) { + configPost.incrementAndGet(); + return delegate.createConfigurationData(payload); + } + + @Override + public Response deleteConfigurationData(String identifier) { + return delegate.deleteConfigurationData(identifier); + } + + @Override + public Response subscribeToStream(String identifier, UriInfo uriInfo) { + return delegate.subscribeToStream(identifier, uriInfo); + } + + @Override + public StructuredData getAvailableStreams(UriInfo uriInfo) { + return delegate.getAvailableStreams(uriInfo); + } + + public BigInteger getConfigDelete() { + return BigInteger.valueOf(configDelete.get()); + } + + public BigInteger getConfigGet() { + return BigInteger.valueOf(configGet.get()); + } + + public BigInteger getConfigPost() { + return BigInteger.valueOf(configPost.get()); + } + + public BigInteger getConfigPut() { + return BigInteger.valueOf(configPut.get()); + } + + public BigInteger getOperationalGet() { + return BigInteger.valueOf(operationalGet.get()); + } + + public BigInteger getRpc() { + return BigInteger.valueOf(rpc.get()); + } + +} diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServer.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServer.java index 67ed44f84e..0a5f5f0ff0 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServer.java +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServer.java @@ -16,11 +16,10 @@ import org.slf4j.LoggerFactory; public class WebSocketServer implements Runnable { private static final Logger logger = LoggerFactory.getLogger(WebSocketServer.class); - public static final String WEBSOCKET_SERVER_CONFIG_PROPERTY = "restconf.websocket.port"; public static final int DEFAULT_PORT = 8181; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; - private static WebSocketServer singleton = null; + private static WebSocketServer instance = null; private int port = DEFAULT_PORT; private WebSocketServer(int port) { @@ -35,14 +34,11 @@ public class WebSocketServer implements Runnable { * @return instance of {@link WebSocketServer} */ public static WebSocketServer createInstance(int port) { - if (singleton != null) { - throw new IllegalStateException("createInstance() has already been called"); - } - if (port < 1024) { - throw new IllegalArgumentException("Privileged port (below 1024) is not allowed"); - } - singleton = new WebSocketServer(port); - return singleton; + Preconditions.checkState(instance == null, "createInstance() has already been called"); + Preconditions.checkArgument(port > 1024, "Privileged port (below 1024) is not allowed"); + + instance = new WebSocketServer(port); + return instance; } /** @@ -58,18 +54,18 @@ public class WebSocketServer implements Runnable { * @return instance of {@link WebSocketServer} */ public static WebSocketServer getInstance() { - Preconditions.checkNotNull(singleton, "createInstance() must be called prior to getInstance()"); - return singleton; + Preconditions.checkNotNull(instance, "createInstance() must be called prior to getInstance()"); + return instance; } /** * Destroy this already created instance */ public static void destroyInstance() { - if (singleton == null) { - throw new IllegalStateException("createInstance() must be called prior to destroyInstance()"); - } - getInstance().stop(); + Preconditions.checkState(instance != null, "createInstance() must be called prior to destroyInstance()"); + + instance.stop(); + instance = null; } @Override @@ -99,9 +95,11 @@ public class WebSocketServer implements Runnable { Notificator.removeAllListeners(); if (bossGroup != null) { bossGroup.shutdownGracefully(); + bossGroup = null; } if (workerGroup != null) { workerGroup.shutdownGracefully(); + workerGroup = null; } } diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/yang/opendaylight-rest-connector.yang b/opendaylight/md-sal/sal-rest-connector/src/main/yang/opendaylight-rest-connector.yang index a8fc8ff4d5..6d2add6ff1 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/yang/opendaylight-rest-connector.yang +++ b/opendaylight/md-sal/sal-rest-connector/src/main/yang/opendaylight-rest-connector.yang @@ -27,6 +27,12 @@ module opendaylight-rest-connector { config:java-name-prefix RestConnector; } + grouping statistics { + leaf received-requests { + type uint64; + } + } + augment "/config:modules/config:module/config:configuration" { case rest-connector-impl { when "/config:modules/config:module/config:type = 'rest-connector-impl'"; @@ -44,4 +50,33 @@ module opendaylight-rest-connector { } } } + + augment "/config:modules/config:module/config:state" { + case rest-connector-impl { + when "/config:modules/config:module/config:type = 'rest-connector-impl'"; + container rpcs { + uses statistics; + } + + container config { + container get { + uses statistics; + } + + container post { + uses statistics; + } + + container put { + uses statistics; + } + } + + container operational { + container get { + uses statistics; + } + } + } + } } \ No newline at end of file