Files
tk-electron-ai/js/rabbitmq-service.js
2025-10-28 19:40:13 +08:00

311 lines
10 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// rabbitmq-live-client.js (CommonJS)
const amqp = require('amqplib');
const { EventEmitter } = require('events');
const CFG = {
protocol: process.env.RABBIT_PROTOCOL || 'amqp', // 'amqp' | 'amqps'
// host: process.env.RABBIT_HOST || '192.168.1.144',
host: process.env.RABBIT_HOST || 'crawlclient.api.yolozs.com',
port: Number(process.env.RABBIT_PORT || 5672),
user: process.env.RABBIT_USER || 'tkdata',
pass: process.env.RABBIT_PASS || '6rARaRj8Z7UG3ahLzh',
vhost: process.env.RABBIT_VHOST || '/',
heartbeat: Number(process.env.RABBIT_HEARTBEAT || 60), // <-- 关键:心跳
frameMax: Number(process.env.RABBIT_FRAME_MAX || 0), // 0=默认;可调大以减少分片
};
let conn = null;
let pubCh = null; // 发布 Confirm Channel
let conCh = null; // 消费 Channel
const emitter = new EventEmitter();
const consumers = new Map(); // queueName -> { onMessage, options, consumerTag }
let reconnecting = false;
let closing = false;
let backoff = 1000; // ms
const MAX_BACKOFF = 15000;
let reconnectTimer = null;
// —— 工具:序列化消息
function toBuffer(payload) {
if (Buffer.isBuffer(payload)) return payload;
if (typeof payload === 'string') return Buffer.from(payload);
return Buffer.from(JSON.stringify(payload));
}
// —— 内部建立连接含心跳、keepalive、事件
async function createConnection() {
const connection = await amqp.connect({
protocol: CFG.protocol,
hostname: CFG.host,
port: CFG.port,
username: CFG.user,
password: CFG.pass,
vhost: CFG.vhost,
heartbeat: CFG.heartbeat,
frameMax: CFG.frameMax > 0 ? CFG.frameMax : undefined,
// 也可用 URL 形式:`amqp://u:p@host:5672/vhost?heartbeat=60`
});
// 打开 TCP keepalive降低 NAT/空闲超时断开的概率
try {
const stream = connection.stream || connection.socket;
if (stream?.setKeepAlive) stream.setKeepAlive(true, 15_000); // 15s
} catch (_) { }
connection.on('error', (e) => {
// 心跳超时常见,避免重复噪音
const msg = e?.message || String(e);
if (msg && /heartbeat/i.test(msg)) {
console.error('[AMQP] connection error (heartbeat):', msg);
} else {
console.error('[AMQP] connection error:', msg);
}
emitter.emit('error', e);
});
connection.on('close', () => {
if (closing) return; // 正在关闭时不重连
console.error('[AMQP] connection closed');
conn = null; pubCh = null; conCh = null;
scheduleReconnect();
});
// Broker 侧内存/磁盘压力会 block 连接
connection.on('blocked', (reason) => {
console.warn('[AMQP] connection blocked by broker:', reason);
emitter.emit('blocked', reason);
});
connection.on('unblocked', () => {
console.log('[AMQP] connection unblocked');
emitter.emit('unblocked');
});
console.log(`[AMQP] connected to ${CFG.host} (hb=${CFG.heartbeat}s)`);
return connection;
}
// —— 内部:确保连接和通道存在
async function ensureChannels() {
if (!conn) conn = await createConnection();
if (!pubCh) {
pubCh = await conn.createConfirmChannel();
pubCh.on('error', e => console.error('[AMQP] pub channel error:', e?.message || e));
pubCh.on('close', () => { pubCh = null; if (!closing) scheduleReconnect(); });
}
if (!conCh) {
conCh = await conn.createChannel();
conCh.on('error', e => console.error('[AMQP] con channel error:', e?.message || e));
conCh.on('close', () => { conCh = null; if (!closing) scheduleReconnect(); });
}
}
// —— 内部:安排重连(指数退避 + 抖动,且只触发一个循环)
function scheduleReconnect() {
if (reconnecting || closing) return;
reconnecting = true;
if (reconnectTimer) { clearTimeout(reconnectTimer); reconnectTimer = null; }
const attempt = async () => {
if (closing) return;
try {
await ensureChannels();
await resumeConsumers(); // 恢复所有消费
reconnecting = false;
backoff = 1000;
emitter.emit('reconnected');
console.log('[AMQP] reconnected and consumers resumed');
} catch (e) {
const base = Math.min(backoff, MAX_BACKOFF);
// 加抖动,避免雪崩:在 75%~125% 之间浮动
const jitter = base * (0.75 + Math.random() * 0.5);
console.warn(`[AMQP] reconnect failed: ${e?.message || e}; retry in ${Math.round(jitter)}ms`);
backoff = Math.min(backoff * 1.6, MAX_BACKOFF);
reconnectTimer = setTimeout(attempt, jitter);
}
};
reconnectTimer = setTimeout(attempt, backoff);
}
// —— 内部:恢复所有消费者
async function resumeConsumers() {
if (!conCh) return;
for (const [queue, c] of consumers.entries()) {
try {
if (c.consumerTag) await conCh.cancel(c.consumerTag);
} catch (_) { }
const tag = await startOneConsumer(queue, c.onMessage, c.options, true);
c.consumerTag = tag.consumerTag;
}
}
// —— 内部:启动一个消费者
async function startOneConsumer(queueName, onMessage, options = {}, isResume = false) {
await ensureChannels();
const {
prefetch = 1,
durable = true,
assertQueue = true,
requeueOnError = false,
// 可选exclusive, arguments, dead-letter 等
} = options;
if (assertQueue) {
await conCh.assertQueue(queueName, { durable });
}
await conCh.prefetch(prefetch);
const consumeResult = await conCh.consume(
queueName,
async (msg) => {
if (!msg) return;
const raw = msg.content;
const text = raw?.toString?.() ?? '';
let json;
try { json = JSON.parse(text); } catch (_) { /* 忽略解析失败 */ }
const payload = {
raw, text, json,
fields: msg.fields,
properties: msg.properties,
ack: () => { try { conCh.ack(msg); } catch (_) { } },
nack: (requeue = requeueOnError) => { try { conCh.nack(msg, false, requeue); } catch (_) { } },
};
try {
emitter.emit('message', queueName, payload);
await onMessage(payload); // 业务回调
payload.ack();
} catch (err) {
emitter.emit('handlerError', queueName, err, payload);
payload.nack(requeueOnError);
}
},
{ noAck: false }
);
if (!isResume) {
consumers.set(queueName, { onMessage, options, consumerTag: consumeResult.consumerTag });
}
console.log(`[*] consuming "${queueName}" (prefetch=${prefetch})`);
return consumeResult;
}
// —— 对外 API启动消费
async function startConsumer(queueName, onMessage, options = {}) {
if (!queueName) throw new Error('queueName 必填');
if (typeof onMessage !== 'function') throw new Error('onMessage 回调必填');
const res = await startOneConsumer(queueName, onMessage, options, false);
return {
emitter,
async stop() {
const c = consumers.get(queueName);
if (c?.consumerTag && conCh) {
await conCh.cancel(c.consumerTag).catch(() => { });
}
consumers.delete(queueName);
return res;
}
};
}
// —— 对外 API发送到队列支持 confirm
async function publishToQueue(queueName, payload, options = {}) {
if (!queueName) throw new Error('queueName 必填');
await ensureChannels();
const {
durable = true,
persistent = true,
assertQueue = true,
confirm = true,
headers = {},
mandatory = false, // 可选:不可路由返回
} = options;
if (assertQueue) {
await pubCh.assertQueue(queueName, { durable });
}
const ok = pubCh.sendToQueue(queueName, toBuffer(payload), { persistent, headers, mandatory });
if (!ok) await new Promise(r => pubCh.once('drain', r));
if (confirm) await pubCh.waitForConfirms();
}
// —— 对外 API发送到交换机支持 confirm
async function publishToExchange(exchange, routingKey, payload, options = {}) {
if (!exchange) throw new Error('exchange 必填');
await ensureChannels();
const {
type = 'direct',
durable = true,
assertExchange = true,
confirm = true,
persistent = true,
headers = {},
mandatory = false,
} = options;
if (assertExchange) {
await pubCh.assertExchange(exchange, type, { durable });
}
const ok = pubCh.publish(exchange, routingKey || '', toBuffer(payload), { persistent, headers, mandatory });
if (!ok) await new Promise(r => pubCh.once('drain', r));
if (confirm) await pubCh.waitForConfirms();
}
// —— 对外 API主动重连给 Electron 恢复/网络变化时调用)
async function reconnectNow() {
if (closing) return;
if (reconnecting) return;
try {
if (pubCh) await pubCh.close().catch(() => { });
if (conCh) await conCh.close().catch(() => { });
if (conn) await conn.close().catch(() => { });
} catch (_) { }
pubCh = null; conCh = null; conn = null;
reconnecting = false;
scheduleReconnect();
}
// —— 关闭
async function close() {
closing = true;
if (reconnectTimer) { clearTimeout(reconnectTimer); reconnectTimer = null; }
try {
// 取消所有消费者
for (const [q, c] of consumers.entries()) {
if (c?.consumerTag && conCh) {
await conCh.cancel(c.consumerTag).catch(() => { });
}
}
} catch (_) { }
consumers.clear();
try { if (pubCh) await pubCh.close(); } catch (_) { }
try { if (conCh) await conCh.close(); } catch (_) { }
try { if (conn) await conn.close(); } catch (_) { }
pubCh = null; conCh = null; conn = null;
}
// —— 进程信号(可选)
process.once('SIGINT', async () => { try { await close(); } finally { process.exit(0); } });
process.once('SIGTERM', async () => { try { await close(); } finally { process.exit(0); } });
module.exports = {
startConsumer,
publishToQueue,
publishToExchange,
reconnectNow,
close,
emitter, // 可订阅 'message' / 'handlerError' / 'reconnected' / 'error' / 'blocked' / 'unblocked'
};