#!/usr/bin/env python3 """Simple scraper following the instructions in 抓取流程.md. Usage: python scraper.py --part-file example_part_100.json --test Features: - Parse part JSON and extract `version_id`s - For each version_id, optionally fetch the details JSON and extract resource URLs ending with .pdf or .m3u8 - Download resources with optional X-Nd-Auth header """ import argparse import json import os import re from pathlib import Path from typing import List, Tuple, Optional import html import unicodedata # Import configuration from config.py try: from config import ( DEFAULT_PART_FILE, DEFAULT_OUTPUT_DIR, DETAIL_BASE_URL, DEFAULT_MATERIAL_NAME, DEFAULT_CHAPTER_NAME, DEFAULT_COURSE_TITLE, DEFAULT_FILENAME, TAG_DIMENSION_ID, MAX_FILENAME_LENGTH, MAX_DIRNAME_LENGTH, CHUNK_SIZE, REQUEST_TIMEOUT, DETAIL_REQUEST_TIMEOUT, DEFAULT_TOKEN, PDF_OR_M3U8_PATTERN ) except ImportError: # Fallback to default values if config.py is not available DEFAULT_PART_FILE = "example_part_100.json" DEFAULT_OUTPUT_DIR = "downloaded" DETAIL_BASE_URL = "https://s-file-2.ykt.cbern.com.cn/zxx/ndrv2/national_lesson/resources/details" DEFAULT_MATERIAL_NAME = "未知教材" DEFAULT_CHAPTER_NAME = "未知章节" DEFAULT_COURSE_TITLE = "未知课程" DEFAULT_FILENAME = "unnamed" TAG_DIMENSION_ID = "zxxcc" MAX_FILENAME_LENGTH = 100 MAX_DIRNAME_LENGTH = 50 CHUNK_SIZE = 8192 REQUEST_TIMEOUT = 30 DETAIL_REQUEST_TIMEOUT = 20 DEFAULT_TOKEN = None PDF_OR_M3U8_PATTERN = r"https?://\S+\.(?:pdf|m3u8)(?:\?\S*)?" try: import requests _HAS_REQUESTS = True except Exception: import urllib.request import urllib.error _HAS_REQUESTS = False PDF_OR_M3U8_RE = re.compile(r"https?://\S+\.(?:pdf|m3u8)(?:\?\S*)?", re.IGNORECASE) def load_json(path: str): with open(path, "r", encoding="utf-8") as f: return json.load(f) def extract_version_ids_from_part(part_json) -> List[str]: ids = [] if isinstance(part_json, list): for item in part_json: vid = item.get("version_id") if vid: ids.append(vid) elif isinstance(part_json, dict): # support single-object files vid = part_json.get("version_id") if vid: ids.append(vid) return ids def _sanitize_filename(name: str, max_length=MAX_FILENAME_LENGTH) -> str: """增强的文件名清理函数,支持中文和长度限制""" if not name: return DEFAULT_FILENAME # 1. Unicode标准化和HTML实体解码 name = unicodedata.normalize("NFKC", html.unescape(name)) # 2. 移除控制字符和非法文件名字符 name = re.sub(r'[\\/*?:"<>|\r\n\t]', '_', name) # 3. 替换连续的下划线和空格 name = re.sub(r'[_ ]+', '_', name.strip()) # 4. 长度限制 if len(name) > max_length: name = name[:max_length-3] + '...' return name if name else DEFAULT_FILENAME def find_resource_urls_in_json(data) -> List[str]: # fallback broad-scan (old behavior) kept for compatibility text = json.dumps(data, ensure_ascii=False) return list(set(PDF_OR_M3U8_RE.findall(text))) def extract_urls_with_labels(detail: dict) -> List[Tuple[str, Optional[str]]]: """Traverse known structure in detail JSON and return list of (url, label). For each resource under relations.national_course_resource, if a ti_storages URL is found, attempt to use the parent's resource_type_code_name as label. Returns list of (url, label) where label may be None when not available. """ results: List[Tuple[str, Optional[str]]] = [] # relations may be at detail['relations'] or nested; defensive access rels = detail.get("relations") or {} ncr = rels.get("national_course_resource") if isinstance(rels, dict) else None if isinstance(ncr, list): for resource in ncr: # 优先使用 alias_name,如果没有则使用 resource_type_code_name label = resource.get("alias_name") or resource.get("resource_type_code_name") title = resource.get("title") or resource.get("global_title", {}).get("zh-CN", "") # ti_items is a list of items that may contain ti_storages ti_items = resource.get("ti_items") or [] if not isinstance(ti_items, list): continue for ti in ti_items: storages = ti.get("ti_storages") or [] if not isinstance(storages, list): continue for s in storages: if not isinstance(s, str): continue # 检查是否是PDF或M3U8文件 if PDF_OR_M3U8_RE.search(s): # 对于视频资源,尝试获取分辨率信息 resolution = None if s.endswith('.m3u8') and 'custom_properties' in resource: custom_props = resource['custom_properties'] if isinstance(custom_props, dict): resolution = custom_props.get('resolution') # 构建更详细的标签 detailed_label = label if title and title != label: detailed_label = f"{label}_{title}" if label else title if resolution: detailed_label = f"{detailed_label}_{resolution}" if detailed_label else resolution results.append((s, detailed_label)) # If we found nothing, fall back to broad scan with no labels if not results: for u in find_resource_urls_in_json(detail): results.append((u, None)) # dedupe while preserving first label seen for a logical resource. # Many resources are served from mirrors like r1-..., r2-..., r3-... where the # path+query is identical. Canonicalize by using the pathname+query as key and # keep the first full URL we encounter for that key. from urllib.parse import urlparse, urlunparse seen_keys = {} ordered: List[Tuple[str, Optional[str]]] = [] for u, l in results: try: p = urlparse(u) key = urlunparse(("", "", p.path, p.params, p.query, p.fragment)) except Exception: key = u if key not in seen_keys: seen_keys[key] = (u, l) ordered.append((u, l)) return ordered def download_url(url: str, dest: Path, headers: dict = None): dest.parent.mkdir(parents=True, exist_ok=True) headers = headers or {} if _HAS_REQUESTS: with requests.get(url, headers=headers, stream=True, timeout=REQUEST_TIMEOUT) as r: r.raise_for_status() with open(dest, "wb") as f: for chunk in r.iter_content(chunk_size=CHUNK_SIZE): if chunk: f.write(chunk) else: req = urllib.request.Request(url, headers=headers) try: with urllib.request.urlopen(req, timeout=REQUEST_TIMEOUT) as r, open(dest, "wb") as f: while True: chunk = r.read(CHUNK_SIZE) if not chunk: break f.write(chunk) except urllib.error.HTTPError as e: raise RuntimeError(f"HTTP error: {e.code} {e.reason}") except urllib.error.URLError as e: raise RuntimeError(f"URL error: {e.reason}") def fetch_version_detail(version_id: str) -> dict: url = f"{DETAIL_BASE_URL}/{version_id}.json" if _HAS_REQUESTS: r = requests.get(url, timeout=DETAIL_REQUEST_TIMEOUT) r.raise_for_status() return r.json() else: req = urllib.request.Request(url) try: with urllib.request.urlopen(req, timeout=DETAIL_REQUEST_TIMEOUT) as r: raw = r.read().decode("utf-8") return json.loads(raw) except urllib.error.HTTPError as e: raise RuntimeError(f"HTTP error: {e.code} {e.reason}") except urllib.error.URLError as e: raise RuntimeError(f"URL error: {e.reason}") def get_teachingmaterial_info(detail: dict) -> Tuple[str, str, str]: """从详细JSON中提取教材、章节和课程标题信息""" # 默认值 material_name = DEFAULT_MATERIAL_NAME chapter_name = DEFAULT_CHAPTER_NAME course_title = detail.get("title") or detail.get("global_title", {}).get("zh-CN", DEFAULT_COURSE_TITLE) # 尝试从custom_properties获取教材信息 custom_props = detail.get("custom_properties", {}) if isinstance(custom_props, dict): teachingmaterial_info = custom_props.get("teachingmaterial_info") if isinstance(teachingmaterial_info, dict): material_name = teachingmaterial_info.get("title", material_name) # 尝试从tag_list获取章节信息 tag_list = detail.get("tag_list", []) if isinstance(tag_list, list): for tag in tag_list: if isinstance(tag, dict) and tag.get("tag_dimension_id") == TAG_DIMENSION_ID: chapter_name = tag.get("tag_name", chapter_name) break return material_name, chapter_name, course_title def create_hierarchical_structure(out_root: Path, detail: dict, version_id: str) -> Path: """创建层次化的目录结构,使用global_title作为课程名称""" material_name, chapter_name, course_title = get_teachingmaterial_info(detail) # 清理目录名 safe_material = _sanitize_filename(material_name, MAX_DIRNAME_LENGTH) safe_chapter = _sanitize_filename(chapter_name, MAX_DIRNAME_LENGTH) safe_title = _sanitize_filename(course_title, MAX_DIRNAME_LENGTH) # 创建目录结构:教材/章节/课程名称_version_id后6位 dest_dir = out_root / safe_material / safe_chapter / f"{safe_title}_{version_id[-6:]}" dest_dir.mkdir(parents=True, exist_ok=True) return dest_dir def main(): p = argparse.ArgumentParser(description="Scraper for national_lesson resources") p.add_argument("--part-file", default=DEFAULT_PART_FILE) p.add_argument("--part-url", default=None, help="Remote URL for part_100.json") p.add_argument("--out", default=DEFAULT_OUTPUT_DIR) p.add_argument("--token", default=DEFAULT_TOKEN, help="Full X-Nd-Auth header value (without 'X-Nd-Auth:')") p.add_argument("--list-only", action="store_true", help="Only list found .pdf/.m3u8 URLs without downloading") p.add_argument("--limit", type=int, default=0, help="Limit number of version_ids to process (0 = all)") p.add_argument("--only-pdf", action="store_true", help="Download only .pdf files (skip .m3u8)") p.add_argument("--test", action="store_true", help="Only parse local part file and print version_ids") p.add_argument("--flat-structure", action="store_true", help="Use flat directory structure instead of hierarchical") args = p.parse_args() part_json = None if args.part_url: print(f"Fetching part JSON from {args.part_url} ...") try: if _HAS_REQUESTS: hdrs = {} if args.token: hdrs["X-Nd-Auth"] = args.token r = requests.get(args.part_url, headers=hdrs, timeout=DETAIL_REQUEST_TIMEOUT) r.raise_for_status() part_json = r.json() else: req = urllib.request.Request(args.part_url) if args.token: req.add_header("X-Nd-Auth", args.token) with urllib.request.urlopen(req, timeout=DETAIL_REQUEST_TIMEOUT) as r: raw = r.read().decode("utf-8") part_json = json.loads(raw) except Exception as e: print(f"Failed to fetch part JSON: {e}") return else: part_file = Path(args.part_file) if not part_file.exists(): print(f"Part file not found: {part_file}") return part_json = load_json(str(part_file)) vids = extract_version_ids_from_part(part_json) print("Extracted version_ids:") for v in vids: print(v) if args.test: return headers = {} if args.token: headers["X-Nd-Auth"] = args.token out_root = Path(args.out) if args.limit and args.limit > 0: vids = vids[: args.limit] for vid in vids: try: print(f"Fetching detail for {vid}...") detail = fetch_version_detail(vid) except Exception as e: print(f"Failed to fetch detail for {vid}: {e}") continue url_label_pairs = extract_urls_with_labels(detail) if not url_label_pairs: print(f"No pdf/m3u8 urls found for {vid}") continue # optionally filter only pdfs if args.only_pdf: url_label_pairs = [(u, l) for (u, l) in url_label_pairs if u.lower().endswith(".pdf")] print(f"Found {len(url_label_pairs)} resource(s) for {vid}:") for u, l in url_label_pairs: kind = "pdf" if u.lower().endswith(".pdf") else "m3u8" label_info = f" label={l!r}" if l else "" print(f" [{kind}]{label_info} {u}") if args.list_only: # save the discovered urls to a manifest for later dest_dir = out_root / vid dest_dir.mkdir(parents=True, exist_ok=True) manifest = { "version_id": vid, "resources": [{"url": u, "label": l} for (u, l) in url_label_pairs], } with open(dest_dir / "manifest.json", "w", encoding="utf-8") as mf: json.dump(manifest, mf, ensure_ascii=False, indent=2) continue # prepare destination and manifest - use hierarchical structure by default if args.flat_structure: dest_dir = out_root / vid else: dest_dir = create_hierarchical_structure(out_root, detail, vid) dest_dir.mkdir(parents=True, exist_ok=True) manifest_entries = [] existing_names = set() for u, label in url_label_pairs: base = u.split("/")[-1].split("?")[0] ext = base.rsplit('.', 1)[-1] if '.' in base else '' # 使用更智能的文件命名 if label: # 清理标签并确保有合适的文件名 safe_label = _sanitize_filename(label) name_base = safe_label # 对于视频资源,添加分辨率信息(如果还没有包含) if u.lower().endswith('.m3u8') and 'resolution' not in safe_label.lower(): # 尝试从URL中提取分辨率信息 resolution_match = re.search(r'(\d+x\d+)', u) if resolution_match: name_base = f"{safe_label}_{resolution_match.group(1)}" else: # 如果没有标签,使用URL的基本名称 name_base = base.rsplit('.', 1)[0] if '.' in base else base # 确保文件名有合适的扩展名 name = f"{name_base}.{ext}" if ext and not name_base.endswith(f'.{ext}') else name_base # 避免文件名冲突 i = 1 original_name = name while name in existing_names or (dest_dir / name).exists(): if '.' in original_name: name_parts = original_name.rsplit('.', 1) name = f"{name_parts[0]}_{i}.{name_parts[1]}" else: name = f"{original_name}_{i}" i += 1 existing_names.add(name) dest = dest_dir / name entry = {"url": u, "filename": name, "label": label, "status": "pending"} try: print(f"Downloading {u} -> {dest} ...") download_url(u, dest, headers=headers) entry["status"] = "ok" print(f"✓ Successfully downloaded: {name}") except Exception as e: print(f"✗ Failed to download {u}: {e}") entry["status"] = f"error: {e}" manifest_entries.append(entry) # write per-version manifest with open(dest_dir / "manifest.json", "w", encoding="utf-8") as mf: json.dump({"version_id": vid, "resources": manifest_entries}, mf, ensure_ascii=False, indent=2) # append to global manifest all_manifest_path = out_root / "manifest_all.json" all_manifest = [] if all_manifest_path.exists(): try: with open(all_manifest_path, "r", encoding="utf-8") as af: all_manifest = json.load(af) except Exception: all_manifest = [] all_manifest.append({"version_id": vid, "resources": manifest_entries}) with open(all_manifest_path, "w", encoding="utf-8") as af: json.dump(all_manifest, af, ensure_ascii=False, indent=2) if __name__ == "__main__": main()