// rabbitmq-live-client.js (CommonJS) const amqp = require('amqplib'); const { EventEmitter } = require('events'); const CFG = { protocol: process.env.RABBIT_PROTOCOL || 'amqp', // 'amqp' | 'amqps' // host: process.env.RABBIT_HOST || '192.168.1.144', host: process.env.RABBIT_HOST || 'crawlclient.api.yolozs.com', port: Number(process.env.RABBIT_PORT || 5672), user: process.env.RABBIT_USER || 'tkdata', pass: process.env.RABBIT_PASS || '6rARaRj8Z7UG3ahLzh', vhost: process.env.RABBIT_VHOST || '/', heartbeat: Number(process.env.RABBIT_HEARTBEAT || 60), // <-- 关键:心跳 frameMax: Number(process.env.RABBIT_FRAME_MAX || 0), // 0=默认;可调大以减少分片 }; let conn = null; let pubCh = null; // 发布 Confirm Channel let conCh = null; // 消费 Channel const emitter = new EventEmitter(); const consumers = new Map(); // queueName -> { onMessage, options, consumerTag } let reconnecting = false; let closing = false; let backoff = 1000; // ms const MAX_BACKOFF = 15000; let reconnectTimer = null; // —— 工具:序列化消息 function toBuffer(payload) { if (Buffer.isBuffer(payload)) return payload; if (typeof payload === 'string') return Buffer.from(payload); return Buffer.from(JSON.stringify(payload)); } // —— 内部:建立连接(含心跳、keepalive、事件) async function createConnection() { const connection = await amqp.connect({ protocol: CFG.protocol, hostname: CFG.host, port: CFG.port, username: CFG.user, password: CFG.pass, vhost: CFG.vhost, heartbeat: CFG.heartbeat, frameMax: CFG.frameMax > 0 ? CFG.frameMax : undefined, // 也可用 URL 形式:`amqp://u:p@host:5672/vhost?heartbeat=60` }); // 打开 TCP keepalive,降低 NAT/空闲超时断开的概率 try { const stream = connection.stream || connection.socket; if (stream?.setKeepAlive) stream.setKeepAlive(true, 15_000); // 15s } catch (_) { } connection.on('error', (e) => { // 心跳超时常见,避免重复噪音 const msg = e?.message || String(e); if (msg && /heartbeat/i.test(msg)) { console.error('[AMQP] connection error (heartbeat):', msg); } else { console.error('[AMQP] connection error:', msg); } emitter.emit('error', e); }); connection.on('close', () => { if (closing) return; // 正在关闭时不重连 console.error('[AMQP] connection closed'); conn = null; pubCh = null; conCh = null; scheduleReconnect(); }); // Broker 侧内存/磁盘压力会 block 连接 connection.on('blocked', (reason) => { console.warn('[AMQP] connection blocked by broker:', reason); emitter.emit('blocked', reason); }); connection.on('unblocked', () => { console.log('[AMQP] connection unblocked'); emitter.emit('unblocked'); }); console.log(`[AMQP] connected to ${CFG.host} (hb=${CFG.heartbeat}s)`); return connection; } // —— 内部:确保连接和通道存在 async function ensureChannels() { if (!conn) conn = await createConnection(); if (!pubCh) { pubCh = await conn.createConfirmChannel(); pubCh.on('error', e => console.error('[AMQP] pub channel error:', e?.message || e)); pubCh.on('close', () => { pubCh = null; if (!closing) scheduleReconnect(); }); } if (!conCh) { conCh = await conn.createChannel(); conCh.on('error', e => console.error('[AMQP] con channel error:', e?.message || e)); conCh.on('close', () => { conCh = null; if (!closing) scheduleReconnect(); }); } } // —— 内部:安排重连(指数退避 + 抖动,且只触发一个循环) function scheduleReconnect() { if (reconnecting || closing) return; reconnecting = true; if (reconnectTimer) { clearTimeout(reconnectTimer); reconnectTimer = null; } const attempt = async () => { if (closing) return; try { await ensureChannels(); await resumeConsumers(); // 恢复所有消费 reconnecting = false; backoff = 1000; emitter.emit('reconnected'); console.log('[AMQP] reconnected and consumers resumed'); } catch (e) { const base = Math.min(backoff, MAX_BACKOFF); // 加抖动,避免雪崩:在 75%~125% 之间浮动 const jitter = base * (0.75 + Math.random() * 0.5); console.warn(`[AMQP] reconnect failed: ${e?.message || e}; retry in ${Math.round(jitter)}ms`); backoff = Math.min(backoff * 1.6, MAX_BACKOFF); reconnectTimer = setTimeout(attempt, jitter); } }; reconnectTimer = setTimeout(attempt, backoff); } // —— 内部:恢复所有消费者 async function resumeConsumers() { if (!conCh) return; for (const [queue, c] of consumers.entries()) { try { if (c.consumerTag) await conCh.cancel(c.consumerTag); } catch (_) { } const tag = await startOneConsumer(queue, c.onMessage, c.options, true); c.consumerTag = tag.consumerTag; } } // —— 内部:启动一个消费者 async function startOneConsumer(queueName, onMessage, options = {}, isResume = false) { await ensureChannels(); const { prefetch = 1, durable = true, assertQueue = true, requeueOnError = false, // 可选:exclusive, arguments, dead-letter 等 } = options; if (assertQueue) { await conCh.assertQueue(queueName, { durable }); } await conCh.prefetch(prefetch); const consumeResult = await conCh.consume( queueName, async (msg) => { if (!msg) return; const raw = msg.content; const text = raw?.toString?.() ?? ''; let json; try { json = JSON.parse(text); } catch (_) { /* 忽略解析失败 */ } const payload = { raw, text, json, fields: msg.fields, properties: msg.properties, ack: () => { try { conCh.ack(msg); } catch (_) { } }, nack: (requeue = requeueOnError) => { try { conCh.nack(msg, false, requeue); } catch (_) { } }, }; try { emitter.emit('message', queueName, payload); await onMessage(payload); // 业务回调 payload.ack(); } catch (err) { emitter.emit('handlerError', queueName, err, payload); payload.nack(requeueOnError); } }, { noAck: false } ); if (!isResume) { consumers.set(queueName, { onMessage, options, consumerTag: consumeResult.consumerTag }); } console.log(`[*] consuming "${queueName}" (prefetch=${prefetch})`); return consumeResult; } // —— 对外 API:启动消费 async function startConsumer(queueName, onMessage, options = {}) { if (!queueName) throw new Error('queueName 必填'); if (typeof onMessage !== 'function') throw new Error('onMessage 回调必填'); const res = await startOneConsumer(queueName, onMessage, options, false); return { emitter, async stop() { const c = consumers.get(queueName); if (c?.consumerTag && conCh) { await conCh.cancel(c.consumerTag).catch(() => { }); } consumers.delete(queueName); return res; } }; } // —— 对外 API:发送到队列(支持 confirm) async function publishToQueue(queueName, payload, options = {}) { if (!queueName) throw new Error('queueName 必填'); await ensureChannels(); const { durable = true, persistent = true, assertQueue = true, confirm = true, headers = {}, mandatory = false, // 可选:不可路由返回 } = options; if (assertQueue) { await pubCh.assertQueue(queueName, { durable }); } const ok = pubCh.sendToQueue(queueName, toBuffer(payload), { persistent, headers, mandatory }); if (!ok) await new Promise(r => pubCh.once('drain', r)); if (confirm) await pubCh.waitForConfirms(); } // —— 对外 API:发送到交换机(支持 confirm) async function publishToExchange(exchange, routingKey, payload, options = {}) { if (!exchange) throw new Error('exchange 必填'); await ensureChannels(); const { type = 'direct', durable = true, assertExchange = true, confirm = true, persistent = true, headers = {}, mandatory = false, } = options; if (assertExchange) { await pubCh.assertExchange(exchange, type, { durable }); } const ok = pubCh.publish(exchange, routingKey || '', toBuffer(payload), { persistent, headers, mandatory }); if (!ok) await new Promise(r => pubCh.once('drain', r)); if (confirm) await pubCh.waitForConfirms(); } // —— 对外 API:主动重连(给 Electron 恢复/网络变化时调用) async function reconnectNow() { if (closing) return; if (reconnecting) return; try { if (pubCh) await pubCh.close().catch(() => { }); if (conCh) await conCh.close().catch(() => { }); if (conn) await conn.close().catch(() => { }); } catch (_) { } pubCh = null; conCh = null; conn = null; reconnecting = false; scheduleReconnect(); } // —— 关闭 async function close() { closing = true; if (reconnectTimer) { clearTimeout(reconnectTimer); reconnectTimer = null; } try { // 取消所有消费者 for (const [q, c] of consumers.entries()) { if (c?.consumerTag && conCh) { await conCh.cancel(c.consumerTag).catch(() => { }); } } } catch (_) { } consumers.clear(); try { if (pubCh) await pubCh.close(); } catch (_) { } try { if (conCh) await conCh.close(); } catch (_) { } try { if (conn) await conn.close(); } catch (_) { } pubCh = null; conCh = null; conn = null; } // —— 进程信号(可选) process.once('SIGINT', async () => { try { await close(); } finally { process.exit(0); } }); process.once('SIGTERM', async () => { try { await close(); } finally { process.exit(0); } }); module.exports = { startConsumer, publishToQueue, publishToExchange, reconnectNow, close, emitter, // 可订阅 'message' / 'handlerError' / 'reconnected' / 'error' / 'blocked' / 'unblocked' };