import re import json import asyncio import uvicorn from playwright.async_api import async_playwright from fastapi import FastAPI import pymysql from pymysql.converters import escape_string app = FastAPI() HOST = '0.0.0.0' PORT = 8000 # store = 20182 # storeIdentifier = 'ECI_METRO' # auth = 'eci' store = 20153 storeIdentifier = 'LEI_SHING_HONG' auth = 'lsh' code_rule = r"^([0-9]{1,2}[A-Z])\-*([0-9]{4})|([0-9]{1,3})\-*([0-9]{4})$" MYSQL = { 'host':'127.0.0.1', 'database':'workdata', 'user':'root', 'password':'rxxt', 'charset':'utf8mb4', } @app.get('/hi') def _(): return { 'version': 1.2, 'host': HOST, 'port': PORT } def getUniqueId(s: str): p = re.match(r'^.+(.+).*$',s) if p: try: r = json.loads(p.group(1)) if 'uniqueId' in r and r['uniqueId']: return r['uniqueId'] # ({'code': code,'id': r['uniqueId']}) except Exception as e: print('json 格式错',e) return None def getDetail(s: str): p = re.match(r'^.+(.+).*$',s) if p: try: r = json.loads(p.group(1)) return r except Exception as e: print('json 格式错',e) return None @app.get('/product/{code}') async def getProductInfo(code: str,corp: str = 'lsh'): global store,storeIdentifier,auth auth = corp.lower() if auth not in ['lsh','eci']: auth = 'lsh' if auth == 'lsh': store = 20153 storeIdentifier = 'LEI_SHING_HONG' else: store = 20182 storeIdentifier = 'ECI_METRO' code = code.strip().upper() result = {'result': 101,'code': code,'info': {}} p = re.match(code_rule,code) if p: code = f'{p.group(1)}-{p.group(2)}' if p.group(1) else f'{int(p.group(3)):03d}-{p.group(4)}' else: return result result['code'] = code conn = pymysql.connect(**MYSQL) cur = conn.cursor(pymysql.cursors.DictCursor) sql = f"select * from catdb where code='{code}' and corp='{auth}'" cur.execute(sql) r0 = cur.fetchone() if r0: result['result'] = 0 result['info'] = json.loads(r0['data']) else: res = await searchInfo(code) if res: result['result'] = 0 result['info'] = res sql = f"insert into catdb (corp,code,data) values('{auth}','{code}','{escape_string(json.dumps(res,ensure_ascii=False))}')" cur.execute(sql) conn.commit() cur.close() conn.close() return result async def searchInfo(code): result = None async with async_playwright() as playwright: browser = await playwright.chromium.launch(headless=False) context = await browser.new_context(base_url="https://parts.cat.com",storage_state=f"auth/auth-{auth}.json") page = await context.new_page() url = f"/wcs/resources/store/{store}/seo/urltoken/byProduct?token={code}" await page.goto(url,timeout=0) res = await page.content() id = getUniqueId(res.strip()) if id: url = f'/api/product/detail?productId={id}&storeIdentifier={storeIdentifier}&locale=zh_CN&partNumber={code}&storeId={store}&langId=-7' await page.goto(url,timeout=0) res = await page.content() result = getDetail(res.strip()) if result: partsList = [{'partNumber': code,"uniqueId": f"{id}",'quantity': 1}] if 'merchandisingAssociations' in result and len(result['merchandisingAssociations']): partsList += [{'partNumber': item['partNumber'],'quantity': 1} for item in result['merchandisingAssociations']] url = f'/wcs/resources/store/{store}/getDealerPriceAndAvailability?langId=-7' try: resp = await context.request.post(url,data={'partsList': partsList},timeout=0) if resp and resp.ok: price = await resp.json() result['priceInfo'] = price.get('priceAndAvailability',[]) except Exception: print('引用接口错误!') await asyncio.sleep(1) await context.close() await browser.close() return result async def main(): config = uvicorn.Config("apix:app", host=HOST, port=PORT,log_level="info") server = uvicorn.Server(config) await server.serve() if __name__ == "__main__": asyncio.run(main())