import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
import org.opendaylight.controller.cluster.messaging.MessageAssembler;
import org.opendaylight.yangtools.concepts.Identifiable;
-import org.opendaylight.yangtools.concepts.WritableIdentifier;
+import org.opendaylight.yangtools.concepts.Identifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
return this;
}
+ if (context().messageSlicer().handleMessage(command)) {
+ return this;
+ }
+
return onCommand(command);
}
- private static long extractCookie(final WritableIdentifier id) {
+ private static long extractCookie(final Identifier id) {
if (id instanceof TransactionIdentifier) {
return ((TransactionIdentifier) id).getHistoryId().getCookie();
} else if (id instanceof LocalHistoryIdentifier) {
} finally {
connectionsLock.unlockWrite(stamp);
}
+
+ context().messageSlicer().close();
}
/**
}
} else {
LOG.info("{}: removed connection {}", persistenceId(), conn);
+ cancelSlicing(conn.cookie());
}
} finally {
connectionsLock.unlockWrite(stamp);
} else {
LOG.warn("{}: failed to replace connection {}, as it was not tracked", persistenceId(), conn);
}
+ } else {
+ cancelSlicing(oldConn.cookie());
}
} finally {
connectionsLock.unlockWrite(stamp);
}));
}
+ private void cancelSlicing(final Long cookie) {
+ context().messageSlicer().cancelSlicing(id -> {
+ try {
+ return cookie.equals(extractCookie(id));
+ } catch (IllegalArgumentException e) {
+ LOG.debug("extractCookie failed while cancelling slicing for cookie {}: {}", cookie, e);
+ return false;
+ }
+ });
+ }
+
private ConnectingClientConnection<T> createConnection(final Long shard) {
final ConnectingClientConnection<T> conn = new ConnectingClientConnection<>(context(), shard);
resolveConnection(shard, conn);