import asyncio import httpx import base64 import os import json import re from datetime import datetime, timedelta, timezone gpu_sem = asyncio.Semaphore(int(os.getenv("MAX_CONCURRENT_INFERENCE", 1))) async def call_qwen_vlm(content, is_image=True): async with gpu_sem: # 获取北京时间 (UTC+8) tz_beijing = timezone(timedelta(hours=8)) now_beijing = datetime.now(tz_beijing).strftime("%Y-%m-%d %H:%M:%S") system_prompt = ( "你是一个财务账单解析助手。请提取以下字段并返回 JSON:\n" "1. amount (浮点数)\n" "2. category (餐饮, 交通, 购物, 娱乐, 医疗, 运动 , 住宿, 人情, 其它)\n" f"3. transaction_time (格式:YYYY-MM-DD HH:MM:SS。当前北京时间为:{now_beijing})\n\n" "时间提取准则:\n" "- 优先寻找账单截图或文字中明确提到的交易时间。\n" "- 若信息不全(如只有月日),请结合当前北京时间年份进行补全。\n" "- 若完全没有时间信息,请直接使用上述提供的北京时间作为默认值。\n" "注意:\n" "- 严禁输出任何思维过程,只返回纯 JSON 字典,确保数据完整性。\n" "- 忽略任何货币符号以及正负号,仅提取数字部分作为 amount。\n" ) base_url = os.getenv('OLLAMA_BASE_URL', 'http://vlm-service:11434').rstrip('/') payload = { "model": os.getenv("VLM_MODEL"), "prompt": system_prompt, "stream": False, "format": "json", "options": {"temperature": 0.1} } if is_image: payload["images"] = [base64.b64encode(content).decode('utf-8')] else: payload["prompt"] += f"\n用户输入: {content}" try: async with httpx.AsyncClient(timeout=300.0, trust_env=False) as client: resp = await client.post(f"{base_url}/api/generate", json=payload) if resp.status_code == 200: res_json = resp.json() # 兼容 Qwen3-VL 可能将结果放在 thinking 字段的情况 raw_res = res_json.get("response", "") or res_json.get("thinking", "") # 提取并验证 JSON match = re.search(r'(\{.*\})', raw_res, re.DOTALL) clean_json = match.group(1) if match else "{}" # 检查必要字段是否存在,不存在则补齐 data = json.loads(clean_json) data.setdefault("amount", 0.0) data.setdefault("category", "其它") data.setdefault("transaction_time", now_beijing) return json.dumps(data) return "{}" except Exception as e: print(f"VLM Error: {e}") return "{}"