返回
创建于
状态公开

在 Node.js 中使用 MQ(如 RabbitMQ 或 Kafka)来限制并发,可以通过以下方式进行。

1. 使用 RabbitMQ 限制并发

RabbitMQ 可以通过 basic.qos 方法来限制每个消费者的并发数量。这里的关键是设置 prefetch_count,这样每个消费者会一次只处理一个消息。

示例:Node.js 使用 amqplib 和 RabbitMQ 限制并发

首先,确保安装了 amqplib

bash
1npm install amqplib

然后,可以使用以下代码来限制消费者的并发数:

javascript
1const amqp = require('amqplib');
2
3async function start() {
4  const connection = await amqp.connect('amqp://localhost');
5  const channel = await connection.createChannel();
6  
7  const queue = 'task_queue';
8
9  // 声明队列
10  await channel.assertQueue(queue, { durable: true });
11
12  // 设置消费者最大并发数,每次最多处理一个消息
13  channel.prefetch(1);  // 每个消费者只会获取1个消息进行处理
14  
15  console.log('Waiting for messages. To exit press CTRL+C');
16  
17  // 消费者回调函数
18  channel.consume(queue, async (msg) => {
19    if (msg) {
20      console.log(`Received: ${msg.content.toString()}`);
21      
22      // 模拟消息处理
23      setTimeout(() => {
24        console.log(`Processed: ${msg.content.toString()}`);
25        
26        // 确认消息已处理
27        channel.ack(msg);
28      }, 1000);  // 假设每个任务处理1秒钟
29    }
30  });
31}
32
33start().catch(console.error);

在这个示例中:

  • channel.prefetch(1) 限制了每个消费者只能同时处理一个消息。
  • 通过 setTimeout 模拟消息处理的延时,确保每个消息的处理是串行的。

2. 使用 Kafka 限制并发

在 Kafka 中,可以通过控制消费者的数量和每个消费者消费的分区数来实现并发控制。如果你使用 kafkajs 库来连接 Kafka,控制并发通常是通过管理消费者实例和并行处理来完成的。

示例:Node.js 使用 kafkajs 和 Kafka 限制并发

首先,安装 kafkajs

bash
1npm install kafkajs

然后,编写代码来限制消费者的并发:

javascript
1const { Kafka } = require('kafkajs');
2
3async function start() {
4  const kafka = new Kafka({
5    clientId: 'my-app',
6    brokers: ['localhost:9092']
7  });
8
9  const consumer = kafka.consumer({ groupId: 'test-group' });
10
11  await consumer.connect();
12  await consumer.subscribe({ topic: 'my-topic', fromBeginning: true });
13
14  // 设置并发数为 2,意味着同时会有 2 个消息被消费
15  const concurrencyLimit = 2;
16  let processingCount = 0;
17
18  await consumer.run({
19    eachMessage: async ({ topic, partition, message }) => {
20      if (processingCount >= concurrencyLimit) {
21        console.log('Waiting for a slot to process the message...');
22        return;
23      }
24
25      processingCount++;
26      console.log(`Processing message: ${message.value.toString()}`);
27
28      // 模拟消息处理
29      setTimeout(() => {
30        console.log(`Processed message: ${message.value.toString()}`);
31        processingCount--;
32      }, 1000);  // 模拟1秒的处理时间
33    }
34  });
35}
36
37start().catch(console.error);

在这个示例中:

  • 使用 kafkajs 库连接到 Kafka。
  • 控制并发的方式是通过检查 processingCount 来判断是否可以处理新的消息。当正在处理的消息数达到 concurrencyLimit 时,新的消息将被等待处理。
  • setTimeout 用来模拟消息处理时间。

总结

  • RabbitMQ:使用 prefetch(1) 来限制每个消费者的并发数量。
  • Kafka:通过控制消息处理的逻辑(如 processingCount)来限制并发数。

这些方法可以帮助你在 Node.js 中实现基于消息队列的并发控制,避免系统过载。


如果你想控制 总的消费数量,即在多个消费者之间共享限制的并发数(比如总的并发数不超过 4),那就需要在消费者之间共享一个计数器来动态地控制并发。

RabbitMQ 本身并没有直接提供全局控制总并发数量的功能,它只支持每个消费者独立控制 prefetch_count。不过,结合一些额外的逻辑,你可以在消费者之间实现一个共享的并发控制。

实现方案:在多个消费者之间共享并发限制

可以通过以下方式实现:

  1. 使用 Redis 或内存共享变量来跟踪当前正在处理的任务数量(即并发数)。
  2. 每个消费者在处理消息之前,检查当前并发数是否达到限制(例如 4)。如果达到限制,它就等待。
  3. 当一个消费者处理完一个消息,它会通知其他消费者并减少并发计数。

以下是一个用 Node.js + RabbitMQ + Redis 来实现的示例:

1. 安装依赖

首先,确保安装了必要的依赖:

bash
1npm install amqplib redis

2. 示例代码

javascript
1const amqp = require('amqplib');
2const redis = require('redis');
3
4const CONCURRENCY_LIMIT = 4;  // 总的并发数限制
5const REDIS_KEY = 'concurrent_jobs';  // Redis 中的并发计数器键
6
7// 创建 Redis 客户端
8const redisClient = redis.createClient();
9
10async function startConsumer(consumerId) {
11  const connection = await amqp.connect('amqp://localhost');
12  const channel = await connection.createChannel();
13  const queue = 'task_queue';
14
15  await channel.assertQueue(queue, { durable: true });
16
17  // 消费者开始工作
18  console.log(`Consumer ${consumerId} started.`);
19
20  // 每次只获取 1 个消息进行处理
21  channel.prefetch(1);
22
23  // 消费者回调
24  channel.consume(queue, async (msg) => {
25    // 尝试获取锁,查看是否可以消费
26    let currentConcurrency = await getCurrentConcurrency();
27
28    if (currentConcurrency >= CONCURRENCY_LIMIT) {
29      console.log(`Consumer ${consumerId} waiting...`);
30      // 如果并发数达到限制,等待一段时间再检查
31      setTimeout(() => {
32        channel.nack(msg, false, true);  // 将消息重新放回队列
33      }, 1000);
34      return;
35    }
36
37    // 增加并发计数器
38    await increaseConcurrency();
39
40    console.log(`Consumer ${consumerId} processing: ${msg.content.toString()}`);
41    
42    // 模拟任务处理
43    setTimeout(async () => {
44      console.log(`Consumer ${consumerId} finished processing: ${msg.content.toString()}`);
45      
46      // 任务完成,减少并发计数器
47      await decreaseConcurrency();
48      
49      // 确认消息已处理
50      channel.ack(msg);
51    }, 1000);  // 模拟 1 秒的处理时间
52  });
53}
54
55// 获取当前的并发数量
56async function getCurrentConcurrency() {
57  return new Promise((resolve, reject) => {
58    redisClient.get(REDIS_KEY, (err, reply) => {
59      if (err) return reject(err);
60      resolve(parseInt(reply) || 0);
61    });
62  });
63}
64
65// 增加并发计数
66async function increaseConcurrency() {
67  return new Promise((resolve, reject) => {
68    redisClient.incr(REDIS_KEY, (err) => {
69      if (err) return reject(err);
70      resolve();
71    });
72  });
73}
74
75// 减少并发计数
76async function decreaseConcurrency() {
77  return new Promise((resolve, reject) => {
78    redisClient.decr(REDIS_KEY, (err) => {
79      if (err) return reject(err);
80      resolve();
81    });
82  });
83}
84
85// 启动多个消费者
86async function start() {
87  for (let i = 1; i <= 5; i++) {
88    // 启动 5 个消费者,每个消费者的 ID 是 i
89    startConsumer(i);
90  }
91}
92
93start().catch(console.error);

3. 代码说明

  • Redis 用作并发计数器:我们使用 Redis 来存储当前正在处理的任务数。通过 incrdecr 来增加或减少正在进行中的任务数。
  • 消费者逻辑
    • 每个消费者在处理之前会检查当前并发数(通过 getCurrentConcurrency())。
    • 如果并发数达到限制(如 CONCURRENCY_LIMIT = 4),它会等待一段时间后重新尝试(使用 nack 将消息重新放回队列)。
    • 当一个消费者处理完任务后,它会减少并发计数(通过 decreaseConcurrency()),并确认消息。
  • 并发控制:通过 Redis 中的计数器(concurrent_jobs),实现了多个消费者之间的并发共享限制。每个消费者在处理消息时都会检查是否达到并发限制,如果是则等待。

4. 运行

确保 RabbitMQ 和 Redis 已启动,然后运行该代码:

bash
1node your_script.js

在这个方案中,你可以控制系统总的并发消费数(例如 4),并且通过多个消费者进行消费,确保总的并发数不会超过限制。