note/work/AI/course/scraper.py
2025-11-19 10:16:05 +08:00

447 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""Simple scraper following the instructions in 抓取流程.md.
Usage:
python scraper.py --part-file example_part_100.json --test
Features:
- Parse part JSON and extract `version_id`s
- For each version_id, optionally fetch the details JSON and extract resource URLs ending with .pdf or .m3u8
- Download resources with optional X-Nd-Auth header
"""
import argparse
import json
import os
import re
from pathlib import Path
from typing import List, Tuple, Optional
import html
import unicodedata
# Import configuration from config.py
try:
from config import (
DEFAULT_PART_FILE,
DEFAULT_OUTPUT_DIR,
DETAIL_BASE_URL,
DEFAULT_MATERIAL_NAME,
DEFAULT_CHAPTER_NAME,
DEFAULT_COURSE_TITLE,
DEFAULT_FILENAME,
TAG_DIMENSION_ID,
MAX_FILENAME_LENGTH,
MAX_DIRNAME_LENGTH,
CHUNK_SIZE,
REQUEST_TIMEOUT,
DETAIL_REQUEST_TIMEOUT,
DEFAULT_TOKEN,
PDF_OR_M3U8_PATTERN
)
except ImportError:
# Fallback to default values if config.py is not available
DEFAULT_PART_FILE = "example_part_100.json"
DEFAULT_OUTPUT_DIR = "downloaded"
DETAIL_BASE_URL = "https://s-file-2.ykt.cbern.com.cn/zxx/ndrv2/national_lesson/resources/details"
DEFAULT_MATERIAL_NAME = "未知教材"
DEFAULT_CHAPTER_NAME = "未知章节"
DEFAULT_COURSE_TITLE = "未知课程"
DEFAULT_FILENAME = "unnamed"
TAG_DIMENSION_ID = "zxxcc"
MAX_FILENAME_LENGTH = 100
MAX_DIRNAME_LENGTH = 50
CHUNK_SIZE = 8192
REQUEST_TIMEOUT = 30
DETAIL_REQUEST_TIMEOUT = 20
DEFAULT_TOKEN = None
PDF_OR_M3U8_PATTERN = r"https?://\S+\.(?:pdf|m3u8)(?:\?\S*)?"
try:
import requests
_HAS_REQUESTS = True
except Exception:
import urllib.request
import urllib.error
_HAS_REQUESTS = False
PDF_OR_M3U8_RE = re.compile(r"https?://\S+\.(?:pdf|m3u8)(?:\?\S*)?", re.IGNORECASE)
def load_json(path: str):
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def extract_version_ids_from_part(part_json) -> List[str]:
ids = []
if isinstance(part_json, list):
for item in part_json:
vid = item.get("version_id")
if vid:
ids.append(vid)
elif isinstance(part_json, dict):
# support single-object files
vid = part_json.get("version_id")
if vid:
ids.append(vid)
return ids
def _sanitize_filename(name: str, max_length=MAX_FILENAME_LENGTH) -> str:
"""增强的文件名清理函数,支持中文和长度限制"""
if not name:
return DEFAULT_FILENAME
# 1. Unicode标准化和HTML实体解码
name = unicodedata.normalize("NFKC", html.unescape(name))
# 2. 移除控制字符和非法文件名字符
name = re.sub(r'[\\/*?:"<>|\r\n\t]', '_', name)
# 3. 替换连续的下划线和空格
name = re.sub(r'[_ ]+', '_', name.strip())
# 4. 长度限制
if len(name) > max_length:
name = name[:max_length-3] + '...'
return name if name else DEFAULT_FILENAME
def find_resource_urls_in_json(data) -> List[str]:
# fallback broad-scan (old behavior) kept for compatibility
text = json.dumps(data, ensure_ascii=False)
return list(set(PDF_OR_M3U8_RE.findall(text)))
def extract_urls_with_labels(detail: dict) -> List[Tuple[str, Optional[str]]]:
"""Traverse known structure in detail JSON and return list of (url, label).
For each resource under relations.national_course_resource, if a ti_storages URL
is found, attempt to use the parent's resource_type_code_name as label.
Returns list of (url, label) where label may be None when not available.
"""
results: List[Tuple[str, Optional[str]]] = []
# relations may be at detail['relations'] or nested; defensive access
rels = detail.get("relations") or {}
ncr = rels.get("national_course_resource") if isinstance(rels, dict) else None
if isinstance(ncr, list):
for resource in ncr:
# 优先使用 alias_name如果没有则使用 resource_type_code_name
label = resource.get("alias_name") or resource.get("resource_type_code_name")
title = resource.get("title") or resource.get("global_title", {}).get("zh-CN", "")
# ti_items is a list of items that may contain ti_storages
ti_items = resource.get("ti_items") or []
if not isinstance(ti_items, list):
continue
for ti in ti_items:
storages = ti.get("ti_storages") or []
if not isinstance(storages, list):
continue
for s in storages:
if not isinstance(s, str):
continue
# 检查是否是PDF或M3U8文件
if PDF_OR_M3U8_RE.search(s):
# 对于视频资源,尝试获取分辨率信息
resolution = None
if s.endswith('.m3u8') and 'custom_properties' in resource:
custom_props = resource['custom_properties']
if isinstance(custom_props, dict):
resolution = custom_props.get('resolution')
# 构建更详细的标签
detailed_label = label
if title and title != label:
detailed_label = f"{label}_{title}" if label else title
if resolution:
detailed_label = f"{detailed_label}_{resolution}" if detailed_label else resolution
results.append((s, detailed_label))
# If we found nothing, fall back to broad scan with no labels
if not results:
for u in find_resource_urls_in_json(detail):
results.append((u, None))
# dedupe while preserving first label seen for a logical resource.
# Many resources are served from mirrors like r1-..., r2-..., r3-... where the
# path+query is identical. Canonicalize by using the pathname+query as key and
# keep the first full URL we encounter for that key.
from urllib.parse import urlparse, urlunparse
seen_keys = {}
ordered: List[Tuple[str, Optional[str]]] = []
for u, l in results:
try:
p = urlparse(u)
key = urlunparse(("", "", p.path, p.params, p.query, p.fragment))
except Exception:
key = u
if key not in seen_keys:
seen_keys[key] = (u, l)
ordered.append((u, l))
return ordered
def download_url(url: str, dest: Path, headers: dict = None):
dest.parent.mkdir(parents=True, exist_ok=True)
headers = headers or {}
if _HAS_REQUESTS:
with requests.get(url, headers=headers, stream=True, timeout=REQUEST_TIMEOUT) as r:
r.raise_for_status()
with open(dest, "wb") as f:
for chunk in r.iter_content(chunk_size=CHUNK_SIZE):
if chunk:
f.write(chunk)
else:
req = urllib.request.Request(url, headers=headers)
try:
with urllib.request.urlopen(req, timeout=REQUEST_TIMEOUT) as r, open(dest, "wb") as f:
while True:
chunk = r.read(CHUNK_SIZE)
if not chunk:
break
f.write(chunk)
except urllib.error.HTTPError as e:
raise RuntimeError(f"HTTP error: {e.code} {e.reason}")
except urllib.error.URLError as e:
raise RuntimeError(f"URL error: {e.reason}")
def fetch_version_detail(version_id: str) -> dict:
url = f"{DETAIL_BASE_URL}/{version_id}.json"
if _HAS_REQUESTS:
r = requests.get(url, timeout=DETAIL_REQUEST_TIMEOUT)
r.raise_for_status()
return r.json()
else:
req = urllib.request.Request(url)
try:
with urllib.request.urlopen(req, timeout=DETAIL_REQUEST_TIMEOUT) as r:
raw = r.read().decode("utf-8")
return json.loads(raw)
except urllib.error.HTTPError as e:
raise RuntimeError(f"HTTP error: {e.code} {e.reason}")
except urllib.error.URLError as e:
raise RuntimeError(f"URL error: {e.reason}")
def get_teachingmaterial_info(detail: dict) -> Tuple[str, str, str]:
"""从详细JSON中提取教材、章节和课程标题信息"""
# 默认值
material_name = DEFAULT_MATERIAL_NAME
chapter_name = DEFAULT_CHAPTER_NAME
course_title = detail.get("title") or detail.get("global_title", {}).get("zh-CN", DEFAULT_COURSE_TITLE)
# 尝试从custom_properties获取教材信息
custom_props = detail.get("custom_properties", {})
if isinstance(custom_props, dict):
teachingmaterial_info = custom_props.get("teachingmaterial_info")
if isinstance(teachingmaterial_info, dict):
material_name = teachingmaterial_info.get("title", material_name)
# 尝试从tag_list获取章节信息
tag_list = detail.get("tag_list", [])
if isinstance(tag_list, list):
for tag in tag_list:
if isinstance(tag, dict) and tag.get("tag_dimension_id") == TAG_DIMENSION_ID:
chapter_name = tag.get("tag_name", chapter_name)
break
return material_name, chapter_name, course_title
def create_hierarchical_structure(out_root: Path, detail: dict, version_id: str) -> Path:
"""创建层次化的目录结构使用global_title作为课程名称"""
material_name, chapter_name, course_title = get_teachingmaterial_info(detail)
# 清理目录名
safe_material = _sanitize_filename(material_name, MAX_DIRNAME_LENGTH)
safe_chapter = _sanitize_filename(chapter_name, MAX_DIRNAME_LENGTH)
safe_title = _sanitize_filename(course_title, MAX_DIRNAME_LENGTH)
# 创建目录结构:教材/章节/课程名称_version_id后6位
dest_dir = out_root / safe_material / safe_chapter / f"{safe_title}_{version_id[-6:]}"
dest_dir.mkdir(parents=True, exist_ok=True)
return dest_dir
def main():
p = argparse.ArgumentParser(description="Scraper for national_lesson resources")
p.add_argument("--part-file", default=DEFAULT_PART_FILE)
p.add_argument("--part-url", default=None, help="Remote URL for part_100.json")
p.add_argument("--out", default=DEFAULT_OUTPUT_DIR)
p.add_argument("--token", default=DEFAULT_TOKEN, help="Full X-Nd-Auth header value (without 'X-Nd-Auth:')")
p.add_argument("--list-only", action="store_true", help="Only list found .pdf/.m3u8 URLs without downloading")
p.add_argument("--limit", type=int, default=0, help="Limit number of version_ids to process (0 = all)")
p.add_argument("--only-pdf", action="store_true", help="Download only .pdf files (skip .m3u8)")
p.add_argument("--test", action="store_true", help="Only parse local part file and print version_ids")
p.add_argument("--flat-structure", action="store_true", help="Use flat directory structure instead of hierarchical")
args = p.parse_args()
part_json = None
if args.part_url:
print(f"Fetching part JSON from {args.part_url} ...")
try:
if _HAS_REQUESTS:
hdrs = {}
if args.token:
hdrs["X-Nd-Auth"] = args.token
r = requests.get(args.part_url, headers=hdrs, timeout=DETAIL_REQUEST_TIMEOUT)
r.raise_for_status()
part_json = r.json()
else:
req = urllib.request.Request(args.part_url)
if args.token:
req.add_header("X-Nd-Auth", args.token)
with urllib.request.urlopen(req, timeout=DETAIL_REQUEST_TIMEOUT) as r:
raw = r.read().decode("utf-8")
part_json = json.loads(raw)
except Exception as e:
print(f"Failed to fetch part JSON: {e}")
return
else:
part_file = Path(args.part_file)
if not part_file.exists():
print(f"Part file not found: {part_file}")
return
part_json = load_json(str(part_file))
vids = extract_version_ids_from_part(part_json)
print("Extracted version_ids:")
for v in vids:
print(v)
if args.test:
return
headers = {}
if args.token:
headers["X-Nd-Auth"] = args.token
out_root = Path(args.out)
if args.limit and args.limit > 0:
vids = vids[: args.limit]
for vid in vids:
try:
print(f"Fetching detail for {vid}...")
detail = fetch_version_detail(vid)
except Exception as e:
print(f"Failed to fetch detail for {vid}: {e}")
continue
url_label_pairs = extract_urls_with_labels(detail)
if not url_label_pairs:
print(f"No pdf/m3u8 urls found for {vid}")
continue
# optionally filter only pdfs
if args.only_pdf:
url_label_pairs = [(u, l) for (u, l) in url_label_pairs if u.lower().endswith(".pdf")]
print(f"Found {len(url_label_pairs)} resource(s) for {vid}:")
for u, l in url_label_pairs:
kind = "pdf" if u.lower().endswith(".pdf") else "m3u8"
label_info = f" label={l!r}" if l else ""
print(f" [{kind}]{label_info} {u}")
if args.list_only:
# save the discovered urls to a manifest for later
dest_dir = out_root / vid
dest_dir.mkdir(parents=True, exist_ok=True)
manifest = {
"version_id": vid,
"resources": [{"url": u, "label": l} for (u, l) in url_label_pairs],
}
with open(dest_dir / "manifest.json", "w", encoding="utf-8") as mf:
json.dump(manifest, mf, ensure_ascii=False, indent=2)
continue
# prepare destination and manifest - use hierarchical structure by default
if args.flat_structure:
dest_dir = out_root / vid
else:
dest_dir = create_hierarchical_structure(out_root, detail, vid)
dest_dir.mkdir(parents=True, exist_ok=True)
manifest_entries = []
existing_names = set()
for u, label in url_label_pairs:
base = u.split("/")[-1].split("?")[0]
ext = base.rsplit('.', 1)[-1] if '.' in base else ''
# 使用更智能的文件命名
if label:
# 清理标签并确保有合适的文件名
safe_label = _sanitize_filename(label)
name_base = safe_label
# 对于视频资源,添加分辨率信息(如果还没有包含)
if u.lower().endswith('.m3u8') and 'resolution' not in safe_label.lower():
# 尝试从URL中提取分辨率信息
resolution_match = re.search(r'(\d+x\d+)', u)
if resolution_match:
name_base = f"{safe_label}_{resolution_match.group(1)}"
else:
# 如果没有标签使用URL的基本名称
name_base = base.rsplit('.', 1)[0] if '.' in base else base
# 确保文件名有合适的扩展名
name = f"{name_base}.{ext}" if ext and not name_base.endswith(f'.{ext}') else name_base
# 避免文件名冲突
i = 1
original_name = name
while name in existing_names or (dest_dir / name).exists():
if '.' in original_name:
name_parts = original_name.rsplit('.', 1)
name = f"{name_parts[0]}_{i}.{name_parts[1]}"
else:
name = f"{original_name}_{i}"
i += 1
existing_names.add(name)
dest = dest_dir / name
entry = {"url": u, "filename": name, "label": label, "status": "pending"}
try:
print(f"Downloading {u} -> {dest} ...")
download_url(u, dest, headers=headers)
entry["status"] = "ok"
print(f"✓ Successfully downloaded: {name}")
except Exception as e:
print(f"✗ Failed to download {u}: {e}")
entry["status"] = f"error: {e}"
manifest_entries.append(entry)
# write per-version manifest
with open(dest_dir / "manifest.json", "w", encoding="utf-8") as mf:
json.dump({"version_id": vid, "resources": manifest_entries}, mf, ensure_ascii=False, indent=2)
# append to global manifest
all_manifest_path = out_root / "manifest_all.json"
all_manifest = []
if all_manifest_path.exists():
try:
with open(all_manifest_path, "r", encoding="utf-8") as af:
all_manifest = json.load(af)
except Exception:
all_manifest = []
all_manifest.append({"version_id": vid, "resources": manifest_entries})
with open(all_manifest_path, "w", encoding="utf-8") as af:
json.dump(all_manifest, af, ensure_ascii=False, indent=2)
if __name__ == "__main__":
main()