147 lines
4.0 KiB
Python
147 lines
4.0 KiB
Python
import time
|
|
import httpx
|
|
import json
|
|
import jsonpath_ng.ext as jp
|
|
|
|
from app.utils.template import render, render_headers
|
|
from app.utils.security import mask_sensitive
|
|
|
|
MAX_BODY = 500
|
|
|
|
|
|
def safe_body(resp: httpx.Response):
|
|
try:
|
|
data = resp.json()
|
|
return json.dumps(mask_sensitive(data))[:MAX_BODY]
|
|
except Exception:
|
|
return resp.text[:MAX_BODY]
|
|
|
|
|
|
def extract_cookies(resp: httpx.Response):
|
|
return {k: v for k, v in resp.cookies.items()}
|
|
|
|
|
|
def build_cookies(context: dict):
|
|
"""
|
|
Ưu tiên:
|
|
1. context["cookies"] (save toàn bộ)
|
|
2. các key có chứa token
|
|
"""
|
|
if isinstance(context.get("cookies"), dict):
|
|
return context["cookies"]
|
|
|
|
cookies = {}
|
|
for k, v in context.items():
|
|
if "token" in k.lower():
|
|
cookies[k] = v
|
|
return cookies
|
|
|
|
|
|
async def run(check, context):
|
|
start = time.monotonic()
|
|
|
|
method = check.get("method", "GET").upper()
|
|
|
|
# --- BASE URL ---
|
|
base_url = context.get("base_url") or check.get("base_url")
|
|
url = check.get("url", "")
|
|
if base_url and url.startswith("/"):
|
|
url = base_url.rstrip("/") + url
|
|
url = render(url, context)
|
|
|
|
# --- PARAMS ---
|
|
params = check.get("params")
|
|
if params:
|
|
params = {k: render(v, context) for k, v in params.items()}
|
|
|
|
try:
|
|
async with httpx.AsyncClient(
|
|
timeout=check.get("timeout", 5),
|
|
cookies=build_cookies(context),
|
|
) as client:
|
|
|
|
r = await client.request(
|
|
method=method,
|
|
url=url,
|
|
json=check.get("body"),
|
|
headers=render_headers(check.get("headers", {}), context),
|
|
params=params,
|
|
)
|
|
|
|
latency = int((time.monotonic() - start) * 1000)
|
|
ok = True
|
|
reason = None
|
|
|
|
# ---------- EXPECT ----------
|
|
if "expect" in check:
|
|
exp = check["expect"]
|
|
|
|
if "status_code" in exp and r.status_code != exp["status_code"]:
|
|
ok = False
|
|
reason = f"status_code != {exp['status_code']}"
|
|
|
|
if ok and "json" in exp:
|
|
try:
|
|
data = r.json()
|
|
except Exception:
|
|
ok = False
|
|
reason = "response is not json"
|
|
else:
|
|
for _, expr in exp["json"].items():
|
|
if not jp.parse(expr).find(data):
|
|
ok = False
|
|
reason = f"jsonpath mismatch: {expr}"
|
|
break
|
|
|
|
# ---------- SAVE ----------
|
|
if ok and "save" in check:
|
|
try:
|
|
data = r.json()
|
|
except Exception:
|
|
data = {}
|
|
|
|
for key, expr in check["save"].items():
|
|
|
|
# save toàn bộ cookies
|
|
if expr == "__cookies__":
|
|
context[key] = extract_cookies(r)
|
|
|
|
# save cookie cụ thể
|
|
elif isinstance(expr, str) and expr.startswith("cookie:"):
|
|
cookie_name = expr.split(":", 1)[1]
|
|
if cookie_name in r.cookies:
|
|
context[key] = r.cookies[cookie_name]
|
|
|
|
# save từ json
|
|
else:
|
|
matches = jp.parse(expr).find(data)
|
|
if matches:
|
|
context[key] = matches[0].value
|
|
|
|
return {
|
|
"name": check["name"],
|
|
"type": "http",
|
|
"method": method,
|
|
"url": url,
|
|
"params": params,
|
|
"ok": ok,
|
|
"status": r.status_code,
|
|
"latency_ms": latency,
|
|
"reason": reason,
|
|
"response": {
|
|
"body": safe_body(r)
|
|
},
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
"name": check["name"],
|
|
"type": "http",
|
|
"method": method,
|
|
"url": url,
|
|
"params": params,
|
|
"ok": False,
|
|
"error": str(e),
|
|
"latency_ms": int((time.monotonic() - start) * 1000),
|
|
}
|