Impl enableRecvMsg and disableRecvMsg

This commit is contained in:
Changhua 2023-03-23 20:23:48 +08:00
parent b0164285e7
commit 5b44d24b58
2 changed files with 125 additions and 17 deletions

View File

@ -1,12 +1,6 @@
package com.iamteer;
import com.iamteer.Wcf.DbNames;
import com.iamteer.Wcf.DbTable;
import com.iamteer.Wcf.Functions;
import com.iamteer.Wcf.Request;
import com.iamteer.Wcf.Response;
import com.iamteer.Wcf.RpcContact;
import com.iamteer.Wcf.*;
import io.sisu.nng.Socket;
import io.sisu.nng.pair.Pair1Socket;
import org.slf4j.Logger;
@ -17,20 +11,33 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class Client {
private static final Logger logger = LoggerFactory.getLogger(Client.class);
private final int BUFFER_SIZE = 16 * 1024 * 1024; // 16M
private Socket socket = null;
private Socket cmdSocket = null;
private Socket msgSocket = null;
private String cmdUrl = "tcp://127.0.0.1:10086";
private boolean isReceivingMsg = false;
private BlockingQueue<WxMsg> msgQ;
public Client(String hostPort) {
cmdUrl = hostPort;
connectRPC(hostPort);
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
logger.info("关闭...");
diableRecvMsg();
}
});
}
private void connectRPC(String url) {
try {
socket = new Pair1Socket();
socket.dial(url);
cmdSocket = new Pair1Socket();
cmdSocket.dial(url);
logger.info("请点击登录微信");
while (!isLogin()) { // 直到登录成功
waitMs(1000);
@ -44,9 +51,9 @@ public class Client {
private Response sendCmd(Request req) {
try {
ByteBuffer bb = ByteBuffer.wrap(req.toByteArray());
socket.send(bb);
cmdSocket.send(bb);
ByteBuffer ret = ByteBuffer.allocate(BUFFER_SIZE);
long size = socket.receive(ret, true);
long size = cmdSocket.receive(ret, true);
return Response.parseFrom(Arrays.copyOfRange(ret.array(), 0, (int) size));
} catch (Exception e) {
logger.error("命令调用失败: ", e);
@ -117,16 +124,16 @@ public class Client {
}
/**
* @Description 发送文本消息
* @param msg: 消息内容如果是 @ 消息则需要有跟 @ 的人数量相同的 @
* @param receiver: 消息接收人私聊为 wxidwxid_xxxxxxxxxxxxxx群聊为
* roomidxxxxxxxxxx@chatroom
* @param aters: 群聊时要 @ 的人私聊时为空字符串多个用逗号分隔@所有人
* notify@all必须是群主或者管理员才有权限
* @return int
* @Description 发送文本消息
* @author Changhua
* @example sendText("Hello @某人1 @某人2", "xxxxxxxx@chatroom",
* "wxid_xxxxxxxxxxxxx1,wxid_xxxxxxxxxxxxx2");
* @example sendText(" Hello @ 某人1 @ 某人2 ", " xxxxxxxx @ chatroom ",
* "wxid_xxxxxxxxxxxxx1,wxid_xxxxxxxxxxxxx2");
**/
public int sendText(String msg, String receiver, String aters) {
Wcf.TextMsg textMsg = Wcf.TextMsg.newBuilder().setMsg(msg).setReceiver(receiver).setAters(aters).build();
@ -169,7 +176,7 @@ public class Client {
public int sendXml(String receiver, String xml, String path, int type) {
Wcf.XmlMsg xmlMsg = Wcf.XmlMsg.newBuilder().setContent(xml).setReceiver(receiver).setPath(path).setType(type)
.build();
.build();
Request req = new Request.Builder().setFuncValue(Functions.FUNC_SEND_XML_VALUE).setXml(xmlMsg).build();
logger.debug("sendXml: {}", bytesToHex(req.toByteArray()));
Response rsp = sendCmd(req);
@ -194,6 +201,86 @@ public class Client {
return ret;
}
public boolean getIsReceivingMsg() {
return isReceivingMsg;
}
public WxMsg getMsg() {
try {
return msgQ.take();
} catch (Exception e) {
// TODO: handle exception
return null;
}
}
private void listenMsg(String url) {
try {
msgSocket = new Pair1Socket();
msgSocket.dial(url);
msgSocket.setReceiveTimeout(2000); // 2 秒超时
} catch (Exception e) {
logger.error("创建消息 RPC 失败: {}", e);
return;
}
ByteBuffer bb = ByteBuffer.allocate(BUFFER_SIZE);
while (isReceivingMsg) {
try {
long size = msgSocket.receive(bb, true);
WxMsg wxMsg = Response.parseFrom(Arrays.copyOfRange(bb.array(), 0, (int) size)).getWxmsg();
msgQ.put(wxMsg);
} catch (Exception e) {
// 多半是超时忽略吧
}
}
try {
msgSocket.close();
} catch (Exception e) {
logger.error("关闭连接失败: {}", e);
}
}
public void enableRecvMsg(int qSize) {
if (isReceivingMsg) {
return;
}
Request req = new Request.Builder().setFuncValue(Functions.FUNC_ENABLE_RECV_TXT_VALUE).build();
Response rsp = sendCmd(req);
if (rsp == null) {
logger.error("启动消息接收失败");
isReceivingMsg = false;
return;
}
isReceivingMsg = true;
msgQ = new ArrayBlockingQueue<WxMsg>(qSize);
String msgUrl = cmdUrl.replace("10086", "10087");
Thread thread = new Thread(new Runnable() {
public void run() {
listenMsg(msgUrl);
}
});
thread.start();
}
public int diableRecvMsg() {
if (!isReceivingMsg) {
return 1;
}
int ret = -1;
Request req = new Request.Builder().setFuncValue(Functions.FUNC_DISABLE_RECV_TXT_VALUE).build();
Response rsp = sendCmd(req);
if (rsp != null) {
ret = rsp.getStatus();
if (ret == 0) {
isReceivingMsg = false;
}
}
return ret;
}
public void waitMs(int ms) {
try {
Thread.sleep(ms);
@ -215,10 +302,15 @@ public class Client {
}
logger.info("{}, {}, {}, {}, {}, {}, {}", c.getWxid(), c.getName(), c.getCode(), c.getCountry(),
c.getProvince(), c.getCity(), gender);
c.getProvince(), c.getCity(), gender);
}
}
public void printWxMsg(WxMsg msg) {
logger.info("{}[{}]:{}:{}:{}\n{}", msg.getSender(), msg.getRoomid(), msg.getId(), msg.getType(),
msg.getXml().replace("\n", "").replace("\t", ""), msg.getContent());
}
public String bytesToHex(byte[] bytes) {
StringBuilder sb = new StringBuilder();
for (byte b : bytes) {
@ -226,4 +318,10 @@ public class Client {
}
return sb.toString();
}
public void keepRunning() {
while (true) {
waitMs(1000);
}
}
}

View File

@ -44,5 +44,15 @@ public class Main {
// 发送表情消息gif 必须要存在
client.sendEmotion("C:\\Projs\\WeChatFerry\\emo.gif", "filehelper");
// 接收消息并调用 printWxMsg 处理
client.enableRecvMsg(100);
Thread thread = new Thread(new Runnable() {
public void run(){while(client.getIsReceivingMsg()){client.printWxMsg(client.getMsg());}}
});
thread.start();
// client.diableRecvMsg(); // 需要停止时调用
client.keepRunning();
}
}