链上监听

监听代币

以 USDT 为例

代币地址:https://etherscan.io/address/0xdac17f958d2ee523a2206206994597c13d831ec7

过去转账事件

监听事件的方法:https://web3js.readthedocs.io/en/v1.8.1/web3-eth-contract.html#contract-events

1
2
3
4
5
6
7
8
9
10
11
12
13
  const abi = [...]
const add = "0xdAC17F958D2ee523a2206206994597C13D831ec7"
const usdt = new web3.eth.Contract(abi, add);
// 查询USDT合约最新一百个区块的Transfer事件
console.log("查询USDT合约最新一百个区块的Transfer事件")
await usdt.getPastEvents('Transfer',
{
fromBlock: blocknumber - 100,
toBlock: 'latest'
},
(err, events) => {
console.log(events);
});

实时转账事件

订阅事件:https://web3js.readthedocs.io/en/v1.8.1/web3-eth-subscribe.html#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
const Web3 = require('web3');
const web3 = new Web3('wss://eth-goerli.g.alchemy.com/v2/0wp9jOR9zJMkKs6HJ_m8ksTZs-ivy7q3');

// 为了告诉web3.eth.subscribe我们应该跟踪哪些事件,我们可以添加以下过滤器:
let options = {
// 可以指定某一代币合约,若不指定,则全局监听
address: '0x326C977E6efc84E512bB9C30f76E30c160eD06FB',
topics: [
web3.utils.sha3('Transfer(address,address,uint256)')
]
};

const abi = [
{
"constant": true,
"inputs": [],
"name": "symbol",
"outputs": [
{
"name": "",
"type": "string"
}
],
"payable": false,
"stateMutability": "view",
"type": "function"
},
{
"constant": true,
"inputs": [],
"name": "decimals",
"outputs": [
{
"name": "",
"type": "uint8"
}
],
"payable": false,
"stateMutability": "view",
"type": "function"
}
];

// 通过使用web3.eth.subscribeweb3.js中的函数;
// 我们可以订阅这些代币合约发出的事件,让我们能够在每次新的代币转移发生时对其进行跟踪。
// 通过传递我们刚刚设置的过滤器来启动订阅:
let subscription = web3.eth.subscribe('logs', options);

// 从智能合约中收集更多信息
async function collectData(contract) {
const [decimals, symbol] = await Promise.all([
contract.methods.decimals().call(),
contract.methods.symbol().call()
]);
return { decimals, symbol };
}

// 每次发现新的 ERC-20 交易时调用该函数
subscription.on('data', event => {
if (event.topics.length == 3) {
// 使用 ERC-20 ABI 对其进行解码
let transaction = web3.eth.abi.decodeLog([{
type: 'address',
name: 'from',
indexed: true
}, {
type: 'address',
name: 'to',
indexed: true
}, {
type: 'uint256',
name: 'value',
indexed: false
}],
event.data,
[event.topics[1], event.topics[2], event.topics[3]]);

const contract = new web3.eth.Contract(abi, event.address)

collectData(contract).then(contractData => {
const unit = Object.keys(web3.utils.unitMap).find(key => web3.utils.unitMap[key] === web3.utils.toBN(10).pow(web3.utils.toBN(contractData.decimals)).toString());

console.log(`Transfer of ${web3.utils.fromWei(transaction.value, unit)} ${contractData.symbol} from ${transaction.from} to ${transaction.to}`)

// if (transaction.from == '0x495f947276749ce646f68ac8c248420045cb7b5e') { console.log('Specified address sent an ERC-20 token!') };
// if (transaction.to == '0x495f947276749ce646f68ac8c248420045cb7b5e') { console.log('Specified address received an ERC-20 token!') };
// if (transaction.from == '0xBC4CA0EdA7647A8aB7C2061c2E118A18a936f13D' && event.address == '0x6b175474e89094c44da98b954eedeac495271d0f') { console.log('Specified address transferred specified token!') }; // event.address contains the contract address
// if (event.address == '0x6b175474e89094c44da98b954eedeac495271d0f') { console.log('Specified ERC-20 transfer!') };

})
}
});

// 查看订阅是否成功启动或是否发生任何错误
subscription.on('error', err => { throw err });
subscription.on('connected', nr => console.log('Subscription on ERC-20 started with ID %s', nr));

image-20221201183345989

所有持币人的余额

web3js

第一种方法

查询到过去区块的event,拿到to地址之后,查询balanceOf来获取余额

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 存储从事件中获取到的to地址
const eventsData = []
// 存储所有的地址和相应的余额
const result = []
// 获取地址
await usdt.getPastEvents('Transfer',
{
fromBlock: blocknumber - 1,
toBlock: 'latest'
},
(err, events) => {
const l = events.length
console.log(".....", l);
for (let i = 0; i < l; i++) {
const toAdd = events[i].returnValues.to
console.log(".....", toAdd);
eventsData.push(toAdd)
}
console.log(eventsData)
});
// 获取余额
const len2 = eventsData.length
for (let i = 0; i < len2; i++) {
await usdt.methods.balances(eventsData[i]).call().then(function (amount) {
console.log('balances:', amount);
const data = {
"address": eventsData[i],
"amount": amount
}
result.push(data)

});
// 写入json文件
let str = JSON.stringify(result, "", "\t")
fs.writeFile('data.json', str, function (err) {
if (err) { res.status(500).send('Server is error...') }
})
}

但是这种方法有一些问题

  1. 一个地址多次接收到转账,account 地址重复
  2. 查询 balanceOf 会受到网络速度以及provider rate limit的限制,比较低效

第二种方法

拿到所有的 usdt 的address和amount,拉取从合约部署开始到现在的所有 Transfer event,并自己计算出所涉及到地址的余额

首先是最近一万个区块,可以在本机上完成

由于 alchemy 等节点的限制,每次获取的事件不能过多,于是写了一个循环来分批获取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
//  一个 k-v 对象
const accountdata = {}
let eventdata = []

for (let i = 0; i < 10; i++) {
blocknumber -= 1000
await usdt.getPastEvents('Transfer',
{
fromBlock: blocknumber - 1000,
toBlock: blocknumber
},
(err, events) => {
// 拼接数组
eventdata = eventdata.concat(events)

});

const l2 = eventdata.length
console.log(".....", l2);
}

// 计算
const l = eventdata.length
for (let i = 0; i < l; i++) {
const from = eventdata[i].returnValues.from
const to = eventdata[i].returnValues.to
const amount = parseInt(eventdata[i].returnValues.value)
// 如果是第一次出现的地址,进行初始化操作
if (accountdata[from] == undefined) {
accountdata[from] = 0
// console.log("...1")
}
if (accountdata[to] == undefined) {
accountdata[to] = 0
// console.log("...2")

}
accountdata[from] -= amount
accountdata[to] += amount
}
// console.log(accountdata)
//写入json文件
let str = JSON.stringify(accountdata, "", "\t")
fs.writeFile('data.json', str, function (err) {
if (err) { res.status(500).send('Server is error...') }
})

从初始区块开始的拉取和计算,需要借助服务器来进行

但在本机,拉取仍然可以克服 js 的 memory 限制

做法是每拉取一万个区块的数据,就对 json 文件进行一次写入,并清除 js 中数组存储的数据,防止 memory 溢出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
const accountdata = {}
let eventdata = []

for (let i = 0; i < 1000; i++) {
blocknumber -= 1000
await usdt.getPastEvents('Transfer',
{
fromBlock: blocknumber - 1000,
toBlock: blocknumber
},
(err, events) => {
eventdata = eventdata.concat(events)
if (i % 10 == 0) {
let str = JSON.stringify(eventdata, "", "\t")
fs.writeFile('test7/' + i + 'data.json', str, function (err) {
if (err) { res.status(500).send('Server is error...') }
})
eventdata = []
}
});

const l2 = eventdata.length
console.log(".....", l2);
}

监听账户

所有交易活动

利用 web3.eth.getBlock 来扫块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 交易检查器
module.exports = web3 => {
// 在这里填入x
const account = '0x4281eCF07378Ee595C564a59048801330f3084eE'.toLowerCase();

return async function checkLastBlock() {
let block = await web3.eth.getBlock('latest');
console.log(`[*] Searching block ${block.number}...`);
if (block && block.transactions) {
for (let txHash of block.transactions) {
let tx = await web3.eth.getTransaction(txHash);
// console.log(tx.from)
if (account == tx.from.toLowerCase()) {
let lastBlockNumber = block.number
console.log(`[+] Transaction found on block ${lastBlockNumber}`);
console.log({
// txdata: tx,
toAddress: tx.to,
// ether 金额
ETHvalue: web3.utils.fromWei(tx.value, 'ether'),
timestamp: new Date()
});
// 解析出input中的内容
let inputdata = await web3.eth.abi.decodeParameters(
// ERC20 transfer method args
[
{ internalType: 'address', name: 'to', type: 'address' },
{ internalType: 'uint256', name: 'value', type: 'uint256' },
],
`0x${tx.input.substring(10)}`
);
console.log(inputdata)
}
}
}
}
}

底部的间隔功能每7秒检查一次当前区块。我选择此数字是因为以太坊的平均出块时间为15秒,我们不想错过任何区块。该程序的问题在于它不依赖统计异常值。例如如果一个区块在7秒内被挖掘,则可能会完全丢失该区块。而且如果我们尝试通过减少轮询间隔来缓解这种情况,则会发现我们需要一个非常快速的Internet连接来处理所有异步网络I/O。

1
2
3
4
5
6
7
8
9
10
11
12
13
const Web3 = require('web3');

const BuildTransactionChecker = require('./transactionChecker');
const CreateClient = require('./ethClient');

const web3 = CreateClient(Web3);
const checkBlock = BuildTransactionChecker(web3)

// 每7秒检查一次当前区块
// 以太坊平均出块时间为15秒
setInterval(() => {
checkBlock();
}, 7000)

image-20221201124441267

余额

结合合约和js的库:Ethereum Balance Checker

获取地址余额

参数

  • provider: Web3 | Ethers.Provider- 用于合约调用的提供者。
  • address: string- 查询余额的地址
  • tokens: string[]- 代币合约地址数组。仅支持 ERC20 代币。
  • options?: Options- 合约选项,选项见上。

退货

1
2
3
4
5
6
7
8
Promise<{
// Ether balance
"0x0": "100",
// Token balances
"0x123...": "500",
"0x456...": "100000",
...
}>

例子

1
2
3
4
5
6
7
8
9
import Web3 from 'web3';
import { getAddressBalances } from 'eth-balance-checker/lib/web3';

const web3 = new Web3(...);
const address = '0x123...';
const tokens = ['0x0', '0x456...'];
getAddressBalances(web3, address, tokens).then(balances => {
console.log(balances); // { "0x0": "100", "0x456...": "200" }
});

待处理交易

要在以太坊网络编写或者更新任何内容,需要有人创建,签署和发送交易。交易是外部世界与以太坊网络通信的方式。当发送到以太坊网络时,交易会停留在称为“mempool”的队列中,交易等待旷工被处理——- 处于这种等待交易称为待处理交易。发送交易所需要的少量费用称为gas;交易被旷工包含在一个区块中,并且根据它们包含的给旷工的gas 价格来确定优先级 。

查看这里, 将得到关于内存池和待处理交易的更多信息。

通过检查待处理的交易,可以执行以下操作:

  • 估计gas:理论上我们可以查看待处理的交易来预测下一个区块的最优gas价格。
  • 用于交易分析:我们可以分析去中心化交易所中的待处理交易,以便预测市场趋势。
  • 交易抢跑:在 DeFi 中,你可以预览即将到来的与价格(预言机)相关的交易,并可能对 MKR、COMP 和其他协议的保险库发出清算。

应用此方法我们可以完成一个简单的套利机器人

ether.js

订阅事件:ether.js Event

我们将使用WebSockets处理这些待处理的交易流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
var ethers = require("ethers");
var url = "wss";

var init = function () {
var customWsProvider = new ethers.providers.WebSocketProvider(url);
console.log(customWsProvider.listeners.toString())
customWsProvider.on("pending", (tx) => {
customWsProvider.getTransaction(tx).then(function (transaction) {
console.log(transaction);
});
});

customWsProvider._websocket.on("error", async () => {
console.log(`Unable to connect to ${ep.subdomain} retrying in 3s...`);
setTimeout(init, 3000);
});
customWsProvider._websocket.on("close", async (code) => {
console.log(
`Connection lost with code ${code}! Attempting reconnect in 3s...`
);
customWsProvider._websocket.terminate();
setTimeout(init, 3000);
});
};

init();

web3.js

订阅事件:https://web3js.readthedocs.io/en/v1.8.1/web3-eth-subscribe.html#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// const { default: Web3 } = require('web3')

const Web3 = require('web3')

class TransactionChecker {
Web3;
web3ws;
account;
subscription;

constructor(projectId, account) {
this.web3ws = new Web3(new Web3.providers.WebsocketProvider('wss://eth-goerli.g.alchemy.com/v2/' + projectId,
{
clientConfig: {
maxReceivedFrameSize: 100000000,
maxReceivedMessageSize: 100000000,
}
}
));
this.web3 = new Web3(new Web3.providers.HttpProvider('https://eth-goerli.g.alchemy.com/v2/' + projectId));
this.account = account.toLowerCase();
}

subscribe(topic) {
this.subscription = this.web3ws.eth.subscribe(topic, (err, res) => {
if (err) console.error(err);
});
}

watchTransactions() {
console.log('Watching all pending transactions...');
this.subscription.on('data', async (txHash) => {
// setTimeout(async () => {
try {
let tx = await this.web3.eth.getTransaction(txHash);
if (tx != null) {
// console.log(tx.from);
// 指定账户
if (this.account == tx.from.toLowerCase()) {
console.log({
addressFrom: tx.from,
addressTo: tx.to,
value: this.web3.utils.fromWei(tx.value, 'ether'),
timestamp: new Date()
})
}
}
} catch (err) {
console.error(err);
}
// }, 60000)
})
}
}

let txChecker = new TransactionChecker('KEY', '0x4281eCF07378Ee595C564a59048801330f3084eE');
txChecker.subscribe('pendingTransactions');
txChecker.watchTransactions();

参考资料


链上监听
http://sissice.github.io/2022/12/02/monitor/
作者
Sissice
发布于
2022年12月2日
许可协议