1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386
| import os import sys import shutil import base64 import hashlib import pandas as pd from Cryptodome.PublicKey import ECC from Cryptodome.Cipher import AES from Cryptodome.Util.Padding import pad, unpad import qrcode import io import cv2 import numpy as np from flask import Flask, request, jsonify, render_template_string, send_from_directory
ROOT_DIR = r"C:\Users\lenovo\Documents\qrs"
def ensure_root_directory(): """ 确保根目录及其子目录存在。 如果不存在,则创建它们。 """ if not os.path.exists(ROOT_DIR): os.makedirs(ROOT_DIR) print(f"已创建根目录: {ROOT_DIR}")
subdirs = ["uploads", "qrs", "templates", "static"] for subdir in subdirs: subdir_path = os.path.join(ROOT_DIR, subdir) if not os.path.exists(subdir_path): os.makedirs(subdir_path) print(f"已创建子目录: {subdir_path}")
def resource_path(relative_path): """ 动态获取资源文件的路径。 - 在开发环境中,返回相对路径。 - 在打包后的环境中,返回 PyInstaller 解压路径。 """ if hasattr(sys, '_MEIPASS'): return os.path.join(sys._MEIPASS, relative_path) return os.path.join(os.path.abspath("."), relative_path)
def extract_dependencies(): """ 将依赖文件解压到目标目录。 如果文件已存在,则不会重复解压。 """ dependencies = { "key.xlsx": os.path.join(ROOT_DIR, "key.xlsx"), "templates/index.html": os.path.join(ROOT_DIR, "templates/index.html"), }
for src, dst in dependencies.items(): if not os.path.exists(dst): src_path = resource_path(src) shutil.copy(src_path, dst) print(f"已解压文件: {src} -> {dst}")
ensure_root_directory() extract_dependencies()
key_excel_path = os.path.join(ROOT_DIR, "key.xlsx")
template_html_path = os.path.join(ROOT_DIR, "templates/index.html")
app = Flask(__name__, static_folder=resource_path("static"))
def load_keys_from_excel(file_path): """ 从 Excel 文件中加载私钥和临时私钥。 返回两个列表:私钥和临时私钥。 """ if not os.path.exists(file_path): raise FileNotFoundError("Excel file not found") df = pd.read_excel(file_path) private_keys = df['私钥d'].tolist() temp_private_keys = df['临时私钥r'].tolist() return private_keys, temp_private_keys
def encrypt_data_aes(data, shared_key): """ 使用 AES 对称加密加密数据。 返回加密后的数据(IV + 密文)。 """ key = shared_key[:32] cipher = AES.new(key, AES.MODE_CBC) ct_bytes = cipher.encrypt(pad(data.encode('utf-8'), AES.block_size)) iv = cipher.iv return iv.hex() + ct_bytes.hex()
def decrypt_data_aes(encrypted_data, shared_key): """ 使用 AES 对称加密解密数据。 返回解密后的原始数据。 """ key = shared_key[:32] iv = bytes.fromhex(encrypted_data[:32]) ct = bytes.fromhex(encrypted_data[32:]) cipher = AES.new(key, AES.MODE_CBC, iv) decrypted_data = unpad(cipher.decrypt(ct), AES.block_size) return decrypted_data.decode('utf-8')
def compute_shared_key(temp_public_key, private_key): """ 计算共享密钥。 使用 ECC 的规则,基于临时公钥和私钥计算共享密钥。 """ ecc_key = ECC.construct(curve='secp256r1', d=int(private_key, 16)) temp_pub_key = ECC.import_key(temp_public_key) shared_key = ecc_key.d * temp_pub_key.pointQ compressed_shared_key = hashlib.sha256(str(shared_key.x).encode('utf-8')).digest() return compressed_shared_key
def analyze_and_convert(file_path): """ 分析文件并将其转换为 Base64 格式。 返回文件的 SHA256、大小、类型和 Base64 内容。 """ try: with open(file_path, 'rb') as f: content = f.read() sha256_hash = hashlib.sha256(content).hexdigest() file_size = os.path.getsize(file_path) file_type = file_path.split('.')[-1] base64_content = base64.b64encode(content).decode('utf-8') return sha256_hash, file_size, file_type, base64_content except Exception as e: raise ValueError(f"File analysis failed: {str(e)}")
def split_and_encrypt(data, private_key, temp_private_key, chunk_size=1000): """ 将数据分块并加密。 每个组块包含临时公钥、加密数据、字母编号、数字编号、SHA256校验值和组块数量。 返回加密后的组块列表。 """ chunks = [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)] encrypted_chunks = [] try: ecc_key = ECC.construct(curve='secp256r1', d=int(temp_private_key, 16)) public_key = ecc_key.public_key() shared_key = int(private_key, 16) * int(ecc_key.d) compressed_shared_key = hashlib.sha256(str(shared_key).encode('utf-8')).digest()
for idx, chunk in enumerate(chunks): aes_encrypted_chunk = encrypt_data_aes(chunk, compressed_shared_key) hex_index = f"{idx + 1:02X}" encrypted_chunk = ( str(public_key.export_key(format='PEM')) + '-' + aes_encrypted_chunk + '-' + "A" + hex_index + '-' + hashlib.sha256(chunk.encode('utf-8')).hexdigest() + '-' + str(len(chunks)) + '-' + file_type ) encrypted_chunks.append(encrypted_chunk) except Exception as e: raise ValueError(f"ECC encryption failed: {str(e)}") return encrypted_chunks
def generate_qr_codes_in_memory(chunks, file_name): """ 将加密后的组块生成二维码。 返回二维码的二进制数据列表。 """ qr_images = [] file_qr_folder = os.path.join(ROOT_DIR, "qrs", file_name) if not os.path.exists(file_qr_folder): os.makedirs(file_qr_folder)
for idx, chunk in enumerate(chunks): qr = qrcode.QRCode(version=1, box_size=10, border=5) qr.add_data(chunk) qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") buffer = io.BytesIO() img.save(buffer, format="PNG") buffer.seek(0) qr_images.append(buffer.getvalue())
qr_path = os.path.join(file_qr_folder, f"{file_name}_qr_{idx + 1}.png") with open(qr_path, 'wb') as f: f.write(buffer.getvalue())
return qr_images
@app.route('/upload', methods=['POST']) def upload_files(): """ 处理文件上传请求。 用户上传文件后,程序会分析文件、加密内容并生成二维码。 返回 JSON 格式的响应,包含上传结果和二维码路径。 """ try: if 'files' not in request.files or not request.files.getlist('files'): return jsonify({'error': 'No files uploaded'}), 400
files = request.files.getlist('files') saved_files = []
excel_path = os.path.join(ROOT_DIR, "key.xlsx") private_keys, temp_private_keys = load_keys_from_excel(excel_path)
all_qr_paths = []
for i, file in enumerate(files): if file.filename == '': continue file_path = os.path.join(ROOT_DIR, "uploads", file.filename) file.save(file_path) saved_files.append(file.filename)
sha256_hash, file_size, file_type, base64_content = analyze_and_convert(file_path) header = f"{file_type}-{file_size}-{sha256_hash}-" data = header + base64_content encrypted_chunks = split_and_encrypt(data, private_keys[i], temp_private_keys[i]) qr_images = generate_qr_codes_in_memory(encrypted_chunks, file.filename)
file_qr_folder = os.path.join(ROOT_DIR, "qrs", file.filename) qr_paths = [f"/static/qrs/{file.filename}/{file.filename}_qr_{j + 1}.png" for j in range(len(qr_images))] all_qr_paths.extend(qr_paths)
return jsonify({ 'message': 'Files uploaded successfully!', 'files': saved_files, 'qr_paths': all_qr_paths }) except Exception as e: return jsonify({'error': f'Server error: {str(e)}'}), 500
@app.route('/') def index(): """ 渲染首页。 """ return render_template_string(open(template_html_path, 'r', encoding='utf-8').read())
def scan_qr_code(): """ 扫描二维码。 使用 OpenCV 打开摄像头,实时扫描二维码并返回扫描到的数据。 """ cap = cv2.VideoCapture(0) scanned_data = []
while True: ret, frame = cap.read() if not ret: print("无法读取摄像头画面") break
detector = cv2.QRCodeDetector() data, bbox, _ = detector.detectAndDecode(frame)
if data: scanned_data.append(data) print(f"已扫描二维码: {data}")
cv2.putText(frame, f"Scanned: {len(scanned_data)}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.imshow("QR Code Scanner", frame)
if cv2.waitKey(1) & 0xFF == ord('q'): break
cap.release() cv2.destroyAllWindows() return scanned_data
def process_scanned_data(scanned_data, private_keys): """ 处理解密和文件还原。 根据扫描到的二维码数据,解密并拼接成原始文件。 返回还原的文件内容和文件扩展名。 """ decrypted_chunks = {} file_extension = None
for data in scanned_data: parts = data.split('-') temp_public_key = parts[0] encrypted_chunk = parts[1] file_info = parts[2] sha256_hash = parts[3] total_chunks = int(parts[4]) extension = parts[5]
if not file_extension: file_extension = extension
private_key = private_keys[ord(file_info[0]) - ord('A')]
shared_key = compute_shared_key(temp_public_key, private_key)
decrypted_data = decrypt_data_aes(encrypted_chunk, shared_key) decrypted_chunks[file_info] = decrypted_data
sorted_keys = sorted(decrypted_chunks.keys()) full_data = ''.join([decrypted_chunks[key] for key in sorted_keys])
file_content = base64.b64decode(full_data) return file_content, file_extension
def input_end_main(): """ 输入端主函数。 启动摄像头扫描二维码,并将扫描到的内容解密还原为文件。 """ excel_path = os.path.join(ROOT_DIR, "key.xlsx") private_keys = load_keys_from_excel(excel_path)
print("开始扫描二维码...") scanned_data = scan_qr_code()
if not scanned_data: print("未扫描到任何二维码") return
print("正在处理扫描数据...") try: file_content, file_extension = process_scanned_data(scanned_data, private_keys) output_file = os.path.join(ROOT_DIR, f"output_file.{file_extension}") with open(output_file, 'wb') as f: f.write(file_content) print(f"文件已保存为: {output_file}") except Exception as e: print(f"处理失败: {str(e)}")
def main(): """ 主程序入口。 用户可以选择运行输出端功能或输入端功能。 """ try: print("请选择功能:") print("1. 输出端功能(上传文件并生成二维码)") print("2. 输入端功能(扫描二维码并还原文件)") choice = input("请输入选项(1 或 2):")
if choice == '1': print("启动 Flask 服务器...") app.run(host='0.0.0.0', debug=False, port=8000) elif choice == '2': print("启动输入端功能...") input_end_main() else: print("无效选项,请重新运行程序。") except Exception as e: print(f"程序运行失败: {str(e)}")
if __name__ == '__main__': main()
|