312 lines
10 KiB
JavaScript
312 lines
10 KiB
JavaScript
// rabbitmq-live-client.js (CommonJS)
|
||
const amqp = require('amqplib');
|
||
const { EventEmitter } = require('events');
|
||
|
||
const CFG = {
|
||
protocol: process.env.RABBIT_PROTOCOL || 'amqp', // 'amqp' | 'amqps'
|
||
// host: process.env.RABBIT_HOST || '192.168.1.144',
|
||
// host: process.env.RABBIT_HOST || 'crawlclient.api.yolozs.com',
|
||
host: process.env.RABBIT_HOST || '47.79.98.113',
|
||
port: Number(process.env.RABBIT_PORT || 5672),
|
||
user: process.env.RABBIT_USER || 'tkdata',
|
||
pass: process.env.RABBIT_PASS || '6rARaRj8Z7UG3ahLzh',
|
||
vhost: process.env.RABBIT_VHOST || '/',
|
||
heartbeat: Number(process.env.RABBIT_HEARTBEAT || 60), // <-- 关键:心跳
|
||
frameMax: Number(process.env.RABBIT_FRAME_MAX || 0), // 0=默认;可调大以减少分片
|
||
};
|
||
|
||
let conn = null;
|
||
let pubCh = null; // 发布 Confirm Channel
|
||
let conCh = null; // 消费 Channel
|
||
const emitter = new EventEmitter();
|
||
|
||
const consumers = new Map(); // queueName -> { onMessage, options, consumerTag }
|
||
|
||
let reconnecting = false;
|
||
let closing = false;
|
||
let backoff = 1000; // ms
|
||
const MAX_BACKOFF = 15000;
|
||
let reconnectTimer = null;
|
||
|
||
// —— 工具:序列化消息
|
||
function toBuffer(payload) {
|
||
if (Buffer.isBuffer(payload)) return payload;
|
||
if (typeof payload === 'string') return Buffer.from(payload);
|
||
return Buffer.from(JSON.stringify(payload));
|
||
}
|
||
|
||
// —— 内部:建立连接(含心跳、keepalive、事件)
|
||
async function createConnection() {
|
||
const connection = await amqp.connect({
|
||
protocol: CFG.protocol,
|
||
hostname: CFG.host,
|
||
port: CFG.port,
|
||
username: CFG.user,
|
||
password: CFG.pass,
|
||
vhost: CFG.vhost,
|
||
heartbeat: CFG.heartbeat,
|
||
frameMax: CFG.frameMax > 0 ? CFG.frameMax : undefined,
|
||
// 也可用 URL 形式:`amqp://u:p@host:5672/vhost?heartbeat=60`
|
||
});
|
||
|
||
// 打开 TCP keepalive,降低 NAT/空闲超时断开的概率
|
||
try {
|
||
const stream = connection.stream || connection.socket;
|
||
if (stream?.setKeepAlive) stream.setKeepAlive(true, 15_000); // 15s
|
||
} catch (_) { }
|
||
|
||
connection.on('error', (e) => {
|
||
// 心跳超时常见,避免重复噪音
|
||
const msg = e?.message || String(e);
|
||
if (msg && /heartbeat/i.test(msg)) {
|
||
console.error('[AMQP] 连接错误 (心跳):', msg);
|
||
} else {
|
||
console.error('[AMQP] 连接错误:', msg);
|
||
}
|
||
emitter.emit('error', e);
|
||
});
|
||
|
||
connection.on('close', () => {
|
||
if (closing) return; // 正在关闭时不重连
|
||
console.error('[AMQP] 连接已关闭');
|
||
conn = null; pubCh = null; conCh = null;
|
||
scheduleReconnect();
|
||
});
|
||
|
||
// Broker 侧内存/磁盘压力会 block 连接
|
||
connection.on('blocked', (reason) => {
|
||
console.warn('[AMQP] 连接被代理阻塞::', reason);
|
||
emitter.emit('blocked', reason);
|
||
});
|
||
connection.on('unblocked', () => {
|
||
console.log('[AMQP] 链接解锁');
|
||
emitter.emit('unblocked');
|
||
});
|
||
|
||
console.log(`[AMQP] 已连接到 ${CFG.host} (hb=${CFG.heartbeat}s)`);
|
||
return connection;
|
||
}
|
||
|
||
// —— 内部:确保连接和通道存在
|
||
async function ensureChannels() {
|
||
if (!conn) conn = await createConnection();
|
||
|
||
if (!pubCh) {
|
||
pubCh = await conn.createConfirmChannel();
|
||
pubCh.on('error', e => console.error('[AMQP] 通道错误:', e?.message || e));
|
||
pubCh.on('close', () => { pubCh = null; if (!closing) scheduleReconnect(); });
|
||
}
|
||
|
||
if (!conCh) {
|
||
conCh = await conn.createChannel();
|
||
conCh.on('error', e => console.error('[AMQP] 通道错误:', e?.message || e));
|
||
conCh.on('close', () => { conCh = null; if (!closing) scheduleReconnect(); });
|
||
}
|
||
}
|
||
|
||
// —— 内部:安排重连(指数退避 + 抖动,且只触发一个循环)
|
||
function scheduleReconnect() {
|
||
if (reconnecting || closing) return;
|
||
reconnecting = true;
|
||
if (reconnectTimer) { clearTimeout(reconnectTimer); reconnectTimer = null; }
|
||
|
||
const attempt = async () => {
|
||
if (closing) return;
|
||
try {
|
||
await ensureChannels();
|
||
await resumeConsumers(); // 恢复所有消费
|
||
reconnecting = false;
|
||
backoff = 1000;
|
||
emitter.emit('reconnected');
|
||
console.log('[AMQP] 重新连接并恢复了消费者');
|
||
} catch (e) {
|
||
const base = Math.min(backoff, MAX_BACKOFF);
|
||
// 加抖动,避免雪崩:在 75%~125% 之间浮动
|
||
const jitter = base * (0.75 + Math.random() * 0.5);
|
||
console.warn(`[AMQP] 重连失败: ${e?.message || e}; retry in ${Math.round(jitter)}ms`);
|
||
backoff = Math.min(backoff * 1.6, MAX_BACKOFF);
|
||
reconnectTimer = setTimeout(attempt, jitter);
|
||
}
|
||
};
|
||
|
||
reconnectTimer = setTimeout(attempt, backoff);
|
||
}
|
||
|
||
// —— 内部:恢复所有消费者
|
||
async function resumeConsumers() {
|
||
if (!conCh) return;
|
||
for (const [queue, c] of consumers.entries()) {
|
||
try {
|
||
if (c.consumerTag) await conCh.cancel(c.consumerTag);
|
||
} catch (_) { }
|
||
const tag = await startOneConsumer(queue, c.onMessage, c.options, true);
|
||
c.consumerTag = tag.consumerTag;
|
||
}
|
||
}
|
||
|
||
// —— 内部:启动一个消费者
|
||
async function startOneConsumer(queueName, onMessage, options = {}, isResume = false) {
|
||
await ensureChannels();
|
||
|
||
const {
|
||
prefetch = 1,
|
||
durable = true,
|
||
assertQueue = true,
|
||
requeueOnError = false,
|
||
// 可选:exclusive, arguments, dead-letter 等
|
||
} = options;
|
||
|
||
if (assertQueue) {
|
||
await conCh.assertQueue(queueName, { durable });
|
||
}
|
||
await conCh.prefetch(prefetch);
|
||
|
||
const consumeResult = await conCh.consume(
|
||
queueName,
|
||
async (msg) => {
|
||
if (!msg) return;
|
||
const raw = msg.content;
|
||
const text = raw?.toString?.() ?? '';
|
||
let json;
|
||
try { json = JSON.parse(text); } catch (_) { /* 忽略解析失败 */ }
|
||
|
||
const payload = {
|
||
raw, text, json,
|
||
fields: msg.fields,
|
||
properties: msg.properties,
|
||
ack: () => { try { conCh.ack(msg); } catch (_) { } },
|
||
nack: (requeue = requeueOnError) => { try { conCh.nack(msg, false, requeue); } catch (_) { } },
|
||
};
|
||
|
||
try {
|
||
emitter.emit('message', queueName, payload);
|
||
await onMessage(payload); // 业务回调
|
||
payload.ack();
|
||
} catch (err) {
|
||
emitter.emit('handlerError', queueName, err, payload);
|
||
payload.nack(requeueOnError);
|
||
}
|
||
},
|
||
{ noAck: false }
|
||
);
|
||
|
||
if (!isResume) {
|
||
consumers.set(queueName, { onMessage, options, consumerTag: consumeResult.consumerTag });
|
||
}
|
||
|
||
console.log(`[*] 消费 "${queueName}" (预取=${prefetch})`);
|
||
return consumeResult;
|
||
}
|
||
|
||
// —— 对外 API:启动消费
|
||
async function startConsumer(queueName, onMessage, options = {}) {
|
||
if (!queueName) throw new Error('queueName 必填');
|
||
if (typeof onMessage !== 'function') throw new Error('onMessage 回调必填');
|
||
const res = await startOneConsumer(queueName, onMessage, options, false);
|
||
return {
|
||
emitter,
|
||
async stop() {
|
||
const c = consumers.get(queueName);
|
||
if (c?.consumerTag && conCh) {
|
||
await conCh.cancel(c.consumerTag).catch(() => { });
|
||
}
|
||
consumers.delete(queueName);
|
||
return res;
|
||
}
|
||
};
|
||
}
|
||
|
||
// —— 对外 API:发送到队列(支持 confirm)
|
||
async function publishToQueue(queueName, payload, options = {}) {
|
||
if (!queueName) throw new Error('queueName 必填');
|
||
await ensureChannels();
|
||
|
||
const {
|
||
durable = true,
|
||
persistent = true,
|
||
assertQueue = true,
|
||
confirm = true,
|
||
headers = {},
|
||
mandatory = false, // 可选:不可路由返回
|
||
} = options;
|
||
|
||
if (assertQueue) {
|
||
await pubCh.assertQueue(queueName, { durable });
|
||
}
|
||
|
||
const ok = pubCh.sendToQueue(queueName, toBuffer(payload), { persistent, headers, mandatory });
|
||
if (!ok) await new Promise(r => pubCh.once('drain', r));
|
||
if (confirm) await pubCh.waitForConfirms();
|
||
}
|
||
|
||
// —— 对外 API:发送到交换机(支持 confirm)
|
||
async function publishToExchange(exchange, routingKey, payload, options = {}) {
|
||
if (!exchange) throw new Error('exchange 必填');
|
||
await ensureChannels();
|
||
|
||
const {
|
||
type = 'direct',
|
||
durable = true,
|
||
assertExchange = true,
|
||
confirm = true,
|
||
persistent = true,
|
||
headers = {},
|
||
mandatory = false,
|
||
} = options;
|
||
|
||
if (assertExchange) {
|
||
await pubCh.assertExchange(exchange, type, { durable });
|
||
}
|
||
|
||
const ok = pubCh.publish(exchange, routingKey || '', toBuffer(payload), { persistent, headers, mandatory });
|
||
if (!ok) await new Promise(r => pubCh.once('drain', r));
|
||
if (confirm) await pubCh.waitForConfirms();
|
||
}
|
||
|
||
// —— 对外 API:主动重连(给 Electron 恢复/网络变化时调用)
|
||
async function reconnectNow() {
|
||
if (closing) return;
|
||
if (reconnecting) return;
|
||
try {
|
||
if (pubCh) await pubCh.close().catch(() => { });
|
||
if (conCh) await conCh.close().catch(() => { });
|
||
if (conn) await conn.close().catch(() => { });
|
||
} catch (_) { }
|
||
pubCh = null; conCh = null; conn = null;
|
||
reconnecting = false;
|
||
scheduleReconnect();
|
||
}
|
||
|
||
// —— 关闭
|
||
async function close() {
|
||
closing = true;
|
||
if (reconnectTimer) { clearTimeout(reconnectTimer); reconnectTimer = null; }
|
||
try {
|
||
// 取消所有消费者
|
||
for (const [q, c] of consumers.entries()) {
|
||
if (c?.consumerTag && conCh) {
|
||
await conCh.cancel(c.consumerTag).catch(() => { });
|
||
}
|
||
}
|
||
} catch (_) { }
|
||
consumers.clear();
|
||
|
||
try { if (pubCh) await pubCh.close(); } catch (_) { }
|
||
try { if (conCh) await conCh.close(); } catch (_) { }
|
||
try { if (conn) await conn.close(); } catch (_) { }
|
||
pubCh = null; conCh = null; conn = null;
|
||
}
|
||
|
||
// —— 进程信号(可选)
|
||
process.once('SIGINT', async () => { try { await close(); } finally { process.exit(0); } });
|
||
process.once('SIGTERM', async () => { try { await close(); } finally { process.exit(0); } });
|
||
|
||
module.exports = {
|
||
startConsumer,
|
||
publishToQueue,
|
||
publishToExchange,
|
||
reconnectNow,
|
||
close,
|
||
emitter, // 可订阅 'message' / 'handlerError' / 'reconnected' / 'error' / 'blocked' / 'unblocked'
|
||
};
|