Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GCP任务队列 #71

Open
mrdulin opened this issue Mar 14, 2019 · 0 comments
Open

GCP任务队列 #71

mrdulin opened this issue Mar 14, 2019 · 0 comments
Labels

Comments

@mrdulin
Copy link
Owner

mrdulin commented Mar 14, 2019

GCP任务队列

前言

这篇文章介绍了最简化的架构图,流程以及代码,只说重点,最重要的是理解架构设计,而不是代码细节,比如不要问我示例代码的环境变量为什么是hard code这种问题。生产环境还有很多东西需要处理,也有很多场景,不同的场景也会衍生出不同的问题,所以架构设计要根据具体的业务场景去调整。这里只介绍一种场景,算是抛砖引玉。阅读此文章需要熟悉实现任务队列用到的技术栈:

  • App engine
  • Cloud function
  • Cloud Pub/Sub
  • Node.js

以及异步编程的一些概念,例如promiseasync/await等。

问题

当有大量并发任务时,由于接口的rate limit限制,导致任务失败,如何处理失败的任务?

解决方案

使用任务队列,将并发任务中失败的任务加入任务队列,每隔一段时间从任务队列中拉取任务进行重试操作,从而保证所有的并发任务最终都是成功的。(解决方案很多,根据业务需求来,这里只是演示一种方案)

架构图

image

实施

一、创建和部署Rate Limit App

使用express-rate-limit中间件来给接口增加Rate Limit功能,配置如下:

const createUserLimiter = new rateLimit({
    windowMs: 30 * 1000,
    max: 2,
    message: 'Too many users created from this IP, please try again after 30 seconds'
});

create userAPI如下:

app.post('/create-user', createUserLimiter, (req, res) => {
    const user = req.body;
    memoryDB.users.push(user);
    res.end('create user success.');
});

每个IP地址在30秒的窗口时间内只能访问/create-user API两次,超过两次后,createUserLimiter中间件抛出异常,并提示'Too many users created from this IP, please try again after 30 seconds'的错误信息。

为了简便起见,这里使用内存对象模拟数据库:

const memoryDB = { users: [] };

接口/users用来查看当前内存对象数据库中的所有user和总数:

app.get('/users', (req, res) => {
    res.json({
        users: memoryDB.users,
        count: memoryDB.users.length
    });
});

创建app.yaml,用来部署到我们创建的服务到GAE(google app engine)上,如下:

service: third-party-service
runtime: nodejs8

配置好gcloud命令行工具,开始部署,执行:

gcloud app deploy

部署成功后,查看GCP console:

image

访问https://third-party-service-dot-<YOUR_PROJECT_ID>.appspot.com/users,可以看到:

image

当前内存数据库中没有user。

二、创建和部署createUser cloud function

cloud function代码如下:

const request = require('request-promise');

const pubsub = require('../pubsub');
const { config, MESSAGE_PROCESS_TOPIC } = require('../config');

const UserService = {
  CREATE_USER_RATE_LIMIT_ERROR: 'Too many users created from this IP, please try again after 30 seconds',

  createUser(user) {
    const url = `https://third-party-service-dot-${config.PROJECT_ID}.appspot.com/create-user`;
    const options = { method: 'POST', url, body: user, json: true };
    return request(options).catch((error) => {
      return Promise.reject(error.message);
    });
  }
};

const createUser = (event, callback) => {
  const pubsubMessage = event.data;
  const message = JSON.parse(Buffer.from(pubsubMessage.data, 'base64').toString());
  console.log('message: ', message);

  UserService.createUser(message)
    .then(() => {
      console.log('create user successfully.');
    })
    .catch((error) => {
      console.log('create user failed.');
      if (error.indexOf(UserService.CREATE_USER_RATE_LIMIT_ERROR) !== -1) {
        console.log('Add create user task to task queue.');
        return pubsub.publish(MESSAGE_PROCESS_TOPIC, message).catch((error) => {
          // TODO: store user to Datastore
          return Promise.reject(error);
        });
      }
    })
    .then(() => callback())
    .catch(callback);
};

module.exports = createUser;

创建一个UserService以及一个createUser方法,用来向Rate Limit App发送create user请求。cloud function是后台函数,通过Cloud Pub/Sub来触发,运行时(runtime)是nodejs6nodejs6nodejs8的函数签名不一样,详见官方文档)。可以从event参数获取触发cloud function的消息,也就是通过Cloud Pub/Sub发布到Topic中的消息。调用callback表示Cloud function执行完成,回收资源。

Cloud Function中调用UserService.createUser发起创建用户的请求,成功时,打印create user successfully日志到stackdriver logs;失败时,判断error是否是CREATE_USER_RATE_LIMIT_ERROR,如果是,将此次create usertask重新加入到名为MESSAGE_PROCESS_TOPIC的任务队列中。

部署Cloud Function之前,需要先创建触发该Cloud FunctionPub/Sub Topic,可以手动去GCP console创建,也可以通过应用程序创建。创建好的Topic如下:

image

我们要用到create-usermessage-process这两个Topic及其Subscriptioncreate-user是用来触发上面的后台函数createUser的消息队列,messsage-process作为我们的任务队列(Task Queue),我们将失败的create user任务加入到该Task Queue中,定时去该Task Queue中拉取create user任务,然后重新将该任务发布到create-user消息队列中。

创建一个部署脚本deploy-createUser.sh用来部署createUser Cloud Function

functionName=createUser
topicName=create-user

gcloud beta functions deploy ${functionName} --trigger-resource ${topicName} --trigger-event google.pubsub.topic.publish

GCP console查看部署好的createUser Cloud Function:

image

三、创建和部署app engine cron job service和cron job handler

首先,创建cron job处理服务

server.js:

const express = require('express');
const async = require('async');

const package = require('./package.json');
const UserService = require('./userService');

const app = express();
const PORT = process.env.PORT || 8080;
const userService = new UserService();
const SCHEDULE = 60;
const n = SCHEDULE / process.env.CREATE_USER_SCHEDULER || 2;
console.log('n: ', n);

app.get('/', (req, res) => {
  res.send(`version:${package.version}`);
});

app.get('/tasks/create-user', async (req, res) => {
  async.timesSeries(
    n,
    (_, next) => userService.createUserTimeSeries(next),
    () => {
      res.sendStatus(200);
    }
  );
});

app.listen(PORT, () => {
  console.log(`Server listening on port ${PORT}...`);
});

/tasks/create-user就是每隔1分钟执行一次的定时任务触发的端点(endpoint),换句话说,就是每1分钟调用一次/tasks/create-user,我们就可以在这里注册和实现我们的定时任务具体的功能逻辑了。关于这里为什么用async.timesSeries,简单说下,app enginecron.yaml中定义的cron job, schedule最小是every 1 mins,要想实现精确到second的schedule,或者说,更细粒度的schedule,需要在代码中实现。使用async.timesSeries就可以实现scheduleevery 30 seconds(n = 2)。

下面来看最重要的部分userService.js:

const pubsub = require('./pubsub');
const { config } = require('./config');

function UserService() {
  const maxMessages = 1;

  function sleep(ms) {
    return new Promise((resolve) => setTimeout(resolve, ms));
  }

  async function createUser() {
    console.log('create user task start.');

    return pubsub
      .pull(config.PUBSUB.MESSAGE_PROCESS_SUBSCRIPTION, maxMessages)
      .then((messages) => {
        const message = messages[0];
        if (message) {
          return message;
        }
        return Promise.reject('No create user message found.');
      })
      .then((message) => {
        return pubsub.publish(config.PUBSUB.CREATE_USER_TOPIC, message.data, message.attributes).then(() => message);
      })
      .then((message) => {
        const projectId = config.PROJECT_ID;
        if (!projectId) {
          return Promise.reject('projectId is required');
        }
        const request = {
          subscription: `projects/${projectId}/subscriptions/${config.PUBSUB.MESSAGE_PROCESS_SUBSCRIPTION}`,
          ackIds: [message.ackId]
        };

        return pubsub.subscriberClient
          .acknowledge(request)
          .then(() => {
            console.log('create user message acked.');
          })
          .catch((error) => {
            console.error(`subscribeClient acknowledge message error.`);
            return Promise.reject(error);
          });
      })
      .then(() => {
        console.log('create user task done.');
      })
      .catch(console.log);
  }

  async function createUserTimeSeries(callback) {
    await createUser();
    await sleep(config.CREATE_USER_SCHEDULER);
    callback();
  }

  return {
    createUser,
    createUserTimeSeries
  };
}

module.exports = UserService;

createUser方法实现了从message-process任务队列中拉取create user任务,并将该任务发布到create-user消息队列中,createUser Cloud Function会消费create-user消息队列中的消息。

createUserTimeSeries方法配合async.timesSeries方法,实现每隔30秒去message-process任务队列中拉取create user任务的功能。

cron job handler服务部署文件app.yaml:

runtime: nodejs8
service: delay-trigger
env_variables:
  CREATE_USER_SCHEDULER: 30

cron job服务部署文件cron.yaml:

cron:
  - description: 'create user scheduler'
    url: /tasks/create-user
    schedule: every 1 mins
    target: delay-trigger

部署成功后,在GCP console查看:

image

image

stackdriver logs查看cron job的运行情况:

image

每1分钟触发一次/tasks/create-user端点,每30秒去message-process任务队列中拉取create user任务,运行正常。

四、创建客户端

下面我们需要一个客户端模拟并发create user

createUser.js:

const faker = require('faker');

const pubsub = require('./pubsub');

async function main() {
  const count = 10;
  const CREATE_USER_TOPIC = 'create-user';

  for (let i = 0; i < count; i++) {
    const user = { id: faker.random.uuid(), name: faker.name.findName(), email: faker.internet.email() };
    pubsub.publish(CREATE_USER_TOPIC, user);
  }
}

main();

并发数为10 ,执行:

☁  client [master] ⚡  node createUser.js 
Message was published with ID: 421302379400827
Message was published with ID: 421302379400828
Message was published with ID: 421302379400829
Message was published with ID: 421302379400830
Message was published with ID: 421302379400831
Message was published with ID: 421302379400832
Message was published with ID: 421302379400833
Message was published with ID: 421302379400834
Message was published with ID: 421302379400835
Message was published with ID: 421302379400836

访问https://third-party-service-dot-<YOUR_PROJECT_ID>.appspot.com/users查看内存对象数据库中的user:

image

只有两个create user的任务是成功的,其他的都遇到了Rate Limit错误,查看createUser Cloud Function的日志:

image

可以看到部分用户创建失败,将创建用户失败的任务重新放回任务队列。现在整个系统已经在运行,等待几分钟,再次访问https://third-party-service-dot-<YOUR_PROJECT_ID>.appspot.com/users查看memory DB中的user

image

尽管有Rate limit的限制,但最终成功创建了10个user

源码地址

https://github.com/mrdulin/nodejs-gcp/tree/master/src/cloud-functions/delay-trigger


Flag Counter

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant