Skip to content

Commit

Permalink
(a12) change appl package format
Browse files Browse the repository at this point in the history
This moves away from exec:ing to tar into a placeholder custom. The need
for this is to have a .manifest file for an appl that turns into a
global header for the appl with basic information about sandboxing needs
(primarily of -net and -terminal should be permitted or not).

Then there's a single line header per file, the raw bytes and repeat
until eof. Checksumming and compression is not included as the protocol
does that already. What is missing in that regard is signature chains
and caching of compression to reduce server spinup times, but that's
for the optimisation stage.

This should also restore functionality for afsrv_net browse/load that
was broken during the net/dir_cl refactor. The biggest discrepancy
between the two right now is that no NETSTATE events are emitted yet
and no tunnel request controls.
  • Loading branch information
letoram committed Oct 26, 2023
1 parent 792626a commit fa6025f
Show file tree
Hide file tree
Showing 10 changed files with 475 additions and 125 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -43,6 +43,8 @@
* directory mode notification of appl updates and sources coming / leaving
* directory mode registration of dynamic sinks
* directory mode sourcing dynamic sinks (sink-inbound only)
* directory mode source-sink pairing 'reachable source' and 'tunnel' transfer modes
* changed packaging format for appls

## Terminal
* SGR reset fix, add CNL / CPL
Expand Down
2 changes: 1 addition & 1 deletion src/a12/a12.c
Expand Up @@ -2256,7 +2256,7 @@ static void process_blob(struct a12_state* S)
return;
}
else
a12int_trace(A12_TRACE_BTRANSFER, "kind=zstd_state:%"PRIu64, decode);
a12int_trace(A12_TRACE_BTRANSFER, "kind=zstd_state:size=%"PRIu64, decode);

free_buf = true;
}
Expand Down
41 changes: 35 additions & 6 deletions src/a12/net/README.md
Expand Up @@ -71,10 +71,39 @@ in the path. On normal/default setups this is supported by appl output being
directed to a separate 'appltemp' namespace by default, letting applbase be
read-only.

If the public key a client is trusted (in keystore), it can also be used as a
state store for storing/restoring the appl execution state. Other (planned)
modes of operation for this feature is to act as a pub/sub for sources/sinks/
other directories to find eachother.
If the public key a client uses is trusted (in keystore), it can also be used
as a state store for storing/restoring the appl execution state. It can also
be used to dynamically update the hosted appl, letting connected clients switch
automatically:

arcan-net --push-appl myappl @myserver

It is also possible to share access to arcan clients dynamically:

A12_IDENT=me ARCAN_CONNPATH=a12://myserver@ /usr/bin/afsrv_terminal

Would expose a single instance of 'afsrv\_terminal' as 'me', letting someone
else access it:

arcan-net --keep-alive @myserver /me

The above would connect to the directory, wait for the 'me' source to be
announced as available and request to source it.

This retains end to end encryption of the data stream, and can be used to learn
about public keys from this trusted third party in order to reconnect with
their owners them through some other means later.

The default form of this creates a direct sink to source outbound connection.
In cases where that network traffic is not possible, the directory server can
act as a tunnel:

arcan-net --tunnel

This is permitted by default by a directory server but can be toggled off with
--block-tunnel:

arcan-net --directory --block-tunnel -l 6680

# Cache

Expand Down Expand Up @@ -249,8 +278,8 @@ Milestone 3 - big stretch (0.6.x)
- [x] enumerate / fetch / execute appl
- [x] register as source
- [x] register as sink
- [ ] notify on new source/sink
- [ ] relay a12 traffing between source/sink
- [x] notify on new source/sink
- [x] relay a12 traffing between source/sink
- [ ] NAT-punch between source/sink
- [x] state store/restore
- [x] Add to afsrv\_net (x)
Expand Down
122 changes: 74 additions & 48 deletions src/a12/net/dir_cl.c
Expand Up @@ -180,8 +180,31 @@ static void on_source(struct a12_state* S, struct a12_dynreq req, void* tag)
static void on_cl_event(
struct arcan_shmif_cont* cont, int chid, struct arcan_event* ev, void* tag)
{
struct ioloop_shared* I = tag;

/* main use would be the appl- runner forwarding messages that direction */
a12int_trace(A12_TRACE_DIRECTORY, "event=%s", arcan_shmif_eventstr(ev, NULL, 0));

/* we do have an ongoing transfer (--push-appl) that we wait for an OK or cancel
* before marking that we're ready to shutdown */
if (I->cbt->in_transfer &&
ev->category == EVENT_EXTERNAL &&
ev->ext.kind == EVENT_EXTERNAL_STREAMSTATUS){
if (ev->ext.streamstat.identifier == I->cbt->transfer_id){
a12int_trace(A12_TRACE_DIRECTORY,
"streamstatus:progress=%f:id=%f",
ev->ext.streamstat.completion,
(int)ev->ext.streamstat.identifier
);
if (ev->ext.streamstat.completion < 0 ||
ev->ext.streamstat.completion >= 0.999){
I->shutdown = true;
}
}
else
a12int_trace(A12_TRACE_DIRECTORY,
"streamstatus:unknown=%d", (int)ev->ext.streamstat.identifier);
}
}

static int cleancb(
Expand Down Expand Up @@ -211,31 +234,6 @@ static bool clean_appldir(const char* name, int basedir)
return 0 == nftw(name, cleancb, 32, FTW_DEPTH | FTW_PHYS);
}

static bool ensure_appldir(const char* name, int basedir)
{
/* this should also ensure that we have a correct statedir
* and try to retrieve it if possible */
if (-1 != basedir){
fchdir(basedir);
}

char buf[strlen(name) + sizeof(".new")];
if (atomic_load(&active_appls.n_active) > 0){
snprintf(buf, sizeof(buf), "%s.new", name);
name = buf;
}

/* make sure we don't have a collision */
clean_appldir(name, basedir);

if (-1 == mkdir(name, S_IRWXU) || -1 == chdir(name)){
fprintf(stderr, "Couldn't create [basedir]/%s\n", name);
return false;
}

return true;
}

struct default_meta {
const char* bin;
char* key;
Expand Down Expand Up @@ -321,6 +319,8 @@ static pid_t exec_cpath(struct a12_state* S,
if (ctx->key)
setenv(ctx->key, ctx->val, 1);

fchdir(dir->clopt->basedir);

setsid();
setenv("XDG_RUNTIME_DIR", "./", 1);
dup2(pstdin[0], STDIN_FILENO);
Expand Down Expand Up @@ -358,7 +358,7 @@ static void swap_appldir(const char* name, int basedir)
}

nftw(name, cleancb, 32, FTW_DEPTH | FTW_PHYS);
rename(buf, name);
renameat(basedir, buf, basedir, name);
}

/*
Expand Down Expand Up @@ -621,6 +621,8 @@ void* appl_runner(void* tag)
cbt->state_in_complete = false;
}

swap_appldir(cbt->clopt->applname, cbt->clopt->basedir);

/* appl_runner hooks into ios->userfd and attaches the event handler
* which translates commands and eventually serializes / forwards state */
handover_exec(S, state_in);
Expand Down Expand Up @@ -650,33 +652,42 @@ static void mark_xfer_complete(struct ioloop_shared* I, struct a12_bhandler_meta

/* signs of foul-play - we recieved completion notices without init.. */
if (!cbt->appl_out){
fprintf(stderr, "xfer completed on blob without an active state");
fprintf(stderr, "xfer completed on blob without an active state\n");
I->shutdown = true;
return;
}

/* EOF the unpack action now */
if (pclose(cbt->appl_out) != EXIT_SUCCESS){
fprintf(stderr, "xfer download unpack failed");
/* extract into 'newname' first, then we swap it before launch */
char newname[strlen(cbt->clopt->applname) + sizeof(".new")];
snprintf(newname, sizeof(newname), "%s.new", cbt->clopt->applname);
const char* msg;
if (!extract_appl_pkg(cbt->appl_out, cbt->clopt->basedir, newname, &msg)){
fprintf(stderr, "unpack appl failed: %s\n", msg);
I->shutdown = true;
return;
}

cbt->appl_out = NULL;
cbt->appl_out_complete = false;

/* Setup so it is threadable, though unlikely that useful versus just having
* userfd and multiplex appl-a12 that way. It would be for the case of running
* multiple appls over the same channel so keep that door open. Right now we
* work on the idea that if a new appl is received we should just force the
* other end to reload. We do this by queueing the command, waking the watchdog
* and waiting for it to reply to us via userfd */
/* if there is already an appl running we wnat to swap it out, in order to do
* that we want to make sure there are no file-system races. this is done by
* sending a lock command, waiting for that to be acknowledged when in the
* handler rename. */
if (atomic_load(&active_appls.n_active) > 0){
fprintf(active_appls.active.pf_stdin, "lock\n");
kill(active_appls.active.pid, SIGUSR1);
fprintf(stderr, "signalling=%d\n", active_appls.active.pid);
return;
}

/* Setup so it is threadable, though unlikely that useful versus just having
* userfd and multiplex appl-a12 that way. It would be for the case of running
* multiple appls over the same channel so keep that door open. Right now we
* work on the idea that if a new appl is received we should just force the
* other end to reload. We do this by queueing the command, waking the watchdog
* and waiting for it to reply to us via userfd */

atomic_fetch_add(&active_appls.n_active, 1);
active_appls.active.ios = I;

Expand Down Expand Up @@ -750,21 +761,26 @@ struct a12_bhandler_res anet_directory_cl_bhandler(
*
* In the case of an appl we should verify that we wanted hot reloading,
* ensure_appldir into new and set atomic-swap on completion. */
if (!ensure_appldir(cbt->clopt->applname, cbt->clopt->basedir)){
fprintf(stderr, "Couldn't create temporary appl directory\n");
if (-1 == cbt->clopt->basedir){
snprintf(cbt->clopt->basedir_path, PATH_MAX, "%s", "/tmp/appltemp-XXXXXX");
if (!mkdtemp(cbt->clopt->basedir_path)){
fprintf(stderr, "Couldn't build a temporary storage base\n");
return res;
}
cbt->clopt->basedir = open(cbt->clopt->basedir_path, O_DIRECTORY);
}

char filename[] = "appltemp-XXXXXX";
int appl_fd = mkstemp(filename);
if (-1 == appl_fd){
fprintf(stderr, "Couldn't create temporary appl- unpack store\n");
return res;
}
unlink(filename);

cbt->appl_out = popen("tar xfm -", "w");
res.fd = fileno(cbt->appl_out);
cbt->appl_out = fdopen(appl_fd, "rw");
res.flag = A12_BHANDLER_NEWFD;

/* these should really just enforce basedir and never use relative, it is
* just some initial hack thing that survived. */
if (-1 != cbt->clopt->basedir)
fchdir(cbt->clopt->basedir);
else
chdir("..");
res.fd = appl_fd;
}
break;

Expand Down Expand Up @@ -841,6 +857,9 @@ static bool cl_got_dir(struct ioloop_shared* I, struct appl_meta* dir)
"push-no-match:name=%s", cbt->clopt->outapp.appl.name);
}

cbt->transfer_id = cbt->clopt->outapp.identifier;
cbt->in_transfer = true;

a12_enqueue_blob(I->S,
cbt->clopt->outapp.buf,
cbt->clopt->outapp.buf_sz,
Expand All @@ -849,7 +868,6 @@ static bool cl_got_dir(struct ioloop_shared* I, struct appl_meta* dir)
cbt->clopt->outapp.appl.name
);

I->shutdown = true;
return true;
}

Expand Down Expand Up @@ -940,4 +958,12 @@ void anet_directory_cl(
a12int_request_dirlist(S, !opts.die_on_list || opts.applname[0]);

anet_directory_ioloop(&ioloop);

/* if we went for setting up the basedir we clean it as well */
if (opts.basedir_path[0]){
rmdir(opts.basedir_path);
close(opts.basedir);
opts.basedir = -1;
opts.basedir_path[0] = '\0';
}
}
3 changes: 3 additions & 0 deletions src/a12/net/dir_srv.c
Expand Up @@ -501,6 +501,7 @@ static void handle_bchunk_completion(struct dircl* C, bool ok)
* rebuilding the index and notifying listeners though - identity action
* so volatile is no concern */
pthread_mutex_unlock(&active_clients.sync);
A12INT_DIRTRACE("dirsv:bchunk_state:appl_update=%d", cur->identifier);
anet_directory_shmifsrv_set(
(struct anet_dirsrv_opts*) active_clients.opts);
}
Expand Down Expand Up @@ -836,7 +837,9 @@ static void* dircl_process(void* P)
handle_bchunk_req(C, (char*) ev.ext.bchunk.extensions, ev.ext.bchunk.input);
}

/* bounce-back ack streamsatus */
else if (ev.ext.kind == EVENT_EXTERNAL_STREAMSTATUS){
shmifsrv_enqueue_event(C->C, &ev, -1);
if (C->pending_stream){
C->pending_stream = false;
handle_bchunk_completion(C, ev.ext.streamstat.completion >= 1.0);
Expand Down
59 changes: 39 additions & 20 deletions src/a12/net/dir_srv_worker.c
Expand Up @@ -695,6 +695,23 @@ static struct appl_meta* find_identifier(struct appl_meta* base, unsigned id)
return NULL;
}

static void pair_enqueue(
struct a12_state* S, struct arcan_shmif_cont *C, struct arcan_event ev)
{
struct evqueue_entry* rep = malloc(sizeof(struct evqueue_entry));

if (shmif_block_synch_request(C, ev, rep,
EVENT_EXTERNAL,
EVENT_EXTERNAL_STREAMSTATUS,
EVENT_EXTERNAL,
EVENT_EXTERNAL_STREAMSTATUS)){
run_evqueue(S, C, rep);
}

free_evqueue(rep);
a12_channel_enqueue(S, &ev);
}

static int request_parent_resource(
struct a12_state* S, struct arcan_shmif_cont *C, const char* id, bool out)
{
Expand Down Expand Up @@ -765,32 +782,34 @@ static struct a12_bhandler_res srv_bevent(
a12int_trace(
A12_TRACE_DIRECTORY, "kind=status:completed:identifier=%"PRIu16, M.identifier);
if (cbt->in_transfer && M.identifier == cbt->transfer_id){
struct arcan_event sack = (struct arcan_event){
.category = EVENT_EXTERNAL,
.ext.kind = EVENT_EXTERNAL_STREAMSTATUS,
.ext.streamstat = {
.completion = 1.0,
.identifier = M.identifier
}
};

/* with low enough latency and high enough server load this enqueue can be triggered
* while the event is still in flight, and we shut down before the parent gets to ack
* the update. */
cbt->in_transfer = false;
arcan_shmif_enqueue(cbt->C,
&(struct arcan_event){
.category = EVENT_EXTERNAL,
.ext.kind = EVENT_EXTERNAL_STREAMSTATUS,
.ext.streamstat = {
.completion = 1.0,
.identifier = M.identifier
}
}
);
pair_enqueue(cbt->S, cbt->C, sack);
}
break;
case A12_BHANDLER_CANCELLED:
if (cbt->in_transfer && M.identifier == cbt->transfer_id){
cbt->in_transfer = false;
arcan_shmif_enqueue(cbt->C,
&(struct arcan_event){
.category = EVENT_EXTERNAL,
.ext.kind = EVENT_EXTERNAL_STREAMSTATUS,
.ext.streamstat = {
.completion = -1,
.identifier = M.identifier
}
struct arcan_event sack = (struct arcan_event){
.category = EVENT_EXTERNAL,
.ext.kind = EVENT_EXTERNAL_STREAMSTATUS,
.ext.streamstat = {
.completion = -1,
.identifier = M.identifier
}
);
};
cbt->in_transfer = false;
pair_enqueue(cbt->S, cbt->C, sack);
}
else
a12int_trace(
Expand Down

0 comments on commit fa6025f

Please sign in to comment.