note/知识图谱/教科书-数学/all/data_validation.py
2025-11-19 10:16:05 +08:00

707 lines
25 KiB
Python

#!/usr/bin/env python3
"""
数据验证和测试脚本
用于验证导入的数据质量和完整性
"""
import json
import os
import re
import psycopg2
from psycopg2.extras import Json, RealDictCursor
from typing import Dict, List, Any, Tuple, Optional
import logging
from datetime import datetime
from collections import defaultdict
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('data_validation.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class DataValidator:
"""数据验证器"""
def __init__(self, db_config: Dict[str, Any]):
"""
初始化验证器
Args:
db_config: 数据库连接配置
"""
self.db_config = db_config
self.conn = None
self.cursor = None
self.validation_results = {
'errors': [],
'warnings': [],
'statistics': {},
'passed_checks': 0,
'failed_checks': 0
}
def connect(self):
"""连接数据库"""
try:
self.conn = psycopg2.connect(**self.db_config)
self.cursor = self.conn.cursor(cursor_factory=RealDictCursor)
logger.info("数据库连接成功")
except Exception as e:
logger.error(f"数据库连接失败: {e}")
raise
def disconnect(self):
"""断开数据库连接"""
if self.cursor:
self.cursor.close()
if self.conn:
self.conn.close()
logger.info("数据库连接已关闭")
def validate_json_files(self, directory: str) -> bool:
"""
验证JSON文件的格式和完整性
Args:
directory: JSON文件目录路径
Returns:
验证是否通过
"""
logger.info("开始验证JSON文件格式...")
if not os.path.exists(directory):
self.add_error(f"目录不存在: {directory}")
return False
files = [f for f in os.listdir(directory) if f.endswith('.json')]
total_files = len(files)
if total_files == 0:
self.add_error("目录中没有找到JSON文件")
return False
valid_files = 0
file_types = defaultdict(int)
for filename in files:
file_path = os.path.join(directory, filename)
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# 验证文件类型和结构
if filename.startswith('knowledge-'):
success = self.validate_knowledge_json(data, filename)
file_types['knowledge'] += 1
elif filename.startswith('method-'):
success = self.validate_method_json(data, filename)
file_types['method'] += 1
elif filename.startswith('problems-'):
success = self.validate_problem_json(data, filename)
file_types['problem'] += 1
else:
self.add_warning(f"未知文件类型: {filename}")
success = True
if success:
valid_files += 1
except json.JSONDecodeError as e:
self.add_error(f"JSON格式错误 {filename}: {e}")
except Exception as e:
self.add_error(f"验证文件失败 {filename}: {e}")
self.validation_results['statistics']['json_validation'] = {
'total_files': total_files,
'valid_files': valid_files,
'file_types': dict(file_types)
}
success_rate = valid_files / total_files if total_files > 0 else 0
logger.info(f"JSON文件验证完成: {valid_files}/{total_files} 文件通过验证")
return success_rate >= 0.9 # 90%通过率认为验证通过
def validate_knowledge_json(self, data: Dict, filename: str) -> bool:
"""验证知识点JSON结构"""
required_keys = ['knowledge_list']
if not all(key in data for key in required_keys):
self.add_error(f"知识点JSON缺少必需字段: {filename}")
return False
knowledge_list = data.get('knowledge_list', [])
if not isinstance(knowledge_list, list):
self.add_error(f"knowledge_list不是数组: {filename}")
return False
valid_count = 0
for i, knowledge in enumerate(knowledge_list):
if self.validate_knowledge_item(knowledge, f"{filename}[{i}]"):
valid_count += 1
success_rate = valid_count / len(knowledge_list) if knowledge_list else 0
return success_rate >= 0.8
def validate_method_json(self, data: Dict, filename: str) -> bool:
"""验证方法JSON结构"""
required_keys = ['method_list']
if not all(key in data for key in required_keys):
self.add_error(f"方法JSON缺少必需字段: {filename}")
return False
method_list = data.get('method_list', [])
if not isinstance(method_list, list):
self.add_error(f"method_list不是数组: {filename}")
return False
valid_count = 0
for i, method in enumerate(method_list):
if self.validate_method_item(method, f"{filename}[{i}]"):
valid_count += 1
success_rate = valid_count / len(method_list) if method_list else 0
return success_rate >= 0.8
def validate_problem_json(self, data: Dict, filename: str) -> bool:
"""验证题目JSON结构"""
required_keys = ['problem_list']
if not all(key in data for key in required_keys):
self.add_error(f"题目JSON缺少必需字段: {filename}")
return False
problem_list = data.get('problem_list', [])
if not isinstance(problem_list, list):
self.add_error(f"problem_list不是数组: {filename}")
return False
valid_count = 0
for i, problem in enumerate(problem_list):
if self.validate_problem_item(problem, f"{filename}[{i}]"):
valid_count += 1
success_rate = valid_count / len(problem_list) if problem_list else 0
return success_rate >= 0.8
def validate_knowledge_item(self, knowledge: Dict, context: str) -> bool:
"""验证单个知识点项目"""
required_fields = ['编号', '名称', '层次', '类型']
missing_fields = [field for field in required_fields if field not in knowledge]
if missing_fields:
self.add_error(f"知识点缺少必需字段 {context}: {missing_fields}")
return False
# 验证编号格式
code = knowledge.get('编号')
if not re.match(r'^K\d+-\d+-\d+-\d+$', code):
self.add_error(f"知识点编号格式错误 {context}: {code}")
return False
# 验证层次
level = knowledge.get('层次')
if level not in ['二级', '三级']:
self.add_error(f"知识点层次错误 {context}: {level}")
return False
# 验证类型
type_field = knowledge.get('类型')
if type_field not in ['概念/定义', '定理/性质', '公式']:
self.add_error(f"知识点类型错误 {context}: {type_field}")
return False
return True
def validate_method_item(self, method: Dict, context: str) -> bool:
"""验证单个方法项目"""
required_fields = ['编号', '名称', '类型']
missing_fields = [field for field in required_fields if field not in method]
if missing_fields:
self.add_error(f"方法缺少必需字段 {context}: {missing_fields}")
return False
# 验证编号格式
code = method.get('编号')
if not re.match(r'^M\d+-\d+-\d+-\d+$', code):
self.add_error(f"方法编号格式错误 {context}: {code}")
return False
# 验证类型
type_field = method.get('类型')
if type_field not in ['解题方法', '计算技巧', '证明方法']:
self.add_error(f"方法类型错误 {context}: {type_field}")
return False
return True
def validate_problem_item(self, problem: Dict, context: str) -> bool:
"""验证单个题目项目"""
required_fields = ['题目基本信息']
if '题目基本信息' not in problem:
self.add_error(f"题目缺少基本信息 {context}")
return False
basic_info = problem.get('题目基本信息', {})
if '编号' not in basic_info:
self.add_error(f"题目缺少编号 {context}")
return False
# 验证编号格式
code = basic_info.get('编号')
if not re.match(r'^T\d+-\d+-\d+-[A-Z]\d+$', code):
self.add_error(f"题目编号格式错误 {context}: {code}")
return False
return True
def validate_database_consistency(self) -> bool:
"""验证数据库数据一致性"""
logger.info("开始验证数据库一致性...")
checks = [
self.check_duplicate_codes,
self.check_foreign_key_consistency,
self.check_data_completeness,
self.check_relationship_integrity,
self.check_code_format_consistency
]
passed_checks = 0
for check in checks:
try:
if check():
passed_checks += 1
except Exception as e:
self.add_error(f"数据库一致性检查失败: {e}")
self.validation_results['statistics']['database_consistency'] = {
'total_checks': len(checks),
'passed_checks': passed_checks
}
success_rate = passed_checks / len(checks)
logger.info(f"数据库一致性验证完成: {passed_checks}/{len(checks)} 检查通过")
return success_rate >= 0.8
def check_duplicate_codes(self) -> bool:
"""检查重复编号"""
logger.info("检查重复编号...")
queries = [
("SELECT code, COUNT(*) as count FROM knowledge_points GROUP BY code HAVING COUNT(*) > 1", "knowledge_points"),
("SELECT code, COUNT(*) as count FROM methods GROUP BY code HAVING COUNT(*) > 1", "methods"),
("SELECT code, COUNT(*) as count FROM problems GROUP BY code HAVING COUNT(*) > 1", "problems")
]
has_duplicates = False
for query, table in queries:
self.cursor.execute(query)
duplicates = self.cursor.fetchall()
if duplicates:
has_duplicates = True
for dup in duplicates:
self.add_error(f"{table}表中存在重复编号: {dup['code']} (数量: {dup['count']})")
return not has_duplicates
def check_foreign_key_consistency(self) -> bool:
"""检查外键一致性"""
logger.info("检查外键一致性...")
issues = []
# 检查知识点引用完整性
query = """
SELECT kp.code, kr.relation_type, kp_target.code as target_code
FROM knowledge_relations kr
JOIN knowledge_points kp ON kr.source_knowledge_id = kp.id
LEFT JOIN knowledge_points kp_target ON kr.target_knowledge_id = kp_target.id
WHERE kp_target.id IS NULL
"""
self.cursor.execute(query)
orphaned_relations = self.cursor.fetchall()
for relation in orphaned_relations:
self.add_error(f"知识点关系引用不完整: {relation['code']} -> {relation['target_code']} ({relation['relation_type']})")
issues.append(1)
# 检查题目知识点引用完整性
query = """
SELECT p.code, pkt.knowledge_id
FROM problem_knowledge_tags pkt
JOIN problems p ON pkt.problem_id = p.id
LEFT JOIN knowledge_points kp ON pkt.knowledge_id = kp.id
WHERE kp.id IS NULL
"""
self.cursor.execute(query)
orphaned_tags = self.cursor.fetchall()
for tag in orphaned_tags:
self.add_error(f"题目知识点标签引用不完整: 题目 {tag['code']} 引用未知知识点ID {tag['knowledge_id']}")
issues.append(1)
return len(issues) == 0
def check_data_completeness(self) -> bool:
"""检查数据完整性"""
logger.info("检查数据完整性...")
issues = []
# 检查必填字段完整性
checks = [
("SELECT COUNT(*) as count FROM knowledge_points WHERE name IS NULL OR name = ''", "知识点缺少名称"),
("SELECT COUNT(*) as count FROM methods WHERE name IS NULL OR name = ''", "方法缺少名称"),
("SELECT COUNT(*) as count FROM problems WHERE stem IS NULL OR stem = ''", "题目缺少题干")
]
for query, description in checks:
self.cursor.execute(query)
result = self.cursor.fetchone()
if result['count'] > 0:
self.add_warning(f"{description}: {result['count']} 条记录")
# 检查关系数据完整性
query = """
SELECT COUNT(*) as count FROM problems p
WHERE NOT EXISTS (SELECT 1 FROM problem_knowledge_tags pkt WHERE pkt.problem_id = p.id)
"""
self.cursor.execute(query)
result = self.cursor.fetchone()
if result['count'] > 0:
self.add_warning(f"{result['count']} 个题目没有知识点标签")
return len(issues) == 0
def check_relationship_integrity(self) -> bool:
"""检查关系完整性"""
logger.info("检查关系完整性...")
issues = []
# 检查循环依赖
query = """
WITH RECURSIVE cycle_check AS (
SELECT source_knowledge_id, target_knowledge_id, ARRAY[source_knowledge_id] as path
FROM knowledge_relations
WHERE relation_type = '前置知识'
UNION ALL
SELECT kr.source_knowledge_id, kr.target_knowledge_id, cc.path || kr.source_knowledge_id
FROM knowledge_relations kr
JOIN cycle_check cc ON kr.target_knowledge_id = cc.source_knowledge_id
WHERE kr.relation_type = '前置知识'
AND kr.source_knowledge_id != ALL(cc.path)
)
SELECT cc.source_knowledge_id, cc.target_knowledge_id
FROM cycle_check cc
JOIN knowledge_points kp1 ON cc.source_knowledge_id = kp1.id
JOIN knowledge_points kp2 ON cc.target_knowledge_id = kp2.id
WHERE kp1.code = kp2.code
"""
self.cursor.execute(query)
cycles = self.cursor.fetchall()
for cycle in cycles:
self.add_error(f"检测到知识点循环依赖: {cycle['source_knowledge_id']} -> {cycle['target_knowledge_id']}")
issues.append(1)
return len(issues) == 0
def check_code_format_consistency(self) -> bool:
"""检查编号格式一致性"""
logger.info("检查编号格式一致性...")
issues = []
format_checks = [
("SELECT id, code FROM knowledge_points WHERE code !~ '^K\\d+-\\d+-\\d+-\\d+$'", "知识点编号格式"),
("SELECT id, code FROM methods WHERE code !~ '^M\\d+-\\d+-\\d+-\\d+$'", "方法编号格式"),
("SELECT id, code FROM problems WHERE code !~ '^T\\d+-\\d+-\\d+-[A-Z]\\d+$'", "题目编号格式")
]
for query, description in format_checks:
self.cursor.execute(query)
invalid_records = self.cursor.fetchall()
for record in invalid_records:
self.add_error(f"{description}错误: ID {record['id']}, 编号 {record['code']}")
issues.append(1)
return len(issues) == 0
def generate_statistics(self) -> Dict:
"""生成数据统计信息"""
logger.info("生成数据统计信息...")
stats = {}
# 基础统计
basic_queries = {
'knowledge_points': "SELECT COUNT(*) as total FROM knowledge_points",
'methods': "SELECT COUNT(*) as total FROM methods",
'problems': "SELECT COUNT(*) as total FROM problems",
'knowledge_relations': "SELECT COUNT(*) as total FROM knowledge_relations",
'problem_knowledge_tags': "SELECT COUNT(*) as total FROM problem_knowledge_tags",
'problem_method_tags': "SELECT COUNT(*) as total FROM problem_method_tags"
}
for key, query in basic_queries.items():
self.cursor.execute(query)
result = self.cursor.fetchone()
stats[key] = result['total']
# 分类统计
classification_queries = {
'knowledge_by_importance': """
SELECT importance, COUNT(*) as count
FROM knowledge_points
GROUP BY importance
""",
'knowledge_by_level': """
SELECT level, COUNT(*) as count
FROM knowledge_points
GROUP BY level
""",
'methods_by_type': """
SELECT type, COUNT(*) as count
FROM methods
GROUP BY type
""",
'problems_by_type': """
SELECT problem_type, COUNT(*) as count
FROM problems
GROUP BY problem_type
"""
}
for key, query in classification_queries.items():
self.cursor.execute(query)
results = self.cursor.fetchall()
stats[key] = [dict(row) for row in results]
# 关系统计
relation_queries = {
'relation_types': """
SELECT relation_type, COUNT(*) as count
FROM knowledge_relations
GROUP BY relation_type
""",
'average_relations_per_knowledge': """
SELECT AVG(relation_count) as avg_relations
FROM (
SELECT COUNT(*) as relation_count
FROM knowledge_relations
GROUP BY source_knowledge_id
) as relation_counts
"""
}
for key, query in relation_queries.items():
self.cursor.execute(query)
result = self.cursor.fetchone()
stats[key] = dict(result) if result else {}
self.validation_results['statistics']['data_statistics'] = stats
return stats
def validate_learning_path_coherence(self) -> bool:
"""验证学习路径连贯性"""
logger.info("验证学习路径连贯性...")
issues = []
# 检查核心知识点的前置关系完整性
query = """
SELECT kp.code, kp.name, kp.importance
FROM knowledge_points kp
WHERE kp.importance = '核心'
AND EXISTS (
SELECT 1 FROM knowledge_relations kr1
WHERE kr1.target_knowledge_id = kp.id
AND kr1.relation_type = '前置知识'
)
AND NOT EXISTS (
SELECT 1 FROM knowledge_relations kr2
WHERE kr2.source_knowledge_id = kp.id
AND kr2.relation_type = '前置知识'
)
AND kp.level = '二级'
"""
self.cursor.execute(query)
isolated_core_knowledge = self.cursor.fetchall()
for knowledge in isolated_core_knowledge:
self.add_warning(f"核心知识点可能孤立: {knowledge['code']} {knowledge['name']}")
return len(issues) == 0
def run_full_validation(self, json_directory: str) -> Dict:
"""运行完整验证"""
logger.info("开始完整数据验证...")
# 验证JSON文件
json_valid = self.validate_json_files(json_directory)
# 连接数据库
self.connect()
try:
# 验证数据库一致性
db_consistent = self.validate_database_consistency()
# 生成统计信息
stats = self.generate_statistics()
# 验证学习路径连贯性
path_coherent = self.validate_learning_path_coherence()
overall_valid = json_valid and db_consistent and path_coherent
self.validation_results['overall'] = {
'json_files_valid': json_valid,
'database_consistent': db_consistent,
'learning_path_coherent': path_coherent,
'overall_valid': overall_valid
}
return self.validation_results
finally:
self.disconnect()
def generate_report(self, output_file: str = None) -> str:
"""生成验证报告"""
report_lines = []
report_lines.append("# 数据验证报告")
report_lines.append(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report_lines.append("")
# 总体结果
if 'overall' in self.validation_results:
overall = self.validation_results['overall']
report_lines.append("## 总体验证结果")
report_lines.append(f"- JSON文件验证: {'✓ 通过' if overall['json_files_valid'] else '✗ 失败'}")
report_lines.append(f"- 数据库一致性: {'✓ 通过' if overall['database_consistent'] else '✗ 失败'}")
report_lines.append(f"- 学习路径连贯性: {'✓ 通过' if overall['learning_path_coherent'] else '✗ 失败'}")
report_lines.append(f"- **整体验证**: {'✓ 通过' if overall['overall_valid'] else '✗ 失败'}")
report_lines.append("")
# 错误列表
if self.validation_results['errors']:
report_lines.append("## 错误列表")
for i, error in enumerate(self.validation_results['errors'], 1):
report_lines.append(f"{i}. {error}")
report_lines.append("")
# 警告列表
if self.validation_results['warnings']:
report_lines.append("## 警告列表")
for i, warning in enumerate(self.validation_results['warnings'], 1):
report_lines.append(f"{i}. {warning}")
report_lines.append("")
# 统计信息
if 'statistics' in self.validation_results:
report_lines.append("## 统计信息")
stats = self.validation_results['statistics']
# JSON验证统计
if 'json_validation' in stats:
json_stats = stats['json_validation']
report_lines.append(f"### JSON文件验证")
report_lines.append(f"- 总文件数: {json_stats['total_files']}")
report_lines.append(f"- 有效文件数: {json_stats['valid_files']}")
report_lines.append(f"- 文件类型分布: {json_stats['file_types']}")
report_lines.append("")
# 数据库一致性统计
if 'database_consistency' in stats:
db_stats = stats['database_consistency']
report_lines.append(f"### 数据库一致性")
report_lines.append(f"- 检查项总数: {db_stats['total_checks']}")
report_lines.append(f"- 通过检查数: {db_stats['passed_checks']}")
report_lines.append("")
# 数据统计
if 'data_statistics' in stats:
data_stats = stats['data_statistics']
report_lines.append("### 数据量统计")
report_lines.append(f"- 知识点: {data_stats.get('knowledge_points', 0)}")
report_lines.append(f"- 方法: {data_stats.get('methods', 0)}")
report_lines.append(f"- 题目: {data_stats.get('problems', 0)}")
report_lines.append(f"- 知识关系: {data_stats.get('knowledge_relations', 0)}")
report_lines.append("")
report_content = '\n'.join(report_lines)
if output_file:
with open(output_file, 'w', encoding='utf-8') as f:
f.write(report_content)
logger.info(f"验证报告已保存到: {output_file}")
return report_content
def add_error(self, message: str):
"""添加错误"""
self.validation_results['errors'].append(message)
self.validation_results['failed_checks'] += 1
logger.error(f"错误: {message}")
def add_warning(self, message: str):
"""添加警告"""
self.validation_results['warnings'].append(message)
logger.warning(f"警告: {message}")
def main():
"""主函数"""
# 数据库配置
db_config = {
'host': 'localhost',
'port': 5432,
'database': 'math_knowledge_graph',
'user': 'postgres',
'password': 'your_password' # 请修改为实际密码
}
# JSON文件目录
json_directory = '/Users/robertmaxwell/note/知识图谱/教科书-数学/all'
# 创建验证器并执行验证
validator = DataValidator(db_config)
try:
# 运行完整验证
results = validator.run_full_validation(json_directory)
# 生成报告
report = validator.generate_report('validation_report.md')
# 打印简要结果
if results.get('overall', {}).get('overall_valid', False):
print("✓ 数据验证通过")
else:
print("✗ 数据验证失败")
print(f"错误数量: {len(results['errors'])}")
print(f"警告数量: {len(results['warnings'])}")
except Exception as e:
logger.error(f"验证过程中发生错误: {e}")
print(f"验证失败: {e}")
if __name__ == "__main__":
main()