Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(electric): Make it possible to select the SQL dialect in /api/migrations and Satellite #1065

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 29 additions & 0 deletions clients/typescript/src/_generated/protocol/satellite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ export interface SatInStartReplicationReq {
* observed additional data before disconnect
*/
observedTransactionData: Long[];
/**
* The SQL dialect used by the client
* Defaults to SQLite if not specified
*/
sqlDialect?: SatInStartReplicationReq_Dialect | undefined;
}

export enum SatInStartReplicationReq_Option {
Expand All @@ -130,6 +135,12 @@ export enum SatInStartReplicationReq_Option {
UNRECOGNIZED = -1,
}

export enum SatInStartReplicationReq_Dialect {
SQLITE = 0,
POSTGRES = 1,
UNRECOGNIZED = -1,
}

/** (Producer) The result of the start replication requests */
export interface SatInStartReplicationResp {
$type: "Electric.Satellite.SatInStartReplicationResp";
Expand Down Expand Up @@ -457,9 +468,15 @@ export interface SatOpMigrate_PgColumnType {
size: number[];
}

/** reserved 2; */
export interface SatOpMigrate_Column {
$type: "Electric.Satellite.SatOpMigrate.Column";
name: string;
/**
* deprecated
* leaving it here to avoid breaking TypeScript tests that have hard-coded,
* base64-encoded SatOpMigrate messages.
*/
sqliteType: string;
pgType: SatOpMigrate_PgColumnType | undefined;
}
Expand Down Expand Up @@ -1102,6 +1119,7 @@ function createBaseSatInStartReplicationReq(): SatInStartReplicationReq {
subscriptionIds: [],
schemaVersion: undefined,
observedTransactionData: [],
sqlDialect: undefined,
};
}

Expand All @@ -1128,6 +1146,9 @@ export const SatInStartReplicationReq = {
writer.uint64(v);
}
writer.ldelim();
if (message.sqlDialect !== undefined) {
writer.uint32(56).int32(message.sqlDialect);
}
return writer;
},

Expand Down Expand Up @@ -1193,6 +1214,13 @@ export const SatInStartReplicationReq = {
}

break;
case 7:
if (tag !== 56) {
break;
}

message.sqlDialect = reader.int32() as any;
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
Expand All @@ -1213,6 +1241,7 @@ export const SatInStartReplicationReq = {
message.subscriptionIds = object.subscriptionIds?.map((e) => e) || [];
message.schemaVersion = object.schemaVersion ?? undefined;
message.observedTransactionData = object.observedTransactionData?.map((e) => Long.fromValue(e)) || [];
message.sqlDialect = object.sqlDialect ?? undefined;
return message;
},
};
Expand Down
10 changes: 9 additions & 1 deletion clients/typescript/src/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,18 @@ export interface ElectricConfig {
connectionBackOffOptions?: ConnectionBackOffOptions
}

export type ElectricConfigWithDialect = ElectricConfig & {
dialect?: 'SQLite' | 'Postgres'
}

export type HydratedConfig = {
auth: AuthConfig
replication: {
host: string
port: number
ssl: boolean
timeout: number
dialect: 'SQLite' | 'Postgres'
}
debug: boolean
connectionBackOffOptions: ConnectionBackOffOptions
Expand All @@ -68,7 +73,9 @@ export type InternalElectricConfig = {
connectionBackOffOptions?: ConnectionBackOffOptions
}

export const hydrateConfig = (config: ElectricConfig): HydratedConfig => {
export const hydrateConfig = (
config: ElectricConfigWithDialect
): HydratedConfig => {
const auth = config.auth ?? {}

const debug = config.debug ?? false
Expand All @@ -86,6 +93,7 @@ export const hydrateConfig = (config: ElectricConfig): HydratedConfig => {
port: port,
ssl: sslEnabled,
timeout: config.timeout ?? 3000,
dialect: config.dialect ?? 'SQLite',
}

const {
Expand Down
4 changes: 2 additions & 2 deletions clients/typescript/src/electric/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ElectricConfig, hydrateConfig } from '../config/index'
import { ElectricConfigWithDialect, hydrateConfig } from '../config/index'
import { DatabaseAdapter } from './adapter'
import { BundleMigrator, Migrator } from '../migrators/index'
import { EventNotifier, Notifier } from '../notifiers/index'
Expand Down Expand Up @@ -47,7 +47,7 @@ export const electrify = async <DB extends DbSchema<any>>(
dbDescription: DB,
adapter: DatabaseAdapter,
socketFactory: SocketFactory,
config: ElectricConfig = {},
config: ElectricConfigWithDialect = {},
opts?: Omit<ElectrifyOptions, 'adapter' | 'socketFactory'>
): Promise<ElectricClient<DB>> => {
setLogLevel(config.debug ? 'TRACE' : 'WARN')
Expand Down
25 changes: 9 additions & 16 deletions clients/typescript/src/migrators/triggers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,7 @@ type ForeignKey = {
}

type ColumnName = string
type SQLiteType = string
type PgType = string
type ColumnType = {
sqliteType: SQLiteType
pgType: PgType
}
type ColumnType = string
type ColumnTypes = Record<ColumnName, ColumnType>

export type Table = {
Expand Down Expand Up @@ -71,11 +66,11 @@ export function generateOplogTriggers(
SELECT
CASE
${primary
.map(
(col) =>
`WHEN old."${col}" != new."${col}" THEN\n\t\tRAISE (ABORT, 'cannot change the value of column ${col} as it belongs to the primary key')`
)
.join('\n')}
.map(
(col) =>
`WHEN old."${col}" != new."${col}" THEN\n\t\tRAISE (ABORT, 'cannot change the value of column ${col} as it belongs to the primary key')`
)
.join('\n')}
END;
END;
`,
Expand Down Expand Up @@ -283,17 +278,15 @@ function joinColsForJSON(
// Perform transformations on some columns to ensure consistent
// serializability into JSON
const transformIfNeeded = (col: string, targetedCol: string) => {
const tpes = colTypes[col]
const sqliteType = tpes.sqliteType
const pgType = tpes.pgType
const pgType = colTypes[col]

// cast REALs, INT8s, BIGINTs to TEXT to work around SQLite's `json_object` bug
if (sqliteType === 'REAL' || pgType === 'INT8' || pgType === 'BIGINT') {
if (pgType === 'FLOAT4' || pgType === 'REAL' || pgType === 'INT8' || pgType === 'BIGINT') {
return `cast(${targetedCol} as TEXT)`
}

// transform blobs/bytestrings into hexadecimal strings for JSON encoding
if (sqliteType === 'BLOB' || pgType === 'BYTEA') {
if (pgType === 'BYTEA') {
return `CASE WHEN ${targetedCol} IS NOT NULL THEN hex(${targetedCol}) ELSE NULL END`
}
return targetedCol
Expand Down
15 changes: 12 additions & 3 deletions clients/typescript/src/satellite/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {
Root,
RootClientImpl,
SatRpcRequest,
SatInStartReplicationReq_Dialect,
} from '../_generated/protocol/satellite'
import {
getObjFromString,
Expand Down Expand Up @@ -131,6 +132,7 @@ type EventEmitter = AsyncEventEmitter<Events>

export class SatelliteClient implements Client {
private opts: Required<SatelliteClientOpts>
private dialect: SatInStartReplicationReq_Dialect

private emitter: EventEmitter

Expand Down Expand Up @@ -194,6 +196,10 @@ export class SatelliteClient implements Client {
this.emitter = new AsyncEventEmitter<Events>()

this.opts = { ...satelliteClientDefaults, ...opts }
this.dialect =
opts.dialect === 'SQLite'
? SatInStartReplicationReq_Dialect.SQLITE
: SatInStartReplicationReq_Dialect.POSTGRES
this.socketFactory = socketFactory

this.inbound = this.resetInboundReplication()
Expand Down Expand Up @@ -365,7 +371,10 @@ export class SatelliteClient implements Client {
)
)
}
request = SatInStartReplicationReq.fromPartial({ schemaVersion })
request = SatInStartReplicationReq.fromPartial({
schemaVersion,
sqlDialect: this.dialect,
})
} else {
Log.info(
`starting replication with lsn: ${base64.fromBytes(
Expand All @@ -376,6 +385,7 @@ export class SatelliteClient implements Client {
lsn,
subscriptionIds,
observedTransactionData,
sqlDialect: this.dialect,
})
}

Expand Down Expand Up @@ -845,8 +855,7 @@ export class SatelliteClient implements Client {
'error',
new SatelliteError(
SatelliteErrorCode.UNEXPECTED_STATE,
`unexpected state ${
ReplicationStatus[this.inbound.isReplicating]
`unexpected state ${ReplicationStatus[this.inbound.isReplicating]
} handling 'relation' message`
)
)
Expand Down
1 change: 1 addition & 0 deletions clients/typescript/src/satellite/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ export interface SatelliteClientOpts {
ssl: boolean
timeout: number
pushPeriod?: number
dialect: 'SQLite' | 'Postgres'
}

export const validateConfig = (config: any) => {
Expand Down
8 changes: 1 addition & 7 deletions clients/typescript/src/satellite/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1651,13 +1651,7 @@ export function generateTriggersForTable(tbl: MigrationTable): Statement[] {
}
}),
columnTypes: Object.fromEntries(
tbl.columns.map((col) => [
col.name,
{
sqliteType: col.sqliteType.toUpperCase(),
pgType: col.pgType!.name.toUpperCase(),
},
])
tbl.columns.map((col) => [col.name, col.pgType!.name.toUpperCase()])
),
}
const fullTableName = table.namespace + '.' + table.tableName
Expand Down
1 change: 1 addition & 0 deletions clients/typescript/src/satellite/registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ export class GlobalRegistry extends BaseRegistry {
port: config.replication.port,
ssl: config.replication.ssl,
timeout: config.replication.timeout,
dialect: config.replication.dialect,
}

const client = new SatelliteClient(
Expand Down
1 change: 1 addition & 0 deletions clients/typescript/test/satellite/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ test.beforeEach((t) => {
timeout: 10000,
ssl: false,
pushPeriod: 100,
dialect: 'SQLite',
})
const clientId = '91eba0c8-28ba-4a86-a6e8-42731c2c6694'

Expand Down
12 changes: 6 additions & 6 deletions clients/typescript/test/satellite/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -366,11 +366,11 @@ export const personTable: Table = {
primary: ['id'],
foreignKeys: [],
columnTypes: {
id: { sqliteType: 'REAL', pgType: PgBasicType.PG_REAL },
name: { sqliteType: 'TEXT', pgType: PgBasicType.PG_TEXT },
age: { sqliteType: 'INTEGER', pgType: PgBasicType.PG_INTEGER },
bmi: { sqliteType: 'REAL', pgType: PgBasicType.PG_REAL },
int8: { sqliteType: 'INTEGER', pgType: PgBasicType.PG_INT8 },
blob: { sqliteType: 'BLOB', pgType: PgBasicType.PG_BYTEA },
id: PgBasicType.PG_REAL,
name: PgBasicType.PG_TEXT,
age: PgBasicType.PG_INTEGER,
bmi: PgBasicType.PG_REAL,
int8: PgBasicType.PG_INT8,
blob: PgBasicType.PG_BYTEA,
},
}
1 change: 1 addition & 0 deletions components/electric/lib/electric/plug/migrations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ defmodule Electric.Plug.Migrations do
defp get_dialect(%{query_params: %{"dialect" => dialect_name}}) do
case dialect_name do
"sqlite" -> {:ok, Electric.Postgres.Dialect.SQLite}
"postgresql" -> {:ok, Electric.Postgres.Dialect.Postgresql}
_ -> {:error, "unsupported dialect #{inspect(dialect_name)}"}
end
end
Expand Down
26 changes: 12 additions & 14 deletions components/electric/lib/electric/postgres/replication.ex
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ defmodule Electric.Postgres.Replication do
{:ok, [], []}

propagate_ast ->
{msg, relations} = build_replication_msg(propagate_ast, schema_version, dialect)
{msg, relations} = build_replication_msg(propagate_ast, stmt, schema_version, dialect)

{:ok, [msg], relations}
end
Expand All @@ -90,6 +90,9 @@ defmodule Electric.Postgres.Replication do
end
end

defp to_sql(_ast, stmt, Dialect.Postgresql), do: stmt
defp to_sql(ast, _stmt, dialect), do: Dialect.to_sql(ast, dialect)

def affected_tables(stmts, dialect \\ @default_dialect) when is_list(stmts) do
stmts
|> Enum.flat_map(&get_affected_table/1)
Expand All @@ -112,7 +115,7 @@ defmodule Electric.Postgres.Replication do
[]
end

defp build_replication_msg(ast, schema_version, dialect) do
defp build_replication_msg(ast, stmt, schema_version, dialect) do
affected_tables = affected_tables(ast, dialect)

relations = Enum.map(affected_tables, &{&1.schema, &1.name})
Expand All @@ -133,7 +136,7 @@ defmodule Electric.Postgres.Replication do
ast,
&%SatOpMigrate.Stmt{
type: stmt_type(&1),
sql: Dialect.to_sql(&1, dialect)
sql: to_sql(&1, stmt, dialect)
}
)

Expand Down Expand Up @@ -168,17 +171,17 @@ defmodule Electric.Postgres.Replication do
defp replication_msg_table(%Proto.Table{} = table, dialect) do
%SatOpMigrate.Table{
name: Dialect.table_name(table.name, dialect),
columns: Enum.map(table.columns, &replication_msg_table_col(&1, dialect)),
columns: Enum.map(table.columns, &replication_msg_table_col(&1)),
fks: Enum.flat_map(table.constraints, &replication_msg_table_fk(&1, dialect)),
pks: Enum.flat_map(table.constraints, &replication_msg_table_pk(&1, dialect))
pks: Enum.flat_map(table.constraints, &replication_msg_table_pk(&1))
}
end

defp replication_msg_table_col(%Proto.Column{} = column, dialect) do
defp replication_msg_table_col(%Proto.Column{} = column) do
%SatOpMigrate.Column{
name: column.name,
pg_type: replication_msg_table_col_type(column.type),
sqlite_type: Dialect.type_name(column.type, dialect)
sqlite_type: Dialect.type_name(column.type, Dialect.SQLite)
}
end

Expand All @@ -190,13 +193,8 @@ defmodule Electric.Postgres.Replication do
}
end

defp replication_msg_table_pk(%Proto.Constraint{constraint: {:primary, pk}}, _dialect) do
pk.keys
end

defp replication_msg_table_pk(_constraint, _dialect) do
[]
end
defp replication_msg_table_pk(%Proto.Constraint{constraint: {:primary, pk}}), do: pk.keys
defp replication_msg_table_pk(_constraint), do: []

defp replication_msg_table_fk(%Proto.Constraint{constraint: {:foreign, fk}}, dialect) do
[
Expand Down