Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GraphQL Subscription多用户订阅与通知(一) #74

Open
mrdulin opened this issue Apr 15, 2019 · 0 comments
Open

GraphQL Subscription多用户订阅与通知(一) #74

mrdulin opened this issue Apr 15, 2019 · 0 comments

Comments

@mrdulin
Copy link
Owner

mrdulin commented Apr 15, 2019

GraphQL Subscription多用户订阅与通知(一)

前言

这不是一篇面向初学者的文章,阅读此文章以及源码需要熟练掌握以下技术栈,编程范式,软件开发架构及工具:

  • GraphQL
  • Apollo Server (Version 2)
  • Node.js, Express.js, npm, npx
  • TypeScript, JavaScript
  • 基于JWT的身份认证
  • WebSocket
  • OOP, FP, MVC
  • Altair GraphQL Client

问题

对于一个多用户应用,如何使用GraphQL Subscription实现指定用户/客户端接收订阅通知?

为了更直观的说明问题,画图

image

现在有5个client instances, 1个server instance, 4个client instances与server建立了websocket连接。这5个client instances分别对应5个用户,用户有个userType业务字段。

需求是client 5执行一个GraphQL Mutation操作,这里是addTemplate,当创建template成功以后,怎么通过WebSocket这个全双工通信协议,推送新创建的templateclient 2client 3,而不推送给client 1,client 4

解决方案

这篇文章使用apollo-server提供的PubSub实现,基于Node.js EventEmitter

关键代码:

addTemplate mutation:

addTemplate: (
  __,
  { templateInput },
  { templateConnector, userConnector, requestingUser }: IAppContext
): Omit<ICommonResponse, 'payload'> | undefined => {
  if (userConnector.isAuthrized(requestingUser)) {
    const commonResponse: ICommonResponse = templateConnector.add(templateInput);
    if (commonResponse.payload) {
      const payload = {
        data: commonResponse.payload,
        context: {
          requestingUser
        }
      };
      templateConnector.publish(payload);
    }

    return _.omit(commonResponse, 'payload');
  }
}

创建template成功后,构造payload数据,包含新创建的template以及发起此mutation的用户requestingUser,然后通过publish方法发送此payload

templateConnector.publish方法:

public publish(payload: any) {
  this.pubsub.publish(TriggerNameType.TEMPLATE_ADDED, payload);
}

接着看下Subscription resolver的实现:

Subscription: {
  templateAdded: {
    resolve: (
      payload: ISubscriptionPayload<ITemplate, Pick<IAppContext, 'requestingUser'>>,
      args: any,
      subscriptionContext: ISubscriptionContext,
      info: any
    ): ITemplate => {
      return payload.data;
    },
    subscribe: withFilter(templateIterator, templateFilter)
  }
}
function templateIterator() {
  return pubsub.asyncIterator([TriggerNameType.TEMPLATE_ADDED]);
}

async function templateFilter(
  payload?: ISubscriptionPayload<ITemplate, Pick<IAppContext, 'requestingUser'>>,
  args?: any,
  subscriptionContext?: ISubscriptionContext,
  info?: any
): Promise<boolean> {
  const NOTIFY: boolean = true;
  const DONT_NOTIFY: boolean = false;
  if (!payload || !subscriptionContext) {
    return DONT_NOTIFY;
  }

  const { userConnector, locationConnector } = subscriptionContext;
  const { data: template, context } = payload;

  if (!subscriptionContext.subscribeUser || !context.requestingUser) {
    return DONT_NOTIFY;
  }

  let results: IUser[];
  try {
    results = await Promise.all([
      userConnector.findByEmail(subscriptionContext.subscribeUser.email),
      userConnector.findByEmail(context.requestingUser.email)
    ]);
  } catch (error) {
    console.error(error);
    return DONT_NOTIFY;
  }

  const [subscribeUser, requestingUser] = results;

  // user himself/herself
  if (subscribeUser.id === requestingUser.id) {
    return DONT_NOTIFY;
  }

  const notificationIds = template.shareLocationIds;
  let subscribeLocationIds: string[] = [];
  switch (subscribeUser.userType) {
    case UserType.ZOLO:
    case UserType.ZELO:
      if (subscribeUser.locationId) {
        subscribeLocationIds = [subscribeUser.locationId];
      }
      break;
    case UserType.ZEWI:
    case UserType.ZOWI:
      if (subscribeUser.orgId) {
        subscribeLocationIds = locationConnector.findLocationIdsByOrgId(subscribeUser.orgId);
      }
      break;
  }

  const shouldNotify: boolean = intersection(notificationIds, subscribeLocationIds).length > 0;

  return shouldNotify;
}

templateIterator方法返回异步迭代器asyncIteratorTriggerNameType.TEMPLATE_ADDED表示GraphQL Subscription订阅的频道,或者说主题。

templateFilter方法是本示例最关键的代码。该方法返回值是boolean类型,返回true表示推送消息给客户端,返回false则不推送给客户端。

看到这里,你也许会有疑问,怎么区分客户端呢?也就是怎么把"创建template"这个操作创建出来的new template推送给指定的客户端?

首先,这个方法,通过payload(就是上面templateConnector.publish方法发送的payload)可以拿到执行addTemplate mutation操作的用户信息(requestingUser),也就是client 5

其次,还可以拿到与服务器建立了WebSocket连接的用户信息,怎么拿?代码如下:

const server = new ApolloServer({
  typeDefs,
  resolvers,
  context: contextFunction,
  subscriptions: {
    onConnect: (
      connectionParams: IWebSocketConnectionParams,
      webSocket: WebSocket,
      connectionContext: ConnectionContext
    ) => {
      console.log('websocket connect');
      console.log('connectionParams: ', connectionParams);
      if (connectionParams.token) {
        const token: string = validateToken(connectionParams.token);
        const userConnector = new UserConnector<IMemoryDB>(memoryDB);
        let user: IUser | undefined;
        try {
          const userType: UserType = UserType[token];
          user = userConnector.findUserByUserType(userType);
        } catch (error) {
          throw error;
        }

        const context: ISubscriptionContext = {
          subscribeUser: user,
          userConnector,
          locationConnector: new LocationConnector<IMemoryDB>(memoryDB)
        };

        return context;
      }

      throw new Error('Missing auth token!');
    },
    onDisconnect: (webSocket: WebSocket, connectionContext: ConnectionContext) => {
      console.log('websocket disconnect');
    }
  }
});

通过connectionParams.token拿到客户端传递过来的token,就是JWTJWT经过verify以后,我们可以拿到包含在JWT中的用户基本信息,比如邮箱,然后就可以通过邮箱地址去数据库中查询用户的信息。本示例简化了JWT的流程,使用authorization: Bearer <userType>的方式模拟用户的JWT

我们使用Altair GraphQL Client工具来模拟这5个客户端,这个工具比GraphQL提供的浏览器中运行的Playground要强大,简单说下这款工具的特性:

  • 支持设置request header, 比如设置authorization: Bearer <JWT>,方便调试有身份认证的接口。
  • 支持GraphQL Subscription
  • Prettify
  • 接口调用历史记录
  • GraphQL查询转换为curl命令

等等。

对于client 5,是发起创建template这个GraphQL mutation的用户,设置其request header,用于authentication:

设置好后,点击"save"保存,后端authentication通过以后,打印的日志如下:

点击Docs就可以看到GraphQL的mutation schema了。

对于订阅用户client 1, client 2, client 3 client 4,说订阅用户可能不太准确,实际上,每个客户端即可以订阅(建立WebSocket连接),也可以发起GraphQL查询。

给这上述4个客户端设置SubscriptionURLconnectionParams,作为我们的订阅用户(客户端)。

image

设置完成以后,写好Subscription:

subscription {
  templateAdded {
    id
    name
    shareLocationIds
  }
}

点击Send Request,与服务端WebSocket服务建立连接,建立成功如下图:

image

client 1建立WebSocket连接,服务端日志如下:

websocket connect
connectionParams:  { token: 'Bearer ZOWI' }
Found user:  { id: 'b70b6a00-b424-481b-a54c-523b3a21fe8d',
  name: 'Elwin Schmitt',
  email: 'Tyrique_Lind12@hotmail.com',
  orgId: '1350c033-88e3-4181-968d-14c13633c4d8',
  locationId: null,
  userType: 'ZOWI' }

现在在templateFilter方法中,通过payloadsubscriptionContext我们分别拿到了requestingUser(client 5),和每个订阅用户(client 1 , client 2, client 3, client 4)。

这个方法会被执行4次,我们在templateFilter方法中加入一行代码:

 console.count('templateFilter');

client 5执行addTemplate mutation时,服务端日志如下:

templateFilter: 1
templateFilter: 2
templateFilter: 3
templateFilter: 4

执行时,subscriptionContext参数的subscribeUser分别是client 1 , client 2, client 3, client 4

这就足够我们区分当前上下文中,是哪个客户端了。

template实体上,有个字段叫做shareLocationIds, 业务含义是该template可以共享给哪些location, 这里可以用它来和client 1 , client 2, client 3, client 4对应的location交集client 2对应的locationlocation 3client 3对应的locationlocation 3location 4。因此当shareLocationIds[location 3]时。

  1. shareLocationsIds[location 3] = true,应该推送消息给client 2

  2. shareLocationsIds[location 3, location 4] = true,应该推送消息给client 3

我们来看下各个客户端的情况,首先是发起创建template的ZOWI用户(client 5),并且将template的分享给包含location 3的用户,ZOLO用户的locationId字段是location 3,ZEWI用户没有locationId,但是有orgIdorglocation是一对多的关系,可以通过orgIdlocation表中找到org下所有的location

ZOWI用户(client 1),由于判断出是发起创建template的ZOWI用户他自己,所以templateFilter函数返回false,不给自己推送这个新创建的template消息

ZOLO用户(client 2),locationIdlocation 3,所以收到服务端websocket推送过来的新创建的template消息

ZEWI用户(client 3),通过orgIdlocation表中找到该org下包含location 3location 4,和shareLocationIds有交集,因此也收到服务端websocket推送过来的新创建的template消息

ZELO用户(client 4),locationIdlocation 1,与shareLocationIds没有交集,因此不推送消息。

templateFilter方法中打印出debug日志:

notificationIds: ["3"], subscribeLocationIds: ["3","4"]
user: ZEWI, should notify: true
notificationIds: ["3"], subscribeLocationIds: ["1"]
user: ZELO, should notify: false
notificationIds: ["3"], subscribeLocationIds: ["3"]
user: ZOLO, should notify: true

可以看到,ZOLOZEWI用户(客户端client 2client 3)收到了服务端WebSocket服务推送的消息,这里是新创建的template。而其他客户端client 1, client 4client 5则没有收到推送的消息,满足了开始的需求。

源码

https://github.com/mrdulin/apollo-graphql-tutorial/tree/master/src/subscriptions

参考


Flag Counter

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant