import datetime import json import shutil import threading import time from typing import Tuple import xmltodict from entity.comment import Comment from entity.contact import Contact from exporter.avatar_exporter import AvatarExporter from exporter.emoji_exporter import EmojiExporter from exporter.image_exporter import ImageExporter from exporter.video_exporter import VideoExporter from log import LOG from entity.moment_msg import MomentMsg from pathlib import Path def get_img_div_css(size: int) -> str: if size == 1: return 'width:10rem; overflow:hidden' else: return 'width:19rem; overflow:hidden' def get_img_css(size: int) -> str: """object-fit: cover; 预览图居中裁剪 cursor:pointer; 手形鼠标 """ img_style = "object-fit:cover;cursor:pointer;" if size == 1: return f'width:10rem;height:10rem;{img_style}' elif size == 2: return f'width:8rem;height:8rem;float:left;margin-bottom:0.2rem;margin-right:0.2rem;{img_style}' elif size == 4: return f'width:8rem;height:8rem;float:left;margin-bottom:0.2rem;margin-right:0.2rem;{img_style}' else: return f'width:5rem;height:5rem;float:left;margin-bottom:0.2rem;margin-right:0.2rem;{img_style}' def is_music_msg(msg: MomentMsg) -> bool: """判断一个msg是否为音乐分享 """ if msg.timelineObject.ContentObject and msg.timelineObject.ContentObject.mediaList and msg.timelineObject.ContentObject.mediaList.media: media = msg.timelineObject.ContentObject.mediaList.media[0] if media.type == '5': return True return False def get_music_info(msg: MomentMsg) -> Tuple[str, str, str]: """获取音乐标题,演唱者,音乐源 """ title = "" musician = "" src = "" if msg.timelineObject.ContentObject and msg.timelineObject.ContentObject.mediaList and msg.timelineObject.ContentObject.mediaList.media: media = msg.timelineObject.ContentObject.mediaList.media[0] title = media.title musician = media.description if media.url: src = media.url.text return title, musician, src class HtmlExporter(threading.Thread): def __init__(self, gui: 'Gui', dir_name: str, contacts_map: dict[str, Contact], begin_date: datetime.date, end_date: datetime.date, convert_video: int): self.dir_name = dir_name if Path(f"output/{self.dir_name}").exists(): shutil.rmtree(f"output/{self.dir_name}") shutil.copytree("resource/template/", f"output/{self.dir_name}") self.gui = gui self.avatar_exporter = AvatarExporter(dir_name) self.image_exporter = ImageExporter(dir_name) self.video_exporter = VideoExporter(dir_name) self.html_head = None self.html_end = None self.file = None self.contacts_map = contacts_map self.begin_date = begin_date self.end_date = end_date self.convert_video = convert_video self.stop_flag = False super().__init__() def run(self) -> None: with open(f"resource/template.html", encoding='utf-8') as f: content = f.read() self.html_head, self.html_end = content.split('/*内容分割线*/') self.file = open(f"output/{self.dir_name}/index.html", 'w', encoding='utf-8') if self.gui.account_info and self.gui.account_info.get('wxid'): self.avatar_exporter.get_avatar_path(self.gui.account_info.get('wxid')) self.html_head = self.html_head.replace("{my_wxid}", f"{self.gui.account_info.get('wxid')}") from app.DataBase import micro_msg_db my_info = micro_msg_db.get_contact_by_username(self.gui.account_info.get('wxid')) self.html_head = self.html_head.replace("{my_name}", f"{my_info[4]}") from app.DataBase import sns_db cover_url = sns_db.get_cover_url() if cover_url: cover_path = self.image_exporter.save_image((cover_url, "", ""), 'image') self.html_head = self.html_head.replace("{cover_path}", cover_path) self.file.write(self.html_head) # 加一天 end_date = self.end_date + datetime.timedelta(days=1) begin_time = time.mktime( datetime.datetime(self.begin_date.year, self.begin_date.month, self.begin_date.day).timetuple()) end_time = time.mktime(datetime.datetime(end_date.year, end_date.month, end_date.day).timetuple()) self.gui.image_decrypter.decrypt_images(self, self.begin_date, end_date, self.dir_name) self.gui.video_decrypter.decrypt_videos(self, self.begin_date, end_date, self.dir_name, self.convert_video) message_datas = sns_db.get_messages_in_time(begin_time, end_time) for index, message_data in enumerate(message_datas): if not self.stop_flag: if message_data[0] in self.contacts_map: comments_datas = sns_db.get_comment_by_feed_id(message_data[2]) comments: list[Comment] = [] for c in comments_datas: contact = Comment(c[0], c[1], c[2]) comments.append(contact) self.export_msg(message_data[1], comments, self.contacts_map) # 更新进度条 前30%视频处理 后70%其他处理 progress = round(index / len(message_datas) * 70) self.gui.update_export_progressbar(30 + progress) self.gui.update_export_progressbar(100) self.finish_file() self.gui.export_succeed() def stop(self) -> None: self.stop_flag = True def export_msg(self, message: str, comments: list[Comment], contacts_map: dict[str, Contact]) -> None: LOG.info(message) # force_list: 强制要求转media为list msg_dict = xmltodict.parse(message, force_list={'media'}) msg_json = json.dumps(msg_dict) msg = MomentMsg.from_json(msg_json) # 微信ID username = msg.timelineObject.username # 头像路径 avatar_path = self.avatar_exporter.get_avatar_path(username) contact = contacts_map.get(username) # 备注, 或用户名 remark = contact.remark if contact.remark else contact.nickName # 朋友圈图片 images = self.image_exporter.get_images(msg) # 朋友圈视频 videos = self.video_exporter.get_videos(msg) # 样式 3:链接样式 content_style = msg.timelineObject.ContentObject.contentStyle html = '
\n' html += '
\n' html += '
\n' html += f' \n' html += '
\n' html += '
\n' html += '
\n' html += '
\n' html += f'

{remark}

\n' if msg.timelineObject.contentDesc: content_desc = msg.timelineObject.contentDesc.replace("\n", "
") content_desc = EmojiExporter.replace_emoji(content_desc) html += f'

{content_desc}

\n' html += '
\n' # 超链接 if content_style == 3: html += f' \n' html += ' \n' html += ' \n' # 音乐 elif is_music_msg(msg): title, musician, src = get_music_info(msg) html += f' \n' html += ' \n' html += ' \n' # 视频号 elif msg.timelineObject.ContentObject.finderFeed: html += f'
\n' # 视频号图片 thumb_path = self.image_exporter.get_finder_images(msg) html += f""" \n""" html += '
\n' # 视频号说明 html += '
\n' nickname = msg.timelineObject.ContentObject.finderFeed.nickname desc = msg.timelineObject.ContentObject.finderFeed.desc html += f'

视频号 · {nickname} · {desc}

\n' html += '
\n' # 普通朋友圈 else: html += f'
\n' for thumb_path, image_path in images: html += f""" \n""" html += '
\n' html += '
\n' for video_path in videos: html += f'
\n' html += '
\n' if msg.timelineObject.location and msg.timelineObject.location.poiName: html += f'

{msg.timelineObject.location.poiName}

\n' html += f'

{msg.timelineObject.create_time}

\n' html += '
\n' html += '
\n' html += '
\n' self.file.write(html) def finish_file(self): self.file.write(self.html_end) self.file.close()