如何实现 RPC 通信 —— jsonrpc-rx 带你飞

进程间通信是在进行跨进程开发任务一个绕不过的坎:

  • 内嵌 iframe 页面通信
  • Web Worker 服务调用
  • Figma 插件 UI 与沙箱进程通信
  • Chrome 扩展页面间通信
  • Electron 主进程与渲染进程
  • Vscode extension 进程和 webview 进程通信
  • ……

这些场景,各有各的实现方案,但无疑都是基于事件的通信(event-based communication)。基于事件通信是非常经典的通信模式,但是在实际的使用中,有诸多的不便。

进程间通信的痛点

痛点一: 实现 RPC 比较麻烦

远程过程调用Remote Procedure Call,RPC)是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一个地址空间(通常为一个开放网络的一台计算机)的子程序,而程序员就像调用本地程序一样,无需额外地为这个交互作用编程(无需关注细节)。RPC是一种服务端-客户端(Client/Server)模式,经典实现是一个通过发送请求-接受回应进行信息交互的系统。

以 Web Worker 和 window 之间通信为例:

(代码示例一)

// worker.ts:运行在 web worker 的上下文
const handlers = {
    add: (a: number, b: number) => a + b,
    subtract: (a: number, b: number) => a - b,
};

self.onmessage = (event) => {
const { method, args, id } = event.data;
const result = handlersmethod;
self.postMessage({
result,
id,
});
});

// index.ts: 运行在 window 的上下文
const worker = new Worker(new URL('./worker', import.meta.url));

const invoke = (method: 'add' | 'subtract', args: any[]) => {
    return new Promise((resolve) => {
        const uuid = Math.random().toString(36).substring(2);

        const handler = (event: MessageEvent) => {
            const { result, id } = event.data;
            if (id && id === uuid) {
                worker.removeEventListener('message', handler);
                resolve(result);
            }
        };
        worker.onmessage = handler;

        worker.postMessage(
            {
                method,
                args,
                id: uuid,
            },
            '*'
        );
    });
};

const res0 = await invoke('add', [1, 2]); // output: 3
const res1 = await invoke('subtract', [2, 1]); // output: 1

从本例旨在实现在 index.ts 中调用 worker.ts 中的 handlers 中的方法。实现难度不大,但是实现是比较麻烦的,而且对于 typescript,缺乏类型提示,体验不佳。

痛点二: 服务端-客户端难以解耦

服务端:提供服务(能力)的一端,如:上面例子中的 worker.ts

客户端:消费(使用)服务的一端,如:上面例子中的 index.ts

从架构的角度看,相对于客户端,服务端是架构中的上层。服务端应该主导服务的定义,对所有的服务端提供一致的调用方案

但是在实际项目中,服务端往往需要“使用一些客户端的能力”。但是这样的话,会导致服务端和客户端之间的依赖关系混乱、架构上下分层不明确。如:在代码示例一,如果反过来, worker.ts 需要调用 index.ts 中的能力的时候,那么就需要在 index.ts 中维护一个 handlers,这样的弊端显而易见,worker.tsindex.ts 之间互为对方的服务端和客户端,这会导致循环混乱、分层无效。

jsonrpc-rx

针对上述的两个痛点,jsonrpc-rx 给出了解决方案。

jsonrpc-rx: 一个基于 JSON-RPC 2.0 和 响应式编程 用于 RPC 通讯的工具库。

针对痛点一

同样的例子,使用 jsonrpc-rx 实现:

(代码示例二)

// worker.ts
import { JsonrpcServer, expose } from '@jsonrpc-rx/server';

const msgSender = (msg) => self.postMessage(msg); // the message sender
const msgReceiver = (h) => (self.onmessage = (e) => h(e.data)); // the message receiver
const jsonrpcServer = new JsonrpcServer(msgSender, msgReceiver); // create jsonrpcServer

const handlers = {
add: (a: number, b: number) => a + b,
subtract: (a: number, b: number) => a - b,
};

// expose handlers
expose(jsonrpcServer, handlers);

// expose handlers type
export type HandlersType = typeof handlers;

// index.ts
import { JsonrpcClient, wrap } from '@jsonrpc-rx/client';
import { HandlersType } from './worker';

const worker = new Worker(new URL('./worker', import.meta.url));

const msgSender = (msg) => worker.postMessage(msg); // the message sender
const msgReceiver = (h) => (worker.onmessage = (e) => h(e.data)); // the message receiver
const jsonrpcClient = new JsonrpcClient(msgSender, msgReceiver); // create jsonrpcClient

const reomte = wrap(jsonrpcClient);

const res0 = await reomte.add(1, 2); // output: 3
const res1 = await reomte.subtract(2, 1); // output: 1

仅仅从这个例子看的话,好像 jsonrpc-rx 就是对于代码示例一的封装。但是如果仔细一点的话,会注意到,在 iframe.ts 将 handlers 的类型导出了,并在 index.ts 中使用了,这样的话,我们可以获得调用的类型提示:

代码示例二中,将代码示例一中的 postMessage 和 onmessage 封装为 msgSender 和 msgReceiver,将这两者传递给 jsonrpc-rx 即可实现方便地调用。

msgSender: 消息发送者,用于本端向目标端发送消息

msgReceiver: 消息接收者,用于本端接收目标端的消息

我们可以延伸一下,对于不同的情景,只要提供 msgSender 和 msgReceiver,即:如果两个进程之间能建立双工的实时通信,就能使用 jsonrpc-rx 的能力。所以 jsonrpc-rx 具有通用性,适用于在文章开头描述的所有场景!

针对痛点二

针对痛点二,jsonrpc 有两种解决的方案:

  • 支持 function 类型的参数
  • 支持发布订阅的模式

接着上面的代码示例二

(代码示例三)

// worker.ts
import { JsonrpcServer, expose, Publisher } from '@jsonrpc-rx/server';

// … 此处省略初始化 JsonrpcServer 的代码

const handlers = {
// 支持 function 类型的参数 ———— alert 应该为一个消息弹窗回调
useAlert: async (alert: (message?: any) => void) => {
setTimeout(() => alert(‘hello jsonrpc-rx !’), 5000);
},
// 发布一个可订阅的主题
timer: asSubject(({ next }: Publisher) => {
let a = 0;
const interval = setInterval(() => next(++a), 1000);
return () => clearInterval(interval);
}),
};

expose(jsonrpcServer, handlers);

export type HandlersType = typeof handlers;

// index.ts
import { JsonrpcClient, wrap } from '@jsonrpc-rx/client';
import { HandlersType } from './iframe';

// ... 此处省略初始化 JsonrpcClient 的代码

const reomte = wrap(jsonrpcClient);

// function 类型的参数
const alert = alert.bind(window);
reomte.useAlert(alert);

// 订阅主题
reomte.timer({
    next: (value) => console.log(value), // 1---2---3--...
});

本示例的代码地址:https://github.com/jsonrpc-rx/jsonrpc-rx-samples/tree/main/packages/webworker-plus

无论是支持 function 类型的参数,还是支持发布订阅的模式。我都可以统一的理解为,worker.ts 可以调用 index.ts 中定义的方法,也即:服务端调用客户端的能力。

与直接在客户端中声明能力不同的是,通过 jsonrpc-rx,服务端需要客户端提供什么样的能力,由服务端决定,如上面的例子中,useAlert 方法要求 window 提供一个弹窗方法,而不是 window 中声明一个弹窗方法,useAlert 方法去调用它。

这解决了:服务端应该主导服务的定义,对所有的客户端提供一致的调用方案。这个和依赖倒置原则有着异曲同工之妙!

小结

目前,jsonrpc-rx 的能力总结下:

  • 支持响应式

    在实际的通信过程中,响应式编程范式非常的实用。在 client 端,jsonrpc-rx 的响应式还可以和 rxjs 结合,使用 rxjs 的能力,可见于示例

  • 支持 Function 类型参数

    参数可为 Function 类型是由拦截器实现的,通过一个示例了解下如何使用 Function 类型参数。

  • 类型提示友好

    在客户端调用远程方法的时候,jsonrpc-rx 提供了友好的类型提示。

  • 支持自定义拦截器

    类似于 Axios ,jsonrpc-rx 支持自定义拦截器来处理“发送”和”接收“的消息。可以用于处理错误、记录日志

展望

面向未来 jsonrpc-rx 会逐步实现以下能力:

  • [ ] 提供更多内置的拦截器实现,如:自动重试、日志记录等,让用户更方便地使用拦截器功能。
  • [ ] 支持更多的传输格式,目前看主要是基于 JSON 的,会支持二进制提高效率。
  • [ ] 提供连接管理功能,如自动重连、负载均衡等,使得客户端和服务端的连接更可靠。
  • [ ] 支持服务发现,可以自动发现和连接后端服务。
  • [ ] 提供更多便捷的工具函数,如自动生成客户端和服务端代码,减少冗余代码的编写。
  • [ ] 提供其他语言的实现版本,如: java、go 等,使 jsonrpc-rx 有更广阔的使用场景。

参考资料

https://juejin.cn/post/7073156371008454663


这是一个从 https://juejin.cn/post/7368784048574627874 下的原始话题分离的讨论话题