Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reworked reorder.c #3181

Merged
merged 1 commit into from
Dec 4, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
101 changes: 42 additions & 59 deletions src/reorder.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ SEXP reorder(SEXP x, SEXP order)
// 'order' must strictly be a permutation of 1:n (i.e. no repeats, zeros or NAs)
// If only a small subset in the middle is reordered the ends are moved in: [start,end].
// x may be a vector, or a list of same-length vectors such as data.table

R_len_t nrow, ncol;
int maxSize = 0;
size_t maxSize = 0;
if (isNewList(x)) {
nrow = length(VECTOR_ELT(x,0));
ncol = length(x);
Expand All @@ -20,7 +19,7 @@ SEXP reorder(SEXP x, SEXP order)
error("Column %d is length %d which differs from length of column 1 (%d). Invalid data.table.", i+1, length(v), nrow);
if (SIZEOF(v) > maxSize)
maxSize=SIZEOF(v);
if (ALTREP(v)) SET_VECTOR_ELT(x,i,duplicate(v)); // expand compact vector in place, ready for reordering by reference
if (ALTREP(v)) SET_VECTOR_ELT(x,i,duplicate(v)); // expand compact vector in place ready for reordering by reference
}
} else {
if (SIZEOF(x)!=4 && SIZEOF(x)!=8)
Expand All @@ -35,75 +34,59 @@ SEXP reorder(SEXP x, SEXP order)
int nprotect = 0;
if (ALTREP(order)) { order=PROTECT(duplicate(order)); nprotect++; } // TODO: how to fetch range of ALTREP compact vector

R_len_t start = 0;
while (start<nrow && INTEGER(order)[start] == start+1) start++;

const int *restrict idx = INTEGER(order);
int i=0;
while (i<nrow && idx[i] == i+1) i++;
const int start = i;
if (start==nrow) { UNPROTECT(nprotect); return(R_NilValue); } // input is 1:n, nothing to do
R_len_t end = nrow-1;
while (INTEGER(order)[end] == end+1) end--;
for (R_len_t i=start; i<=end; i++) {
int itmp = INTEGER(order)[i]-1;
i = nrow-1;
while (idx[i] == i+1) i--;
const int end = i;
for (int i=start; i<=end; i++) {
int itmp = idx[i]-1;
if (itmp<start || itmp>end) error("order is not a permutation of 1:nrow[%d]", nrow);
}
// Creorder is for internal use (so we should get the input right!), but the check above seems sensible, otherwise
// would be segfault below. The for loop above should run in neglible time (sequential) and will also catch NAs.
// It won't catch duplicates in order, but that's ok. Checking that would be going too far given this is for internal use only.
// Creorder is for internal use (so we should get the input right!), but the check above seems sensible. The for loop above should run
// in neglible time (sequential with prefetch). It will catch NAs anywhere but won't catch duplicates. But doing so would be going too
// far given this is for internal use only and we only use this function internally when we're sure it's a permutation.

// Enough working ram for one column of the largest type, for every thread.
// Up to a limit of 1GB total. It's system dependent how to find out the truly free RAM - TODO.
// Without a limit it could easily start swapping and not work at all.
int nth = MIN(getDTthreads(), ncol);
size_t oneTmpSize = (end-start+1)*(size_t)maxSize;
size_t totalLimit = 1024*1024*(size_t)1024; // 1GB
nth = MIN(totalLimit/oneTmpSize, nth);
if (nth==0) nth=1; // if one column's worth is very big, we'll just have to try
char *tmp[nth]; // VLA ok because small; limited to max getDTthreads() not ncol which could be > 1e6
int ok=0; for (; ok<nth; ok++) {
tmp[ok] = malloc(oneTmpSize);
if (tmp[ok] == NULL) break;
}
if (ok==0) error("unable to allocate %d * %d bytes of working memory for reordering data.table", end-start+1, maxSize);
nth = ok; // as many threads for which we have a successful malloc
// So we can still reorder a 10GB table in 16GB of RAM, as long as we have at least one column's worth of tmp
char *TMP = malloc(nrow * maxSize);
// enough RAM for a copy of one column (of largest type). Writes into the [start,end] subset. Outside [start,end] is wasted in that rarer case
// to save a "-start" in the deep loop below in all cases.
if (!TMP) error("Unable to allocate %d * %d bytes of working memory for reordering data.table", end-start+1, maxSize);
const int nth = getDTthreads();

#pragma omp parallel for schedule(dynamic) num_threads(nth)
for (int i=0; i<ncol; i++) {
const SEXP v = isNewList(x) ? VECTOR_ELT(x,i) : x;
const int size = SIZEOF(v);
const int me = omp_get_thread_num();
const int *vi = INTEGER(order)+start;
const size_t size = SIZEOF(v); // size_t, otherwise #5305 (integer overflow in memcpy)
if (size==4) {
const int *vd = (const int *)DATAPTR(v);
int *tmpp = (int *)tmp[me];
for (int j=start; j<=end; j++) {
*tmpp++ = vd[*vi++ -1]; // just copies 4 bytes, including pointers on 32bit
const int *restrict vd = INTEGER(v); // just DATAPTR really. Could use REAL too or any of the other types. It's the LHS's type being 4 byte that is important.
int *restrict tmp = (int *)TMP;
#pragma omp parallel for num_threads(nth)
for (int i=start; i<=end; i++) {
tmp[i] = vd[idx[i]-1]; // copies 4 bytes including pointers on 32bit (STRSXP and VECSXP)
}
memcpy(INTEGER(v)+start, tmp+start, (end-start+1)*size);
} else {
const double *vd = (const double *)DATAPTR(v);
double *tmpp = (double *)tmp[me];
for (int j=start; j<=end; j++) {
*tmpp++ = vd[*vi++ -1]; // just copies 8 bytes, pointers too including STRSXP and VECSXP
const double *restrict vd = REAL(v); // similarly, could use INTEGER() here. It's the LHS's type being 8 bytes that's important.
double *restrict tmp = (double *)TMP;
#pragma omp parallel for num_threads(nth)
for (int i=start; i<=end; i++) {
tmp[i] = vd[idx[i]-1]; // copies 8 bytes including pointers on 64bit (STRSXP and VECSXP)
}
memcpy(REAL(v)+start, tmp+start, (end-start+1)*size);
}
// How is this possible to not only ignore the write barrier but in parallel too?
// Only because this reorder() function accepts and checks a unique permutation of 1:nrow. It
// performs an in-place shuffle. This operation in the end does not change gcgen, mark or
// named/refcnt. They all stay the same even for STRSXP and VECSXP because it's just a data shuffle.
//
// Theory:
// The write to tmp is contiguous and io efficient (so less threads should not help that)
// The read from vd is as io efficient as order is ordered (the more threads the better when close
// to ordered but less threads may help when not very ordered).
// TODO large data benchmark to confirm theory and auto tune.
// io probably limits too much but at least this is our best shot (e.g. branchless) in finding out
// on other platforms with faster bus, perhaps

// copy the reordered data back into the original vector
memcpy((char *)DATAPTR(v) + start*(size_t)size,
tmp[me],
(end-start+1)*(size_t)size);
// size_t, otherwise #5305 (integer overflow in memcpy)
}
for (int i=0; i<nth; i++) free(tmp[i]);
// It's ok to ignore write barrier, and in parallel too, only because this reorder() function accepts and checks
// a unique permutation of 1:nrow. It performs an in-place shuffle. This operation in the end does not change gcgen, mark or
// named/refcnt. They all stay the same even for STRSXP and VECSXP because it's just a data shuffle.
//
// Theory:
// The write to TMP is contiguous, so sync between cpus of written-cache-lines should not be an issue.
// The read from vd is random, but at least the column has a good chance of being all in cache as parallelism is within column.
// As idx approaches being ordered (e.g. moving blocks around) then it should automatically be more read cache efficient.
free(TMP);
UNPROTECT(nprotect);
return(R_NilValue);
}
Expand Down