Skip to content

Commit

Permalink
uer
Browse files Browse the repository at this point in the history
  • Loading branch information
laurent22 committed May 11, 2024
1 parent f9033c7 commit c99325c
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 19 deletions.
11 changes: 6 additions & 5 deletions packages/server/src/db.replication.test.ts
@@ -1,5 +1,5 @@
import { afterAllTests, beforeAllDb, beforeEachDb, createFolder, createUserAndSession, db, dbSlave, expectThrow, models, packageRootDir, updateFolder } from './utils/testing/testUtils';
import { connectDb, disconnectDb, isPostgres, reconnectDb, sqliteSyncSlave } from './db';
import { afterAllTests, beforeAllDb, beforeEachDb, createFolder, createUserAndSession, db, dbSlave, expectThrow, getDatabaseClientType, models, packageRootDir, updateFolder } from './utils/testing/testUtils';
import { connectDb, disconnectDb, reconnectDb, sqliteSyncSlave } from './db';
import { ChangeType, Event } from './services/database/types';
import { DatabaseConfig, DatabaseConfigClient } from './utils/types';
import { createDb } from './tools/dbTools';
Expand Down Expand Up @@ -29,7 +29,7 @@ const afterTest = async () => {
describe('db.replication', () => {

it('should reconnect a database', async () => {
if (isPostgres(db())) return;
if (getDatabaseClientType() === DatabaseConfigClient.PostgreSQL) return;

await beforeTest();

Expand Down Expand Up @@ -58,7 +58,7 @@ describe('db.replication', () => {
});

it('should manually sync an SQLite slave instance', async () => {
if (isPostgres(db())) return;
if (getDatabaseClientType() === DatabaseConfigClient.PostgreSQL) return;

const masterConfig: DatabaseConfig = {
client: DatabaseConfigClient.SQLite,
Expand Down Expand Up @@ -91,12 +91,13 @@ describe('db.replication', () => {
});

test('should track changes - using replication', async () => {
if (isPostgres(db())) return;
if (getDatabaseClientType() === DatabaseConfigClient.PostgreSQL) return;

await beforeTest({ DB_USE_SLAVE: '1' });

const { session, user } = await createUserAndSession(1, true);
const changeModel = models().change();
changeModel.usersWithReplication_ = [user.id];

const folder = {
id: '000000000000000000000000000000F1',
Expand Down
4 changes: 0 additions & 4 deletions packages/server/src/db.ts
Expand Up @@ -128,10 +128,6 @@ export async function waitForConnection(dbConfig: DatabaseConfig): Promise<Conne
}

export const clientType = (db: DbConnection): DatabaseConfigClient => {
// We have this extra check so that in tests we can quickly find out the database backend
// without having to use an initialized connection.
if (process.env.JOPLIN_TESTS_SERVER_DB === 'pg') return DatabaseConfigClient.PostgreSQL;
if (process.env.JOPLIN_TESTS_SERVER_DB === 'sqlite') return DatabaseConfigClient.SQLite;
return db.client.config.client;
};

Expand Down
3 changes: 3 additions & 0 deletions packages/server/src/env.ts
Expand Up @@ -22,6 +22,7 @@ const defaultEnvValues: EnvVariables = {
ERROR_STACK_TRACES: false,
COOKIES_SECURE: false,
RUNNING_IN_DOCKER: false,
USERS_WITH_REPLICATION: '', // Temporary

// The admin panel is accessible only if this is an admin instance.
// Additionally, processing services (those defined in setupTaskService.ts)
Expand Down Expand Up @@ -150,6 +151,8 @@ export interface EnvVariables {
ERROR_STACK_TRACES: boolean;
COOKIES_SECURE: boolean;
RUNNING_IN_DOCKER: boolean;
USERS_WITH_REPLICATION: string;

MAX_TIME_DRIFT: number;
NTP_SERVER: string;
DELTA_INCLUDES_ITEMS: boolean;
Expand Down
11 changes: 9 additions & 2 deletions packages/server/src/models/BaseModel.ts
Expand Up @@ -69,12 +69,14 @@ export default abstract class BaseModel<T> {
private modelFactory_: NewModelFactoryHandler;
private config_: Config;
private savePoints_: SavePoint[] = [];
public usersWithReplication_: string[] = [];

public constructor(db: DbConnection, dbSlave: DbConnection, modelFactory: NewModelFactoryHandler, config: Config) {
this.db_ = db;
this.dbSlave_ = dbSlave;
this.modelFactory_ = modelFactory;
this.config_ = config;
this.usersWithReplication_ = config.USERS_WITH_REPLICATION ? config.USERS_WITH_REPLICATION.split(',') : [];

this.transactionHandler_ = new TransactionHandler(db);
}
Expand Down Expand Up @@ -115,8 +117,13 @@ export default abstract class BaseModel<T> {
return this.db_;
}

public get dbSlave(): DbConnection {
return this.dbSlave_;
public dbSlave(userId: Uuid = ''): DbConnection {
if (userId && this.usersWithReplication_.includes(userId)) {
logger.info(`Using slave database for user: ${userId}`);
return this.dbSlave_;
}

return this.db_;
}

protected get defaultFields(): string[] {
Expand Down
8 changes: 4 additions & 4 deletions packages/server/src/models/ChangeModel.ts
Expand Up @@ -199,8 +199,8 @@ export default class ChangeModel extends BaseModel<Change> {
if (!doCountQuery) {
finalParams.push(limit);

if (isPostgres(this.dbSlave)) {
query = this.dbSlave.raw(`
if (isPostgres(this.dbSlave(userId))) {
query = this.dbSlave(userId).raw(`
WITH cte1 AS MATERIALIZED (
${subQuery1}
)
Expand All @@ -214,7 +214,7 @@ export default class ChangeModel extends BaseModel<Change> {
LIMIT ?
`, finalParams);
} else {
query = this.dbSlave.raw(`
query = this.dbSlave(userId).raw(`
SELECT ${fieldsSql} FROM (${subQuery1}) as sub1
UNION ALL
SELECT ${fieldsSql} FROM (${subQuery2}) as sub2
Expand All @@ -223,7 +223,7 @@ export default class ChangeModel extends BaseModel<Change> {
`, finalParams);
}
} else {
query = this.dbSlave.raw(`
query = this.dbSlave(userId).raw(`
SELECT count(*) as total
FROM (
(${subQuery1})
Expand Down
4 changes: 2 additions & 2 deletions packages/server/src/models/ItemModel.ts
Expand Up @@ -102,7 +102,7 @@ export default class ItemModel extends BaseModel<Item> {
let driver = ItemModel.storageDrivers_.get(config);

if (!driver) {
driver = await loadStorageDriver(config, this.db, this.dbSlave);
driver = await loadStorageDriver(config, this.db, this.dbSlave());
ItemModel.storageDrivers_.set(config, driver);
}

Expand Down Expand Up @@ -331,7 +331,7 @@ export default class ItemModel extends BaseModel<Item> {
let fromDriver: StorageDriverBase = drivers[item.content_storage_id];

if (!fromDriver) {
fromDriver = await loadStorageDriver(item.content_storage_id, this.db, this.dbSlave);
fromDriver = await loadStorageDriver(item.content_storage_id, this.db, this.dbSlave());
drivers[item.content_storage_id] = fromDriver;
}

Expand Down
9 changes: 7 additions & 2 deletions packages/server/src/utils/testing/testUtils.ts
Expand Up @@ -2,7 +2,7 @@ import { DbConnection, connectDb, disconnectDb, truncateTables } from '../../db'
import { User, Session, Item, Uuid } from '../../services/database/types';
import { createDb, CreateDbOptions } from '../../tools/dbTools';
import modelFactory from '../../models/factory';
import { AppContext, Env } from '../types';
import { AppContext, DatabaseConfigClient, Env } from '../types';
import config, { initConfig } from '../../config';
import Logger from '@joplin/utils/Logger';
import FakeCookies from './koa/FakeCookies';
Expand Down Expand Up @@ -69,6 +69,11 @@ function initGlobalLogger() {
initLib(globalLogger);
}

export const getDatabaseClientType = () => {
if (process.env.JOPLIN_TESTS_SERVER_DB === 'pg') return DatabaseConfigClient.PostgreSQL;
return DatabaseConfigClient.SQLite;
};

let createdDbPath_: string = null;
let createdDbSlavePath_: string = null;
export async function beforeAllDb(unitName: string, createDbOptions: CreateDbOptions = null, extraEnv: Record<string, string> = null) {
Expand All @@ -89,7 +94,7 @@ export async function beforeAllDb(unitName: string, createDbOptions: CreateDbOpt
//
// JOPLIN_TESTS_SERVER_DB=pg yarn test

if (process.env.JOPLIN_TESTS_SERVER_DB === 'pg') {
if (getDatabaseClientType() === DatabaseConfigClient.PostgreSQL) {
await initConfig(Env.Dev, parseEnv({
DB_CLIENT: 'pg',

Expand Down

0 comments on commit c99325c

Please sign in to comment.