import json import os import argparse import subprocess import pandas as pd import csv import shutil import cv2 from tqdm import tqdm import numpy as np import glob import re ffmpeg_path='ffmpeg.exe' def atoi(text): return int(text) if text.isdigit() else text def natural_keys(text): return [atoi(c) for c in re.split(r'(\d+)', text)] def remove_first_five_columns(file_path): df = pd.read_csv(file_path) if df.shape[1] < 5: print("Openface_csv文件出错") return df = df.iloc[:, 5:] df.to_csv(file_path, index=False) def csv_to_txt(csv_filename, output_dir): with open(csv_filename, newline='') as csv_file: csv_reader = csv.reader(csv_file) next(csv_reader) file_count = 0 txt_lines = [] for row in csv_reader: left_column = row[0:68] right_column = row[68:136] formatted_lines = [ f"{float(left):.6f} {float(right):.6f}" for left, right in zip(left_column, right_column) ] txt_lines.extend(formatted_lines) if len(txt_lines) == 68: output_filename = os.path.join(output_dir, f"{file_count}.txt") with open(output_filename, 'w') as txt_file: txt_file.write('\n'.join(txt_lines)) txt_lines = [] file_count += 1 if txt_lines: output_filename = os.path.join(output_dir, f"{file_count}.txt") with open(output_filename, 'w') as txt_file: txt_file.write('\n'.join(txt_lines)) print("Landmarks文件处理完成") def convert_images_to_video(input_folder, output_file, fps=25): print("正在将图片转化为视频中..") image_files = [os.path.join(input_folder, f) for f in os.listdir(input_folder) if f.endswith('.jpg')] image_files.sort(key=natural_keys) if not image_files: print(f"没有找到图片文件在 {input_folder}") return first_image_path = image_files[0] frame = cv2.imread(first_image_path) if frame is None: print(f"无法读取图片文件: {first_image_path}") return height, width, layers = frame.shape video_writer = cv2.VideoWriter(output_file, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height)) for file in image_files: frame = cv2.imread(file) if frame is None: print(f"无法读取图片文件: {file}") continue video_writer.write(frame) video_writer.release() print(f'视频 {output_file} 已经被创建.') return output_file def crop_image_and_draw_landmarks(image, center, side_length, landmarks, initial_padding=0): image_height, image_width = image.shape[:2] half_side = (side_length // 2) + initial_padding top_left_x = max(int(center[0] - half_side), 0) top_left_y = max(int(center[1] - half_side), 0) bottom_right_x = min(int(center[0] + half_side), image_width) bottom_right_y = min(int(center[1] + half_side), image_height) cropped_image = image[top_left_y:bottom_right_y, top_left_x:bottom_right_x] for x, y in landmarks: cv2.circle(cropped_image, (int(x) - top_left_x, int(y) - top_left_y), 2, (0, 255, 0), -1) return cropped_image, (top_left_x, top_left_y) def analysis_landmarks(img_folder_path, save_folder_path, extrema, initial_padding=0): jpeg_files = [f for f in os.listdir(img_folder_path) if f.lower().endswith('.jpg')] first_image_top_left_coordinates = None for file_name in tqdm(jpeg_files, desc="Processing images", unit="image"): base_name = os.path.splitext(file_name)[0] img_path = os.path.join(img_folder_path, file_name) landmark_path = os.path.join(img_folder_path, base_name + '.txt') image = cv2.imread(img_path) if image is None: print(f'Cannot read image: {img_path}') continue if not os.path.exists(landmark_path): print(f'Landmarks file not found: {landmark_path}') continue with open(landmark_path, 'r') as f: landmarks = [(float(x), float(y)) for x, y in (line.split() for line in f)] side_length = max(extrema['global_x_max'] - extrema['global_x_min'], extrema['global_y_max'] - extrema['global_y_min']) center = ((extrema['global_x_min'] + extrema['global_x_max']) / 2, (extrema['global_y_min'] + extrema['global_y_max']) / 2) cropped_image, (top_left_x, top_left_y) = crop_image_and_draw_landmarks(image, center, side_length, landmarks, initial_padding=initial_padding) if first_image_top_left_coordinates is None: first_image_top_left_coordinates = (top_left_x, top_left_y) save_path = os.path.join(save_folder_path, f"{base_name}.jpg") cv2.imwrite(save_path, cropped_image) return first_image_top_left_coordinates def calculate_global_extreme_points(directory): global_x_min = float('inf') global_x_max = float('-inf') global_y_min = float('inf') global_y_max = float('-inf') txt_files = sorted([f for f in os.listdir(directory) if f.endswith('.txt')], key=lambda x: int(os.path.splitext(x)[0])) for txt_file in txt_files: landmarks_txt_file = os.path.join(directory, txt_file) points = np.loadtxt(landmarks_txt_file) global_x_min = min(global_x_min, points[:, 0].min()) global_x_max = max(global_x_max, points[:, 0].max()) global_y_min = min(global_y_min, points[:, 1].min()) global_y_max = max(global_y_max, points[:, 1].max()) return global_x_min, global_x_max, global_y_min, global_y_max def calculate_scale_and_position(image_path, target_size=512): image = cv2.imread(image_path) if image is None: raise ValueError(f'无法读取: {image_path}') original_height, original_width = image.shape[:2] scale = round(target_size / max(original_width, original_height), 2) new_width = int(original_width * scale) new_height = int(original_height * scale) return scale, new_width, new_height def update_coordinates(first_image_coords, scale): new_x = int(first_image_coords[0] * scale) new_y = int(first_image_coords[1] * scale) return new_x, new_y def get_video_duration(video_path): command = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", video_path ] result = subprocess.run(command, capture_output=True, text=True) return result.stdout.strip() def face_merge_main(infer_video_path, crop_params): print("read crop params to json file!") # 读取json文件 with open(crop_params, 'r') as f: crop_json = json.load(f) train_directory_path = crop_json["train_directory_path"] base_video_name = crop_json["video_name"] scale_move_path = crop_json["scale_move_path"] train_move_path = crop_json["train_move_path"] crop_x = crop_json["crop_x"] crop_y = crop_json["crop_y"] scale = crop_json["scale"] # 获取推理视频的时长 duration = get_video_duration(infer_video_path) # 假设你有以下变量: # ffmpeg_path: FFmpeg可执行文件的路径 # video_name: 原始视频的名称(不包括扩展名) # video_ext: 原始视频的文件扩展名(例如.mp4) # original_video_path: 原始视频的完整路径 # train_video_path: 裁剪后视频的完整路径 # output_video_path: 最终输出视频的完整路径 # crop_x, crop_y: 裁剪区域的左上角坐标(在原始视频中的位置) # crop_width, crop_height: 裁剪区域的宽度和高度(在这个例子中是512x512) # train_directory_path = os.path.join('data', 'raw', 'videos') train_directory_path = os.path.join('data', base_video_name) combine_move_path = os.path.join(train_directory_path, f"{base_video_name}_combine.mp4") # 贴回裁剪视频的命令 # 构建 ffmpeg 命令 overlay_command = f"""{ffmpeg_path} -y -i "{scale_move_path}" -i "{infer_video_path}" \ -filter_complex "[0:v]trim=duration={duration}[v0]; \ [1:v]setpts=PTS-STARTPTS[v1]; \ [v0][v1]overlay={crop_x}:{crop_y}[outv]" \ -map "[outv]" -c:v libx264 -crf 18 -pix_fmt yuv420p "{combine_move_path}\"""" # 执行命令 subprocess.run(overlay_command, shell=True) if not os.path.exists(combine_move_path): print(f"未能生成合并视频: {combine_move_path}") return "" return combine_move_path if __name__ == "__main__": parser = argparse.ArgumentParser(description='视频合成处理') parser.add_argument('-v', '--video_name', default='infer.mp4', help='输入数字人名称') parser.add_argument('-c', '--crop_params', default='', help='原始始视频裁剪参数') args = parser.parse_args() face_merge_main(args.video_name, args.crop_params)