3.1.0稳定版
This commit is contained in:
@@ -5,7 +5,8 @@ const { EventEmitter } = require('events');
|
||||
const CFG = {
|
||||
protocol: process.env.RABBIT_PROTOCOL || 'amqp', // 'amqp' | 'amqps'
|
||||
// host: process.env.RABBIT_HOST || '192.168.1.144',
|
||||
host: process.env.RABBIT_HOST || 'crawlclient.api.yolozs.com',
|
||||
// host: process.env.RABBIT_HOST || 'crawlclient.api.yolozs.com',
|
||||
host: process.env.RABBIT_HOST || '47.79.98.113',
|
||||
port: Number(process.env.RABBIT_PORT || 5672),
|
||||
user: process.env.RABBIT_USER || 'tkdata',
|
||||
pass: process.env.RABBIT_PASS || '6rARaRj8Z7UG3ahLzh',
|
||||
@@ -58,31 +59,31 @@ async function createConnection() {
|
||||
// 心跳超时常见,避免重复噪音
|
||||
const msg = e?.message || String(e);
|
||||
if (msg && /heartbeat/i.test(msg)) {
|
||||
console.error('[AMQP] connection error (heartbeat):', msg);
|
||||
console.error('[AMQP] 连接错误 (心跳):', msg);
|
||||
} else {
|
||||
console.error('[AMQP] connection error:', msg);
|
||||
console.error('[AMQP] 连接错误:', msg);
|
||||
}
|
||||
emitter.emit('error', e);
|
||||
});
|
||||
|
||||
connection.on('close', () => {
|
||||
if (closing) return; // 正在关闭时不重连
|
||||
console.error('[AMQP] connection closed');
|
||||
console.error('[AMQP] 连接已关闭');
|
||||
conn = null; pubCh = null; conCh = null;
|
||||
scheduleReconnect();
|
||||
});
|
||||
|
||||
// Broker 侧内存/磁盘压力会 block 连接
|
||||
connection.on('blocked', (reason) => {
|
||||
console.warn('[AMQP] connection blocked by broker:', reason);
|
||||
console.warn('[AMQP] 连接被代理阻塞::', reason);
|
||||
emitter.emit('blocked', reason);
|
||||
});
|
||||
connection.on('unblocked', () => {
|
||||
console.log('[AMQP] connection unblocked');
|
||||
console.log('[AMQP] 链接解锁');
|
||||
emitter.emit('unblocked');
|
||||
});
|
||||
|
||||
console.log(`[AMQP] connected to ${CFG.host} (hb=${CFG.heartbeat}s)`);
|
||||
console.log(`[AMQP] 已连接到 ${CFG.host} (hb=${CFG.heartbeat}s)`);
|
||||
return connection;
|
||||
}
|
||||
|
||||
@@ -92,13 +93,13 @@ async function ensureChannels() {
|
||||
|
||||
if (!pubCh) {
|
||||
pubCh = await conn.createConfirmChannel();
|
||||
pubCh.on('error', e => console.error('[AMQP] pub channel error:', e?.message || e));
|
||||
pubCh.on('error', e => console.error('[AMQP] 通道错误:', e?.message || e));
|
||||
pubCh.on('close', () => { pubCh = null; if (!closing) scheduleReconnect(); });
|
||||
}
|
||||
|
||||
if (!conCh) {
|
||||
conCh = await conn.createChannel();
|
||||
conCh.on('error', e => console.error('[AMQP] con channel error:', e?.message || e));
|
||||
conCh.on('error', e => console.error('[AMQP] 通道错误:', e?.message || e));
|
||||
conCh.on('close', () => { conCh = null; if (!closing) scheduleReconnect(); });
|
||||
}
|
||||
}
|
||||
@@ -117,12 +118,12 @@ function scheduleReconnect() {
|
||||
reconnecting = false;
|
||||
backoff = 1000;
|
||||
emitter.emit('reconnected');
|
||||
console.log('[AMQP] reconnected and consumers resumed');
|
||||
console.log('[AMQP] 重新连接并恢复了消费者');
|
||||
} catch (e) {
|
||||
const base = Math.min(backoff, MAX_BACKOFF);
|
||||
// 加抖动,避免雪崩:在 75%~125% 之间浮动
|
||||
const jitter = base * (0.75 + Math.random() * 0.5);
|
||||
console.warn(`[AMQP] reconnect failed: ${e?.message || e}; retry in ${Math.round(jitter)}ms`);
|
||||
console.warn(`[AMQP] 重连失败: ${e?.message || e}; retry in ${Math.round(jitter)}ms`);
|
||||
backoff = Math.min(backoff * 1.6, MAX_BACKOFF);
|
||||
reconnectTimer = setTimeout(attempt, jitter);
|
||||
}
|
||||
@@ -193,7 +194,7 @@ async function startOneConsumer(queueName, onMessage, options = {}, isResume = f
|
||||
consumers.set(queueName, { onMessage, options, consumerTag: consumeResult.consumerTag });
|
||||
}
|
||||
|
||||
console.log(`[*] consuming "${queueName}" (prefetch=${prefetch})`);
|
||||
console.log(`[*] 消费 "${queueName}" (预取=${prefetch})`);
|
||||
return consumeResult;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user