pytable/copy_videos.py
2024-11-10 14:39:04 +01:00

194 lines
6.6 KiB
Python

import subprocess
import json
import os
import datetime
import glob
from PIL import Image
import shutil
import re
import argparse
class VideoCopyException(Exception):
def __init__(self, video: str, message: str, *args: object) -> None:
self.video = video
self.message = message
super().__init__(*args)
def video_creation_time(fn):
try:
output = subprocess.check_output([
"ffprobe", "-v", "quiet", fn, "-print_format", "json",
"-show_entries",
"stream=index,codec_type:stream_tags=creation_time:format_tags=creation_time"
])
except subprocess.CalledProcessError as e:
raise VideoCopyException(fn, "'ffprobe' failed to extract creation time") from e
fmt = "%Y-%m-%dT%H:%M:%S.%fZ"
data = json.loads(output)
if "streams" in data:
for d in data["streams"]:
if "tags" in d and "creation_time" in d["tags"]:
return datetime.datetime.strptime(d["tags"]["creation_time"], fmt)
if "format" in data and "tags" in data["format"] and "creation_time" in data["format"]["tags"]:
return datetime.datetime.strptime(data["format"]["tags"]["creation_time"], fmt)
raise VideoCopyException(fn, "Could not find creation time in 'ffprobe' output")
#import sys
#import zoneinfo
#dt = datetime.datetime.fromtimestamp(
# os.stat(fn).st_ctime, zoneinfo.ZoneInfo("UTC"))
#dt = dt.replace(tzinfo=None)
#return dt
return ValueError()
def get_time_image(fn):
fmt = "%Y:%m:%d %H:%M:%S"
return datetime.datetime.strptime(Image.open(fn)._getexif()[36867], fmt)
FMT_FOLDER = "%Y%m%d"
FMT_BASE = "%Y%m%d_%H%M"
def get_matching_image(video_fn):
base, ext = os.path.splitext(video_fn)
ext = ext.lower()
image_files = list(
filter(lambda fn: os.path.splitext(fn)[1].lower() in [".jpeg", ".jpg", ".png"],
glob.iglob(glob.escape(base) + "*")))
if len(image_files) >= 2:
raise VideoCopyException(video_fn, "Too many images found that could belong to video")
if len(image_files) == 0:
return None
return image_files[0]
def match_to_destination_image(video_fn: str, dest_dir: str) -> str | None:
image_file = get_matching_image(video_fn)
if image_file is None:
return None
image_ext = os.path.splitext(image_file)[-1]
image_dt = get_time_image(image_file)
image_size = os.path.getsize(image_file)
new_base = os.path.join(
dest_dir,
image_dt.strftime(FMT_FOLDER),
image_dt.strftime(FMT_BASE)
)
g = glob.escape(new_base) + "_*" + image_ext
matches = [g for g in glob.glob(g) if image_size == os.path.getsize(g)]
if len(matches) != 1:
raise VideoCopyException(
video_fn,
"No unique image found at destination that matches the image '%s' "
"(which belongs to video). Found %d images for glob '%s'"
% (image_file, len(matches), g)
)
_, ext = os.path.splitext(video_fn)
nfn: str = os.path.splitext(matches[0])[0] + ext
nfn_parts = list(os.path.split(nfn))
nfn_parts[-1] = "." + nfn_parts[-1]
return os.path.join(*nfn_parts)
def get_new_fn(video_fn: str, dest_dir: str) -> str:
_, ext = os.path.splitext(video_fn)
ext = ext.lower()
dest_fn = match_to_destination_image(video_fn, dest_dir)
if dest_fn:
return dest_fn
time = video_creation_time(video_fn)
new_base = os.path.join(dest_dir, time.strftime(FMT_FOLDER), time.strftime(FMT_BASE))
g = glob.glob(glob.escape(new_base) + "_*" + ext)
if len(g) == 0:
return new_base + "_0000" + ext
n = 0
s = os.path.getsize(video_fn)
for f in g:
if os.path.getsize(f) == s:
return f
p = r".*_(\d+)" + ext
nn = re.match(p, f)
n = max(n, int(nn.group(1)))
return new_base + "_%04d" % (n + 1) + ext
def is_video(fn):
_, ext = os.path.splitext(fn)
ext = ext.lower()
return ext in [".mp4", ".mov", ".mts"]
from dataclasses import dataclass
@dataclass
class DiscoveredVideo:
video_file: str
destination_file: str | None
exits: bool
error: Exception | None
def discover_new(dest_dir, *directories):
for directory in directories:
print("Discovering video files in '%s'" % directory)
n = 0
to_copy = 0
for path, _, files in os.walk(os.path.expanduser(directory)):
for f in sorted(files):
fn = os.path.join(path, f)
if not is_video(fn):
continue
n += 1
try:
nfn = get_new_fn(fn, dest_dir)
except VideoCopyException as e:
yield DiscoveredVideo(fn, None, False, e)
print("ERROR: %s: %s" % (e.video, e.message))
continue
if not os.path.exists(nfn):
if not os.path.exists(os.path.dirname(nfn)):
os.makedirs(os.path.dirname(nfn))
to_copy += 1
yield DiscoveredVideo(fn, nfn, False, None)
else:
if os.path.getsize(fn) != os.path.getsize(nfn):
raise VideoCopyException(fn, "Destination exists but has different size...")
yield DiscoveredVideo(fn, nfn, True, None)
print("Found %d/%d new videos." % (to_copy, n))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("folder", nargs='+')
parser.add_argument("--destination", default=os.path.expanduser("~/Pictures/Darktable"))
parser.add_argument("--dry", action="store_true")
parser.add_argument("--verbose", action="store_true")
options = parser.parse_args()
video_files = list(discover_new(options.destination, *options.folder))
copying = list(filter(lambda r: not r.exits and r.error is None, video_files))
if options.verbose:
for c in video_files:
print("%s => %s%s%s" % (
c.video_file,
c.destination_file,
" (EXISTS)" if c.exits else "",
" (ERROR)" if c.error else ""
))
print()
print("Wants to copy %d/%d video files." % (len(copying), len(video_files)))
if len(copying) == 0:
exit(0)
reply = input("Continue? y/n ")
if reply != 'y':
print("You did not type 'y'. Not doing anything!")
exit(1)
for c in copying:
assert c.destination_file
print("Copying %s => %s" % (c.video_file, c.destination_file))
shutil.copy(c.video_file, c.destination_file)