Skip to content

Commit

Permalink
(a12) arcan-net(dircl) to lwa synch to messaging
Browse files Browse the repository at this point in the history
This adds the option to join and send/receive from an appl- tied message
group, along with net_open("@stdin") to actually connect the .lua APIs
accordingly. There are some minor quirks and testing to do with
multipart chunking along with appl-update push reconnect that seems to
trigger a spin-connect on the wrong key.
  • Loading branch information
letoram committed Nov 1, 2023
1 parent 4af8b02 commit 2e7aed3
Show file tree
Hide file tree
Showing 18 changed files with 495 additions and 53 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -45,6 +45,7 @@
* directory mode sourcing dynamic sinks (sink-inbound only)
* directory mode source-sink pairing 'reachable source' and 'tunnel' transfer modes
* changed packaging format for appls
* net\_open("@stdin") can now be used to access a per-directory-appl messaging group

## Terminal
* SGR reset fix, add CNL / CPL
Expand Down
9 changes: 6 additions & 3 deletions doc/net_discover.lua
Expand Up @@ -64,9 +64,12 @@
-- the local keystore, while basename/subname is a breakdown of components in a
-- fully qualified domain name. Multiple events may be used to provide a
-- complete reverse entry: basename.basename.subname (com.example.local) and is
-- terminated with an zero-length base- or subname. A12pub is a bit special in
-- the sense that it provides the public key for a previously unknown a12
-- connection based on the TRUST_PERMIT_UNKNOWN trust model argument.
-- terminated with an zero-length base- or subname.
--
-- If the *namespace* is a12pub, there is also a base64 coded form of the
-- public key presented as 'kpub'. This is useful when requesting to open that
-- as a source specifically where there might be ambiguity (directory) or a
-- change of petname (directory and broadcast discover).
--
-- The (source, sink and directory) states describe the capabilities of the
-- discovered node. If none of them are set, the capabilities could not be
Expand Down
45 changes: 45 additions & 0 deletions doc/net_open.lua
Expand Up @@ -6,8 +6,40 @@
-- speaking the a12 protocol. If *host* starts with an @ sign and matches a
-- known name in the keystore, the connection information and authentication
-- credentials will be picked from there.
--
-- The connection behaves just as if it had been initiated through
-- ref:launch_target or or ref:target_alloc.
--
-- If *host* starts with the reserved identifier @stdin it will try and
-- connect to an attached monitor and communicate through it with an external
-- directory server and the associated appl- group of others running the same
-- appl through the same directory.
--
-- The restricted short (alnum _) identifier used server side is determined
-- first by the arcan-net --ident argument (or when opening the directory
-- through ref:net_discover) and, on collision, generated by the directory
-- server. The one actually used will be provided in a 'message' event as
-- 'a12:join=XXXXXX' where the Xs are substituted with the actual identity.
--
-- Any higher level 'nickname' system is expected to be implemented as a nested
-- application specific protocol within the appl- message group.
--
-- Should the connection be severed, an a12:disconnected will be issued, and
-- when if/it can be resumed, a12:reconnected.
--
-- Message events received in this format must follow the argument packing in
-- builtin/string.lua:strings.unpack_shmif_argstr(src) (key=value with ':' as
-- separator, \t being substituted to ':'). They have quite short restrictions
-- (78b) and are not intended for large datastream serialisation since they can
-- have large amplification and trigger throttling in the processing chain. For
-- large transfers we have ref:open_nonblock or using the directory to create a
-- direct channel to a specific user. The server end will enforce a prefix of
-- from=name and reserve the use of an 'a12' key anywhere. It will reject
-- messages which attempts to use that.
--
-- @note: with @stdin:user expect deliveries to have multipart, meaning they
-- need to be concatenated until a terminating multipart=false arrives.
--
-- @group: network
-- @cfunction: net_open
-- @related: net_discover, launch_target
Expand All @@ -32,6 +64,19 @@ function main()
end)
#endif

#idef MAIN2
local vid = net_open("@stdin:me",
function(source, status)
if status.kind == "message" then

end
end
)
if not valid_vid(vid) then
print("not connected to a directory server")
end
#endif

#ifdef ERROR1
#endif
end
1 change: 1 addition & 0 deletions src/a12/net/a12_helper_srv.c
Expand Up @@ -28,6 +28,7 @@ struct shmifsrv_thread_data {
struct a12_state* S;
struct arcan_shmif_cont fake;
struct a12helper_opts opts;

float font_sz;
int kill_fd;
uint8_t chid;
Expand Down
112 changes: 107 additions & 5 deletions src/a12/net/dir_cl.c
Expand Up @@ -184,6 +184,11 @@ static void on_cl_event(

/* main use would be the appl- runner forwarding messages that direction */
a12int_trace(A12_TRACE_DIRECTORY, "event=%s", arcan_shmif_eventstr(ev, NULL, 0));
if (ev->category == EVENT_EXTERNAL &&
ev->ext.kind == EVENT_EXTERNAL_MESSAGE){
arcan_shmif_enqueue(&I->shmif, ev);
return;
}

/* we do have an ongoing transfer (--push-appl) that we wait for an OK or cancel
* before marking that we're ready to shutdown */
Expand Down Expand Up @@ -327,10 +332,12 @@ static pid_t exec_cpath(struct a12_state* S,
close(pstdin[0]);
close(pstdin[1]);
close(pstdout[0]);
close(STDERR_FILENO);
/*
* close(STDERR_FILENO);
close(STDOUT_FILENO);
open("/dev/null", O_WRONLY);
open("/dev/null", O_WRONLY);
*/
execvp(ctx->bin, argv);
exit(EXIT_FAILURE);
}
Expand All @@ -346,6 +353,34 @@ static pid_t exec_cpath(struct a12_state* S,
return pid;
}

static void runner_shmif(struct ioloop_shared* I)
{
arcan_event ev;

while (arcan_shmif_poll(&I->shmif, &ev) > 0){
if (ev.category != EVENT_TARGET){
continue;
}

if (ev.tgt.kind != TARGET_COMMAND_MESSAGE)
continue;

/* we need to flip the 'direction' as the other end expect us to behave like a
* shmif client, i.e. TARGET is from server to client, EXTERNAL is from client
*/
struct arcan_event out = {
.category = EVENT_EXTERNAL,
.ext.message.multipart = ev.tgt.ioevs[0].iv
};
_Static_assert(sizeof(out.ext.message.data) ==
sizeof(ev.tgt.message), "_event.h integrity");

memcpy(out.ext.message.data, ev.tgt.message, sizeof(out.ext.message.data));

a12_channel_enqueue(I->S, &out);
}
}

static void swap_appldir(const char* name, int basedir)
{
/* it is probably worth keeping track of the old here */
Expand Down Expand Up @@ -394,6 +429,59 @@ static void process_thread(struct ioloop_shared* I, bool ok)
* defined override state. */
fprintf(A->pf_stdin, "continue\n");
}

/* client wants to join the applgroup through a net_open call and has set up a
* connection point for us to access - this direction may seem a bit weird, but
* since we are on equal privilege and arcan-core does not have a way of
* hooking up a shmif_cont structure, only feeding it, this added the least
* amount of complexity across the chain. */
else if (strncmp(buf, "join ", 5) == 0){
if (I->shmif.addr){
arcan_shmif_drop(&I->shmif);
}

buf[strlen(buf)-1] = '\0';
char cbuf[strlen(buf) + strlen(I->cbt->clopt->basedir_path) + 1];
snprintf(cbuf, sizeof(cbuf), "%s/%s", I->cbt->clopt->basedir_path, &buf[5]);

/* strip \n and connect */
int dfd;
char* key = arcan_shmif_connect(cbuf, NULL, &dfd);
a12int_trace(A12_TRACE_DIRECTORY,
"appl_monitor:connect=%s:ok=%s", &buf[5], key ? "true":"false");
if (!key){
return;
}

I->shmif = arcan_shmif_acquire(NULL, key, SEGID_MEDIA, 0);
I->shmif.epipe = dfd;
I->on_shmif = runner_shmif;

if (!I->shmif.addr){
a12int_trace(A12_TRACE_DIRECTORY, "appl_monitor:connect_fail");
return;
}

/* join the message group for the running appl */
arcan_event ev = {
.category = EVENT_EXTERNAL,
.ext.kind = ARCAN_EVENT(IDENT)
};

size_t lim = sizeof(ev.ext.message.data)/sizeof(ev.ext.message.data[1]);
if (cbt->clopt->ident[0]){
snprintf(
(char*)ev.ext.message.data, lim, "%d:%s",
I->cbt->clopt->applid, cbt->clopt->ident
);
}
else
snprintf(
(char*)ev.ext.message.data, lim, "%d", I->cbt->clopt->applid);
a12_channel_enqueue(I->S, &ev);
arcan_shmif_resize(&I->shmif, 64, 64);
runner_shmif(I);
}
}

return;
Expand Down Expand Up @@ -475,10 +563,24 @@ static void process_thread(struct ioloop_shared* I, bool ok)
a12_enqueue_bstream(I->S, state_fd,
A12_BTYPE_STATE, cbt->clopt->applid, false, state_sz, empty_ext);
}
else if (!exec_res && !cbt->clopt->block_log){
fprintf(stderr, "sending crash report (%zu) bytes\n", state_sz);
a12_enqueue_bstream(I->S, state_fd,
A12_BTYPE_CRASHDUMP, cbt->clopt->applid, false, state_sz, empty_ext);
else if (!exec_res){
if (cbt->clopt->stderr_log){
FILE* fpek = fdopen(state_fd, "r");
while (!feof(fpek)){
char buf[4096];
size_t nr = fread(buf, 1, 4096, fpek);
if (nr)
fwrite(stderr, nr, 1, fpek);
}
fseek(fpek, 0, SEEK_SET);
fclose(fpek);
}

if (!cbt->clopt->block_log){
fprintf(stderr, "sending crash report (%zu) bytes\n", state_sz);
a12_enqueue_bstream(I->S, state_fd,
A12_BTYPE_CRASHDUMP, cbt->clopt->applid, false, state_sz, empty_ext);
}
}

out:
Expand Down

0 comments on commit 2e7aed3

Please sign in to comment.