From e26a859659400a38fbc1897d43591889edc5d1f8 Mon Sep 17 00:00:00 2001 From: har0ke Date: Sun, 10 Nov 2024 14:39:04 +0100 Subject: [PATCH] Polish copy_videos.py --- copy_videos.py | 308 +++++++++++++++++++++++++++++-------------------- 1 file changed, 184 insertions(+), 124 deletions(-) diff --git a/copy_videos.py b/copy_videos.py index 730681f..04a7f3a 100644 --- a/copy_videos.py +++ b/copy_videos.py @@ -6,128 +6,188 @@ import glob from PIL import Image import shutil import re - -def get_time(fn): - try: - output = subprocess.check_output([ - "ffprobe", "-v", "quiet", fn, "-print_format", "json", - "-show_entries", - "stream=index,codec_type:stream_tags=creation_time:format_tags=creation_time" - ]) - except subprocess.CalledProcessError as e: - print("FFPROBE failed for ", fn) - return ValueError() - fmt = "%Y-%m-%dT%H:%M:%S.%fZ" - data = json.loads(output) - if "streams" in data: - for d in data["streams"]: - if "tags" in d and "creation_time" in d["tags"]: - return datetime.datetime.strptime(d["tags"]["creation_time"], fmt) - if "format" in data and "tags" in data["format"] and "creation_time" in data["format"]["tags"]: - return datetime.datetime.strptime(data["format"]["tags"]["creation_time"], fmt) - #import sys - #import zoneinfo - #dt = datetime.datetime.fromtimestamp( - # os.stat(fn).st_ctime, zoneinfo.ZoneInfo("UTC")) - #dt = dt.replace(tzinfo=None) - #return dt - return ValueError() -def get_time_image(fn): - fmt = "%Y:%m:%d %H:%M:%S" - return datetime.datetime.strptime(Image.open(fn)._getexif()[36867], fmt) - -def get_new_fn(fn): - base, ext = os.path.splitext(fn) - directory, base_fn = os.path.split(base) - ext = ext.lower() - - image_files = list( - filter(lambda fn: os.path.splitext(fn)[1].lower() in [".jpeg", ".jpg", ".png"], - glob.iglob(glob.escape(base) + "*"))) - assert len(image_files) < 2 - fmt_folder = "%Y%m%d" - fmt_base = "%Y%m%d_%H%M" - if image_files: - f = image_files[0] - image_ext = os.path.splitext(f)[-1] - image_dt = get_time_image(f) - image_size = os.path.getsize(f) - new_base = os.path.join(os.path.expanduser("~/Pictures/Darktable"), image_dt.strftime(fmt_folder), image_dt.strftime(fmt_base)) - - g = glob.escape(new_base) + "_*" + image_ext - matches = [g for g in glob.glob(g) if image_size == os.path.getsize(g)] - if len(matches) != 1: - print() - print("NO MATCH") - print(image_files) - print(matches) - print(g) - print(image_dt) - - return None - assert len(matches) == 1, fn - nfn = os.path.splitext(matches[0])[0] + ext - nfn = list(os.path.split(nfn)) - nfn[-1] = "." + nfn[-1] - return os.path.join(*nfn) - - time = get_time(fn) - if isinstance(time, ValueError): - print("NO TIME") - return None - new_base = os.path.join(os.path.expanduser("~/Pictures/Darktable"), time.strftime(fmt_folder), time.strftime(fmt_base)) - g = glob.glob(glob.escape(new_base) + "_*" + ext) - if len(g) == 0: - return new_base + "_0000" + ext - n = 0 - s = os.path.getsize(fn) - for f in g: - if os.path.getsize(f) == s: - return f - p = r".*_(\d+)" + ext - nn = re.match(p, f) - n = max(n, int(nn.group(1))) - n = new_base + "_%04d" % (n + 1) + ext - return n - -def main(dir, dry): - print(dir) - for p, folders, files in os.walk(os.path.expanduser(dir)): - for f in sorted(files): - base, ext = os.path.splitext(f) - - ext = ext.lower() - if ext not in [".mp4", ".mov", ".mts"]: - if ext not in [".jpg", ".jpeg", ".png"]: - pass # print("Not handled: ", os.path.join(p, f)) - continue - fn = os.path.join(p, f) - nfn = get_new_fn(fn) - if not nfn: - print("SKIP: ", fn) - continue - assert nfn - - if not os.path.exists(nfn): - if not os.path.exists(os.path.dirname(nfn)): - os.makedirs(os.path.dirname(nfn)) - print(fn, nfn) - if not dry: - shutil.copy(fn, nfn) - else: - print(fn, nfn, "EXIST") - - - - import argparse -parser = argparse.ArgumentParser() -parser.add_argument("--dry", action="store_true") -options = parser.parse_args() -main("/run/media/oke/3634-3063", options.dry) -main("/run/media/oke/3138-3162", options.dry) -main("/run/media/oke/01D5-9878/DCIM/", options.dry) -main("/run/media/oke/E5B5-DBF0/DCIM", options.dry) -main("/run/media/oke/disk/", options.dry) -main("~/Nextcloud/InstantUpload", options.dry) -main("/home/oke/Desktop/360/", options.dry) + +class VideoCopyException(Exception): + + def __init__(self, video: str, message: str, *args: object) -> None: + self.video = video + self.message = message + super().__init__(*args) + +def video_creation_time(fn): + try: + output = subprocess.check_output([ + "ffprobe", "-v", "quiet", fn, "-print_format", "json", + "-show_entries", + "stream=index,codec_type:stream_tags=creation_time:format_tags=creation_time" + ]) + except subprocess.CalledProcessError as e: + raise VideoCopyException(fn, "'ffprobe' failed to extract creation time") from e + fmt = "%Y-%m-%dT%H:%M:%S.%fZ" + data = json.loads(output) + if "streams" in data: + for d in data["streams"]: + if "tags" in d and "creation_time" in d["tags"]: + return datetime.datetime.strptime(d["tags"]["creation_time"], fmt) + if "format" in data and "tags" in data["format"] and "creation_time" in data["format"]["tags"]: + return datetime.datetime.strptime(data["format"]["tags"]["creation_time"], fmt) + + raise VideoCopyException(fn, "Could not find creation time in 'ffprobe' output") + #import sys + #import zoneinfo + #dt = datetime.datetime.fromtimestamp( + # os.stat(fn).st_ctime, zoneinfo.ZoneInfo("UTC")) + #dt = dt.replace(tzinfo=None) + #return dt + return ValueError() +def get_time_image(fn): + fmt = "%Y:%m:%d %H:%M:%S" + return datetime.datetime.strptime(Image.open(fn)._getexif()[36867], fmt) + + +FMT_FOLDER = "%Y%m%d" +FMT_BASE = "%Y%m%d_%H%M" + +def get_matching_image(video_fn): + base, ext = os.path.splitext(video_fn) + ext = ext.lower() + image_files = list( + filter(lambda fn: os.path.splitext(fn)[1].lower() in [".jpeg", ".jpg", ".png"], + glob.iglob(glob.escape(base) + "*"))) + if len(image_files) >= 2: + raise VideoCopyException(video_fn, "Too many images found that could belong to video") + + if len(image_files) == 0: + return None + return image_files[0] + +def match_to_destination_image(video_fn: str, dest_dir: str) -> str | None: + image_file = get_matching_image(video_fn) + if image_file is None: + return None + + image_ext = os.path.splitext(image_file)[-1] + image_dt = get_time_image(image_file) + image_size = os.path.getsize(image_file) + new_base = os.path.join( + dest_dir, + image_dt.strftime(FMT_FOLDER), + image_dt.strftime(FMT_BASE) + ) + + g = glob.escape(new_base) + "_*" + image_ext + matches = [g for g in glob.glob(g) if image_size == os.path.getsize(g)] + if len(matches) != 1: + raise VideoCopyException( + video_fn, + "No unique image found at destination that matches the image '%s' " + "(which belongs to video). Found %d images for glob '%s'" + % (image_file, len(matches), g) + ) + + _, ext = os.path.splitext(video_fn) + nfn: str = os.path.splitext(matches[0])[0] + ext + nfn_parts = list(os.path.split(nfn)) + nfn_parts[-1] = "." + nfn_parts[-1] + return os.path.join(*nfn_parts) + +def get_new_fn(video_fn: str, dest_dir: str) -> str: + _, ext = os.path.splitext(video_fn) + ext = ext.lower() + + dest_fn = match_to_destination_image(video_fn, dest_dir) + if dest_fn: + return dest_fn + + time = video_creation_time(video_fn) + new_base = os.path.join(dest_dir, time.strftime(FMT_FOLDER), time.strftime(FMT_BASE)) + g = glob.glob(glob.escape(new_base) + "_*" + ext) + if len(g) == 0: + return new_base + "_0000" + ext + n = 0 + s = os.path.getsize(video_fn) + for f in g: + if os.path.getsize(f) == s: + return f + p = r".*_(\d+)" + ext + nn = re.match(p, f) + n = max(n, int(nn.group(1))) + return new_base + "_%04d" % (n + 1) + ext + +def is_video(fn): + _, ext = os.path.splitext(fn) + ext = ext.lower() + return ext in [".mp4", ".mov", ".mts"] + +from dataclasses import dataclass + +@dataclass +class DiscoveredVideo: + video_file: str + destination_file: str | None + exits: bool + error: Exception | None + +def discover_new(dest_dir, *directories): + for directory in directories: + print("Discovering video files in '%s'" % directory) + n = 0 + to_copy = 0 + for path, _, files in os.walk(os.path.expanduser(directory)): + for f in sorted(files): + fn = os.path.join(path, f) + if not is_video(fn): + continue + n += 1 + try: + nfn = get_new_fn(fn, dest_dir) + except VideoCopyException as e: + yield DiscoveredVideo(fn, None, False, e) + print("ERROR: %s: %s" % (e.video, e.message)) + continue + if not os.path.exists(nfn): + if not os.path.exists(os.path.dirname(nfn)): + os.makedirs(os.path.dirname(nfn)) + to_copy += 1 + yield DiscoveredVideo(fn, nfn, False, None) + else: + if os.path.getsize(fn) != os.path.getsize(nfn): + raise VideoCopyException(fn, "Destination exists but has different size...") + yield DiscoveredVideo(fn, nfn, True, None) + print("Found %d/%d new videos." % (to_copy, n)) + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument("folder", nargs='+') + parser.add_argument("--destination", default=os.path.expanduser("~/Pictures/Darktable")) + parser.add_argument("--dry", action="store_true") + parser.add_argument("--verbose", action="store_true") + options = parser.parse_args() + + video_files = list(discover_new(options.destination, *options.folder)) + copying = list(filter(lambda r: not r.exits and r.error is None, video_files)) + + if options.verbose: + for c in video_files: + print("%s => %s%s%s" % ( + c.video_file, + c.destination_file, + " (EXISTS)" if c.exits else "", + " (ERROR)" if c.error else "" + )) + + print() + print("Wants to copy %d/%d video files." % (len(copying), len(video_files))) + + if len(copying) == 0: + exit(0) + reply = input("Continue? y/n ") + if reply != 'y': + print("You did not type 'y'. Not doing anything!") + exit(1) + + for c in copying: + assert c.destination_file + print("Copying %s => %s" % (c.video_file, c.destination_file)) + shutil.copy(c.video_file, c.destination_file)