digitalhumantalk/face_merge.py
2024-12-10 17:05:37 +08:00

227 lines
9.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import os
import argparse
import subprocess
import pandas as pd
import csv
import shutil
import cv2
from tqdm import tqdm
import numpy as np
import glob
import re
ffmpeg_path='ffmpeg.exe'
def atoi(text):
return int(text) if text.isdigit() else text
def natural_keys(text):
return [atoi(c) for c in re.split(r'(\d+)', text)]
def remove_first_five_columns(file_path):
df = pd.read_csv(file_path)
if df.shape[1] < 5:
print("Openface_csv文件出错")
return
df = df.iloc[:, 5:]
df.to_csv(file_path, index=False)
def csv_to_txt(csv_filename, output_dir):
with open(csv_filename, newline='') as csv_file:
csv_reader = csv.reader(csv_file)
next(csv_reader)
file_count = 0
txt_lines = []
for row in csv_reader:
left_column = row[0:68]
right_column = row[68:136]
formatted_lines = [
f"{float(left):.6f} {float(right):.6f}"
for left, right in zip(left_column, right_column)
]
txt_lines.extend(formatted_lines)
if len(txt_lines) == 68:
output_filename = os.path.join(output_dir, f"{file_count}.txt")
with open(output_filename, 'w') as txt_file:
txt_file.write('\n'.join(txt_lines))
txt_lines = []
file_count += 1
if txt_lines:
output_filename = os.path.join(output_dir, f"{file_count}.txt")
with open(output_filename, 'w') as txt_file:
txt_file.write('\n'.join(txt_lines))
print("Landmarks文件处理完成")
def convert_images_to_video(input_folder, output_file, fps=25):
print("正在将图片转化为视频中..")
image_files = [os.path.join(input_folder, f) for f in os.listdir(input_folder) if f.endswith('.jpg')]
image_files.sort(key=natural_keys)
if not image_files:
print(f"没有找到图片文件在 {input_folder}")
return
first_image_path = image_files[0]
frame = cv2.imread(first_image_path)
if frame is None:
print(f"无法读取图片文件: {first_image_path}")
return
height, width, layers = frame.shape
video_writer = cv2.VideoWriter(output_file, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))
for file in image_files:
frame = cv2.imread(file)
if frame is None:
print(f"无法读取图片文件: {file}")
continue
video_writer.write(frame)
video_writer.release()
print(f'视频 {output_file} 已经被创建.')
return output_file
def crop_image_and_draw_landmarks(image, center, side_length, landmarks, initial_padding=0):
image_height, image_width = image.shape[:2]
half_side = (side_length // 2) + initial_padding
top_left_x = max(int(center[0] - half_side), 0)
top_left_y = max(int(center[1] - half_side), 0)
bottom_right_x = min(int(center[0] + half_side), image_width)
bottom_right_y = min(int(center[1] + half_side), image_height)
cropped_image = image[top_left_y:bottom_right_y, top_left_x:bottom_right_x]
for x, y in landmarks:
cv2.circle(cropped_image, (int(x) - top_left_x, int(y) - top_left_y), 2, (0, 255, 0), -1)
return cropped_image, (top_left_x, top_left_y)
def analysis_landmarks(img_folder_path, save_folder_path, extrema, initial_padding=0):
jpeg_files = [f for f in os.listdir(img_folder_path) if f.lower().endswith('.jpg')]
first_image_top_left_coordinates = None
for file_name in tqdm(jpeg_files, desc="Processing images", unit="image"):
base_name = os.path.splitext(file_name)[0]
img_path = os.path.join(img_folder_path, file_name)
landmark_path = os.path.join(img_folder_path, base_name + '.txt')
image = cv2.imread(img_path)
if image is None:
print(f'Cannot read image: {img_path}')
continue
if not os.path.exists(landmark_path):
print(f'Landmarks file not found: {landmark_path}')
continue
with open(landmark_path, 'r') as f:
landmarks = [(float(x), float(y)) for x, y in (line.split() for line in f)]
side_length = max(extrema['global_x_max'] - extrema['global_x_min'], extrema['global_y_max'] - extrema['global_y_min'])
center = ((extrema['global_x_min'] + extrema['global_x_max']) / 2, (extrema['global_y_min'] + extrema['global_y_max']) / 2)
cropped_image, (top_left_x, top_left_y) = crop_image_and_draw_landmarks(image, center, side_length, landmarks, initial_padding=initial_padding)
if first_image_top_left_coordinates is None:
first_image_top_left_coordinates = (top_left_x, top_left_y)
save_path = os.path.join(save_folder_path, f"{base_name}.jpg")
cv2.imwrite(save_path, cropped_image)
return first_image_top_left_coordinates
def calculate_global_extreme_points(directory):
global_x_min = float('inf')
global_x_max = float('-inf')
global_y_min = float('inf')
global_y_max = float('-inf')
txt_files = sorted([f for f in os.listdir(directory) if f.endswith('.txt')], key=lambda x: int(os.path.splitext(x)[0]))
for txt_file in txt_files:
landmarks_txt_file = os.path.join(directory, txt_file)
points = np.loadtxt(landmarks_txt_file)
global_x_min = min(global_x_min, points[:, 0].min())
global_x_max = max(global_x_max, points[:, 0].max())
global_y_min = min(global_y_min, points[:, 1].min())
global_y_max = max(global_y_max, points[:, 1].max())
return global_x_min, global_x_max, global_y_min, global_y_max
def calculate_scale_and_position(image_path, target_size=512):
image = cv2.imread(image_path)
if image is None:
raise ValueError(f'无法读取: {image_path}')
original_height, original_width = image.shape[:2]
scale = round(target_size / max(original_width, original_height), 2)
new_width = int(original_width * scale)
new_height = int(original_height * scale)
return scale, new_width, new_height
def update_coordinates(first_image_coords, scale):
new_x = int(first_image_coords[0] * scale)
new_y = int(first_image_coords[1] * scale)
return new_x, new_y
def get_video_duration(video_path):
command = [
"ffprobe",
"-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
video_path
]
result = subprocess.run(command, capture_output=True, text=True)
return result.stdout.strip()
def face_merge_main(infer_video_path, crop_params):
print("read crop params to json file!")
# 读取json文件
with open(crop_params, 'r') as f:
crop_json = json.load(f)
train_directory_path = crop_json["train_directory_path"]
base_video_name = crop_json["video_name"]
scale_move_path = crop_json["scale_move_path"]
train_move_path = crop_json["train_move_path"]
crop_x = crop_json["crop_x"]
crop_y = crop_json["crop_y"]
scale = crop_json["scale"]
# 获取推理视频的时长
duration = get_video_duration(infer_video_path)
# 假设你有以下变量:
# ffmpeg_path: FFmpeg可执行文件的路径
# video_name: 原始视频的名称(不包括扩展名)
# video_ext: 原始视频的文件扩展名(例如.mp4
# original_video_path: 原始视频的完整路径
# train_video_path: 裁剪后视频的完整路径
# output_video_path: 最终输出视频的完整路径
# crop_x, crop_y: 裁剪区域的左上角坐标(在原始视频中的位置)
# crop_width, crop_height: 裁剪区域的宽度和高度在这个例子中是512x512
# train_directory_path = os.path.join('data', 'raw', 'videos')
train_directory_path = os.path.join('data', base_video_name)
combine_move_path = os.path.join(train_directory_path, f"{base_video_name}_combine.mp4")
# 贴回裁剪视频的命令
# 构建 ffmpeg 命令
overlay_command = f"""{ffmpeg_path} -y -i "{scale_move_path}" -i "{infer_video_path}" \
-filter_complex "[0:v]trim=duration={duration}[v0]; \
[1:v]setpts=PTS-STARTPTS[v1]; \
[v0][v1]overlay={crop_x}:{crop_y}[outv]" \
-map "[outv]" -c:v libx264 -crf 18 -pix_fmt yuv420p "{combine_move_path}\""""
# 执行命令
subprocess.run(overlay_command, shell=True)
if not os.path.exists(combine_move_path):
print(f"未能生成合并视频: {combine_move_path}")
return ""
return combine_move_path
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='视频合成处理')
parser.add_argument('-v', '--video_name', default='infer.mp4', help='输入数字人名称')
parser.add_argument('-c', '--crop_params', default='', help='原始始视频裁剪参数')
args = parser.parse_args()
face_merge_main(args.video_name, args.crop_params)