Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store schema in shared cache Fix logic regarding fetching one schema at a time #22245

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 10 additions & 4 deletions api/src/cache.ts
Expand Up @@ -130,6 +130,7 @@ export async function setSchemaCache(schema: SchemaOverview): Promise<void> {
const { localSchemaCache, sharedSchemaCache } = getCache();
const schemaHash = getSimpleHash(JSON.stringify(schema));

await sharedSchemaCache.set('schema', schema);
await sharedSchemaCache.set('hash', schemaHash);

await localSchemaCache.set('schema', schema);
Expand All @@ -139,13 +140,18 @@ export async function setSchemaCache(schema: SchemaOverview): Promise<void> {
export async function getSchemaCache(): Promise<SchemaOverview | undefined> {
const { localSchemaCache, sharedSchemaCache } = getCache();

const sharedSchemaHash = await sharedSchemaCache.get('hash');
const [localSchemaHash, sharedSchemaHash] = await Promise.all([
localSchemaCache.get('hash'),
sharedSchemaCache.get('hash'),
]);

if (!sharedSchemaHash) return;

const localSchemaHash = await localSchemaCache.get('hash');
if (!localSchemaHash || localSchemaHash !== sharedSchemaHash) return;
if (localSchemaHash && sharedSchemaHash && localSchemaHash === sharedSchemaHash) {
return await localSchemaCache.get('schema');
}

return await localSchemaCache.get('schema');
return await sharedSchemaCache.get('schema');
}

export async function setCacheValue(
Expand Down
97 changes: 58 additions & 39 deletions api/src/utils/get-schema.ts
Expand Up @@ -33,17 +33,18 @@ export async function getSchema(
): Promise<SchemaOverview> {
const MAX_ATTEMPTS = 3;

const env = useEnv();

if (attempt >= MAX_ATTEMPTS) {
throw new Error(`Failed to get Schema information: hit infinite loop`);
throw new Error(`Failed to get Schema information after ${MAX_ATTEMPTS} attempts`);
}

if (options?.bypassCache || env['CACHE_SCHEMA'] === false) {
const database = options?.database || getDatabase();
const schemaInspector = createInspector(database);
const env = useEnv();
const lock = useLock();
const bus = useBus();
const lockKey = 'schemaCache--preparing';
const messageKey = 'schemaCache--done';

return await getDatabaseSchema(database, schemaInspector);
if (options?.bypassCache || env['CACHE_SCHEMA'] === false) {
return await fetchAndCacheSchema();
}

const cached = await getSchemaCache();
Expand All @@ -52,58 +53,76 @@ export async function getSchema(
return cached;
}

const lock = useLock();
const bus = useBus();
return await handleSchemaCaching();

const lockKey = 'schemaCache--preparing';
const messageKey = 'schemaCache--done';
const processId = await lock.increment(lockKey);
async function fetchAndCacheSchema(): Promise<SchemaOverview> {
const database = options?.database || getDatabase();
const schemaInspector = createInspector(database);
const schema = await getDatabaseSchema(database, schemaInspector);
await setSchemaCache(schema);

if (processId >= (env['CACHE_SCHEMA_MAX_ITERATIONS'] as number)) {
await lock.delete(lockKey);
bus.publish(messageKey, { ready: true });

return schema;
}

const currentProcessShouldHandleOperation = processId === 1;
async function handleSchemaCaching(): Promise<SchemaOverview> {
const processId = await lock.increment(lockKey);

if (currentProcessShouldHandleOperation === false) {
logger.trace('Schema cache is prepared in another process, waiting for result.');
if (processId < 1) {
await lock.set(lockKey, 1);
}

return new Promise((resolve, reject) => {
const TIMEOUT = 10000;
if (processId >= (env['CACHE_SCHEMA_MAX_ITERATIONS'] as number)) {
await lock.delete(lockKey);
throw new Error('Maximum iterations reached for schema caching.');
}

const timeout: NodeJS.Timeout = setTimeout(() => {
logger.trace('Did not receive schema callback message in time. Pulling schema...');
callback().catch(reject);
}, TIMEOUT);
if (processId > 1) {
// Wait for schema cache

try {
const schema = await waitForSchemaCache();
// Got schema from someone else
await lock.increment(lockKey, -1);
return schema;
} catch (error) {
logger.warn(error);
// Since we couldn't get schema from others, let's fetch it ourselves
}
}

return await fetchAndCacheSchema();
}

async function waitForSchemaCache(): Promise<SchemaOverview> {
const TIMEOUT = 5000;
return new Promise((resolve, reject) => {
const timeout = setTimeout(
() => {
// Timeout reached. Pulling schema directly.
callback();
},
// Offput subsequent calls on the same container
Math.random() * TIMEOUT + 2000,
);

bus.subscribe(messageKey, callback);

async function callback() {
try {
if (timeout) clearTimeout(timeout);
async function callback(): Promise<void> {
clearTimeout(timeout);
bus.unsubscribe(messageKey, callback);

try {
const schema = await getSchema(options, attempt + 1);
resolve(schema);
} catch (error) {
reject(error);
} finally {
bus.unsubscribe(messageKey, callback);
}
}
});
}

try {
const database = options?.database || getDatabase();
const schemaInspector = createInspector(database);

const schema = await getDatabaseSchema(database, schemaInspector);
await setSchemaCache(schema);
return schema;
} finally {
await lock.delete(lockKey);
bus.publish(messageKey, { ready: true });
}
}

async function getDatabaseSchema(database: Knex, schemaInspector: SchemaInspector): Promise<SchemaOverview> {
Expand Down