Skip to content

Commit

Permalink
(build) split directory.c into srv/cl/supp
Browse files Browse the repository at this point in the history
Before this gets messier than it already is, differentiate the role
and implementation for directory mode now.
  • Loading branch information
letoram committed Jul 12, 2023
1 parent a8cbf3e commit 9085fd6
Show file tree
Hide file tree
Showing 7 changed files with 539 additions and 305 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -28,6 +28,7 @@

## Net
* allow h264 passthrough, sidestepping local encode
* re-add afsrv\_net, have it support directory, source and sink access modes.

## Terminal
* SGR reset fix, add CNL / CPL
Expand Down
4 changes: 3 additions & 1 deletion src/a12/net/CMakeLists.txt
Expand Up @@ -6,7 +6,9 @@ set(SOURCES
a12_helper_cl.c
a12_helper_srv.c
net.c
directory.c
dir_cl.c
dir_srv.c
dir_supp.c
${ARCAN_SRC}/frameserver/util/anet_helper.c
${ARCAN_SRC}/frameserver/util/anet_keystore_naive.c
)
Expand Down
303 changes: 0 additions & 303 deletions src/a12/net/directory.c → src/a12/net/dir_cl.c
Expand Up @@ -29,254 +29,6 @@
#include <fcntl.h>
#include <poll.h>

static bool g_shutdown;

static struct a12_bhandler_res srv_bevent(
struct a12_state* S, struct a12_bhandler_meta M, void* tag);

static FILE* cmd_to_membuf(const char* cmd, char** out, size_t* out_sz)
{
FILE* applin = popen(cmd, "r");
if (!applin)
return NULL;

FILE* applbuf = open_memstream(out, out_sz);
if (!applbuf){
pclose(applin);
return NULL;
}

char buf[4096];
size_t nr;
bool ok = true;

while ((nr = fread(buf, 1, 4096, applin))){
if (1 != fwrite(buf, nr, 1, applbuf)){
ok = false;
break;
}
}

pclose(applin);
if (!ok){
fclose(applbuf);
return NULL;
}

/* actually keep both in order to allow appending elsewhere */
fflush(applbuf);
return applbuf;
}

/* This part is much more PoC - we'd need a nicer cache / store (sqlite?) so
* that each time we start up, we don't have to rescan and the other end don't
* have to redownload if nothing's changed. */
static size_t scan_appdir(int fd, struct appl_meta* dst)
{
lseek(fd, 0, SEEK_SET);
DIR* dir = fdopendir(fd);
struct dirent* ent;
size_t count = 0;

while (dir && (ent = readdir(dir))){
if (
strlen(ent->d_name) >= 18 ||
strcmp(ent->d_name, "..") == 0 || strcmp(ent->d_name, ".") == 0){
continue;
}
fchdir(fd);

/* just want directories */
struct stat sbuf;
if (-1 == stat(ent->d_name, &sbuf) || (sbuf.st_mode & S_IFMT) != S_IFDIR)
continue;

chdir(ent->d_name);

struct appl_meta* new = malloc(sizeof(struct appl_meta));
if (!new)
break;

*dst = (struct appl_meta){0};
size_t buf_sz;
dst->handle = cmd_to_membuf("tar cf - .", &dst->buf, &buf_sz);
dst->buf_sz = buf_sz;
fchdir(fd);

if (!dst->handle){
free(new);
continue;
}
dst->identifier = count++;

blake3_hasher temp;
blake3_hasher_init(&temp);
blake3_hasher_update(&temp, dst->buf, dst->buf_sz);
blake3_hasher_finalize(&temp, dst->hash, 4);
snprintf(dst->applname, 18, "%s", ent->d_name);

*new = (struct appl_meta){0};
dst->next = new;
dst = new;
}

closedir(dir);
return count;
}

static void on_srv_event(
struct arcan_shmif_cont* cont, int chid, struct arcan_event* ev, void* tag)
{
struct directory_meta* cbt = tag;

/* the only concerns here are BCHUNK matching our directory IDs */
if (ev->ext.kind == EVENT_EXTERNAL_BCHUNKSTATE){
/* sweep the directory, and when found: */
if (!isdigit(ev->ext.bchunk.extensions[0])){
a12int_trace(A12_TRACE_DIRECTORY, "event=bchunkstate:error=invalid_id");
return;
}

uint16_t extid = (uint16_t)
strtoul((char*)ev->ext.bchunk.extensions, NULL, 10);

struct appl_meta* meta = cbt->dir;
while (meta){
if (extid == meta->identifier){
/* we have the applname, and the Kpub of the other end -
* use that to determine if we have any state block to send first */
int fd = a12_access_state(cbt->S, meta->applname, "r", 0);
if (-1 != fd){
a12int_trace(A12_TRACE_DIRECTORY,
"event=bchunkstate:send_state=%s", meta->applname);
a12_enqueue_bstream(
cbt->S, fd, A12_BTYPE_STATE, meta->identifier, false, 0);
close(fd);
}

a12int_trace(A12_TRACE_DIRECTORY,
"event=bchunkstate:send=%s", meta->applname);
a12_enqueue_blob(cbt->S, meta->buf, meta->buf_sz, meta->identifier);
return;
}
meta = meta->next;
}
a12int_trace(A12_TRACE_DIRECTORY,
"event=bchunkstate:error=no_match:id=%"PRIu16, extid);
}
else
a12int_trace(A12_TRACE_DIRECTORY,
"event=%s", arcan_shmif_eventstr(ev, NULL, 0));
}

void anet_directory_ioloop
(struct a12_state* S, void* tag,
int fdin, int fdout,
int usrfd,
void (*on_event)(struct arcan_shmif_cont* cont, int chid, struct arcan_event*, void*),
bool (*on_directory)(struct a12_state* S, struct appl_meta* dir, void*),
void (*on_userfd)(struct a12_state* S, void*))
{
int errmask = POLLERR | POLLNVAL | POLLHUP;
struct pollfd fds[3] =
{
{.fd = usrfd, .events = POLLIN | errmask},
{.fd = fdin, .events = POLLIN | errmask},
{.fd = -fdout, .events = POLLOUT | errmask}
};

uint8_t inbuf[9000];
uint8_t* outbuf = NULL;
uint64_t ts = 0;

fcntl(fdin, F_SETFD, FD_CLOEXEC);
fcntl(fdout, F_SETFD, FD_CLOEXEC);

size_t outbuf_sz = a12_flush(S, &outbuf, A12_FLUSH_ALL);

if (outbuf_sz)
fds[2].fd = fdout;

/* regular simple processing loop, wait for DIRECTORY-LIST command */
while (a12_ok(S) && -1 != poll(fds, 3, -1)){
if ((fds[0].revents | fds[1].revents | fds[2].revents) & errmask)
break;

if (fds[0].revents & POLLIN){
on_userfd(S, tag);
}

if ((fds[2].revents & POLLOUT) && outbuf_sz){
ssize_t nw = write(fdout, outbuf, outbuf_sz);
if (nw > 0){
outbuf += nw;
outbuf_sz -= nw;
}
}

if (fds[1].revents & POLLIN){
ssize_t nr = recv(fdin, inbuf, 9000, 0);
if (-1 == nr && errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR){
a12int_trace(A12_TRACE_DIRECTORY, "shutdown:reason=rw_error");
break;
}
else if (0 == nr){
a12int_trace(A12_TRACE_DIRECTORY, "shutdown:reason=closed");
break;
}
a12_unpack(S, inbuf, nr, tag, on_event);

/* check if there has been a change to the directory state after each unpack */
uint64_t new_ts;
if (on_directory){
struct appl_meta* dir = a12int_get_directory(S, &new_ts);
if (new_ts != ts){
ts = new_ts;
if (!on_directory(S, dir, tag))
return;
}
}
}

if (!outbuf_sz){
outbuf_sz = a12_flush(S, &outbuf, A12_FLUSH_ALL);
if (!outbuf_sz && g_shutdown){
break;
}
}

fds[0].revents = fds[1].revents = fds[2].revents = 0;
fds[2].fd = outbuf_sz ? fdout : -1;
}
}

/* this will just keep / cache the built .tars in memory, the startup times
* will still be long and there is no detection when / if to rebuild or when
* the state has changed - a better server would use sqlite and some basic
* signalling. */
void anet_directory_srv_rescan(struct anet_dirsrv_opts* opts)
{
opts->dir_count = scan_appdir(dup(opts->basedir), &opts->dir);
}

void anet_directory_srv(
struct a12_state* S, struct anet_dirsrv_opts opts, int fdin, int fdout)
{
struct directory_meta cbt = {
.dir = &opts.dir,
.S = S
};

if (!opts.dir_count){
a12int_trace(A12_TRACE_DIRECTORY, "shutdown:reason=no_valid_appls");
return;
}

a12int_set_directory(S, &opts.dir);
a12_set_bhandler(S, srv_bevent, &cbt);
anet_directory_ioloop(S, &cbt, fdin, fdout, -1, on_srv_event, NULL, NULL);
}

static void on_cl_event(
struct arcan_shmif_cont* cont, int chid, struct arcan_event* ev, void* tag)
{
Expand Down Expand Up @@ -583,61 +335,6 @@ static bool handover_exec(struct a12_state* S, const char* name,
return false;
}

static struct appl_meta* find_identifier(struct appl_meta* base, unsigned id)
{
while (base){
if (base->identifier == id)
return base;
base = base->next;
}
return NULL;
}

static struct a12_bhandler_res srv_bevent(
struct a12_state* S, struct a12_bhandler_meta M, void* tag)
{
struct a12_bhandler_res res = {
.fd = -1,
.flag = A12_BHANDLER_DONTWANT
};

struct directory_meta* cbt = tag;
struct appl_meta* meta = find_identifier(cbt->dir, M.identifier);
if (!meta)
return res;

/* this is not robust or complete - the previous a12_access_state for the ID
* should really only be swapped when we have a complete transfer - one option
* is to first store under a temporary id, then on completion access and copy */
switch (M.state){
case A12_BHANDLER_COMPLETED:
a12int_trace(
A12_TRACE_DIRECTORY, "kind=status:completed:identifier=%"PRIu16, M.identifier);
break;
case A12_BHANDLER_CANCELLED:
/* 1. truncate the existing state store for the slot */
break;
case A12_BHANDLER_INITIALIZE:
/* 1. check that the identifier is valid. */
/* 2. reserve the state slot - add suffix if it is debug */
/* 3. setup the result structure. */
if (M.type == A12_BTYPE_STATE){
res.fd = a12_access_state(S, meta->applname, "w+", M.known_size);
ftruncate(res.fd, 0);
}
else if (M.type == A12_BTYPE_CRASHDUMP){
char name[sizeof(meta->applname) + sizeof(".dump")];
snprintf(name, sizeof(name), "%s.dump", meta->applname);
res.fd = a12_access_state(S, name, "w+", M.known_size);
}
break;
}

if (-1 != res.fd)
res.flag = A12_BHANDLER_NEWFD;
return res;
}

static void mark_xfer_complete(
struct a12_state* S, struct a12_bhandler_meta M, struct directory_meta* cbt)
{
Expand Down

0 comments on commit 9085fd6

Please sign in to comment.