Skip to content

Commit

Permalink
Reworked reorder.c (#3181)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattdowle committed Dec 4, 2018
1 parent a0b496d commit e2a95fb
Showing 1 changed file with 42 additions and 59 deletions.
101 changes: 42 additions & 59 deletions src/reorder.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ SEXP reorder(SEXP x, SEXP order)
// 'order' must strictly be a permutation of 1:n (i.e. no repeats, zeros or NAs)
// If only a small subset in the middle is reordered the ends are moved in: [start,end].
// x may be a vector, or a list of same-length vectors such as data.table

R_len_t nrow, ncol;
int maxSize = 0;
size_t maxSize = 0;
if (isNewList(x)) {
nrow = length(VECTOR_ELT(x,0));
ncol = length(x);
Expand All @@ -20,7 +19,7 @@ SEXP reorder(SEXP x, SEXP order)
error("Column %d is length %d which differs from length of column 1 (%d). Invalid data.table.", i+1, length(v), nrow);
if (SIZEOF(v) > maxSize)
maxSize=SIZEOF(v);
if (ALTREP(v)) SET_VECTOR_ELT(x,i,duplicate(v)); // expand compact vector in place, ready for reordering by reference
if (ALTREP(v)) SET_VECTOR_ELT(x,i,duplicate(v)); // expand compact vector in place ready for reordering by reference
}
} else {
if (SIZEOF(x)!=4 && SIZEOF(x)!=8)
Expand All @@ -35,75 +34,59 @@ SEXP reorder(SEXP x, SEXP order)
int nprotect = 0;
if (ALTREP(order)) { order=PROTECT(duplicate(order)); nprotect++; } // TODO: how to fetch range of ALTREP compact vector

R_len_t start = 0;
while (start<nrow && INTEGER(order)[start] == start+1) start++;

const int *restrict idx = INTEGER(order);
int i=0;
while (i<nrow && idx[i] == i+1) i++;
const int start = i;
if (start==nrow) { UNPROTECT(nprotect); return(R_NilValue); } // input is 1:n, nothing to do
R_len_t end = nrow-1;
while (INTEGER(order)[end] == end+1) end--;
for (R_len_t i=start; i<=end; i++) {
int itmp = INTEGER(order)[i]-1;
i = nrow-1;
while (idx[i] == i+1) i--;
const int end = i;
for (int i=start; i<=end; i++) {
int itmp = idx[i]-1;
if (itmp<start || itmp>end) error("order is not a permutation of 1:nrow[%d]", nrow);
}
// Creorder is for internal use (so we should get the input right!), but the check above seems sensible, otherwise
// would be segfault below. The for loop above should run in neglible time (sequential) and will also catch NAs.
// It won't catch duplicates in order, but that's ok. Checking that would be going too far given this is for internal use only.
// Creorder is for internal use (so we should get the input right!), but the check above seems sensible. The for loop above should run
// in neglible time (sequential with prefetch). It will catch NAs anywhere but won't catch duplicates. But doing so would be going too
// far given this is for internal use only and we only use this function internally when we're sure it's a permutation.

// Enough working ram for one column of the largest type, for every thread.
// Up to a limit of 1GB total. It's system dependent how to find out the truly free RAM - TODO.
// Without a limit it could easily start swapping and not work at all.
int nth = MIN(getDTthreads(), ncol);
size_t oneTmpSize = (end-start+1)*(size_t)maxSize;
size_t totalLimit = 1024*1024*(size_t)1024; // 1GB
nth = MIN(totalLimit/oneTmpSize, nth);
if (nth==0) nth=1; // if one column's worth is very big, we'll just have to try
char *tmp[nth]; // VLA ok because small; limited to max getDTthreads() not ncol which could be > 1e6
int ok=0; for (; ok<nth; ok++) {
tmp[ok] = malloc(oneTmpSize);
if (tmp[ok] == NULL) break;
}
if (ok==0) error("unable to allocate %d * %d bytes of working memory for reordering data.table", end-start+1, maxSize);
nth = ok; // as many threads for which we have a successful malloc
// So we can still reorder a 10GB table in 16GB of RAM, as long as we have at least one column's worth of tmp
char *TMP = malloc(nrow * maxSize);
// enough RAM for a copy of one column (of largest type). Writes into the [start,end] subset. Outside [start,end] is wasted in that rarer case
// to save a "-start" in the deep loop below in all cases.
if (!TMP) error("Unable to allocate %d * %d bytes of working memory for reordering data.table", end-start+1, maxSize);
const int nth = getDTthreads();

#pragma omp parallel for schedule(dynamic) num_threads(nth)
for (int i=0; i<ncol; i++) {
const SEXP v = isNewList(x) ? VECTOR_ELT(x,i) : x;
const int size = SIZEOF(v);
const int me = omp_get_thread_num();
const int *vi = INTEGER(order)+start;
const size_t size = SIZEOF(v); // size_t, otherwise #5305 (integer overflow in memcpy)
if (size==4) {
const int *vd = (const int *)DATAPTR(v);
int *tmpp = (int *)tmp[me];
for (int j=start; j<=end; j++) {
*tmpp++ = vd[*vi++ -1]; // just copies 4 bytes, including pointers on 32bit
const int *restrict vd = INTEGER(v); // just DATAPTR really. Could use REAL too or any of the other types. It's the LHS's type being 4 byte that is important.
int *restrict tmp = (int *)TMP;
#pragma omp parallel for num_threads(nth)
for (int i=start; i<=end; i++) {
tmp[i] = vd[idx[i]-1]; // copies 4 bytes including pointers on 32bit (STRSXP and VECSXP)
}
memcpy(INTEGER(v)+start, tmp+start, (end-start+1)*size);
} else {
const double *vd = (const double *)DATAPTR(v);
double *tmpp = (double *)tmp[me];
for (int j=start; j<=end; j++) {
*tmpp++ = vd[*vi++ -1]; // just copies 8 bytes, pointers too including STRSXP and VECSXP
const double *restrict vd = REAL(v); // similarly, could use INTEGER() here. It's the LHS's type being 8 bytes that's important.
double *restrict tmp = (double *)TMP;
#pragma omp parallel for num_threads(nth)
for (int i=start; i<=end; i++) {
tmp[i] = vd[idx[i]-1]; // copies 8 bytes including pointers on 64bit (STRSXP and VECSXP)
}
memcpy(REAL(v)+start, tmp+start, (end-start+1)*size);
}
// How is this possible to not only ignore the write barrier but in parallel too?
// Only because this reorder() function accepts and checks a unique permutation of 1:nrow. It
// performs an in-place shuffle. This operation in the end does not change gcgen, mark or
// named/refcnt. They all stay the same even for STRSXP and VECSXP because it's just a data shuffle.
//
// Theory:
// The write to tmp is contiguous and io efficient (so less threads should not help that)
// The read from vd is as io efficient as order is ordered (the more threads the better when close
// to ordered but less threads may help when not very ordered).
// TODO large data benchmark to confirm theory and auto tune.
// io probably limits too much but at least this is our best shot (e.g. branchless) in finding out
// on other platforms with faster bus, perhaps

// copy the reordered data back into the original vector
memcpy((char *)DATAPTR(v) + start*(size_t)size,
tmp[me],
(end-start+1)*(size_t)size);
// size_t, otherwise #5305 (integer overflow in memcpy)
}
for (int i=0; i<nth; i++) free(tmp[i]);
// It's ok to ignore write barrier, and in parallel too, only because this reorder() function accepts and checks
// a unique permutation of 1:nrow. It performs an in-place shuffle. This operation in the end does not change gcgen, mark or
// named/refcnt. They all stay the same even for STRSXP and VECSXP because it's just a data shuffle.
//
// Theory:
// The write to TMP is contiguous, so sync between cpus of written-cache-lines should not be an issue.
// The read from vd is random, but at least the column has a good chance of being all in cache as parallelism is within column.
// As idx approaches being ordered (e.g. moving blocks around) then it should automatically be more read cache efficient.
free(TMP);
UNPROTECT(nprotect);
return(R_NilValue);
}
Expand Down

0 comments on commit e2a95fb

Please sign in to comment.