194 lines
6.6 KiB
Python
194 lines
6.6 KiB
Python
import subprocess
|
|
import json
|
|
import os
|
|
import datetime
|
|
import glob
|
|
from PIL import Image
|
|
import shutil
|
|
import re
|
|
import argparse
|
|
|
|
class VideoCopyException(Exception):
|
|
|
|
def __init__(self, video: str, message: str, *args: object) -> None:
|
|
self.video = video
|
|
self.message = message
|
|
super().__init__(*args)
|
|
|
|
def video_creation_time(fn):
|
|
try:
|
|
output = subprocess.check_output([
|
|
"ffprobe", "-v", "quiet", fn, "-print_format", "json",
|
|
"-show_entries",
|
|
"stream=index,codec_type:stream_tags=creation_time:format_tags=creation_time"
|
|
])
|
|
except subprocess.CalledProcessError as e:
|
|
raise VideoCopyException(fn, "'ffprobe' failed to extract creation time") from e
|
|
fmt = "%Y-%m-%dT%H:%M:%S.%fZ"
|
|
data = json.loads(output)
|
|
if "streams" in data:
|
|
for d in data["streams"]:
|
|
if "tags" in d and "creation_time" in d["tags"]:
|
|
return datetime.datetime.strptime(d["tags"]["creation_time"], fmt)
|
|
if "format" in data and "tags" in data["format"] and "creation_time" in data["format"]["tags"]:
|
|
return datetime.datetime.strptime(data["format"]["tags"]["creation_time"], fmt)
|
|
|
|
raise VideoCopyException(fn, "Could not find creation time in 'ffprobe' output")
|
|
#import sys
|
|
#import zoneinfo
|
|
#dt = datetime.datetime.fromtimestamp(
|
|
# os.stat(fn).st_ctime, zoneinfo.ZoneInfo("UTC"))
|
|
#dt = dt.replace(tzinfo=None)
|
|
#return dt
|
|
return ValueError()
|
|
def get_time_image(fn):
|
|
fmt = "%Y:%m:%d %H:%M:%S"
|
|
return datetime.datetime.strptime(Image.open(fn)._getexif()[36867], fmt)
|
|
|
|
|
|
FMT_FOLDER = "%Y%m%d"
|
|
FMT_BASE = "%Y%m%d_%H%M"
|
|
|
|
def get_matching_image(video_fn):
|
|
base, ext = os.path.splitext(video_fn)
|
|
ext = ext.lower()
|
|
image_files = list(
|
|
filter(lambda fn: os.path.splitext(fn)[1].lower() in [".jpeg", ".jpg", ".png"],
|
|
glob.iglob(glob.escape(base) + "*")))
|
|
if len(image_files) >= 2:
|
|
raise VideoCopyException(video_fn, "Too many images found that could belong to video")
|
|
|
|
if len(image_files) == 0:
|
|
return None
|
|
return image_files[0]
|
|
|
|
def match_to_destination_image(video_fn: str, dest_dir: str) -> str | None:
|
|
image_file = get_matching_image(video_fn)
|
|
if image_file is None:
|
|
return None
|
|
|
|
image_ext = os.path.splitext(image_file)[-1]
|
|
image_dt = get_time_image(image_file)
|
|
image_size = os.path.getsize(image_file)
|
|
new_base = os.path.join(
|
|
dest_dir,
|
|
image_dt.strftime(FMT_FOLDER),
|
|
image_dt.strftime(FMT_BASE)
|
|
)
|
|
|
|
g = glob.escape(new_base) + "_*" + image_ext
|
|
matches = [g for g in glob.glob(g) if image_size == os.path.getsize(g)]
|
|
if len(matches) != 1:
|
|
raise VideoCopyException(
|
|
video_fn,
|
|
"No unique image found at destination that matches the image '%s' "
|
|
"(which belongs to video). Found %d images for glob '%s'"
|
|
% (image_file, len(matches), g)
|
|
)
|
|
|
|
_, ext = os.path.splitext(video_fn)
|
|
nfn: str = os.path.splitext(matches[0])[0] + ext
|
|
nfn_parts = list(os.path.split(nfn))
|
|
nfn_parts[-1] = "." + nfn_parts[-1]
|
|
return os.path.join(*nfn_parts)
|
|
|
|
def get_new_fn(video_fn: str, dest_dir: str) -> str:
|
|
_, ext = os.path.splitext(video_fn)
|
|
ext = ext.lower()
|
|
|
|
dest_fn = match_to_destination_image(video_fn, dest_dir)
|
|
if dest_fn:
|
|
return dest_fn
|
|
|
|
time = video_creation_time(video_fn)
|
|
new_base = os.path.join(dest_dir, time.strftime(FMT_FOLDER), time.strftime(FMT_BASE))
|
|
g = glob.glob(glob.escape(new_base) + "_*" + ext)
|
|
if len(g) == 0:
|
|
return new_base + "_0000" + ext
|
|
n = 0
|
|
s = os.path.getsize(video_fn)
|
|
for f in g:
|
|
if os.path.getsize(f) == s:
|
|
return f
|
|
p = r".*_(\d+)" + ext
|
|
nn = re.match(p, f)
|
|
n = max(n, int(nn.group(1)))
|
|
return new_base + "_%04d" % (n + 1) + ext
|
|
|
|
def is_video(fn):
|
|
_, ext = os.path.splitext(fn)
|
|
ext = ext.lower()
|
|
return ext in [".mp4", ".mov", ".mts"]
|
|
|
|
from dataclasses import dataclass
|
|
|
|
@dataclass
|
|
class DiscoveredVideo:
|
|
video_file: str
|
|
destination_file: str | None
|
|
exits: bool
|
|
error: Exception | None
|
|
|
|
def discover_new(dest_dir, *directories):
|
|
for directory in directories:
|
|
print("Discovering video files in '%s'" % directory)
|
|
n = 0
|
|
to_copy = 0
|
|
for path, _, files in os.walk(os.path.expanduser(directory)):
|
|
for f in sorted(files):
|
|
fn = os.path.join(path, f)
|
|
if not is_video(fn):
|
|
continue
|
|
n += 1
|
|
try:
|
|
nfn = get_new_fn(fn, dest_dir)
|
|
except VideoCopyException as e:
|
|
yield DiscoveredVideo(fn, None, False, e)
|
|
print("ERROR: %s: %s" % (e.video, e.message))
|
|
continue
|
|
if not os.path.exists(nfn):
|
|
if not os.path.exists(os.path.dirname(nfn)):
|
|
os.makedirs(os.path.dirname(nfn))
|
|
to_copy += 1
|
|
yield DiscoveredVideo(fn, nfn, False, None)
|
|
else:
|
|
if os.path.getsize(fn) != os.path.getsize(nfn):
|
|
raise VideoCopyException(fn, "Destination exists but has different size...")
|
|
yield DiscoveredVideo(fn, nfn, True, None)
|
|
print("Found %d/%d new videos." % (to_copy, n))
|
|
|
|
if __name__ == '__main__':
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("folder", nargs='+')
|
|
parser.add_argument("--destination", default=os.path.expanduser("~/Pictures/Darktable"))
|
|
parser.add_argument("--dry", action="store_true")
|
|
parser.add_argument("--verbose", action="store_true")
|
|
options = parser.parse_args()
|
|
|
|
video_files = list(discover_new(options.destination, *options.folder))
|
|
copying = list(filter(lambda r: not r.exits and r.error is None, video_files))
|
|
|
|
if options.verbose:
|
|
for c in video_files:
|
|
print("%s => %s%s%s" % (
|
|
c.video_file,
|
|
c.destination_file,
|
|
" (EXISTS)" if c.exits else "",
|
|
" (ERROR)" if c.error else ""
|
|
))
|
|
|
|
print()
|
|
print("Wants to copy %d/%d video files." % (len(copying), len(video_files)))
|
|
|
|
if len(copying) == 0:
|
|
exit(0)
|
|
reply = input("Continue? y/n ")
|
|
if reply != 'y':
|
|
print("You did not type 'y'. Not doing anything!")
|
|
exit(1)
|
|
|
|
for c in copying:
|
|
assert c.destination_file
|
|
print("Copying %s => %s" % (c.video_file, c.destination_file))
|
|
shutil.copy(c.video_file, c.destination_file)
|