707 lines
25 KiB
Python
707 lines
25 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
数据验证和测试脚本
|
|
用于验证导入的数据质量和完整性
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import psycopg2
|
|
from psycopg2.extras import Json, RealDictCursor
|
|
from typing import Dict, List, Any, Tuple, Optional
|
|
import logging
|
|
from datetime import datetime
|
|
from collections import defaultdict
|
|
|
|
# 配置日志
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler('data_validation.log'),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class DataValidator:
|
|
"""数据验证器"""
|
|
|
|
def __init__(self, db_config: Dict[str, Any]):
|
|
"""
|
|
初始化验证器
|
|
|
|
Args:
|
|
db_config: 数据库连接配置
|
|
"""
|
|
self.db_config = db_config
|
|
self.conn = None
|
|
self.cursor = None
|
|
self.validation_results = {
|
|
'errors': [],
|
|
'warnings': [],
|
|
'statistics': {},
|
|
'passed_checks': 0,
|
|
'failed_checks': 0
|
|
}
|
|
|
|
def connect(self):
|
|
"""连接数据库"""
|
|
try:
|
|
self.conn = psycopg2.connect(**self.db_config)
|
|
self.cursor = self.conn.cursor(cursor_factory=RealDictCursor)
|
|
logger.info("数据库连接成功")
|
|
except Exception as e:
|
|
logger.error(f"数据库连接失败: {e}")
|
|
raise
|
|
|
|
def disconnect(self):
|
|
"""断开数据库连接"""
|
|
if self.cursor:
|
|
self.cursor.close()
|
|
if self.conn:
|
|
self.conn.close()
|
|
logger.info("数据库连接已关闭")
|
|
|
|
def validate_json_files(self, directory: str) -> bool:
|
|
"""
|
|
验证JSON文件的格式和完整性
|
|
|
|
Args:
|
|
directory: JSON文件目录路径
|
|
|
|
Returns:
|
|
验证是否通过
|
|
"""
|
|
logger.info("开始验证JSON文件格式...")
|
|
|
|
if not os.path.exists(directory):
|
|
self.add_error(f"目录不存在: {directory}")
|
|
return False
|
|
|
|
files = [f for f in os.listdir(directory) if f.endswith('.json')]
|
|
total_files = len(files)
|
|
|
|
if total_files == 0:
|
|
self.add_error("目录中没有找到JSON文件")
|
|
return False
|
|
|
|
valid_files = 0
|
|
file_types = defaultdict(int)
|
|
|
|
for filename in files:
|
|
file_path = os.path.join(directory, filename)
|
|
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
# 验证文件类型和结构
|
|
if filename.startswith('knowledge-'):
|
|
success = self.validate_knowledge_json(data, filename)
|
|
file_types['knowledge'] += 1
|
|
elif filename.startswith('method-'):
|
|
success = self.validate_method_json(data, filename)
|
|
file_types['method'] += 1
|
|
elif filename.startswith('problems-'):
|
|
success = self.validate_problem_json(data, filename)
|
|
file_types['problem'] += 1
|
|
else:
|
|
self.add_warning(f"未知文件类型: {filename}")
|
|
success = True
|
|
|
|
if success:
|
|
valid_files += 1
|
|
|
|
except json.JSONDecodeError as e:
|
|
self.add_error(f"JSON格式错误 {filename}: {e}")
|
|
except Exception as e:
|
|
self.add_error(f"验证文件失败 {filename}: {e}")
|
|
|
|
self.validation_results['statistics']['json_validation'] = {
|
|
'total_files': total_files,
|
|
'valid_files': valid_files,
|
|
'file_types': dict(file_types)
|
|
}
|
|
|
|
success_rate = valid_files / total_files if total_files > 0 else 0
|
|
logger.info(f"JSON文件验证完成: {valid_files}/{total_files} 文件通过验证")
|
|
|
|
return success_rate >= 0.9 # 90%通过率认为验证通过
|
|
|
|
def validate_knowledge_json(self, data: Dict, filename: str) -> bool:
|
|
"""验证知识点JSON结构"""
|
|
required_keys = ['knowledge_list']
|
|
|
|
if not all(key in data for key in required_keys):
|
|
self.add_error(f"知识点JSON缺少必需字段: {filename}")
|
|
return False
|
|
|
|
knowledge_list = data.get('knowledge_list', [])
|
|
if not isinstance(knowledge_list, list):
|
|
self.add_error(f"knowledge_list不是数组: {filename}")
|
|
return False
|
|
|
|
valid_count = 0
|
|
for i, knowledge in enumerate(knowledge_list):
|
|
if self.validate_knowledge_item(knowledge, f"{filename}[{i}]"):
|
|
valid_count += 1
|
|
|
|
success_rate = valid_count / len(knowledge_list) if knowledge_list else 0
|
|
return success_rate >= 0.8
|
|
|
|
def validate_method_json(self, data: Dict, filename: str) -> bool:
|
|
"""验证方法JSON结构"""
|
|
required_keys = ['method_list']
|
|
|
|
if not all(key in data for key in required_keys):
|
|
self.add_error(f"方法JSON缺少必需字段: {filename}")
|
|
return False
|
|
|
|
method_list = data.get('method_list', [])
|
|
if not isinstance(method_list, list):
|
|
self.add_error(f"method_list不是数组: {filename}")
|
|
return False
|
|
|
|
valid_count = 0
|
|
for i, method in enumerate(method_list):
|
|
if self.validate_method_item(method, f"{filename}[{i}]"):
|
|
valid_count += 1
|
|
|
|
success_rate = valid_count / len(method_list) if method_list else 0
|
|
return success_rate >= 0.8
|
|
|
|
def validate_problem_json(self, data: Dict, filename: str) -> bool:
|
|
"""验证题目JSON结构"""
|
|
required_keys = ['problem_list']
|
|
|
|
if not all(key in data for key in required_keys):
|
|
self.add_error(f"题目JSON缺少必需字段: {filename}")
|
|
return False
|
|
|
|
problem_list = data.get('problem_list', [])
|
|
if not isinstance(problem_list, list):
|
|
self.add_error(f"problem_list不是数组: {filename}")
|
|
return False
|
|
|
|
valid_count = 0
|
|
for i, problem in enumerate(problem_list):
|
|
if self.validate_problem_item(problem, f"{filename}[{i}]"):
|
|
valid_count += 1
|
|
|
|
success_rate = valid_count / len(problem_list) if problem_list else 0
|
|
return success_rate >= 0.8
|
|
|
|
def validate_knowledge_item(self, knowledge: Dict, context: str) -> bool:
|
|
"""验证单个知识点项目"""
|
|
required_fields = ['编号', '名称', '层次', '类型']
|
|
missing_fields = [field for field in required_fields if field not in knowledge]
|
|
|
|
if missing_fields:
|
|
self.add_error(f"知识点缺少必需字段 {context}: {missing_fields}")
|
|
return False
|
|
|
|
# 验证编号格式
|
|
code = knowledge.get('编号')
|
|
if not re.match(r'^K\d+-\d+-\d+-\d+$', code):
|
|
self.add_error(f"知识点编号格式错误 {context}: {code}")
|
|
return False
|
|
|
|
# 验证层次
|
|
level = knowledge.get('层次')
|
|
if level not in ['二级', '三级']:
|
|
self.add_error(f"知识点层次错误 {context}: {level}")
|
|
return False
|
|
|
|
# 验证类型
|
|
type_field = knowledge.get('类型')
|
|
if type_field not in ['概念/定义', '定理/性质', '公式']:
|
|
self.add_error(f"知识点类型错误 {context}: {type_field}")
|
|
return False
|
|
|
|
return True
|
|
|
|
def validate_method_item(self, method: Dict, context: str) -> bool:
|
|
"""验证单个方法项目"""
|
|
required_fields = ['编号', '名称', '类型']
|
|
missing_fields = [field for field in required_fields if field not in method]
|
|
|
|
if missing_fields:
|
|
self.add_error(f"方法缺少必需字段 {context}: {missing_fields}")
|
|
return False
|
|
|
|
# 验证编号格式
|
|
code = method.get('编号')
|
|
if not re.match(r'^M\d+-\d+-\d+-\d+$', code):
|
|
self.add_error(f"方法编号格式错误 {context}: {code}")
|
|
return False
|
|
|
|
# 验证类型
|
|
type_field = method.get('类型')
|
|
if type_field not in ['解题方法', '计算技巧', '证明方法']:
|
|
self.add_error(f"方法类型错误 {context}: {type_field}")
|
|
return False
|
|
|
|
return True
|
|
|
|
def validate_problem_item(self, problem: Dict, context: str) -> bool:
|
|
"""验证单个题目项目"""
|
|
required_fields = ['题目基本信息']
|
|
if '题目基本信息' not in problem:
|
|
self.add_error(f"题目缺少基本信息 {context}")
|
|
return False
|
|
|
|
basic_info = problem.get('题目基本信息', {})
|
|
if '编号' not in basic_info:
|
|
self.add_error(f"题目缺少编号 {context}")
|
|
return False
|
|
|
|
# 验证编号格式
|
|
code = basic_info.get('编号')
|
|
if not re.match(r'^T\d+-\d+-\d+-[A-Z]\d+$', code):
|
|
self.add_error(f"题目编号格式错误 {context}: {code}")
|
|
return False
|
|
|
|
return True
|
|
|
|
def validate_database_consistency(self) -> bool:
|
|
"""验证数据库数据一致性"""
|
|
logger.info("开始验证数据库一致性...")
|
|
|
|
checks = [
|
|
self.check_duplicate_codes,
|
|
self.check_foreign_key_consistency,
|
|
self.check_data_completeness,
|
|
self.check_relationship_integrity,
|
|
self.check_code_format_consistency
|
|
]
|
|
|
|
passed_checks = 0
|
|
for check in checks:
|
|
try:
|
|
if check():
|
|
passed_checks += 1
|
|
except Exception as e:
|
|
self.add_error(f"数据库一致性检查失败: {e}")
|
|
|
|
self.validation_results['statistics']['database_consistency'] = {
|
|
'total_checks': len(checks),
|
|
'passed_checks': passed_checks
|
|
}
|
|
|
|
success_rate = passed_checks / len(checks)
|
|
logger.info(f"数据库一致性验证完成: {passed_checks}/{len(checks)} 检查通过")
|
|
|
|
return success_rate >= 0.8
|
|
|
|
def check_duplicate_codes(self) -> bool:
|
|
"""检查重复编号"""
|
|
logger.info("检查重复编号...")
|
|
|
|
queries = [
|
|
("SELECT code, COUNT(*) as count FROM knowledge_points GROUP BY code HAVING COUNT(*) > 1", "knowledge_points"),
|
|
("SELECT code, COUNT(*) as count FROM methods GROUP BY code HAVING COUNT(*) > 1", "methods"),
|
|
("SELECT code, COUNT(*) as count FROM problems GROUP BY code HAVING COUNT(*) > 1", "problems")
|
|
]
|
|
|
|
has_duplicates = False
|
|
for query, table in queries:
|
|
self.cursor.execute(query)
|
|
duplicates = self.cursor.fetchall()
|
|
|
|
if duplicates:
|
|
has_duplicates = True
|
|
for dup in duplicates:
|
|
self.add_error(f"{table}表中存在重复编号: {dup['code']} (数量: {dup['count']})")
|
|
|
|
return not has_duplicates
|
|
|
|
def check_foreign_key_consistency(self) -> bool:
|
|
"""检查外键一致性"""
|
|
logger.info("检查外键一致性...")
|
|
|
|
issues = []
|
|
|
|
# 检查知识点引用完整性
|
|
query = """
|
|
SELECT kp.code, kr.relation_type, kp_target.code as target_code
|
|
FROM knowledge_relations kr
|
|
JOIN knowledge_points kp ON kr.source_knowledge_id = kp.id
|
|
LEFT JOIN knowledge_points kp_target ON kr.target_knowledge_id = kp_target.id
|
|
WHERE kp_target.id IS NULL
|
|
"""
|
|
self.cursor.execute(query)
|
|
orphaned_relations = self.cursor.fetchall()
|
|
|
|
for relation in orphaned_relations:
|
|
self.add_error(f"知识点关系引用不完整: {relation['code']} -> {relation['target_code']} ({relation['relation_type']})")
|
|
issues.append(1)
|
|
|
|
# 检查题目知识点引用完整性
|
|
query = """
|
|
SELECT p.code, pkt.knowledge_id
|
|
FROM problem_knowledge_tags pkt
|
|
JOIN problems p ON pkt.problem_id = p.id
|
|
LEFT JOIN knowledge_points kp ON pkt.knowledge_id = kp.id
|
|
WHERE kp.id IS NULL
|
|
"""
|
|
self.cursor.execute(query)
|
|
orphaned_tags = self.cursor.fetchall()
|
|
|
|
for tag in orphaned_tags:
|
|
self.add_error(f"题目知识点标签引用不完整: 题目 {tag['code']} 引用未知知识点ID {tag['knowledge_id']}")
|
|
issues.append(1)
|
|
|
|
return len(issues) == 0
|
|
|
|
def check_data_completeness(self) -> bool:
|
|
"""检查数据完整性"""
|
|
logger.info("检查数据完整性...")
|
|
|
|
issues = []
|
|
|
|
# 检查必填字段完整性
|
|
checks = [
|
|
("SELECT COUNT(*) as count FROM knowledge_points WHERE name IS NULL OR name = ''", "知识点缺少名称"),
|
|
("SELECT COUNT(*) as count FROM methods WHERE name IS NULL OR name = ''", "方法缺少名称"),
|
|
("SELECT COUNT(*) as count FROM problems WHERE stem IS NULL OR stem = ''", "题目缺少题干")
|
|
]
|
|
|
|
for query, description in checks:
|
|
self.cursor.execute(query)
|
|
result = self.cursor.fetchone()
|
|
if result['count'] > 0:
|
|
self.add_warning(f"{description}: {result['count']} 条记录")
|
|
|
|
# 检查关系数据完整性
|
|
query = """
|
|
SELECT COUNT(*) as count FROM problems p
|
|
WHERE NOT EXISTS (SELECT 1 FROM problem_knowledge_tags pkt WHERE pkt.problem_id = p.id)
|
|
"""
|
|
self.cursor.execute(query)
|
|
result = self.cursor.fetchone()
|
|
if result['count'] > 0:
|
|
self.add_warning(f"{result['count']} 个题目没有知识点标签")
|
|
|
|
return len(issues) == 0
|
|
|
|
def check_relationship_integrity(self) -> bool:
|
|
"""检查关系完整性"""
|
|
logger.info("检查关系完整性...")
|
|
|
|
issues = []
|
|
|
|
# 检查循环依赖
|
|
query = """
|
|
WITH RECURSIVE cycle_check AS (
|
|
SELECT source_knowledge_id, target_knowledge_id, ARRAY[source_knowledge_id] as path
|
|
FROM knowledge_relations
|
|
WHERE relation_type = '前置知识'
|
|
|
|
UNION ALL
|
|
|
|
SELECT kr.source_knowledge_id, kr.target_knowledge_id, cc.path || kr.source_knowledge_id
|
|
FROM knowledge_relations kr
|
|
JOIN cycle_check cc ON kr.target_knowledge_id = cc.source_knowledge_id
|
|
WHERE kr.relation_type = '前置知识'
|
|
AND kr.source_knowledge_id != ALL(cc.path)
|
|
)
|
|
SELECT cc.source_knowledge_id, cc.target_knowledge_id
|
|
FROM cycle_check cc
|
|
JOIN knowledge_points kp1 ON cc.source_knowledge_id = kp1.id
|
|
JOIN knowledge_points kp2 ON cc.target_knowledge_id = kp2.id
|
|
WHERE kp1.code = kp2.code
|
|
"""
|
|
self.cursor.execute(query)
|
|
cycles = self.cursor.fetchall()
|
|
|
|
for cycle in cycles:
|
|
self.add_error(f"检测到知识点循环依赖: {cycle['source_knowledge_id']} -> {cycle['target_knowledge_id']}")
|
|
issues.append(1)
|
|
|
|
return len(issues) == 0
|
|
|
|
def check_code_format_consistency(self) -> bool:
|
|
"""检查编号格式一致性"""
|
|
logger.info("检查编号格式一致性...")
|
|
|
|
issues = []
|
|
|
|
format_checks = [
|
|
("SELECT id, code FROM knowledge_points WHERE code !~ '^K\\d+-\\d+-\\d+-\\d+$'", "知识点编号格式"),
|
|
("SELECT id, code FROM methods WHERE code !~ '^M\\d+-\\d+-\\d+-\\d+$'", "方法编号格式"),
|
|
("SELECT id, code FROM problems WHERE code !~ '^T\\d+-\\d+-\\d+-[A-Z]\\d+$'", "题目编号格式")
|
|
]
|
|
|
|
for query, description in format_checks:
|
|
self.cursor.execute(query)
|
|
invalid_records = self.cursor.fetchall()
|
|
|
|
for record in invalid_records:
|
|
self.add_error(f"{description}错误: ID {record['id']}, 编号 {record['code']}")
|
|
issues.append(1)
|
|
|
|
return len(issues) == 0
|
|
|
|
def generate_statistics(self) -> Dict:
|
|
"""生成数据统计信息"""
|
|
logger.info("生成数据统计信息...")
|
|
|
|
stats = {}
|
|
|
|
# 基础统计
|
|
basic_queries = {
|
|
'knowledge_points': "SELECT COUNT(*) as total FROM knowledge_points",
|
|
'methods': "SELECT COUNT(*) as total FROM methods",
|
|
'problems': "SELECT COUNT(*) as total FROM problems",
|
|
'knowledge_relations': "SELECT COUNT(*) as total FROM knowledge_relations",
|
|
'problem_knowledge_tags': "SELECT COUNT(*) as total FROM problem_knowledge_tags",
|
|
'problem_method_tags': "SELECT COUNT(*) as total FROM problem_method_tags"
|
|
}
|
|
|
|
for key, query in basic_queries.items():
|
|
self.cursor.execute(query)
|
|
result = self.cursor.fetchone()
|
|
stats[key] = result['total']
|
|
|
|
# 分类统计
|
|
classification_queries = {
|
|
'knowledge_by_importance': """
|
|
SELECT importance, COUNT(*) as count
|
|
FROM knowledge_points
|
|
GROUP BY importance
|
|
""",
|
|
'knowledge_by_level': """
|
|
SELECT level, COUNT(*) as count
|
|
FROM knowledge_points
|
|
GROUP BY level
|
|
""",
|
|
'methods_by_type': """
|
|
SELECT type, COUNT(*) as count
|
|
FROM methods
|
|
GROUP BY type
|
|
""",
|
|
'problems_by_type': """
|
|
SELECT problem_type, COUNT(*) as count
|
|
FROM problems
|
|
GROUP BY problem_type
|
|
"""
|
|
}
|
|
|
|
for key, query in classification_queries.items():
|
|
self.cursor.execute(query)
|
|
results = self.cursor.fetchall()
|
|
stats[key] = [dict(row) for row in results]
|
|
|
|
# 关系统计
|
|
relation_queries = {
|
|
'relation_types': """
|
|
SELECT relation_type, COUNT(*) as count
|
|
FROM knowledge_relations
|
|
GROUP BY relation_type
|
|
""",
|
|
'average_relations_per_knowledge': """
|
|
SELECT AVG(relation_count) as avg_relations
|
|
FROM (
|
|
SELECT COUNT(*) as relation_count
|
|
FROM knowledge_relations
|
|
GROUP BY source_knowledge_id
|
|
) as relation_counts
|
|
"""
|
|
}
|
|
|
|
for key, query in relation_queries.items():
|
|
self.cursor.execute(query)
|
|
result = self.cursor.fetchone()
|
|
stats[key] = dict(result) if result else {}
|
|
|
|
self.validation_results['statistics']['data_statistics'] = stats
|
|
return stats
|
|
|
|
def validate_learning_path_coherence(self) -> bool:
|
|
"""验证学习路径连贯性"""
|
|
logger.info("验证学习路径连贯性...")
|
|
|
|
issues = []
|
|
|
|
# 检查核心知识点的前置关系完整性
|
|
query = """
|
|
SELECT kp.code, kp.name, kp.importance
|
|
FROM knowledge_points kp
|
|
WHERE kp.importance = '核心'
|
|
AND EXISTS (
|
|
SELECT 1 FROM knowledge_relations kr1
|
|
WHERE kr1.target_knowledge_id = kp.id
|
|
AND kr1.relation_type = '前置知识'
|
|
)
|
|
AND NOT EXISTS (
|
|
SELECT 1 FROM knowledge_relations kr2
|
|
WHERE kr2.source_knowledge_id = kp.id
|
|
AND kr2.relation_type = '前置知识'
|
|
)
|
|
AND kp.level = '二级'
|
|
"""
|
|
self.cursor.execute(query)
|
|
isolated_core_knowledge = self.cursor.fetchall()
|
|
|
|
for knowledge in isolated_core_knowledge:
|
|
self.add_warning(f"核心知识点可能孤立: {knowledge['code']} {knowledge['name']}")
|
|
|
|
return len(issues) == 0
|
|
|
|
def run_full_validation(self, json_directory: str) -> Dict:
|
|
"""运行完整验证"""
|
|
logger.info("开始完整数据验证...")
|
|
|
|
# 验证JSON文件
|
|
json_valid = self.validate_json_files(json_directory)
|
|
|
|
# 连接数据库
|
|
self.connect()
|
|
|
|
try:
|
|
# 验证数据库一致性
|
|
db_consistent = self.validate_database_consistency()
|
|
|
|
# 生成统计信息
|
|
stats = self.generate_statistics()
|
|
|
|
# 验证学习路径连贯性
|
|
path_coherent = self.validate_learning_path_coherence()
|
|
|
|
overall_valid = json_valid and db_consistent and path_coherent
|
|
|
|
self.validation_results['overall'] = {
|
|
'json_files_valid': json_valid,
|
|
'database_consistent': db_consistent,
|
|
'learning_path_coherent': path_coherent,
|
|
'overall_valid': overall_valid
|
|
}
|
|
|
|
return self.validation_results
|
|
|
|
finally:
|
|
self.disconnect()
|
|
|
|
def generate_report(self, output_file: str = None) -> str:
|
|
"""生成验证报告"""
|
|
report_lines = []
|
|
report_lines.append("# 数据验证报告")
|
|
report_lines.append(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
|
report_lines.append("")
|
|
|
|
# 总体结果
|
|
if 'overall' in self.validation_results:
|
|
overall = self.validation_results['overall']
|
|
report_lines.append("## 总体验证结果")
|
|
report_lines.append(f"- JSON文件验证: {'✓ 通过' if overall['json_files_valid'] else '✗ 失败'}")
|
|
report_lines.append(f"- 数据库一致性: {'✓ 通过' if overall['database_consistent'] else '✗ 失败'}")
|
|
report_lines.append(f"- 学习路径连贯性: {'✓ 通过' if overall['learning_path_coherent'] else '✗ 失败'}")
|
|
report_lines.append(f"- **整体验证**: {'✓ 通过' if overall['overall_valid'] else '✗ 失败'}")
|
|
report_lines.append("")
|
|
|
|
# 错误列表
|
|
if self.validation_results['errors']:
|
|
report_lines.append("## 错误列表")
|
|
for i, error in enumerate(self.validation_results['errors'], 1):
|
|
report_lines.append(f"{i}. {error}")
|
|
report_lines.append("")
|
|
|
|
# 警告列表
|
|
if self.validation_results['warnings']:
|
|
report_lines.append("## 警告列表")
|
|
for i, warning in enumerate(self.validation_results['warnings'], 1):
|
|
report_lines.append(f"{i}. {warning}")
|
|
report_lines.append("")
|
|
|
|
# 统计信息
|
|
if 'statistics' in self.validation_results:
|
|
report_lines.append("## 统计信息")
|
|
stats = self.validation_results['statistics']
|
|
|
|
# JSON验证统计
|
|
if 'json_validation' in stats:
|
|
json_stats = stats['json_validation']
|
|
report_lines.append(f"### JSON文件验证")
|
|
report_lines.append(f"- 总文件数: {json_stats['total_files']}")
|
|
report_lines.append(f"- 有效文件数: {json_stats['valid_files']}")
|
|
report_lines.append(f"- 文件类型分布: {json_stats['file_types']}")
|
|
report_lines.append("")
|
|
|
|
# 数据库一致性统计
|
|
if 'database_consistency' in stats:
|
|
db_stats = stats['database_consistency']
|
|
report_lines.append(f"### 数据库一致性")
|
|
report_lines.append(f"- 检查项总数: {db_stats['total_checks']}")
|
|
report_lines.append(f"- 通过检查数: {db_stats['passed_checks']}")
|
|
report_lines.append("")
|
|
|
|
# 数据统计
|
|
if 'data_statistics' in stats:
|
|
data_stats = stats['data_statistics']
|
|
report_lines.append("### 数据量统计")
|
|
report_lines.append(f"- 知识点: {data_stats.get('knowledge_points', 0)} 个")
|
|
report_lines.append(f"- 方法: {data_stats.get('methods', 0)} 个")
|
|
report_lines.append(f"- 题目: {data_stats.get('problems', 0)} 个")
|
|
report_lines.append(f"- 知识关系: {data_stats.get('knowledge_relations', 0)} 条")
|
|
report_lines.append("")
|
|
|
|
report_content = '\n'.join(report_lines)
|
|
|
|
if output_file:
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
f.write(report_content)
|
|
logger.info(f"验证报告已保存到: {output_file}")
|
|
|
|
return report_content
|
|
|
|
def add_error(self, message: str):
|
|
"""添加错误"""
|
|
self.validation_results['errors'].append(message)
|
|
self.validation_results['failed_checks'] += 1
|
|
logger.error(f"错误: {message}")
|
|
|
|
def add_warning(self, message: str):
|
|
"""添加警告"""
|
|
self.validation_results['warnings'].append(message)
|
|
logger.warning(f"警告: {message}")
|
|
|
|
def main():
|
|
"""主函数"""
|
|
# 数据库配置
|
|
db_config = {
|
|
'host': 'localhost',
|
|
'port': 5432,
|
|
'database': 'math_knowledge_graph',
|
|
'user': 'postgres',
|
|
'password': 'your_password' # 请修改为实际密码
|
|
}
|
|
|
|
# JSON文件目录
|
|
json_directory = '/Users/robertmaxwell/note/知识图谱/教科书-数学/all'
|
|
|
|
# 创建验证器并执行验证
|
|
validator = DataValidator(db_config)
|
|
|
|
try:
|
|
# 运行完整验证
|
|
results = validator.run_full_validation(json_directory)
|
|
|
|
# 生成报告
|
|
report = validator.generate_report('validation_report.md')
|
|
|
|
# 打印简要结果
|
|
if results.get('overall', {}).get('overall_valid', False):
|
|
print("✓ 数据验证通过")
|
|
else:
|
|
print("✗ 数据验证失败")
|
|
print(f"错误数量: {len(results['errors'])}")
|
|
print(f"警告数量: {len(results['warnings'])}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"验证过程中发生错误: {e}")
|
|
print(f"验证失败: {e}")
|
|
|
|
if __name__ == "__main__":
|
|
main() |