import time import httpx import json import jsonpath_ng.ext as jp from app.utils.template import render, render_headers from app.utils.security import mask_sensitive MAX_BODY = 500 def safe_body(resp: httpx.Response): try: data = resp.json() return json.dumps(mask_sensitive(data))[:MAX_BODY] except Exception: return resp.text[:MAX_BODY] def extract_cookies(resp: httpx.Response): return {k: v for k, v in resp.cookies.items()} def build_cookies(context: dict): """ Ưu tiên: 1. context["cookies"] (save toàn bộ) 2. các key có chứa token """ if isinstance(context.get("cookies"), dict): return context["cookies"] cookies = {} for k, v in context.items(): if "token" in k.lower(): cookies[k] = v return cookies async def run(check, context): start = time.monotonic() method = check.get("method", "GET").upper() # --- BASE URL --- base_url = context.get("base_url") or check.get("base_url") url = check.get("url", "") if base_url and url.startswith("/"): url = base_url.rstrip("/") + url url = render(url, context) # --- PARAMS --- params = check.get("params") if params: params = {k: render(v, context) for k, v in params.items()} try: async with httpx.AsyncClient( timeout=check.get("timeout", 5), cookies=build_cookies(context), ) as client: r = await client.request( method=method, url=url, json=check.get("body"), headers=render_headers(check.get("headers", {}), context), params=params, ) latency = int((time.monotonic() - start) * 1000) ok = True reason = None # ---------- EXPECT ---------- if "expect" in check: exp = check["expect"] if "status_code" in exp and r.status_code != exp["status_code"]: ok = False reason = f"status_code != {exp['status_code']}" if ok and "json" in exp: try: data = r.json() except Exception: ok = False reason = "response is not json" else: for _, expr in exp["json"].items(): if not jp.parse(expr).find(data): ok = False reason = f"jsonpath mismatch: {expr}" break # ---------- SAVE ---------- if ok and "save" in check: try: data = r.json() except Exception: data = {} for key, expr in check["save"].items(): # save toàn bộ cookies if expr == "__cookies__": context[key] = extract_cookies(r) # save cookie cụ thể elif isinstance(expr, str) and expr.startswith("cookie:"): cookie_name = expr.split(":", 1)[1] if cookie_name in r.cookies: context[key] = r.cookies[cookie_name] # save từ json else: matches = jp.parse(expr).find(data) if matches: context[key] = matches[0].value return { "name": check["name"], "type": "http", "method": method, "url": url, "params": params, "ok": ok, "status": r.status_code, "latency_ms": latency, "reason": reason, "response": { "body": safe_body(r) }, } except Exception as e: return { "name": check["name"], "type": "http", "method": method, "url": url, "params": params, "ok": False, "error": str(e), "latency_ms": int((time.monotonic() - start) * 1000), }