Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

实现GAE和GKE k8s cron job的秒级调度单位 #73

Open
mrdulin opened this issue Apr 8, 2019 · 0 comments
Open

实现GAE和GKE k8s cron job的秒级调度单位 #73

mrdulin opened this issue Apr 8, 2019 · 0 comments
Labels

Comments

@mrdulin
Copy link
Owner

mrdulin commented Apr 8, 2019

实现GAE和GKE k8s cron job的秒级调度单位

问题

如何实现Google App Engine和Google Kubernetes Engine定时作业(cron job)的秒级调度(schedule)单位?

不同于node-schedule拥有秒级调度单位,可以将定时作业精确在某一秒执行。GAEGKE的定时作业只能精确到分钟(min)。

GAE定时作业文档中,schedule格式可以知道,时间单位的有效值包括:

  • minutesmins
  • hours

GKE定时作业文档中,CronJob 使用所需的 schedule 字段,该字段接受 Unix 标准 crontab 格式的时间。所有 CronJob 时间均采用世界协调时间 (UTC):

  • 第一个值表示分钟(0 到 59 之间)
  • 第二个值表示小时(0 到 23 之间)
  • 第三个值表示一个月中的某天(1 到 31 之间)
  • 第四个值表示月份(1 到 12 之间)
  • 第五个值表示星期(0 到 6 之间)

以及k8s Cron Job文档中的调度(schedule)都是Unix标准crontab格式的时间,GKE使用kubernetes,简称k8s,来管理compute engine集群(kubernetes集群)。

解决方案

GAE cron job

我们可以先尝试设置GAE定时作业的scheduleevery 1 seconds,看看是什么效果。

server.js:

const express = require('express');
const pkg = require('./package.json');
const app = express();

function validateCronRequest(req, res, next) {
  console.log('X-Appengine-Cron', req.get('X-Appengine-Cron'), typeof req.get('X-Appengine-Cron'));
  if (req.get('X-Appengine-Cron') !== 'true') {
    return res.status(403);
  }
  next();
}

app.get('/version', (req, res) => {
  res.send(`version: ${pkg.version}`);
});

app.get('/cronjob/sync', validateCronRequest, (req, res) => {
  res.sendStatus(200);
});

const PORT = process.env.PORT || 8080;
app.listen(PORT, () => {
  console.log(`Server listening on port ${PORT}...`);
});

app.yaml

runtime: nodejs8
service: cronjob-sec-unit

cron.yaml:

cron:
  - description: 'sync job'
    url: /cronjob/sync
    schedule: every 1 seconds
    target: cronjob-sec-unit

首先部署GAE service:

gcloud app deploy

部署成功后,去GCP console - App Engine查看

image

紧接着部署GAE定时作业:

gcloud app deploy ./cron.yaml

报错

ERROR: (gcloud.app.deploy) An error occurred while parsing file: [/Users/ldu020/workspace/nodejs-gcp/src/app-engine/cron-jobs-second-unit-schedule/cron.yaml]
Unable to assign value 'every 1 seconds' to attribute 'schedule':
schedule 'every 1 seconds' failed to parse: line 1:8 mismatched input u'second' expecting set None
in "/Users/ldu020/workspace/nodejs-gcp/src/app-engine/cron-jobs-second-unit-schedule/cron.yaml", line 4, column 15

错误说“不能将every 1 seconds赋值给schedule,解析every 1 seconds失败”。可见,GAE定时作业schedule并不支持秒级单位。

修改cron.yaml,将schedule设置为every 1 mins

cron:
  - description: 'sync job'
    url: /cronjob/sync
    schedule: every 1 mins
    target: cronjob-sec-unit

再次部署,部署成功后查看GCP控制台:

image

查看该定时作业在Stackdriver Logging中的logs,验证该定时作业运行是否正常。

image

通过日志的时间戳可以看到,该定时作业每分钟执行一次。

查看App Engine部署的service的logs:

image

该service运行正常,路由/cronjob/sync每分钟被定时作业调用一次。

接下来实现秒级调度单位的定时作业,本质上是将定时作业的最小调度单位——1分钟,再次划分成一系列秒级时间序列。对Node.js异步流程控制库async熟悉的同学可以马上想到用timeSeries函数。我们这里将定时作业设置为10秒执行一次,修改后的代码如下:

server.js:

const express = require('express');
const async = require('async');

const pkg = require('./package.json');
const app = express();

const OUTER_SCHEDULE = 60; // 1 minute
const INNER_SCHEDULE = 10; // 10 seconds

function validateCronRequest(req, res, next) {
  // console.log('X-Appengine-Cron', req.get('X-Appengine-Cron'), typeof req.get('X-Appengine-Cron'));
  if (process.env.NODE_ENV === 'production') {
    if (req.get('X-Appengine-Cron') !== 'true') {
      return res.status(403);
    }
  }

  next();
}

app.get('/version', (req, res) => {
  res.send(`version: ${pkg.version}`);
});

app.get('/cronjob/sync', validateCronRequest, async (req, res) => {
  // Update database every 1 minute
  // await updateDB();

  // Update database every 10 seconds
  try {
    await updateDBTimeSeries();
  } catch (error) {
    console.error(error);
  }
  res.sendStatus(200);
});

function sleep(ms) {
  return new Promise((resolve) => setTimeout(resolve, ms));
}

async function updateDB() {
  return new Promise((resolve) => {
    process.nextTick(() => {
      console.log('Update database success.');
      resolve();
    });
  });
}

async function updateDBEveryTenSeconds(item, n, next) {
  // console.log('item: ', item);
  await updateDB();
  if (item < n - 1) {
    await sleep(INNER_SCHEDULE * 1000);
  }
  next();
}

async function updateDBTimeSeries() {
  const n = OUTER_SCHEDULE / INNER_SCHEDULE;
  return new Promise((resolve, reject) => {
    async.timesSeries(
      n,
      (item, next) => updateDBEveryTenSeconds(item, n, next),
      (error, results) => {
        if (error) {
          return reject(error);
        }
        console.log('A round update database operations done.');
        resolve(results);
      }
    );
  });
}

const PORT = process.env.PORT || 8080;
app.listen(PORT, () => {
  console.log(`Server listening on port ${PORT}...`);
});

部署后,查看App Engine service的日志:

image

可以看到,我们的调度任务每10秒执行一次,通过async.timeSeries方法将GAE定时作业的schedule(1分钟),划分出了6段。

GKE cron job

同理,我们也可以在GKE的定时作业上实现秒级调度单位。

src/index.js:

const async = require('async');

const OUTER_SCHEDULE = 60;
const INNER_SCHEDULE = 10;

async function main() {
  try {
    await updateDBTimeSeries();
    console.log('A round update database operations done.');
  } catch (error) {
    console.error(error);
  }
}

function sleep(ms) {
  return new Promise((resolve) => setTimeout(resolve, ms));
}

async function updateDB() {
  return new Promise((resolve) => {
    process.nextTick(() => {
      console.log('Update database success.');
      resolve();
    });
  });
}

async function updateDBEveryTenSeconds(item, n, next) {
  await updateDB();
  if (item < n - 1) {
    await sleep(INNER_SCHEDULE * 1000);
  }
  next();
}

async function updateDBTimeSeries() {
  const n = OUTER_SCHEDULE / INNER_SCHEDULE;
  return new Promise((resolve, reject) => {
    async.timesSeries(
      n,
      (item, next) => updateDBEveryTenSeconds(item, n, next),
      (error, results) => {
        if (error) {
          return reject(error);
        }
        resolve(results);
      }
    );
  });
}

main();

示例代码和GAE例子的代码差不多,去掉了express启动的HTTP server的代码。

k8s/cronjobs/update-db.yaml:

apiVersion: batch/v1beta1
kind: CronJob
metadata:
  name: update-db
spec:
  schedule: '*/1 * * * *'
  startingDeadlineSeconds: 10
  concurrencyPolicy: Forbid
  successfulJobsHistoryLimit: 1
  failedJobsHistoryLimit: 1
  jobTemplate:
    spec:
      template:
        metadata:
          labels:
            run: update-db
        spec:
          restartPolicy: Never
          containers:
            - name: cron-job-second-unit-schedule
              image: asia.gcr.io/shadowsocks-218808/cron-job-second-unit-schedule:1.1
              env:
                - name: NODE_ENV
                  value: 'production'
                - name: K8S_POD_NAME
                  valueFrom:
                    fieldRef:
                      fieldPath: metadata.name
              ports:
                - containerPort: 8080
                  protocol: TCP

设置schedule*/1 * * * *,表示每分钟执行一次作业,具体执行什么作业?执行容器CMD指令指定的启动脚本,即npm start,即node ./src/index.js

Dockerfile:

FROM node:8.9-alpine

WORKDIR /app
COPY ./ /app/

RUN apk update \
  && apk add curl --no-cache \
  && npm i -g npm@latest \
  && npm i --production

CMD [ "npm", "start" ]

package.json:

"scripts": {
    "start": "node ./src/index.js"
},

下面关于打包docker image,还有GKE的使用和k8s cron job的部署简单说下。

打包docker image:

docker build -t cron-job-second-unit-schedule:1.1 .

设置docker tag

docker tag cron-job-second-unit-schedule:1.1 asia.gcr.io/shadowsocks-218808/cron-job-second-unit-schedule:1.1

pushdocker imageGCR registry:

docker push asia.gcr.io/shadowsocks-218808/cron-job-second-unit-schedule:1.1
The push refers to repository [asia.gcr.io/shadowsocks-218808/cron-job-second-unit-schedule]
2a9069409556: Pushed
4a18dd75bba0: Pushed
fbe45ea1c2a6: Layer already exists
f846841ed47f: Layer already exists
0198944a9875: Layer already exists
9dfa40a0da3b: Layer already exists
1.1: digest: sha256:81e4b2b5ddda93781bf368372d8697cad5619ca7ffa763cdd1eb0a49893622d1 size: 1577

使用gcloud命令行工具创建kubernetes cluster:

☁  cron-job-second-unit-schedule [master] ⚡  gcloud container clusters create nodejs-gcp --num-nodes=3

Creating cluster nodejs-gcp in us-west1-a... Cluster is being health-checked (master is healthy)...done.
Created [https://container.googleapis.com/v1/projects/shadowsocks-218808/zones/us-west1-a/clusters/nodejs-gcp].
To inspect the contents of your cluster, go to: https://console.cloud.google.com/kubernetes/workload_/gcloud/us-west1-a/nodejs-gcp?project=shadowsocks-218808
kubeconfig entry generated for nodejs-gcp.
NAME        LOCATION    MASTER_VERSION  MASTER_IP      MACHINE_TYPE   NODE_VERSION   NUM_NODES  STATUS
nodejs-gcp  us-west1-a  1.11.7-gke.12   35.199.144.38  n1-standard-1  1.11.7-gke.12  3          RUNNING

kubectl命令设置默认集群,将kubectlcontext设置为新创建的这个cluster

☁  cron-job-second-unit-schedule [master] ⚡  gcloud container clusters get-credentials nodejs-gcp
Fetching cluster endpoint and auth data.
kubeconfig entry generated for nodejs-gcp.

部署k8s cron job:

☁  cron-job-second-unit-schedule [master] ⚡  kaf ./k8s/cronjobs/update-db.yaml
cronjob.batch "update-db" created

部署完成后,去GCP控制台查看:

image

查看update-db定时作业的container logs:

image

image

可以看到该定时作业每10秒执行一次。

源码地址

GAE: https://github.com/mrdulin/nodejs-gcp/tree/master/src/app-engine/cron-jobs-second-unit-schedule
GKE: https://github.com/mrdulin/nodejs-gcp/tree/master/src/kubernetes-engine/cron-job-second-unit-schedule

顺便推荐个工具:https://crontab.guru/ - 输入*/1 * * * *这样的crontab风格的schedule,会输出文字表述的时间。


Flag Counter

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant