You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
constrequest=require('request-promise');constpubsub=require('../pubsub');const{ config,MESSAGE_PROCESS_TOPIC}=require('../config');constUserService={CREATE_USER_RATE_LIMIT_ERROR: 'Too many users created from this IP, please try again after 30 seconds',createUser(user){consturl=`https://third-party-service-dot-${config.PROJECT_ID}.appspot.com/create-user`;constoptions={method: 'POST', url,body: user,json: true};returnrequest(options).catch((error)=>{returnPromise.reject(error.message);});}};constcreateUser=(event,callback)=>{constpubsubMessage=event.data;constmessage=JSON.parse(Buffer.from(pubsubMessage.data,'base64').toString());console.log('message: ',message);UserService.createUser(message).then(()=>{console.log('create user successfully.');}).catch((error)=>{console.log('create user failed.');if(error.indexOf(UserService.CREATE_USER_RATE_LIMIT_ERROR)!==-1){console.log('Add create user task to task queue.');returnpubsub.publish(MESSAGE_PROCESS_TOPIC,message).catch((error)=>{// TODO: store user to DatastorereturnPromise.reject(error);});}}).then(()=>callback()).catch(callback);};module.exports=createUser;
constexpress=require('express');constasync=require('async');constpackage=require('./package.json');constUserService=require('./userService');constapp=express();constPORT=process.env.PORT||8080;constuserService=newUserService();constSCHEDULE=60;constn=SCHEDULE/process.env.CREATE_USER_SCHEDULER||2;console.log('n: ',n);app.get('/',(req,res)=>{res.send(`version:${package.version}`);});app.get('/tasks/create-user',async(req,res)=>{async.timesSeries(n,(_,next)=>userService.createUserTimeSeries(next),()=>{res.sendStatus(200);});});app.listen(PORT,()=>{console.log(`Server listening on port ${PORT}...`);});
☁ client [master] ⚡ node createUser.js
Message was published with ID: 421302379400827
Message was published with ID: 421302379400828
Message was published with ID: 421302379400829
Message was published with ID: 421302379400830
Message was published with ID: 421302379400831
Message was published with ID: 421302379400832
Message was published with ID: 421302379400833
Message was published with ID: 421302379400834
Message was published with ID: 421302379400835
Message was published with ID: 421302379400836
GCP任务队列
前言
这篇文章介绍了最简化的架构图,流程以及代码,只说重点,最重要的是理解架构设计,而不是代码细节,比如不要问我示例代码的环境变量为什么是hard code这种问题。生产环境还有很多东西需要处理,也有很多场景,不同的场景也会衍生出不同的问题,所以架构设计要根据具体的业务场景去调整。这里只介绍一种场景,算是抛砖引玉。阅读此文章需要熟悉实现任务队列用到的技术栈:
以及异步编程的一些概念,例如
promise
,async/await
等。问题
当有大量并发任务时,由于接口的rate limit限制,导致任务失败,如何处理失败的任务?
解决方案
使用任务队列,将并发任务中失败的任务加入任务队列,每隔一段时间从任务队列中拉取任务进行重试操作,从而保证所有的并发任务最终都是成功的。(解决方案很多,根据业务需求来,这里只是演示一种方案)
架构图
实施
一、创建和部署Rate Limit App
使用
express-rate-limit
中间件来给接口增加Rate Limit功能,配置如下:create user
的API
如下:每个IP地址在30秒的窗口时间内只能访问
/create-user
API
两次,超过两次后,createUserLimiter
中间件抛出异常,并提示'Too many users created from this IP, please try again after 30 seconds'
的错误信息。为了简便起见,这里使用内存对象模拟数据库:
接口
/users
用来查看当前内存对象数据库中的所有user和总数:创建
app.yaml
,用来部署到我们创建的服务到GAE
(google app engine
)上,如下:配置好
gcloud
命令行工具,开始部署,执行:部署成功后,查看
GCP
console
:访问
https://third-party-service-dot-<YOUR_PROJECT_ID>.appspot.com/users
,可以看到:当前内存数据库中没有user。
二、创建和部署
createUser
cloud function
cloud function
代码如下:创建一个
UserService
以及一个createUser
方法,用来向Rate Limit App发送create user
请求。cloud function
是后台函数,通过Cloud Pub/Sub
来触发,运行时(runtime)是nodejs6
(nodejs6
和nodejs8
的函数签名不一样,详见官方文档)。可以从event
参数获取触发cloud function
的消息,也就是通过Cloud Pub/Sub
发布到Topic
中的消息。调用callback
表示Cloud function
执行完成,回收资源。在
Cloud Function
中调用UserService.createUser
发起创建用户的请求,成功时,打印create user successfully
日志到stackdriver logs
;失败时,判断error
是否是CREATE_USER_RATE_LIMIT_ERROR
,如果是,将此次create user
的task
重新加入到名为MESSAGE_PROCESS_TOPIC
的任务队列中。部署
Cloud Function
之前,需要先创建触发该Cloud Function
的Pub/Sub
Topic
,可以手动去GCP
console
创建,也可以通过应用程序创建。创建好的Topic
如下:我们要用到
create-user
和message-process
这两个Topic
及其Subscription
,create-user
是用来触发上面的后台函数createUser
的消息队列,messsage-process
作为我们的任务队列(Task Queue
),我们将失败的create user
任务加入到该Task Queue
中,定时去该Task Queue
中拉取create user
任务,然后重新将该任务发布到create-user
消息队列中。创建一个部署脚本
deploy-createUser.sh
用来部署createUser
Cloud Function
:在
GCP
console
查看部署好的createUser
Cloud Function
:三、创建和部署
app engine
cron job service和cron job handler首先,创建cron job处理服务
server.js
:/tasks/create-user
就是每隔1分钟执行一次的定时任务触发的端点(endpoint
),换句话说,就是每1分钟调用一次/tasks/create-user
,我们就可以在这里注册和实现我们的定时任务具体的功能逻辑了。关于这里为什么用async.timesSeries
,简单说下,app engine
在cron.yaml
中定义的cron job
,schedule
最小是every 1 mins
,要想实现精确到second的schedule
,或者说,更细粒度的schedule
,需要在代码中实现。使用async.timesSeries
就可以实现schedule
是every 30 seconds
(n = 2
)。下面来看最重要的部分
userService.js
:createUser
方法实现了从message-process
任务队列中拉取create user
任务,并将该任务发布到create-user
消息队列中,createUser
Cloud Function
会消费create-user
消息队列中的消息。createUserTimeSeries
方法配合async.timesSeries
方法,实现每隔30秒去message-process
任务队列中拉取create user
任务的功能。cron job
handler服务部署文件app.yaml
:cron job
服务部署文件cron.yaml
:部署成功后,在
GCP
console
查看:在
stackdriver logs
查看cron job
的运行情况:每1分钟触发一次
/tasks/create-user
端点,每30秒去message-process
任务队列中拉取create user
任务,运行正常。四、创建客户端
下面我们需要一个客户端模拟并发
create user
。createUser.js
:并发数为10 ,执行:
访问
https://third-party-service-dot-<YOUR_PROJECT_ID>.appspot.com/users
查看内存对象数据库中的user:只有两个
create user
的任务是成功的,其他的都遇到了Rate Limit错误,查看createUser
Cloud Function
的日志:可以看到部分用户创建失败,将创建用户失败的任务重新放回任务队列。现在整个系统已经在运行,等待几分钟,再次访问
https://third-party-service-dot-<YOUR_PROJECT_ID>.appspot.com/users
查看memory
DB
中的user
:尽管有Rate limit的限制,但最终成功创建了10个
user
。源码地址
https://github.com/mrdulin/nodejs-gcp/tree/master/src/cloud-functions/delay-trigger
The text was updated successfully, but these errors were encountered: