Slice front-end request messages
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / ClientActorBehavior.java
index 79d7eceb148d8d255ab1c9785e7349ebb5c72522..3f6515cbb8b20ce22183b857eccacd6acb50223c 100644 (file)
@@ -36,7 +36,7 @@ import org.opendaylight.controller.cluster.common.actor.Dispatchers.DispatcherTy
 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
 import org.opendaylight.controller.cluster.messaging.MessageAssembler;
 import org.opendaylight.yangtools.concepts.Identifiable;
-import org.opendaylight.yangtools.concepts.WritableIdentifier;
+import org.opendaylight.yangtools.concepts.Identifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
@@ -151,10 +151,14 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
             return this;
         }
 
+        if (context().messageSlicer().handleMessage(command)) {
+            return this;
+        }
+
         return onCommand(command);
     }
 
-    private static long extractCookie(final WritableIdentifier id) {
+    private static long extractCookie(final Identifier id) {
         if (id instanceof TransactionIdentifier) {
             return ((TransactionIdentifier) id).getHistoryId().getCookie();
         } else if (id instanceof LocalHistoryIdentifier) {
@@ -245,6 +249,8 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
         } finally {
             connectionsLock.unlockWrite(stamp);
         }
+
+        context().messageSlicer().close();
     }
 
     /**
@@ -367,6 +373,7 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
                 }
             } else {
                 LOG.info("{}: removed connection {}", persistenceId(), conn);
+                cancelSlicing(conn.cookie());
             }
         } finally {
             connectionsLock.unlockWrite(stamp);
@@ -390,6 +397,8 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
                 } else {
                     LOG.warn("{}: failed to replace connection {}, as it was not tracked", persistenceId(), conn);
                 }
+            } else {
+                cancelSlicing(oldConn.cookie());
             }
         } finally {
             connectionsLock.unlockWrite(stamp);
@@ -404,6 +413,17 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
             }));
     }
 
+    private void cancelSlicing(final Long cookie) {
+        context().messageSlicer().cancelSlicing(id -> {
+            try {
+                return cookie.equals(extractCookie(id));
+            } catch (IllegalArgumentException e) {
+                LOG.debug("extractCookie failed while cancelling slicing for cookie {}: {}", cookie, e);
+                return false;
+            }
+        });
+    }
+
     private ConnectingClientConnection<T> createConnection(final Long shard) {
         final ConnectingClientConnection<T> conn = new ConnectingClientConnection<>(context(), shard);
         resolveConnection(shard, conn);