加载笔记内容...
加载笔记内容...
在 Node.js 中使用 MQ(如 RabbitMQ 或 Kafka)来限制并发,可以通过以下方式进行。
RabbitMQ 可以通过 basic.qos
方法来限制每个消费者的并发数量。这里的关键是设置 prefetch_count
,这样每个消费者会一次只处理一个消息。
amqplib
和 RabbitMQ 限制并发首先,确保安装了 amqplib
:
1npm install amqplib
然后,可以使用以下代码来限制消费者的并发数:
1const amqp = require('amqplib');
2
3async function start() {
4 const connection = await amqp.connect('amqp://localhost');
5 const channel = await connection.createChannel();
6
7 const queue = 'task_queue';
8
9 // 声明队列
10 await channel.assertQueue(queue, { durable: true });
11
12 // 设置消费者最大并发数,每次最多处理一个消息
13 channel.prefetch(1); // 每个消费者只会获取1个消息进行处理
14
15 console.log('Waiting for messages. To exit press CTRL+C');
16
17 // 消费者回调函数
18 channel.consume(queue, async (msg) => {
19 if (msg) {
20 console.log(`Received: ${msg.content.toString()}`);
21
22 // 模拟消息处理
23 setTimeout(() => {
24 console.log(`Processed: ${msg.content.toString()}`);
25
26 // 确认消息已处理
27 channel.ack(msg);
28 }, 1000); // 假设每个任务处理1秒钟
29 }
30 });
31}
32
33start().catch(console.error);
在这个示例中:
channel.prefetch(1)
限制了每个消费者只能同时处理一个消息。setTimeout
模拟消息处理的延时,确保每个消息的处理是串行的。在 Kafka 中,可以通过控制消费者的数量和每个消费者消费的分区数来实现并发控制。如果你使用 kafkajs
库来连接 Kafka,控制并发通常是通过管理消费者实例和并行处理来完成的。
kafkajs
和 Kafka 限制并发首先,安装 kafkajs
:
1npm install kafkajs
然后,编写代码来限制消费者的并发:
1const { Kafka } = require('kafkajs');
2
3async function start() {
4 const kafka = new Kafka({
5 clientId: 'my-app',
6 brokers: ['localhost:9092']
7 });
8
9 const consumer = kafka.consumer({ groupId: 'test-group' });
10
11 await consumer.connect();
12 await consumer.subscribe({ topic: 'my-topic', fromBeginning: true });
13
14 // 设置并发数为 2,意味着同时会有 2 个消息被消费
15 const concurrencyLimit = 2;
16 let processingCount = 0;
17
18 await consumer.run({
19 eachMessage: async ({ topic, partition, message }) => {
20 if (processingCount >= concurrencyLimit) {
21 console.log('Waiting for a slot to process the message...');
22 return;
23 }
24
25 processingCount++;
26 console.log(`Processing message: ${message.value.toString()}`);
27
28 // 模拟消息处理
29 setTimeout(() => {
30 console.log(`Processed message: ${message.value.toString()}`);
31 processingCount--;
32 }, 1000); // 模拟1秒的处理时间
33 }
34 });
35}
36
37start().catch(console.error);
在这个示例中:
kafkajs
库连接到 Kafka。processingCount
来判断是否可以处理新的消息。当正在处理的消息数达到 concurrencyLimit
时,新的消息将被等待处理。setTimeout
用来模拟消息处理时间。prefetch(1)
来限制每个消费者的并发数量。processingCount
)来限制并发数。这些方法可以帮助你在 Node.js 中实现基于消息队列的并发控制,避免系统过载。
如果你想控制 总的消费数量,即在多个消费者之间共享限制的并发数(比如总的并发数不超过 4),那就需要在消费者之间共享一个计数器来动态地控制并发。
RabbitMQ 本身并没有直接提供全局控制总并发数量的功能,它只支持每个消费者独立控制 prefetch_count
。不过,结合一些额外的逻辑,你可以在消费者之间实现一个共享的并发控制。
可以通过以下方式实现:
以下是一个用 Node.js + RabbitMQ + Redis 来实现的示例:
首先,确保安装了必要的依赖:
1npm install amqplib redis
1const amqp = require('amqplib');
2const redis = require('redis');
3
4const CONCURRENCY_LIMIT = 4; // 总的并发数限制
5const REDIS_KEY = 'concurrent_jobs'; // Redis 中的并发计数器键
6
7// 创建 Redis 客户端
8const redisClient = redis.createClient();
9
10async function startConsumer(consumerId) {
11 const connection = await amqp.connect('amqp://localhost');
12 const channel = await connection.createChannel();
13 const queue = 'task_queue';
14
15 await channel.assertQueue(queue, { durable: true });
16
17 // 消费者开始工作
18 console.log(`Consumer ${consumerId} started.`);
19
20 // 每次只获取 1 个消息进行处理
21 channel.prefetch(1);
22
23 // 消费者回调
24 channel.consume(queue, async (msg) => {
25 // 尝试获取锁,查看是否可以消费
26 let currentConcurrency = await getCurrentConcurrency();
27
28 if (currentConcurrency >= CONCURRENCY_LIMIT) {
29 console.log(`Consumer ${consumerId} waiting...`);
30 // 如果并发数达到限制,等待一段时间再检查
31 setTimeout(() => {
32 channel.nack(msg, false, true); // 将消息重新放回队列
33 }, 1000);
34 return;
35 }
36
37 // 增加并发计数器
38 await increaseConcurrency();
39
40 console.log(`Consumer ${consumerId} processing: ${msg.content.toString()}`);
41
42 // 模拟任务处理
43 setTimeout(async () => {
44 console.log(`Consumer ${consumerId} finished processing: ${msg.content.toString()}`);
45
46 // 任务完成,减少并发计数器
47 await decreaseConcurrency();
48
49 // 确认消息已处理
50 channel.ack(msg);
51 }, 1000); // 模拟 1 秒的处理时间
52 });
53}
54
55// 获取当前的并发数量
56async function getCurrentConcurrency() {
57 return new Promise((resolve, reject) => {
58 redisClient.get(REDIS_KEY, (err, reply) => {
59 if (err) return reject(err);
60 resolve(parseInt(reply) || 0);
61 });
62 });
63}
64
65// 增加并发计数
66async function increaseConcurrency() {
67 return new Promise((resolve, reject) => {
68 redisClient.incr(REDIS_KEY, (err) => {
69 if (err) return reject(err);
70 resolve();
71 });
72 });
73}
74
75// 减少并发计数
76async function decreaseConcurrency() {
77 return new Promise((resolve, reject) => {
78 redisClient.decr(REDIS_KEY, (err) => {
79 if (err) return reject(err);
80 resolve();
81 });
82 });
83}
84
85// 启动多个消费者
86async function start() {
87 for (let i = 1; i <= 5; i++) {
88 // 启动 5 个消费者,每个消费者的 ID 是 i
89 startConsumer(i);
90 }
91}
92
93start().catch(console.error);
incr
和 decr
来增加或减少正在进行中的任务数。getCurrentConcurrency()
)。CONCURRENCY_LIMIT = 4
),它会等待一段时间后重新尝试(使用 nack
将消息重新放回队列)。decreaseConcurrency()
),并确认消息。concurrent_jobs
),实现了多个消费者之间的并发共享限制。每个消费者在处理消息时都会检查是否达到并发限制,如果是则等待。确保 RabbitMQ 和 Redis 已启动,然后运行该代码:
1node your_script.js
在这个方案中,你可以控制系统总的并发消费数(例如 4),并且通过多个消费者进行消费,确保总的并发数不会超过限制。