facebook-tool/database/models/product.py

209 lines
6.6 KiB
Python

# database/models/product.py
import json
import time
import sqlite3
from database.db import get_connection
class Product:
@staticmethod
def all():
"""Lấy toàn bộ danh sách sản phẩm"""
conn = get_connection()
conn.row_factory = sqlite3.Row
cur = conn.cursor()
cur.execute("SELECT * FROM products")
rows = cur.fetchall()
conn.close()
return [dict(row) for row in rows]
@staticmethod
def get_paginated(offset, limit, filters=None, sort_by="id", sort_order="ASC"):
"""Lấy danh sách sản phẩm có phân trang, lọc, sắp xếp"""
conn = get_connection()
conn.row_factory = sqlite3.Row
cur = conn.cursor()
sql = "SELECT * FROM products WHERE 1=1"
params = []
# Filters
if filters:
if "name" in filters:
sql += " AND name LIKE ?"
params.append(f"%{filters['name']}%")
if "price" in filters:
sql += " AND price = ?"
params.append(filters["price"])
if "created_at" in filters:
sql += " AND DATE(datetime(created_at, 'unixepoch')) = ?"
params.append(filters["created_at"]) # YYYY-MM-DD
if "category" in filters:
sql += " AND category = ?"
params.append(filters["category"])
if "condition" in filters:
sql += " AND condition = ?"
params.append(filters["condition"])
if "brand" in filters:
sql += " AND brand LIKE ?"
params.append(f"%{filters['brand']}%")
if "tags" in filters:
sql += " AND tags LIKE ?"
params.append(f"%{filters['tags']}%")
if "sku" in filters:
sql += " AND sku = ?"
params.append(filters["sku"])
if "location" in filters:
sql += " AND location LIKE ?"
params.append(f"%{filters['location']}%")
# Count total
count_sql = f"SELECT COUNT(*) as total FROM ({sql})"
cur.execute(count_sql, params)
total_count = cur.fetchone()["total"]
# Sorting
allowed_columns = ["id", "name", "price", "created_at", "category", "condition", "brand"]
if sort_by not in allowed_columns:
sort_by = "id"
sort_order = "DESC" if sort_order.upper() == "DESC" else "ASC"
sql += f" ORDER BY {sort_by} {sort_order}"
# Pagination
sql += " LIMIT ? OFFSET ?"
params.extend([limit, offset])
cur.execute(sql, params)
rows = cur.fetchall()
conn.close()
return [dict(row) for row in rows], total_count
@staticmethod
def create(
name,
price,
images=None,
url=None,
status="draft",
category=None,
condition=None,
brand=None,
description=None,
tags=None,
sku=None,
location=None
):
"""Tạo mới sản phẩm"""
images_json = json.dumps(images or [])
tags_json = json.dumps(tags) if tags else None
created_at = int(time.time())
conn = get_connection()
cur = conn.cursor()
cur.execute("""
INSERT INTO products
(name, price, images, url, status, category, condition, brand, description, tags, sku, location, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
name, price, images_json, url, status, category,
condition, brand, description, tags_json, sku, location, created_at
))
conn.commit()
conn.close()
@staticmethod
def update(
product_id,
name,
price,
images=None,
url=None,
status="draft",
category=None,
condition=None,
brand=None,
description=None,
tags=None,
sku=None,
location=None
):
"""Cập nhật sản phẩm"""
images_json = json.dumps(images or [])
tags_json = json.dumps(tags) if tags else None
conn = get_connection()
cur = conn.cursor()
cur.execute("""
UPDATE products
SET name=?, price=?, images=?, url=?, status=?, category=?,
condition=?, brand=?, description=?, tags=?, sku=?, location=?
WHERE id=?
""", (
name, price, images_json, url, status, category,
condition, brand, description, tags_json, sku, location, product_id
))
conn.commit()
conn.close()
@staticmethod
def delete(product_id):
"""Xoá sản phẩm theo ID"""
conn = get_connection()
cur = conn.cursor()
cur.execute("DELETE FROM products WHERE id=?", (product_id,))
conn.commit()
conn.close()
@staticmethod
def insert_from_import(data):
"""Thêm sản phẩm khi import từ CSV hoặc API (tái sử dụng hàm create).
Nếu trùng SKU thì trả về bản ghi hiện có mà không lưu mới.
"""
# Parse images từ chuỗi nếu cần
images = data.get("images")
if isinstance(images, str):
try:
# thử parse dạng JSON
images = json.loads(images)
except Exception:
# fallback: tách theo dấu phẩy
images = [img.strip() for img in images.split(",") if img.strip()]
sku = data.get("sku")
if sku:
existing = Product.get_by_sku(sku)
if existing:
return existing # trả về bản ghi đã có
# Nếu chưa tồn tại, tạo mới
return Product.create(
name=data.get("name"),
price=float(data.get("price", 0)),
images=images,
url=data.get("url"),
status=data.get("status", "draft"),
category=data.get("category"),
condition=data.get("condition"),
brand=data.get("brand"),
description=data.get("description"),
tags=data.get("tags"),
sku=sku,
location=data.get("location"),
)
@staticmethod
def get_by_sku(sku):
"""Trả về dict của sản phẩm theo SKU, hoặc None nếu không tồn tại"""
conn = get_connection() # <-- sửa ở đây
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("SELECT * FROM products WHERE sku = ?", (sku,))
row = cursor.fetchone()
conn.close()
if row:
return dict(row)
return None