447 lines
17 KiB
Python
447 lines
17 KiB
Python
#!/usr/bin/env python3
|
||
"""Simple scraper following the instructions in 抓取流程.md.
|
||
|
||
Usage:
|
||
python scraper.py --part-file example_part_100.json --test
|
||
|
||
Features:
|
||
- Parse part JSON and extract `version_id`s
|
||
- For each version_id, optionally fetch the details JSON and extract resource URLs ending with .pdf or .m3u8
|
||
- Download resources with optional X-Nd-Auth header
|
||
"""
|
||
import argparse
|
||
import json
|
||
import os
|
||
import re
|
||
from pathlib import Path
|
||
from typing import List, Tuple, Optional
|
||
import html
|
||
import unicodedata
|
||
|
||
# Import configuration from config.py
|
||
try:
|
||
from config import (
|
||
DEFAULT_PART_FILE,
|
||
DEFAULT_OUTPUT_DIR,
|
||
DETAIL_BASE_URL,
|
||
DEFAULT_MATERIAL_NAME,
|
||
DEFAULT_CHAPTER_NAME,
|
||
DEFAULT_COURSE_TITLE,
|
||
DEFAULT_FILENAME,
|
||
TAG_DIMENSION_ID,
|
||
MAX_FILENAME_LENGTH,
|
||
MAX_DIRNAME_LENGTH,
|
||
CHUNK_SIZE,
|
||
REQUEST_TIMEOUT,
|
||
DETAIL_REQUEST_TIMEOUT,
|
||
DEFAULT_TOKEN,
|
||
PDF_OR_M3U8_PATTERN
|
||
)
|
||
except ImportError:
|
||
# Fallback to default values if config.py is not available
|
||
DEFAULT_PART_FILE = "example_part_100.json"
|
||
DEFAULT_OUTPUT_DIR = "downloaded"
|
||
DETAIL_BASE_URL = "https://s-file-2.ykt.cbern.com.cn/zxx/ndrv2/national_lesson/resources/details"
|
||
DEFAULT_MATERIAL_NAME = "未知教材"
|
||
DEFAULT_CHAPTER_NAME = "未知章节"
|
||
DEFAULT_COURSE_TITLE = "未知课程"
|
||
DEFAULT_FILENAME = "unnamed"
|
||
TAG_DIMENSION_ID = "zxxcc"
|
||
MAX_FILENAME_LENGTH = 100
|
||
MAX_DIRNAME_LENGTH = 50
|
||
CHUNK_SIZE = 8192
|
||
REQUEST_TIMEOUT = 30
|
||
DETAIL_REQUEST_TIMEOUT = 20
|
||
DEFAULT_TOKEN = None
|
||
PDF_OR_M3U8_PATTERN = r"https?://\S+\.(?:pdf|m3u8)(?:\?\S*)?"
|
||
|
||
try:
|
||
import requests
|
||
_HAS_REQUESTS = True
|
||
except Exception:
|
||
import urllib.request
|
||
import urllib.error
|
||
_HAS_REQUESTS = False
|
||
|
||
|
||
PDF_OR_M3U8_RE = re.compile(r"https?://\S+\.(?:pdf|m3u8)(?:\?\S*)?", re.IGNORECASE)
|
||
|
||
|
||
def load_json(path: str):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
|
||
|
||
def extract_version_ids_from_part(part_json) -> List[str]:
|
||
ids = []
|
||
if isinstance(part_json, list):
|
||
for item in part_json:
|
||
vid = item.get("version_id")
|
||
if vid:
|
||
ids.append(vid)
|
||
elif isinstance(part_json, dict):
|
||
# support single-object files
|
||
vid = part_json.get("version_id")
|
||
if vid:
|
||
ids.append(vid)
|
||
return ids
|
||
|
||
|
||
def _sanitize_filename(name: str, max_length=MAX_FILENAME_LENGTH) -> str:
|
||
"""增强的文件名清理函数,支持中文和长度限制"""
|
||
if not name:
|
||
return DEFAULT_FILENAME
|
||
|
||
# 1. Unicode标准化和HTML实体解码
|
||
name = unicodedata.normalize("NFKC", html.unescape(name))
|
||
|
||
# 2. 移除控制字符和非法文件名字符
|
||
name = re.sub(r'[\\/*?:"<>|\r\n\t]', '_', name)
|
||
|
||
# 3. 替换连续的下划线和空格
|
||
name = re.sub(r'[_ ]+', '_', name.strip())
|
||
|
||
# 4. 长度限制
|
||
if len(name) > max_length:
|
||
name = name[:max_length-3] + '...'
|
||
|
||
return name if name else DEFAULT_FILENAME
|
||
|
||
|
||
def find_resource_urls_in_json(data) -> List[str]:
|
||
# fallback broad-scan (old behavior) kept for compatibility
|
||
text = json.dumps(data, ensure_ascii=False)
|
||
return list(set(PDF_OR_M3U8_RE.findall(text)))
|
||
|
||
|
||
def extract_urls_with_labels(detail: dict) -> List[Tuple[str, Optional[str]]]:
|
||
"""Traverse known structure in detail JSON and return list of (url, label).
|
||
|
||
For each resource under relations.national_course_resource, if a ti_storages URL
|
||
is found, attempt to use the parent's resource_type_code_name as label.
|
||
Returns list of (url, label) where label may be None when not available.
|
||
"""
|
||
results: List[Tuple[str, Optional[str]]] = []
|
||
# relations may be at detail['relations'] or nested; defensive access
|
||
rels = detail.get("relations") or {}
|
||
ncr = rels.get("national_course_resource") if isinstance(rels, dict) else None
|
||
|
||
if isinstance(ncr, list):
|
||
for resource in ncr:
|
||
# 优先使用 alias_name,如果没有则使用 resource_type_code_name
|
||
label = resource.get("alias_name") or resource.get("resource_type_code_name")
|
||
title = resource.get("title") or resource.get("global_title", {}).get("zh-CN", "")
|
||
|
||
# ti_items is a list of items that may contain ti_storages
|
||
ti_items = resource.get("ti_items") or []
|
||
if not isinstance(ti_items, list):
|
||
continue
|
||
|
||
for ti in ti_items:
|
||
storages = ti.get("ti_storages") or []
|
||
if not isinstance(storages, list):
|
||
continue
|
||
|
||
for s in storages:
|
||
if not isinstance(s, str):
|
||
continue
|
||
|
||
# 检查是否是PDF或M3U8文件
|
||
if PDF_OR_M3U8_RE.search(s):
|
||
# 对于视频资源,尝试获取分辨率信息
|
||
resolution = None
|
||
if s.endswith('.m3u8') and 'custom_properties' in resource:
|
||
custom_props = resource['custom_properties']
|
||
if isinstance(custom_props, dict):
|
||
resolution = custom_props.get('resolution')
|
||
|
||
# 构建更详细的标签
|
||
detailed_label = label
|
||
if title and title != label:
|
||
detailed_label = f"{label}_{title}" if label else title
|
||
if resolution:
|
||
detailed_label = f"{detailed_label}_{resolution}" if detailed_label else resolution
|
||
|
||
results.append((s, detailed_label))
|
||
|
||
# If we found nothing, fall back to broad scan with no labels
|
||
if not results:
|
||
for u in find_resource_urls_in_json(detail):
|
||
results.append((u, None))
|
||
|
||
# dedupe while preserving first label seen for a logical resource.
|
||
# Many resources are served from mirrors like r1-..., r2-..., r3-... where the
|
||
# path+query is identical. Canonicalize by using the pathname+query as key and
|
||
# keep the first full URL we encounter for that key.
|
||
from urllib.parse import urlparse, urlunparse
|
||
|
||
seen_keys = {}
|
||
ordered: List[Tuple[str, Optional[str]]] = []
|
||
for u, l in results:
|
||
try:
|
||
p = urlparse(u)
|
||
key = urlunparse(("", "", p.path, p.params, p.query, p.fragment))
|
||
except Exception:
|
||
key = u
|
||
if key not in seen_keys:
|
||
seen_keys[key] = (u, l)
|
||
ordered.append((u, l))
|
||
|
||
return ordered
|
||
|
||
|
||
def download_url(url: str, dest: Path, headers: dict = None):
|
||
dest.parent.mkdir(parents=True, exist_ok=True)
|
||
headers = headers or {}
|
||
if _HAS_REQUESTS:
|
||
with requests.get(url, headers=headers, stream=True, timeout=REQUEST_TIMEOUT) as r:
|
||
r.raise_for_status()
|
||
with open(dest, "wb") as f:
|
||
for chunk in r.iter_content(chunk_size=CHUNK_SIZE):
|
||
if chunk:
|
||
f.write(chunk)
|
||
else:
|
||
req = urllib.request.Request(url, headers=headers)
|
||
try:
|
||
with urllib.request.urlopen(req, timeout=REQUEST_TIMEOUT) as r, open(dest, "wb") as f:
|
||
while True:
|
||
chunk = r.read(CHUNK_SIZE)
|
||
if not chunk:
|
||
break
|
||
f.write(chunk)
|
||
except urllib.error.HTTPError as e:
|
||
raise RuntimeError(f"HTTP error: {e.code} {e.reason}")
|
||
except urllib.error.URLError as e:
|
||
raise RuntimeError(f"URL error: {e.reason}")
|
||
|
||
|
||
def fetch_version_detail(version_id: str) -> dict:
|
||
url = f"{DETAIL_BASE_URL}/{version_id}.json"
|
||
if _HAS_REQUESTS:
|
||
r = requests.get(url, timeout=DETAIL_REQUEST_TIMEOUT)
|
||
r.raise_for_status()
|
||
return r.json()
|
||
else:
|
||
req = urllib.request.Request(url)
|
||
try:
|
||
with urllib.request.urlopen(req, timeout=DETAIL_REQUEST_TIMEOUT) as r:
|
||
raw = r.read().decode("utf-8")
|
||
return json.loads(raw)
|
||
except urllib.error.HTTPError as e:
|
||
raise RuntimeError(f"HTTP error: {e.code} {e.reason}")
|
||
except urllib.error.URLError as e:
|
||
raise RuntimeError(f"URL error: {e.reason}")
|
||
|
||
|
||
def get_teachingmaterial_info(detail: dict) -> Tuple[str, str, str]:
|
||
"""从详细JSON中提取教材、章节和课程标题信息"""
|
||
# 默认值
|
||
material_name = DEFAULT_MATERIAL_NAME
|
||
chapter_name = DEFAULT_CHAPTER_NAME
|
||
course_title = detail.get("title") or detail.get("global_title", {}).get("zh-CN", DEFAULT_COURSE_TITLE)
|
||
|
||
# 尝试从custom_properties获取教材信息
|
||
custom_props = detail.get("custom_properties", {})
|
||
if isinstance(custom_props, dict):
|
||
teachingmaterial_info = custom_props.get("teachingmaterial_info")
|
||
if isinstance(teachingmaterial_info, dict):
|
||
material_name = teachingmaterial_info.get("title", material_name)
|
||
|
||
# 尝试从tag_list获取章节信息
|
||
tag_list = detail.get("tag_list", [])
|
||
if isinstance(tag_list, list):
|
||
for tag in tag_list:
|
||
if isinstance(tag, dict) and tag.get("tag_dimension_id") == TAG_DIMENSION_ID:
|
||
chapter_name = tag.get("tag_name", chapter_name)
|
||
break
|
||
|
||
return material_name, chapter_name, course_title
|
||
|
||
|
||
def create_hierarchical_structure(out_root: Path, detail: dict, version_id: str) -> Path:
|
||
"""创建层次化的目录结构,使用global_title作为课程名称"""
|
||
material_name, chapter_name, course_title = get_teachingmaterial_info(detail)
|
||
|
||
# 清理目录名
|
||
safe_material = _sanitize_filename(material_name, MAX_DIRNAME_LENGTH)
|
||
safe_chapter = _sanitize_filename(chapter_name, MAX_DIRNAME_LENGTH)
|
||
safe_title = _sanitize_filename(course_title, MAX_DIRNAME_LENGTH)
|
||
|
||
# 创建目录结构:教材/章节/课程名称_version_id后6位
|
||
dest_dir = out_root / safe_material / safe_chapter / f"{safe_title}_{version_id[-6:]}"
|
||
dest_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
return dest_dir
|
||
|
||
|
||
def main():
|
||
p = argparse.ArgumentParser(description="Scraper for national_lesson resources")
|
||
p.add_argument("--part-file", default=DEFAULT_PART_FILE)
|
||
p.add_argument("--part-url", default=None, help="Remote URL for part_100.json")
|
||
p.add_argument("--out", default=DEFAULT_OUTPUT_DIR)
|
||
p.add_argument("--token", default=DEFAULT_TOKEN, help="Full X-Nd-Auth header value (without 'X-Nd-Auth:')")
|
||
p.add_argument("--list-only", action="store_true", help="Only list found .pdf/.m3u8 URLs without downloading")
|
||
p.add_argument("--limit", type=int, default=0, help="Limit number of version_ids to process (0 = all)")
|
||
p.add_argument("--only-pdf", action="store_true", help="Download only .pdf files (skip .m3u8)")
|
||
p.add_argument("--test", action="store_true", help="Only parse local part file and print version_ids")
|
||
p.add_argument("--flat-structure", action="store_true", help="Use flat directory structure instead of hierarchical")
|
||
args = p.parse_args()
|
||
|
||
part_json = None
|
||
if args.part_url:
|
||
print(f"Fetching part JSON from {args.part_url} ...")
|
||
try:
|
||
if _HAS_REQUESTS:
|
||
hdrs = {}
|
||
if args.token:
|
||
hdrs["X-Nd-Auth"] = args.token
|
||
r = requests.get(args.part_url, headers=hdrs, timeout=DETAIL_REQUEST_TIMEOUT)
|
||
r.raise_for_status()
|
||
part_json = r.json()
|
||
else:
|
||
req = urllib.request.Request(args.part_url)
|
||
if args.token:
|
||
req.add_header("X-Nd-Auth", args.token)
|
||
with urllib.request.urlopen(req, timeout=DETAIL_REQUEST_TIMEOUT) as r:
|
||
raw = r.read().decode("utf-8")
|
||
part_json = json.loads(raw)
|
||
except Exception as e:
|
||
print(f"Failed to fetch part JSON: {e}")
|
||
return
|
||
else:
|
||
part_file = Path(args.part_file)
|
||
if not part_file.exists():
|
||
print(f"Part file not found: {part_file}")
|
||
return
|
||
part_json = load_json(str(part_file))
|
||
vids = extract_version_ids_from_part(part_json)
|
||
print("Extracted version_ids:")
|
||
for v in vids:
|
||
print(v)
|
||
|
||
if args.test:
|
||
return
|
||
|
||
headers = {}
|
||
if args.token:
|
||
headers["X-Nd-Auth"] = args.token
|
||
|
||
out_root = Path(args.out)
|
||
if args.limit and args.limit > 0:
|
||
vids = vids[: args.limit]
|
||
|
||
for vid in vids:
|
||
try:
|
||
print(f"Fetching detail for {vid}...")
|
||
detail = fetch_version_detail(vid)
|
||
except Exception as e:
|
||
print(f"Failed to fetch detail for {vid}: {e}")
|
||
continue
|
||
|
||
url_label_pairs = extract_urls_with_labels(detail)
|
||
if not url_label_pairs:
|
||
print(f"No pdf/m3u8 urls found for {vid}")
|
||
continue
|
||
|
||
# optionally filter only pdfs
|
||
if args.only_pdf:
|
||
url_label_pairs = [(u, l) for (u, l) in url_label_pairs if u.lower().endswith(".pdf")]
|
||
|
||
print(f"Found {len(url_label_pairs)} resource(s) for {vid}:")
|
||
for u, l in url_label_pairs:
|
||
kind = "pdf" if u.lower().endswith(".pdf") else "m3u8"
|
||
label_info = f" label={l!r}" if l else ""
|
||
print(f" [{kind}]{label_info} {u}")
|
||
|
||
if args.list_only:
|
||
# save the discovered urls to a manifest for later
|
||
dest_dir = out_root / vid
|
||
dest_dir.mkdir(parents=True, exist_ok=True)
|
||
manifest = {
|
||
"version_id": vid,
|
||
"resources": [{"url": u, "label": l} for (u, l) in url_label_pairs],
|
||
}
|
||
with open(dest_dir / "manifest.json", "w", encoding="utf-8") as mf:
|
||
json.dump(manifest, mf, ensure_ascii=False, indent=2)
|
||
continue
|
||
|
||
# prepare destination and manifest - use hierarchical structure by default
|
||
if args.flat_structure:
|
||
dest_dir = out_root / vid
|
||
else:
|
||
dest_dir = create_hierarchical_structure(out_root, detail, vid)
|
||
|
||
dest_dir.mkdir(parents=True, exist_ok=True)
|
||
manifest_entries = []
|
||
existing_names = set()
|
||
|
||
for u, label in url_label_pairs:
|
||
base = u.split("/")[-1].split("?")[0]
|
||
ext = base.rsplit('.', 1)[-1] if '.' in base else ''
|
||
|
||
# 使用更智能的文件命名
|
||
if label:
|
||
# 清理标签并确保有合适的文件名
|
||
safe_label = _sanitize_filename(label)
|
||
name_base = safe_label
|
||
|
||
# 对于视频资源,添加分辨率信息(如果还没有包含)
|
||
if u.lower().endswith('.m3u8') and 'resolution' not in safe_label.lower():
|
||
# 尝试从URL中提取分辨率信息
|
||
resolution_match = re.search(r'(\d+x\d+)', u)
|
||
if resolution_match:
|
||
name_base = f"{safe_label}_{resolution_match.group(1)}"
|
||
else:
|
||
# 如果没有标签,使用URL的基本名称
|
||
name_base = base.rsplit('.', 1)[0] if '.' in base else base
|
||
|
||
# 确保文件名有合适的扩展名
|
||
name = f"{name_base}.{ext}" if ext and not name_base.endswith(f'.{ext}') else name_base
|
||
|
||
# 避免文件名冲突
|
||
i = 1
|
||
original_name = name
|
||
while name in existing_names or (dest_dir / name).exists():
|
||
if '.' in original_name:
|
||
name_parts = original_name.rsplit('.', 1)
|
||
name = f"{name_parts[0]}_{i}.{name_parts[1]}"
|
||
else:
|
||
name = f"{original_name}_{i}"
|
||
i += 1
|
||
|
||
existing_names.add(name)
|
||
dest = dest_dir / name
|
||
entry = {"url": u, "filename": name, "label": label, "status": "pending"}
|
||
|
||
try:
|
||
print(f"Downloading {u} -> {dest} ...")
|
||
download_url(u, dest, headers=headers)
|
||
entry["status"] = "ok"
|
||
print(f"✓ Successfully downloaded: {name}")
|
||
except Exception as e:
|
||
print(f"✗ Failed to download {u}: {e}")
|
||
entry["status"] = f"error: {e}"
|
||
|
||
manifest_entries.append(entry)
|
||
|
||
# write per-version manifest
|
||
with open(dest_dir / "manifest.json", "w", encoding="utf-8") as mf:
|
||
json.dump({"version_id": vid, "resources": manifest_entries}, mf, ensure_ascii=False, indent=2)
|
||
|
||
# append to global manifest
|
||
all_manifest_path = out_root / "manifest_all.json"
|
||
all_manifest = []
|
||
if all_manifest_path.exists():
|
||
try:
|
||
with open(all_manifest_path, "r", encoding="utf-8") as af:
|
||
all_manifest = json.load(af)
|
||
except Exception:
|
||
all_manifest = []
|
||
all_manifest.append({"version_id": vid, "resources": manifest_entries})
|
||
with open(all_manifest_path, "w", encoding="utf-8") as af:
|
||
json.dump(all_manifest, af, ensure_ascii=False, indent=2)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|