const WebSocket = require('ws'); const http = require('http'); const jwt = require('jsonwebtoken'); const db = require('./database'); // 配置 const JWT_SECRET = 'your-secret-key-123'; const SERVER_TOKEN = 'server-auth-token-456'; // 创建服务器 const server = http.createServer(); const wss = new WebSocket.Server({ server }); // 在线用户 { userId -> ws } const onlineUsers = new Map(); // 生成JWT令牌 function generateToken(userId) { return jwt.sign({ userId }, JWT_SECRET, { expiresIn: '24h' }); } // 验证JWT令牌 function verifyToken(token) { try { return jwt.verify(token, JWT_SECRET); } catch (error) { return null; } } // 获取离线消息 function getOfflineMessages(userId, callback) { db.all(`SELECT * FROM messages WHERE receiver_id = ? AND is_withdrawn = 0 ORDER BY timestamp`, [userId], (err, rows) => { if (err) return callback([]); callback(rows); }); } // 处理新连接 wss.on('connection', (ws) => { let userId = null; let authenticated = false; // 处理首次认证消息 ws.once('message', (message) => { try { const data = JSON.parse(message); if (data.type === 'auth') { const { userId: receivedId, token } = data; // 验证令牌 const decoded = verifyToken(token); if (!decoded || decoded.userId !== receivedId) { ws.send(JSON.stringify({ type: 'error', message: 'Authentication failed' })); ws.close(); return; } // 存储用户连接 userId = receivedId; authenticated = true; onlineUsers.set(userId, ws); // 更新用户状态 db.run(`INSERT OR REPLACE INTO users (id, token, last_active) VALUES (?, ?, datetime('now'))`, [userId, token]); // 发送在线状态 broadcastOnlineStatus(userId, true); // 发送离线消息 getOfflineMessages(userId, (messages) => { if (messages.length > 0) { ws.send(JSON.stringify({ type: 'offline_messages', messages })); } }); // 发送认证成功响应 ws.send(JSON.stringify({ type: 'auth_success', message: 'Authentication successful', timestamp: new Date().toISOString() })); } else { ws.send(JSON.stringify({ type: 'error', message: 'First message must be authentication' })); ws.close(); } } catch (e) { ws.send(JSON.stringify({ type: 'error', message: 'Invalid authentication data' })); ws.close(); } }); // 处理常规消息 ws.on('message', (message) => { if (!authenticated) return ws.close(); try { const data = JSON.parse(message); const timestamp = new Date().toISOString(); if (data.type === 'private_message') { const { receiverId, content, messageId } = data; // 存储消息 db.run(`INSERT INTO messages (sender_id, receiver_id, content, timestamp) VALUES (?, ?, ?, ?)`, [userId, receiverId, content, timestamp], function(err) { if (err) { console.error('Database error:', err); return; } const msgId = this.lastID; // 发送送达回执 const receipt = { type: 'delivery_receipt', messageId, timestamp }; // 转发消息给接收者 const receiverWs = onlineUsers.get(receiverId); if (receiverWs && receiverWs.readyState === WebSocket.OPEN) { receiverWs.send(JSON.stringify({ type: 'private_message', senderId: userId, content, timestamp, messageId: msgId })); ws.send(JSON.stringify(receipt)); } else { // 接收方离线 ws.send(JSON.stringify({ ...receipt, status: 'offline' })); } } ); } else if (data.type === 'group_message') { const { groupId, content, messageId } = data; // 存储群组消息 db.run(`INSERT INTO messages (sender_id, group_id, content, timestamp) VALUES (?, ?, ?, ?)`, [userId, groupId, content, timestamp], function(err) { if (err) { console.error('Database error:', err); return; } const msgId = this.lastID; // 发送到群组成员 db.all(`SELECT user_id FROM group_members WHERE group_id = ?`, [groupId], (err, rows) => { if (err) return; // 发送消息到在线成员 rows.forEach(row => { const memberWs = onlineUsers.get(row.user_id); if (memberWs && memberWs.readyState === WebSocket.OPEN) { memberWs.send(JSON.stringify({ type: 'group_message', senderId: userId, groupId, content, timestamp, messageId: msgId })); } }); // 发送送达回执 ws.send(JSON.stringify({ type: 'delivery_receipt', messageId, timestamp })); }); } ); } else if (data.type === 'withdraw_message') { const { messageId } = data; // 标记消息为撤回 db.run(`UPDATE messages SET is_withdrawn = 1 WHERE id = ? AND sender_id = ?`, [messageId, userId], function(err) { if (err) { console.error('Withdraw error:', err); return; } if (this.changes > 0) { // 通知所有相关方 broadcastMessageWithdrawn(messageId); } } ); } } catch (e) { console.error('Message processing error:', e); ws.send(JSON.stringify({ type: 'error', message: 'Invalid message format', timestamp: new Date().toISOString() })); } }); // 处理连接关闭 ws.on('close', () => { if (userId) { onlineUsers.delete(userId); db.run(`UPDATE users SET last_active = datetime('now') WHERE id = ?`, [userId]); broadcastOnlineStatus(userId, false); } }); // 错误处理 ws.on('error', (error) => { console.error(`Connection error for ${userId}:`, error); if (userId) onlineUsers.delete(userId); ws.close(); }); }); // 广播在线状态 function broadcastOnlineStatus(userId, isOnline) { const message = JSON.stringify({ type: 'online_status', userId, isOnline, timestamp: new Date().toISOString() }); wss.clients.forEach(client => { if (client.readyState === WebSocket.OPEN) { client.send(message); } }); } // 广播消息撤回 function broadcastMessageWithdrawn(messageId) { const message = JSON.stringify({ type: 'message_withdrawn', messageId, timestamp: new Date().toISOString() }); wss.clients.forEach(client => { if (client.readyState === WebSocket.OPEN) { client.send(message); } }); } // 启动服务器 const PORT = process.env.PORT || 8080; server.listen(PORT, () => { console.log(`Enhanced chat server running on port ${PORT}`); });