227 lines
9.0 KiB
Python
227 lines
9.0 KiB
Python
import json
|
||
import os
|
||
import argparse
|
||
import subprocess
|
||
import pandas as pd
|
||
import csv
|
||
import shutil
|
||
import cv2
|
||
from tqdm import tqdm
|
||
import numpy as np
|
||
import glob
|
||
import re
|
||
ffmpeg_path='ffmpeg.exe'
|
||
|
||
def atoi(text):
|
||
return int(text) if text.isdigit() else text
|
||
|
||
def natural_keys(text):
|
||
return [atoi(c) for c in re.split(r'(\d+)', text)]
|
||
|
||
def remove_first_five_columns(file_path):
|
||
df = pd.read_csv(file_path)
|
||
if df.shape[1] < 5:
|
||
print("Openface_csv文件出错")
|
||
return
|
||
df = df.iloc[:, 5:]
|
||
df.to_csv(file_path, index=False)
|
||
|
||
def csv_to_txt(csv_filename, output_dir):
|
||
with open(csv_filename, newline='') as csv_file:
|
||
csv_reader = csv.reader(csv_file)
|
||
next(csv_reader)
|
||
|
||
file_count = 0
|
||
txt_lines = []
|
||
|
||
for row in csv_reader:
|
||
left_column = row[0:68]
|
||
right_column = row[68:136]
|
||
|
||
formatted_lines = [
|
||
f"{float(left):.6f} {float(right):.6f}"
|
||
for left, right in zip(left_column, right_column)
|
||
]
|
||
|
||
txt_lines.extend(formatted_lines)
|
||
|
||
if len(txt_lines) == 68:
|
||
output_filename = os.path.join(output_dir, f"{file_count}.txt")
|
||
with open(output_filename, 'w') as txt_file:
|
||
txt_file.write('\n'.join(txt_lines))
|
||
txt_lines = []
|
||
file_count += 1
|
||
|
||
if txt_lines:
|
||
output_filename = os.path.join(output_dir, f"{file_count}.txt")
|
||
with open(output_filename, 'w') as txt_file:
|
||
txt_file.write('\n'.join(txt_lines))
|
||
print("Landmarks文件处理完成")
|
||
|
||
def convert_images_to_video(input_folder, output_file, fps=25):
|
||
print("正在将图片转化为视频中..")
|
||
|
||
image_files = [os.path.join(input_folder, f) for f in os.listdir(input_folder) if f.endswith('.jpg')]
|
||
image_files.sort(key=natural_keys)
|
||
if not image_files:
|
||
print(f"没有找到图片文件在 {input_folder}")
|
||
return
|
||
|
||
first_image_path = image_files[0]
|
||
frame = cv2.imread(first_image_path)
|
||
if frame is None:
|
||
print(f"无法读取图片文件: {first_image_path}")
|
||
return
|
||
|
||
height, width, layers = frame.shape
|
||
video_writer = cv2.VideoWriter(output_file, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))
|
||
|
||
for file in image_files:
|
||
frame = cv2.imread(file)
|
||
if frame is None:
|
||
print(f"无法读取图片文件: {file}")
|
||
continue
|
||
video_writer.write(frame)
|
||
|
||
video_writer.release()
|
||
|
||
print(f'视频 {output_file} 已经被创建.')
|
||
return output_file
|
||
|
||
def crop_image_and_draw_landmarks(image, center, side_length, landmarks, initial_padding=0):
|
||
image_height, image_width = image.shape[:2]
|
||
half_side = (side_length // 2) + initial_padding
|
||
top_left_x = max(int(center[0] - half_side), 0)
|
||
top_left_y = max(int(center[1] - half_side), 0)
|
||
bottom_right_x = min(int(center[0] + half_side), image_width)
|
||
bottom_right_y = min(int(center[1] + half_side), image_height)
|
||
cropped_image = image[top_left_y:bottom_right_y, top_left_x:bottom_right_x]
|
||
for x, y in landmarks:
|
||
cv2.circle(cropped_image, (int(x) - top_left_x, int(y) - top_left_y), 2, (0, 255, 0), -1)
|
||
|
||
return cropped_image, (top_left_x, top_left_y)
|
||
|
||
|
||
def analysis_landmarks(img_folder_path, save_folder_path, extrema, initial_padding=0):
|
||
jpeg_files = [f for f in os.listdir(img_folder_path) if f.lower().endswith('.jpg')]
|
||
first_image_top_left_coordinates = None
|
||
|
||
for file_name in tqdm(jpeg_files, desc="Processing images", unit="image"):
|
||
base_name = os.path.splitext(file_name)[0]
|
||
img_path = os.path.join(img_folder_path, file_name)
|
||
landmark_path = os.path.join(img_folder_path, base_name + '.txt')
|
||
image = cv2.imread(img_path)
|
||
if image is None:
|
||
print(f'Cannot read image: {img_path}')
|
||
continue
|
||
if not os.path.exists(landmark_path):
|
||
print(f'Landmarks file not found: {landmark_path}')
|
||
continue
|
||
|
||
with open(landmark_path, 'r') as f:
|
||
landmarks = [(float(x), float(y)) for x, y in (line.split() for line in f)]
|
||
|
||
side_length = max(extrema['global_x_max'] - extrema['global_x_min'], extrema['global_y_max'] - extrema['global_y_min'])
|
||
center = ((extrema['global_x_min'] + extrema['global_x_max']) / 2, (extrema['global_y_min'] + extrema['global_y_max']) / 2)
|
||
cropped_image, (top_left_x, top_left_y) = crop_image_and_draw_landmarks(image, center, side_length, landmarks, initial_padding=initial_padding)
|
||
if first_image_top_left_coordinates is None:
|
||
first_image_top_left_coordinates = (top_left_x, top_left_y)
|
||
|
||
save_path = os.path.join(save_folder_path, f"{base_name}.jpg")
|
||
cv2.imwrite(save_path, cropped_image)
|
||
|
||
return first_image_top_left_coordinates
|
||
|
||
def calculate_global_extreme_points(directory):
|
||
global_x_min = float('inf')
|
||
global_x_max = float('-inf')
|
||
global_y_min = float('inf')
|
||
global_y_max = float('-inf')
|
||
|
||
txt_files = sorted([f for f in os.listdir(directory) if f.endswith('.txt')], key=lambda x: int(os.path.splitext(x)[0]))
|
||
|
||
for txt_file in txt_files:
|
||
landmarks_txt_file = os.path.join(directory, txt_file)
|
||
points = np.loadtxt(landmarks_txt_file)
|
||
|
||
global_x_min = min(global_x_min, points[:, 0].min())
|
||
global_x_max = max(global_x_max, points[:, 0].max())
|
||
global_y_min = min(global_y_min, points[:, 1].min())
|
||
global_y_max = max(global_y_max, points[:, 1].max())
|
||
|
||
return global_x_min, global_x_max, global_y_min, global_y_max
|
||
|
||
def calculate_scale_and_position(image_path, target_size=512):
|
||
image = cv2.imread(image_path)
|
||
if image is None:
|
||
raise ValueError(f'无法读取: {image_path}')
|
||
original_height, original_width = image.shape[:2]
|
||
scale = round(target_size / max(original_width, original_height), 2)
|
||
new_width = int(original_width * scale)
|
||
new_height = int(original_height * scale)
|
||
|
||
return scale, new_width, new_height
|
||
|
||
def update_coordinates(first_image_coords, scale):
|
||
new_x = int(first_image_coords[0] * scale)
|
||
new_y = int(first_image_coords[1] * scale)
|
||
return new_x, new_y
|
||
|
||
def get_video_duration(video_path):
|
||
command = [
|
||
"ffprobe",
|
||
"-v", "error",
|
||
"-show_entries", "format=duration",
|
||
"-of", "default=noprint_wrappers=1:nokey=1",
|
||
video_path
|
||
]
|
||
result = subprocess.run(command, capture_output=True, text=True)
|
||
return result.stdout.strip()
|
||
|
||
def face_merge_main(infer_video_path, crop_params):
|
||
print("read crop params to json file!")
|
||
# 读取json文件
|
||
with open(crop_params, 'r') as f:
|
||
crop_json = json.load(f)
|
||
train_directory_path = crop_json["train_directory_path"]
|
||
base_video_name = crop_json["video_name"]
|
||
scale_move_path = crop_json["scale_move_path"]
|
||
train_move_path = crop_json["train_move_path"]
|
||
crop_x = crop_json["crop_x"]
|
||
crop_y = crop_json["crop_y"]
|
||
scale = crop_json["scale"]
|
||
# 获取推理视频的时长
|
||
duration = get_video_duration(infer_video_path)
|
||
# 假设你有以下变量:
|
||
# ffmpeg_path: FFmpeg可执行文件的路径
|
||
# video_name: 原始视频的名称(不包括扩展名)
|
||
# video_ext: 原始视频的文件扩展名(例如.mp4)
|
||
# original_video_path: 原始视频的完整路径
|
||
# train_video_path: 裁剪后视频的完整路径
|
||
# output_video_path: 最终输出视频的完整路径
|
||
# crop_x, crop_y: 裁剪区域的左上角坐标(在原始视频中的位置)
|
||
# crop_width, crop_height: 裁剪区域的宽度和高度(在这个例子中是512x512)
|
||
# train_directory_path = os.path.join('data', 'raw', 'videos')
|
||
train_directory_path = os.path.join('data', base_video_name)
|
||
combine_move_path = os.path.join(train_directory_path, f"{base_video_name}_combine.mp4")
|
||
# 贴回裁剪视频的命令
|
||
# 构建 ffmpeg 命令
|
||
overlay_command = f"""{ffmpeg_path} -y -i "{scale_move_path}" -i "{infer_video_path}" \
|
||
-filter_complex "[0:v]trim=duration={duration}[v0]; \
|
||
[1:v]setpts=PTS-STARTPTS[v1]; \
|
||
[v0][v1]overlay={crop_x}:{crop_y}[outv]" \
|
||
-map "[outv]" -c:v libx264 -crf 18 -pix_fmt yuv420p "{combine_move_path}\""""
|
||
# 执行命令
|
||
subprocess.run(overlay_command, shell=True)
|
||
|
||
if not os.path.exists(combine_move_path):
|
||
print(f"未能生成合并视频: {combine_move_path}")
|
||
return ""
|
||
return combine_move_path
|
||
|
||
if __name__ == "__main__":
|
||
parser = argparse.ArgumentParser(description='视频合成处理')
|
||
parser.add_argument('-v', '--video_name', default='infer.mp4', help='输入数字人名称')
|
||
parser.add_argument('-c', '--crop_params', default='', help='原始始视频裁剪参数')
|
||
args = parser.parse_args()
|
||
face_merge_main(args.video_name, args.crop_params) |