Skip to content

Commit

Permalink
[r] Connect re-indexer to blockwise iterator (#2742) (#2748)
Browse files Browse the repository at this point in the history
Connect the re-indexer to the blockwise iterator, allowing reads to be re-indexed on-the-fly. This PR parallels #1792 and completes #2152 and #2637; in addition, provides new shorthand for `reindex_disable_on_axis`:
 - `TRUE`: disable re-indexing on all axes
 - `FALSE: re-index on all axes
 - `NA`: re-index only on major axis, disable re-indexing on all axes (default)

`BlockwiseTableReadIter$concat()` and `BlockwiseSparseReadIter$concat()` are disabled when re-indexing is requested (paralleling Python)

`BlockwiseSparseReadIter` now accepts `repr = "R"` or `repr = "C"` under certain circumstances:
 - axis 0 (`soma_dim_0`) must be re-indexed to allow `repr = "R"`
 - axis 1 (`soma_dim_1`) must be re-indexed to allow `repr = "C"`

`repr` of `"T"` is allowed in all circumstances and continues to be the default

Two new fields are available to blockwise iterators:
 - `$axes_to_reindex`: a vector of minor axes slated to be re-indexed
 - `$reindexable`: status indicator stating if _any_ axis (major or minor) is slated to be re-indexed

resolves #2671

Co-authored-by: Paul Hoffman <[email protected]>
  • Loading branch information
github-actions[bot] and mojaveazure authored Jun 17, 2024
1 parent c8194db commit f4207f4
Show file tree
Hide file tree
Showing 11 changed files with 464 additions and 121 deletions.
161 changes: 141 additions & 20 deletions apis/r/R/BlockwiseIter.R
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ BlockwiseReadIterBase <- R6::R6Class(
coords,
axis,
...,
reindex_disable_on_axis = NULL
reindex_disable_on_axis = NA
) {
super$initialize(sr)
stopifnot(
Expand Down Expand Up @@ -55,6 +55,16 @@ BlockwiseReadIterBase <- R6::R6Class(
}
private$.coords <- coords
# Check reindex_disable_on_axis
if (is_scalar_logical(reindex_disable_on_axis)) {
reindex_disable_on_axis <- if (isTRUE(reindex_disable_on_axis)) { # TRUE
bit64::seq.integer64(0L, ndim)
} else if (isFALSE(reindex_disable_on_axis)) { # FALSE
NULL
} else { # NA
ax <- bit64::seq.integer64(0L, ndim)
ax[ax != self$axis]
}
}
if (!is.null(reindex_disable_on_axis)) {
stopifnot(
"'reindex_disable_on_axis' must be a vector of integers" = (
Expand All @@ -63,11 +73,27 @@ BlockwiseReadIterBase <- R6::R6Class(
),
"'reindex_disable_on_axis' must be finite" = is.finite(reindex_disable_on_axis),
"'reindex_disable_on_axis' must be within the range of dimensions of the array" = all(
reindex_disable_on_axis >= 0 && reindex_disable_on_axis <= ndim
reindex_disable_on_axis >= 0 & reindex_disable_on_axis <= ndim
)
)
reindex_disable_on_axis <- unique(bit64::as.integer64(reindex_disable_on_axis))
}
private$.reindex_disable_on_axis <- reindex_disable_on_axis
axes_to_reindex <- self$axes_to_reindex
private$.reindexers <- vector("list", length = length(axes_to_reindex))
shape <- self$array$shape()
dnames <- self$array$dimnames()
for (i in seq_along(axes_to_reindex)) {
ax <- as.numeric(axes_to_reindex[i]) + 1L
coords <- as.list(CoordsStrider$new(start = 0L, end = shape[ax] - 1L))
coords <- if (length(coords) == 1L) {
coords[[1L]]
} else {
unlist64(coords)
}
private$.reindexers[[i]] <- IntIndexer$new(coords)
names(private$.reindexers)[i] <- dnames[ax]
}
},
#' @description Check if the iterated read is complete or not
#'
Expand All @@ -90,8 +116,8 @@ BlockwiseReadIterBase <- R6::R6Class(
}
private$reset()
dimnam <- self$array$dimnames()[self$axis + 1L]
nextelems <- self$coords_axis$next_element()
private$set_dim_points(dimnam, nextelems)
private$.nextelems <- self$coords_axis$next_element()
private$set_dim_points(dimnam, private$.nextelems)
return(private$.read_next())
}
),
Expand All @@ -102,6 +128,19 @@ BlockwiseReadIterBase <- R6::R6Class(
#' @field axis The axis to iterate over in a blockwise fashion
#'
axis = function() private$.axis,
#' @field axes_to_reindex The axes to re-index
#'
axes_to_reindex = function() {
ax <- bit64::seq.integer64(0L, self$array$ndim() - 1L)
ax <- ax[!ax %in% self$reindex_disable_on_axis]
if (length(ax)) {
ax <- ax[ax != self$axis]
}
if (!length(ax)) {
return(NULL)
}
return(ax)
},
#' @field coords A list of \code{\link{CoordsStrider}} objects
#'
coords = function() private$.coords,
Expand All @@ -113,13 +152,26 @@ BlockwiseReadIterBase <- R6::R6Class(
},
#' @field reindex_disable_on_axis Additional axes that will not be re-indexed
#'
reindex_disable_on_axis = function() private$.reindex_disable_on_axis
reindex_disable_on_axis = function() private$.reindex_disable_on_axis,
#' @field reindexable Shorthand to see if this iterator is poised to be
#' re-indexed or not
#'
reindexable = function() length(self$axes_to_reindex) ||
!bit64::as.integer64(self$axis) %in% self$reindex_disable_on_axis
),
private = list(
.array = NULL,
.coords = list(),
.axis = integer(1L),
.nextelems = NULL,
.reindex_disable_on_axis = NULL,
.reindexers = list(),
# @description Throw an error saying that re-indexed
# iterators are not concatenatable
.notConcatenatable = function() stop(errorCondition(
message = "Re-indexed blockwise iterators are not concatenatable",
class = "notConcatenatableError"
)),
# @description Reset internal state of SOMA Reader while keeping array open
reset = function() {
if (is.null(private$soma_reader_pointer)) {
Expand All @@ -128,6 +180,48 @@ BlockwiseReadIterBase <- R6::R6Class(
sr_reset(private$soma_reader_pointer)
return(invisible(NULL))
},
# @description Re-index an Arrow table
reindex_arrow_table = function(tbl) {
stopifnot(
"'tbl' must be an Arrow table" = R6::is.R6(tbl) && inherits(tbl, 'Table')
)
dname <- self$array$dimnames()[self$axis + 1L]
if (!dname %in% names(tbl)) {
stop(
"Cannot find ",
sQuote(dname),
" in the provided Arrow table",
call. = FALSE
)
}
op <- options(arrow.int64_downcast = FALSE)
on.exit(options(op), add = TRUE, after = FALSE)
coords <- self$coords
coords[[dname]] <- CoordsStrider$new(
private$.nextelems,
stride = coords[[dname]]$stride
)
if (!bit64::as.integer64(self$axis) %in% self$reindex_disable_on_axis) {
indexer <- IntIndexer$new(private$.nextelems)
tbl[[dname]] <- indexer$get_indexer(
tbl[[dname]]$as_vector(),
nomatch_na = TRUE
)
rm(indexer)
}
for (dname in names(private$.reindexers)) {
if (!dname %in% names(tbl)) {
""
}
indexer <- private$.reindexers[[dname]]
tbl[[dname]] <- indexer$get_indexer(
tbl[[dname]]$as_vector(),
nomatch_na = TRUE
)
}
attr(tbl, "coords") <- coords
return(tbl)
},
# @description Set dimension selection on given axis
set_dim_points = function(dimname, points) {
stopifnot(
Expand Down Expand Up @@ -156,14 +250,22 @@ BlockwiseTableReadIter <- R6::R6Class(
classname = "BlockwiseTableReadIter",
inherit = BlockwiseReadIterBase,
public = list(
#' @description ...
#' @description Concatenate the remainder of the blockwise iterator
#'
#' @return ...
#' @return An Arrow Table with the remainder of the iterator
#'
concat = function() soma_array_to_arrow_table_concat(self)
concat = function() {
if (self$reindexable) {
private$.notConcatenatable()
}
return(soma_array_to_arrow_table_concat(self))
}
),
private = list(
soma_reader_transform = function(x) soma_array_to_arrow_table(x)
soma_reader_transform = function(x) {
tbl <- soma_array_to_arrow_table(x)
return(private$reindex_arrow_table(tbl))
}
)
)

Expand Down Expand Up @@ -194,7 +296,7 @@ BlockwiseSparseReadIter <- R6::R6Class(
axis,
...,
repr = "T",
reindex_disable_on_axis = NULL
reindex_disable_on_axis = NA
) {
super$initialize(
sr,
Expand All @@ -204,14 +306,28 @@ BlockwiseSparseReadIter <- R6::R6Class(
...,
reindex_disable_on_axis = reindex_disable_on_axis
)
private$.repr <- match.arg(repr)
stopifnot(
"Sparse reads only work with two-dimensional arrays" = self$array$ndim() == 2L
)
reprs <- c(
'T',
if (!bit64::as.integer64(0L) %in% self$reindex_disable_on_axis)'R',
if (!bit64::as.integer64(1L) %in% self$reindex_disable_on_axis) 'C'
)
private$.repr <- match.arg(repr, choices = reprs)
private$.shape <- sapply(coords, length)
},
#' @description ...
#' @description Concatenate the remainder of the blockwise iterator
#'
#' @return ...
#' @return A sparse matrix (determined by \code{self$repr}) with
#' the remainder of the iterator
#'
concat = function() soma_array_to_sparse_matrix_concat(self, private$.zero_based)
concat = function() {
if (self$reindexable) {
private$.notConcatenatable()
}
return(soma_array_to_sparse_matrix_concat(self, private$.zero_based))
}
),
active = list(
#' @field repr Representation of the sparse matrix to return
Expand All @@ -222,11 +338,16 @@ BlockwiseSparseReadIter <- R6::R6Class(
.repr = character(1L),
.shape = NULL,
.zero_based = FALSE,
soma_reader_transform = function(x) arrow_table_to_sparse(
soma_array_to_arrow_table(x),
repr = self$repr,
shape = private$.shape,
zero_based = private$.zero_based
)
soma_reader_transform = function(x) {
tbl <- private$reindex_arrow_table(soma_array_to_arrow_table(x))
mat <- arrow_table_to_sparse(
tbl,
repr = self$repr,
shape = private$.shape,
zero_based = private$.zero_based
)
attr(mat, "coords") <- attr(tbl, "coords", exact = TRUE)
return(mat)
}
)
)
5 changes: 3 additions & 2 deletions apis/r/R/SOMASparseNDArrayRead.R
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ SOMASparseNDArrayRead <- R6::R6Class(
axis,
...,
size = NULL,
reindex_disable_on_axis = NULL
reindex_disable_on_axis = NA
) {
return(SOMASparseNDArrayBlockwiseRead$new(
self$sr,
Expand Down Expand Up @@ -210,14 +210,15 @@ SOMASparseNDArrayBlockwiseRead <- R6::R6Class(
axis,
...,
size,
reindex_disable_on_axis = NULL
reindex_disable_on_axis = NA
) {
super$initialize(sr, array, coords)
stopifnot(
"'size' must be a single integer value" = is.null(size) ||
rlang::is_integerish(size, 1L, finite = TRUE) ||
(inherits(size, 'integer64') && length(size) == 1L && is.finite(size)),
"'reindex_disable_on_axis' must be a vector of integers" = is.null(reindex_disable_on_axis) ||
is_scalar_logical(reindex_disable_on_axis) ||
rlang::is_integerish(reindex_disable_on_axis, finite = TRUE) ||
(inherits(reindex_disable_on_axis, 'integer64') && all(is.finite(reindex_disable_on_axis)))
)
Expand Down
16 changes: 14 additions & 2 deletions apis/r/man/BlockwiseReadIterBase.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 20 additions & 5 deletions apis/r/man/BlockwiseSparseReadIter.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions apis/r/man/BlockwiseTableReadIter.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit f4207f4

Please sign in to comment.