Skip to content

Commit

Permalink
tests: rewrite topic tests to use completely separate topic/sub pairs…
Browse files Browse the repository at this point in the history
… to avoid flakes (#1397)
  • Loading branch information
feywind committed Sep 23, 2021
1 parent a7d9c15 commit be607ea
Showing 1 changed file with 85 additions and 95 deletions.
180 changes: 85 additions & 95 deletions samples/system-test/topics.test.ts
Expand Up @@ -12,48 +12,65 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import {Message, PubSub, Subscription} from '@google-cloud/pubsub';
import {Message, PubSub, Topic, Subscription} from '@google-cloud/pubsub';
import {assert} from 'chai';
import {describe, it, before, after} from 'mocha';
import {describe, it, after} from 'mocha';
import {execSync, commandFor} from './common';
import * as uuid from 'uuid';

interface TSPair {
tname: string;
t: Topic;
s: Subscription;
}

describe('topics', () => {
const projectId = process.env.GCLOUD_PROJECT;
const pubsub = new PubSub({projectId});
const runId = uuid.v4();
console.log(`Topics runId: ${runId}`);
const topicNameOne = `top1-${runId}`;
const topicNameTwo = `top2-${runId}`;
const topicNameThree = `top3-${runId}`;
const subscriptionNameOne = `sub1-${runId}`;
const subscriptionNameTwo = `sub2-${runId}`;
const subscriptionNameThree = `sub3-${runId}`;
const subscriptionNameFour = `sub4-${runId}`;
const subscriptionNameFive = `sub5-${runId}`;
const fullTopicNameOne = `projects/${projectId}/topics/${topicNameOne}`;
const fullTopicNameThree = `projects/${projectId}/topics/${topicNameThree}`;
const expectedMessage = {data: 'Hello, world!'};

before(async () => {
// topicNameOne is created during the createTopic test.
await pubsub.createTopic(topicNameTwo);
await pubsub.createTopic(topicNameThree);
});
let topicCounter = 0;
function topicName(): string {
return `topic-test-${runId}-topic-${++topicCounter}`;
}

let subCounter = 0;
function subName(): string {
return `topic-test-${runId}-sub-${++subCounter}`;
}

function fullTopicName(name: string): string {
return `projects/${projectId}/topics/${name}`;
}

async function createPair(): Promise<TSPair> {
const tname = topicName(),
sname = subName();
const [topic] = await pubsub.topic(tname).get({autoCreate: true});
const [sub] = await topic.subscription(sname).get({autoCreate: true});

return {t: topic, tname, s: sub};
}

async function cleanSubs() {
const [subscriptions] = await pubsub.getSubscriptions();
await Promise.all(
subscriptions.filter(x => x.name.endsWith(runId)).map(x => x.delete())
subscriptions.filter(x => x.name.indexOf(runId) >= 0).map(x => x.delete())
);
}

after(async () => {
await cleanSubs();
async function cleanTopics() {
const [topics] = await pubsub.getTopics();
await Promise.all(
topics.filter(x => x.name.endsWith(runId)).map(x => x.delete())
topics.filter(x => x.name.indexOf(runId) >= 0).map(x => x.delete())
);
}

after(async () => {
await cleanSubs();
await cleanTopics();
});

// Helper function to pull one message.
Expand All @@ -73,77 +90,60 @@ describe('topics', () => {
};

it('should create a topic', async () => {
const output = execSync(`${commandFor('createTopic')} ${topicNameOne}`);
assert.include(output, `Topic ${topicNameOne} created.`);
const name = topicName();
const output = execSync(`${commandFor('createTopic')} ${name}`);
assert.include(output, `Topic ${name} created.`);
const [topics] = await pubsub.getTopics();
const exists = topics.some(t => t.name === fullTopicNameOne);
const exists = topics.some(t => t.name === fullTopicName(name));
assert.ok(exists, 'Topic was created');
});

it('should list topics', async () => {
const pair = await createPair();
const output = execSync(`${commandFor('listAllTopics')}`);
assert.include(output, 'Topics:');
assert.include(output, fullTopicNameThree);
assert.include(output, pair.t.name);
});

it('should publish a simple message', async () => {
const [subscription] = await pubsub
.topic(topicNameThree)
.subscription(subscriptionNameOne)
.get({autoCreate: true});
const pair = await createPair();
execSync(
`${commandFor('publishMessage')} ${topicNameThree} "${
expectedMessage.data
}"`
`${commandFor('publishMessage')} ${pair.tname} "${expectedMessage.data}"`
);
const receivedMessage = await _pullOneMessage(subscription);
const receivedMessage = await _pullOneMessage(pair.s);
assert.strictEqual(receivedMessage.data.toString(), expectedMessage.data);
});

it('should publish with flow control', async () => {
const [subscription] = await pubsub
.topic(topicNameThree)
.subscription(subscriptionNameOne)
.get({autoCreate: true});
const pair = await createPair();
const output = execSync(
`${commandFor('publishWithFlowControl')} ${topicNameThree}`
`${commandFor('publishWithFlowControl')} ${pair.tname}`
);
const receivedMessage = await _pullOneMessage(subscription);
const receivedMessage = await _pullOneMessage(pair.s);
assert.strictEqual(receivedMessage.data.toString(), 'test!');
assert.include(output, 'Published 1000 with flow control settings');

// Junk any remaining published messages.
await subscription.seek(new Date());
});

it('should publish a JSON message', async () => {
const [subscription] = await pubsub
.topic(topicNameThree)
.subscription(subscriptionNameOne)
.get({autoCreate: true});
const pair = await createPair();
execSync(
`${commandFor('publishMessage')} ${topicNameThree} "${
expectedMessage.data
}"`
`${commandFor('publishMessage')} ${pair.tname} "${expectedMessage.data}"`
);
const receivedMessage = await _pullOneMessage(subscription);
const receivedMessage = await _pullOneMessage(pair.s);
assert.deepStrictEqual(
receivedMessage.data.toString(),
expectedMessage.data
);
});

it('should publish a message with custom attributes', async () => {
const [subscription] = await pubsub
.topic(topicNameThree)
.subscription(subscriptionNameOne)
.get({autoCreate: true});
const pair = await createPair();
execSync(
`${commandFor('publishMessageWithCustomAttributes')} ${topicNameThree} "${
`${commandFor('publishMessageWithCustomAttributes')} ${pair.tname} "${
expectedMessage.data
}"`
);
const receivedMessage = await _pullOneMessage(subscription);
const receivedMessage = await _pullOneMessage(pair.s);
assert.strictEqual(receivedMessage.data.toString(), expectedMessage.data);
assert.deepStrictEqual(receivedMessage.attributes, {
origin: 'nodejs-sample',
Expand All @@ -152,36 +152,29 @@ describe('topics', () => {
});

it('should publish ordered messages', async () => {
const [subscription] = await pubsub
.topic(topicNameTwo)
.subscription(subscriptionNameTwo)
.get({autoCreate: true});

const pair = await createPair();
execSync(
`${commandFor('publishOrderedMessage')} ${topicNameTwo} "${
`${commandFor('publishOrderedMessage')} ${pair.tname} "${
expectedMessage.data
}" my-key`
);
const message = await _pullOneMessage(subscription);
const message = await _pullOneMessage(pair.s);
assert.strictEqual(message.orderingKey, 'my-key');
assert.strictEqual(message.data.toString(), expectedMessage.data);
});

it('should publish with specific batch settings', async () => {
const maxMessages = 10;
const waitTime = 1000;
const [subscription] = await pubsub
.topic(topicNameThree)
.subscription(subscriptionNameThree)
.get({autoCreate: true});
const pair = await createPair();
const startTime = Date.now();
execSync(
`${commandFor('publishBatchedMessages')} ${topicNameThree} "${
`${commandFor('publishBatchedMessages')} ${pair.tname} "${
expectedMessage.data
}" ${maxMessages} ${waitTime}`
);

const {data, publishTime} = await _pullOneMessage(subscription);
const {data, publishTime} = await _pullOneMessage(pair.s);
const actualWait = publishTime.getTime() - startTime;
const acceptableLatency = 300;

Expand All @@ -194,38 +187,33 @@ describe('topics', () => {
});

it('should resume publish', async () => {
const [subscription] = await pubsub
.topic(topicNameTwo)
.subscription(subscriptionNameFive)
.get({autoCreate: true});

const pair = await createPair();
execSync(
`${commandFor('resumePublish')} ${topicNameTwo} "${
`${commandFor('resumePublish')} ${pair.tname} "${
expectedMessage.data
}" my-key`
);
const message = await _pullOneMessage(subscription);
const message = await _pullOneMessage(pair.s);
assert.strictEqual(message.orderingKey, 'my-key');
assert.strictEqual(message.data.toString(), expectedMessage.data);
});

it('should publish with retry settings', async () => {
const [subscription] = await pubsub
.topic(topicNameThree)
.subscription(subscriptionNameFour)
.get({autoCreate: true});
const pair = await createPair();
execSync(
`${commandFor(
'publishWithRetrySettings'
)} ${projectId} ${topicNameThree} "${expectedMessage.data}"`
`${commandFor('publishWithRetrySettings')} ${projectId} ${pair.tname} "${
expectedMessage.data
}"`
);
const receivedMessage = await _pullOneMessage(subscription);
const receivedMessage = await _pullOneMessage(pair.s);
assert.strictEqual(receivedMessage.data.toString(), expectedMessage.data);
});

it('should set the IAM policy for a topic', async () => {
execSync(`${commandFor('setTopicPolicy')} ${topicNameThree}`);
const results = await pubsub.topic(topicNameThree).iam.getPolicy();
const pair = await createPair();

execSync(`${commandFor('setTopicPolicy')} ${pair.tname}`);
const results = await pair.t.iam.getPolicy();
const [policy] = results;
assert.deepStrictEqual(policy.bindings, [
{
Expand All @@ -242,27 +230,29 @@ describe('topics', () => {
});

it('should get the IAM policy for a topic', async () => {
const [policy] = await pubsub.topic(topicNameThree).iam.getPolicy();
const output = execSync(
`${commandFor('getTopicPolicy')} ${topicNameThree}`
);
const pair = await createPair();
const [policy] = await pair.t.iam.getPolicy();
const output = execSync(`${commandFor('getTopicPolicy')} ${pair.tname}`);
assert.include(
output,
`Policy for topic: ${JSON.stringify(policy.bindings)}.`
);
});

it('should test permissions for a topic', async () => {
const pair = await createPair();
const output = execSync(
`${commandFor('testTopicPermissions')} ${topicNameThree}`
`${commandFor('testTopicPermissions')} ${pair.tname}`
);
assert.match(output, /Tested permissions for topic/);
});

it('should delete a topic', async () => {
const output = execSync(`${commandFor('deleteTopic')} ${topicNameThree}`);
assert.include(output, `Topic ${topicNameThree} deleted.`);
const [topics] = await pubsub.getTopics();
assert(topics.every(s => s.name !== fullTopicNameThree));
const name = topicName();
await pubsub.topic(name).get({autoCreate: true});
const output = execSync(`${commandFor('deleteTopic')} ${name}`);
assert.include(output, `Topic ${name} deleted.`);
const [exists] = await pubsub.topic(name).exists();
assert.strictEqual(exists, false);
});
});

0 comments on commit be607ea

Please sign in to comment.