From 5b44d24b5895a4ef4c9f2aeaaa96920524c730c6 Mon Sep 17 00:00:00 2001 From: Changhua Date: Thu, 23 Mar 2023 20:23:48 +0800 Subject: [PATCH] Impl enableRecvMsg and disableRecvMsg --- .../src/main/java/com/iamteer/Client.java | 132 +++++++++++++++--- .../src/main/java/com/iamteer/Main.java | 10 ++ 2 files changed, 125 insertions(+), 17 deletions(-) diff --git a/java/wcferry/src/main/java/com/iamteer/Client.java b/java/wcferry/src/main/java/com/iamteer/Client.java index a3a31f7..ea84628 100644 --- a/java/wcferry/src/main/java/com/iamteer/Client.java +++ b/java/wcferry/src/main/java/com/iamteer/Client.java @@ -1,12 +1,6 @@ package com.iamteer; -import com.iamteer.Wcf.DbNames; -import com.iamteer.Wcf.DbTable; -import com.iamteer.Wcf.Functions; -import com.iamteer.Wcf.Request; -import com.iamteer.Wcf.Response; -import com.iamteer.Wcf.RpcContact; - +import com.iamteer.Wcf.*; import io.sisu.nng.Socket; import io.sisu.nng.pair.Pair1Socket; import org.slf4j.Logger; @@ -17,20 +11,33 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; public class Client { private static final Logger logger = LoggerFactory.getLogger(Client.class); private final int BUFFER_SIZE = 16 * 1024 * 1024; // 16M - private Socket socket = null; + private Socket cmdSocket = null; + private Socket msgSocket = null; + private String cmdUrl = "tcp://127.0.0.1:10086"; + private boolean isReceivingMsg = false; + private BlockingQueue msgQ; public Client(String hostPort) { + cmdUrl = hostPort; connectRPC(hostPort); + Runtime.getRuntime().addShutdownHook(new Thread() { + public void run() { + logger.info("关闭..."); + diableRecvMsg(); + } + }); } private void connectRPC(String url) { try { - socket = new Pair1Socket(); - socket.dial(url); + cmdSocket = new Pair1Socket(); + cmdSocket.dial(url); logger.info("请点击登录微信"); while (!isLogin()) { // 直到登录成功 waitMs(1000); @@ -44,9 +51,9 @@ public class Client { private Response sendCmd(Request req) { try { ByteBuffer bb = ByteBuffer.wrap(req.toByteArray()); - socket.send(bb); + cmdSocket.send(bb); ByteBuffer ret = ByteBuffer.allocate(BUFFER_SIZE); - long size = socket.receive(ret, true); + long size = cmdSocket.receive(ret, true); return Response.parseFrom(Arrays.copyOfRange(ret.array(), 0, (int) size)); } catch (Exception e) { logger.error("命令调用失败: ", e); @@ -117,16 +124,16 @@ public class Client { } /** - * @Description 发送文本消息 * @param msg: 消息内容(如果是 @ 消息则需要有跟 @ 的人数量相同的 @) * @param receiver: 消息接收人,私聊为 wxid(wxid_xxxxxxxxxxxxxx),群聊为 * roomid(xxxxxxxxxx@chatroom) * @param aters: 群聊时要 @ 的人(私聊时为空字符串),多个用逗号分隔。@所有人 用 * notify@all(必须是群主或者管理员才有权限) * @return int + * @Description 发送文本消息 * @author Changhua - * @example sendText("Hello @某人1 @某人2", "xxxxxxxx@chatroom", - * "wxid_xxxxxxxxxxxxx1,wxid_xxxxxxxxxxxxx2"); + * @example sendText(" Hello @ 某人1 @ 某人2 ", " xxxxxxxx @ chatroom ", + * "wxid_xxxxxxxxxxxxx1,wxid_xxxxxxxxxxxxx2"); **/ public int sendText(String msg, String receiver, String aters) { Wcf.TextMsg textMsg = Wcf.TextMsg.newBuilder().setMsg(msg).setReceiver(receiver).setAters(aters).build(); @@ -169,7 +176,7 @@ public class Client { public int sendXml(String receiver, String xml, String path, int type) { Wcf.XmlMsg xmlMsg = Wcf.XmlMsg.newBuilder().setContent(xml).setReceiver(receiver).setPath(path).setType(type) - .build(); + .build(); Request req = new Request.Builder().setFuncValue(Functions.FUNC_SEND_XML_VALUE).setXml(xmlMsg).build(); logger.debug("sendXml: {}", bytesToHex(req.toByteArray())); Response rsp = sendCmd(req); @@ -194,6 +201,86 @@ public class Client { return ret; } + public boolean getIsReceivingMsg() { + return isReceivingMsg; + } + + public WxMsg getMsg() { + try { + return msgQ.take(); + } catch (Exception e) { + // TODO: handle exception + return null; + } + } + + private void listenMsg(String url) { + try { + msgSocket = new Pair1Socket(); + msgSocket.dial(url); + msgSocket.setReceiveTimeout(2000); // 2 秒超时 + } catch (Exception e) { + logger.error("创建消息 RPC 失败: {}", e); + return; + } + ByteBuffer bb = ByteBuffer.allocate(BUFFER_SIZE); + while (isReceivingMsg) { + try { + long size = msgSocket.receive(bb, true); + WxMsg wxMsg = Response.parseFrom(Arrays.copyOfRange(bb.array(), 0, (int) size)).getWxmsg(); + msgQ.put(wxMsg); + } catch (Exception e) { + // 多半是超时,忽略吧 + } + } + try { + msgSocket.close(); + } catch (Exception e) { + logger.error("关闭连接失败: {}", e); + } + } + + public void enableRecvMsg(int qSize) { + if (isReceivingMsg) { + return; + } + + Request req = new Request.Builder().setFuncValue(Functions.FUNC_ENABLE_RECV_TXT_VALUE).build(); + Response rsp = sendCmd(req); + if (rsp == null) { + logger.error("启动消息接收失败"); + isReceivingMsg = false; + return; + } + + isReceivingMsg = true; + msgQ = new ArrayBlockingQueue(qSize); + String msgUrl = cmdUrl.replace("10086", "10087"); + Thread thread = new Thread(new Runnable() { + public void run() { + listenMsg(msgUrl); + } + }); + thread.start(); + } + + public int diableRecvMsg() { + if (!isReceivingMsg) { + return 1; + } + int ret = -1; + Request req = new Request.Builder().setFuncValue(Functions.FUNC_DISABLE_RECV_TXT_VALUE).build(); + Response rsp = sendCmd(req); + if (rsp != null) { + ret = rsp.getStatus(); + if (ret == 0) { + isReceivingMsg = false; + } + + } + return ret; + } + public void waitMs(int ms) { try { Thread.sleep(ms); @@ -215,10 +302,15 @@ public class Client { } logger.info("{}, {}, {}, {}, {}, {}, {}", c.getWxid(), c.getName(), c.getCode(), c.getCountry(), - c.getProvince(), c.getCity(), gender); + c.getProvince(), c.getCity(), gender); } } + public void printWxMsg(WxMsg msg) { + logger.info("{}[{}]:{}:{}:{}\n{}", msg.getSender(), msg.getRoomid(), msg.getId(), msg.getType(), + msg.getXml().replace("\n", "").replace("\t", ""), msg.getContent()); + } + public String bytesToHex(byte[] bytes) { StringBuilder sb = new StringBuilder(); for (byte b : bytes) { @@ -226,4 +318,10 @@ public class Client { } return sb.toString(); } + + public void keepRunning() { + while (true) { + waitMs(1000); + } + } } diff --git a/java/wcferry/src/main/java/com/iamteer/Main.java b/java/wcferry/src/main/java/com/iamteer/Main.java index 0c3d58f..9f2aedf 100644 --- a/java/wcferry/src/main/java/com/iamteer/Main.java +++ b/java/wcferry/src/main/java/com/iamteer/Main.java @@ -44,5 +44,15 @@ public class Main { // 发送表情消息,gif 必须要存在 client.sendEmotion("C:\\Projs\\WeChatFerry\\emo.gif", "filehelper"); + + // 接收消息,并调用 printWxMsg 处理 + client.enableRecvMsg(100); + Thread thread = new Thread(new Runnable() { + public void run(){while(client.getIsReceivingMsg()){client.printWxMsg(client.getMsg());}} + }); + thread.start(); + // client.diableRecvMsg(); // 需要停止时调用 + + client.keepRunning(); } }