Skip to content

Commit

Permalink
feat: Use modern deno streams (#410)
Browse files Browse the repository at this point in the history
* modern streams!!

Signed-off-by: Jersey <wgyt735yt@gmail.com>

* forgot the hashtag

Signed-off-by: Jersey <wgyt735yt@gmail.com>

* fix the issue

Signed-off-by: Jersey <wgyt735yt@gmail.com>

* try to prevent negatives here

Signed-off-by: Jersey <wgyt735yt@gmail.com>

---------

Signed-off-by: Jersey <wgyt735yt@gmail.com>
  • Loading branch information
williamhorning committed Mar 23, 2024
1 parent 749d804 commit eff419f
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 13 deletions.
1 change: 0 additions & 1 deletion deps.ts
Expand Up @@ -19,4 +19,3 @@ export {
export { crypto as stdCrypto } from "jsr:@std/crypto@^0.220.1/crypto";
export { decodeBase64, encodeBase64 } from "jsr:@std/encoding@^0.220.1/base64";
export { encodeHex } from "jsr:@std/encoding@^0.220.1/hex";
export { BufReader, writeAll } from "jsr:@std/io@^0.220.1";
31 changes: 19 additions & 12 deletions src/protocol/protocol.ts
@@ -1,4 +1,3 @@
import { BufReader, writeAll } from "../../deps.ts";
import {
MongoDriverError,
MongoErrorInfo,
Expand All @@ -9,7 +8,6 @@ import { handshake } from "./handshake.ts";
import { parseHeader } from "./header.ts";
import { deserializeMessage, Message, serializeMessage } from "./message.ts";

type Socket = Deno.Reader & Deno.Writer;
interface CommandTask {
requestId: number;
db: string;
Expand All @@ -19,7 +17,7 @@ interface CommandTask {
let nextRequestId = 0;

export class WireProtocol {
#socket: Socket;
#conn: Deno.Conn;
#isPendingResponse = false;
#isPendingRequest = false;
#pendingResponses: Map<number, {
Expand All @@ -28,12 +26,10 @@ export class WireProtocol {
// deno-lint-ignore no-explicit-any
reject: (reason?: any) => void;
}> = new Map();
#reader: BufReader;
#commandQueue: CommandTask[] = [];

constructor(socket: Socket) {
this.#socket = socket;
this.#reader = new BufReader(this.#socket);
constructor(socket: Deno.Conn) {
this.#conn = socket;
}

async connect() {
Expand Down Expand Up @@ -98,7 +94,9 @@ export class WireProtocol {
],
});

await writeAll(this.#socket, buffer);
const w = this.#conn.writable.getWriter();
await w.write(buffer);
w.releaseLock();
}
this.#isPendingRequest = false;
}
Expand All @@ -107,14 +105,14 @@ export class WireProtocol {
if (this.#isPendingResponse) return;
this.#isPendingResponse = true;
while (this.#pendingResponses.size > 0) {
const headerBuffer = await this.#reader.readFull(new Uint8Array(16));
const headerBuffer = await this.read_socket(16);
if (!headerBuffer) {
throw new MongoDriverError("Invalid response header");
}
const header = parseHeader(headerBuffer);
const bodyBuffer = await this.#reader.readFull(
new Uint8Array(header.messageLength - 16),
);
let bodyBytes = header.messageLength - 16;
if (bodyBytes < 0) bodyBytes = 0;
const bodyBuffer = await this.read_socket(header.messageLength - 16);
if (!bodyBuffer) {
throw new MongoDriverError("Invalid response body");
}
Expand All @@ -125,4 +123,13 @@ export class WireProtocol {
}
this.#isPendingResponse = false;
}

private async read_socket(
b: number,
): Promise<Uint8Array | undefined> {
const reader = this.#conn.readable.getReader({ mode: "byob" });
const { value } = await reader.read(new Uint8Array(b));
reader.releaseLock();
return value;
}
}

0 comments on commit eff419f

Please sign in to comment.