【注:由于 farcaster 的代码库非常活跃,若本文内容和你看到的代码之间有出入,请以代码为准。】
近期对 web3 当红项目 farcaster 代码进行了粗略的研究,重点关注了其 decentralized node 实现 hub。并且基于学到的知识以 quick and dirty 的方式实现了一个简单的原型,本文则是这段时间内学习成果的汇总。
之所以研究它,原因无它:正在构思一个 decentralized node 实现,而 farcaster 的技术栈与我们团队高度吻合。
关于 Farcaster Hub 的一点背景
如果作为 web3 从业者还不清楚 farcaster 为何物,那么建议充分利用搜索引擎或直接问近期崛起的 AI 服务。要是用一句话介绍的话:farcaster 就是 web3 版的 twitter。
它的整体架构如下图:
[来源:https://docs.farcaster.xyz/assets/architecture.T7tCPEnC.png]
由上图可知,hub 组成了其去中心化的网络,它也正是本文的重点。至于其他内容,在此略过不谈。
Hub 的架构如下:
[来源:https://docs.farcaster.xyz/assets/hub.PU02ORHT.png]
作为一个去中心化节点,hub 的职责:
- 保存 offchain 数据
- 在这里就是用户发的那些 cast、comment、mention 等等,简单来说就是消息以及相关的数据。
- 处理节点间通信请求
- 作为一个建立在 p2p 协议之上的应用,这一点不难理解。
- 同步数据
- 如 blockchain 节点一样,每个节点都保存一份完整的数据副本。但由于(来自于同个用户的)请求可能会被发送到整个网络的不同节点,那么数据同步和解决冲突自然必不可少。
- 验证其他节点的有效性
- 即网络中的节点是否有篡改数据的行为。
- api server
- 支持 https 和 grpc
最后请注意:hub node 并非一个 blockchain 节点,它只是一个 offchain 的去中心化节点。
支撑 Farcaster Hub 的关键技术
worker
hub 是一个多线程 node 应用,充分利用了 worker 技术来划分节点的不同职责:p2p node、db、merkel 同步 ……
同时还利用 ReadWriteLock 来保证线程安全。
不过,farcaster 使用的 node 自带的 worker 库,代码不免有些繁琐。在此,我推荐使用 threads.js ,它可极大简化代码编写,其 github readme 中的例子充分体现了这一点。其典型特性:
- 主线程触发子线程的执行如普通函数调用一般简单。
- 子线程使用
parentPort
与父线程通信。 - 封装了父子线程之间的事件:
termination
、message
…… - 支持线程池
- ……
熟悉 node worker 编程的小伙伴可能会说还有: piscina 。阔是经过人肉测试之后,个人更喜欢 threads.js。
RocksDB
farcaster 采用 rocksdb 来作为其节点数据库,这也是很多 blockchain node 的选择,因为其读写速度飞快。
阔是,rocksdb 在 node 生态实在更新太慢,各位可自行去搜搜它的最近一次发布是什么时候。更要命的是,其版本混乱,甚至于 rocksdb npm 的 github repo 都是在其老祖宗 leveldb github repo 之下 😓。
farcaster 后来 fork 了这个库,可是并未添加什么实质性更新,仅仅做了一些关于平台编译的微调,列位看官可点击此处自行查看。
甚至于 eth 的 node 实现 lodestar 也用的是 leveldb,而不是 rocksdb。
估计 farcaster 团队也被此弄烦了,再加上内存和性能的问题,在近期代码中,rocksdb 相关代码已经采用组合技术实现:node rust bridge + rust rocksdb cargo。在完成迁移之后,还在 twitter 上欣喜的说:内存少了,性能高了!
关于 node 和 rust addon 的集成,由于时间和精力关系,我没有深入研究。目前初步看来至少有两种流行方案:
gRPC
hub 采用 grpc 进行两方面的通信:
- client 和 hub
- p2p peers
并通过 grpc-web
来提供 http 接口。
hub 使用 ts-proto 来完成 protobuf 的代码生成,直接使用 grpc
和 grpc-web
来用于 server 端实现,至于 http server 则是用 fastify 来完成。
跟直接使用 worker 库一样,这种方式导致代码繁琐,建议采用 connectrpc 快速实现。
libp2p
libp2p 是去中心化节点实现的基础,不少 blockchain 实现也是基于它来完成,比如 eth 的 node 实现 lodestar。
但它本身是一组大的协议栈,若对于其组成部分缺乏基本了解,则无法用好,建议先阅读协议再看配置。
hub 使用 gossipsub 实现节点间的消息传播,并利用 connectionGater
来实现对于节点连接的控制。
但可能是代码实现时间较早的缘故,根据现有协议规范,若自行实现时,可以考虑若干增强:
- 将 stream multiplexer 由 mplex 切换到目前推荐的 yamux
- 使用 Pubsubpeerdiscovery 来自动发现 peer
- 并且可以实现一个 cron job 来移除无效的 peer
基本代码架子如下:
createLibp2p({
peerId: await createFromPrivKey(
await unmarshalPrivateKey(Buffer.from(peerPrivateKey, 'hex'))
),
addresses: {
listen: ['/ip4/127.0.0.1/tcp/0'],
},
connectionGater: {
denyDialPeer: async (peerId: PeerId) => {
return await notAllowed(peerId.toString());
},
denyDialMultiaddr: async (multiaddr: Multiaddr) => {
return await notAllowed(multiaddr.getPeerId()!.toString());
},
denyInboundConnection: () => {
return false;
},
denyOutboundConnection: async (peerId: PeerId) => {
return await notAllowed(peerId.toString());
},
denyInboundEncryptedConnection: async (peerId: PeerId) => {
return await notAllowed(peerId.toString());
},
denyOutboundEncryptedConnection: async (peerId: PeerId) => {
return await notAllowed(peerId.toString());
},
denyInboundUpgradedConnection: async (peerId: PeerId) => {
return await notAllowed(peerId.toString());
},
denyOutboundUpgradedConnection: async (peerId: PeerId) => {
return await notAllowed(peerId.toString());
},
denyInboundRelayReservation: async (peerId: PeerId) => {
return await notAllowed(peerId.toString());
},
denyOutboundRelayedConnection: async (peerId: PeerId) => {
return await notAllowed(peerId.toString());
},
denyInboundRelayedConnection: async (peerId: PeerId) => {
return await notAllowed(peerId.toString());
},
},
connectionManager: {
autoDialInterval: 1000,
},
transports: [tcp()],
connectionEncryption: [noise()],
streamMuxers: [yamux()],
peerDiscovery: [
mdns({
interval: 1000,
}),
pubsubPeerDiscovery(),
],
services: {
pubsub: gossipsub(),
identify: identify(),
},
})
.then(async node => {
libp2p = node;
node.addEventListener('peer:discovery', async evt => {
console.log('Discovered %s', evt.detail.id.toString());
});
node.addEventListener('peer:connect', async evt => {
console.log('Connected to %s', evt.detail.toString());
checkPeersPermissionPeriodically();
});
node.services.pubsub.addEventListener('message', message => {
if (message.detail.topic === messageTopic) {
parentPort?.postMessage({
topic: message.detail.topic,
message: message.detail.data.toString(),
});
}
});
node.services.pubsub.addEventListener('gossipsub:message', message => {});
node.services.pubsub.subscribe(messageTopic);
await node.start();
console.log('listening on addresses:');
node.getMultiaddrs().forEach(addr => {
console.log(addr.toString());
});
const stop = async () => {
await node.stop();
console.log('libp2p has stopped');
process.exit(0);
};
process.on('SIGTERM', stop);
process.on('SIGINT', stop);
})
.catch((e: any) => console.error);
}
CRDT
CRDT 全称:Conflict Free Replicated Data Types,广泛用于主主复制、文档协同编辑等场景。其特点是:最终一致性,此术语各位应该不会太陌生。
hub 采用 CRDT 来定义其 storage,采用的是 2p-set,看起来吓人其实就是:
- add 和 remove 各自记录
- merge 负责处理冲突
在研究代码的过程中,发现一个非常不错的 CRDT 系列文章,第一篇的链接:https://www.bartoszsypytkowski.com/the-state-of-a-state-based-crdts/ ,有兴趣的可以去仔细看看。同时,也可以研究一下大名鼎鼎的yjs。
Merkel 同步
hub 使用该项技术来实现节点之间的数据同步,即在保存本地消息数据时同步生成 merkel trie,在同步周期到了之后,节点交换各自的 merkel tries,以看是否有交换消息的必要。
此处让我不解的是为何 farcaster 要自行去实现一个 merkel trie,或许在当初 node 生态中尚未如现在一样有众多的 merkel tree 的实现。
如果不考虑精细的控制和性能,一种快糙猛的 merkel 同步实现方式如下:
- 确定数据分块策略,注意:一定要维持一个确定的顺序。一个 block 包含多个待同步消息,在 rocksdb 这类 kv 数据库中,使用指向实际数据的 key 即可。
- 每个 block 对应的数据表示: (merkel proof, leaf set)
- 定义数据同步消息格式和协议
- 在同步周期到时,节点随机选择一个节点获取其 merkel trie 数据,即 block 集合。
- 对于每个数据,比较 merkel proof,若相同,则略过。
- 若不同,则 diff 出缺失的 leaf,由于 leaf 本身就是指向消息的 key,因此直接由 key 即可获得对应缺失的消息。
- 同步更新完消息时,也同步更新 block 数据。
至于 merkel proof 的生成,采用成熟的 npm 即可,如 merkeltree.js。此外,openzepplin 也提供了一个实现,而且提供了安全性增强,各位可根据自己的喜好自行选择。
Farcaster 的工程实践
如众多开源项目一样,farcaster 采用的是 monorepo 实践,基于 turbo。并且广泛的采用了基于 rust 的 js/ts 工具链:
- tsup,打包
- swc,编译
- biome,格式化和 lint
关于 tooling,请参见我们近期的研究。
结语
如果想了解 decentralized app 的编写套路,farcaster 无疑是一个非常好的参考,本文列出了其中涉及的关键技术,相信此文将会激发各位的思路以及自行探索的兴趣。