catdata-api-v2/async_api.py
2025-08-01 20:36:20 +08:00

138 lines
4.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
import json
import asyncio
import uvicorn
from contextlib import asynccontextmanager
from fastapi import FastAPI
import aiomysql
from pymysql.converters import escape_string
from playwright.async_api import async_playwright
HOST = '0.0.0.0'
PORT = 8000
CODE_RULE = r"^([0-9]{1,2}[A-Z])\-*([0-9]{4})|([0-9]{1,3})\-*([0-9]{4})$"
store = 20153
storeIdentifier = 'LEI_SHING_HONG'
auth = 'lsh'
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.db_pool = await aiomysql.create_pool(
host='localhost',
port=3306,
user='root',
password='rxxt',
db='workdata',
autocommit=True,
minsize=2,
maxsize=12,
)
yield
app.state.db_pool.close()
await app.state.db_pool.wait_closed()
app = FastAPI(lifespan=lifespan)
@app.get('/hi')
def _():
return {
'version': 2.0,
'host': HOST,
'port': PORT
}
def getUniqueId(s: str):
p = re.match(r'^.+<pre.*>(.+)</pre>.*</body></html>$',s)
if p:
try:
r = json.loads(p.group(1))
if 'uniqueId' in r and r['uniqueId']:
return r['uniqueId'] # ({'code': code,'id': r['uniqueId']})
except Exception as e:
print('json 格式错',e)
return None
def getDetail(s: str):
p = re.match(r'^.+<pre.*>(.+)</pre>.*</body></html>$',s)
if p:
try:
r = json.loads(p.group(1))
return r
except Exception as e:
print('json 格式错',e)
return None
@app.get('/product/{code}')
async def getProductInfo(code: str,corp: str = 'lsh'):
global store,storeIdentifier,auth
auth = corp.lower()
if auth not in ['lsh','eci']:
auth = 'lsh'
if auth == 'lsh':
store = 20153
storeIdentifier = 'LEI_SHING_HONG'
else:
store = 20182
storeIdentifier = 'ECI_METRO'
code = code.strip().upper()
result = {'result': 101,'code': code,'info': {}}
p = re.match(CODE_RULE,code)
if p:
code = f'{p.group(1)}-{p.group(2)}' if p.group(1) else f'{int(p.group(3)):03d}-{p.group(4)}'
else:
return result
result['code'] = code
async with app.state.db_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
await cursor.execute(f"SELECT * FROM catdb WHERE code='{code}' AND corp='{auth}'")
res = await cursor.fetchone()
if res:
result['result'] = 0
result['info'] = json.loads(res['data'])
else:
res = await searchInfo(code)
if res:
result['result'] = 0
result['info'] = res
query = f"INSERT INTO catdb (corp,code,data) VALUES ('{auth}','{code}','{escape_string(json.dumps(res,ensure_ascii=False))}')"
await cursor.execute(query)
return result
async def searchInfo(code):
result = None
async with async_playwright() as playwright:
browser = await playwright.chromium.launch(headless=False)
context = await browser.new_context(base_url="https://parts.cat.com",storage_state=f"auth/auth-{auth}.json")
page = await context.new_page()
url = f"/wcs/resources/store/{store}/seo/urltoken/byProduct?token={code}"
await page.goto(url,timeout=0)
res = await page.content()
id = getUniqueId(res.strip())
if id:
# 详情url不包含价格信息
url = f'/api/product/detail?productId={id}&storeIdentifier={storeIdentifier}&locale=zh_CN&partNumber={code}&storeId={store}&langId=-7'
await page.goto(url,timeout=0)
res = await page.content()
result = getDetail(res.strip())
if result:
partsList = [{'partNumber': code,"uniqueId": f"{id}",'quantity': 1}]
if 'merchandisingAssociations' in result and len(result['merchandisingAssociations']):
partsList += [{'partNumber': item['partNumber'],'quantity': 1} for item in result['merchandisingAssociations']]
elif 'components' in result and len(result['components']):
partsList += [{'partNumber': item['partNumber'],'quantity': 1} for item in result['components']]
# 价格信息url接口POST请求入口参数是partsList(件号数组)
url = f'/wcs/resources/store/{store}/getDealerPriceAndAvailability?langId=-7'
try:
resp = await context.request.post(url,data={'partsList': partsList},timeout=0)
if resp and resp.ok:
result['priceInfo'] = await resp.json()
except Exception:
print('引用接口错误!')
await asyncio.sleep(1)
await context.close()
await browser.close()
return result
if __name__ == "__main__":
uvicorn.run(app, host=HOST, port=PORT,log_level="info")