Polish copy_videos.py
This commit is contained in:
parent
03201efc3b
commit
e26a859659
194
copy_videos.py
194
copy_videos.py
@ -6,8 +6,16 @@ import glob
|
||||
from PIL import Image
|
||||
import shutil
|
||||
import re
|
||||
import argparse
|
||||
|
||||
def get_time(fn):
|
||||
class VideoCopyException(Exception):
|
||||
|
||||
def __init__(self, video: str, message: str, *args: object) -> None:
|
||||
self.video = video
|
||||
self.message = message
|
||||
super().__init__(*args)
|
||||
|
||||
def video_creation_time(fn):
|
||||
try:
|
||||
output = subprocess.check_output([
|
||||
"ffprobe", "-v", "quiet", fn, "-print_format", "json",
|
||||
@ -15,8 +23,7 @@ def get_time(fn):
|
||||
"stream=index,codec_type:stream_tags=creation_time:format_tags=creation_time"
|
||||
])
|
||||
except subprocess.CalledProcessError as e:
|
||||
print("FFPROBE failed for ", fn)
|
||||
return ValueError()
|
||||
raise VideoCopyException(fn, "'ffprobe' failed to extract creation time") from e
|
||||
fmt = "%Y-%m-%dT%H:%M:%S.%fZ"
|
||||
data = json.loads(output)
|
||||
if "streams" in data:
|
||||
@ -25,6 +32,8 @@ def get_time(fn):
|
||||
return datetime.datetime.strptime(d["tags"]["creation_time"], fmt)
|
||||
if "format" in data and "tags" in data["format"] and "creation_time" in data["format"]["tags"]:
|
||||
return datetime.datetime.strptime(data["format"]["tags"]["creation_time"], fmt)
|
||||
|
||||
raise VideoCopyException(fn, "Could not find creation time in 'ffprobe' output")
|
||||
#import sys
|
||||
#import zoneinfo
|
||||
#dt = datetime.datetime.fromtimestamp(
|
||||
@ -36,98 +45,149 @@ def get_time_image(fn):
|
||||
fmt = "%Y:%m:%d %H:%M:%S"
|
||||
return datetime.datetime.strptime(Image.open(fn)._getexif()[36867], fmt)
|
||||
|
||||
def get_new_fn(fn):
|
||||
base, ext = os.path.splitext(fn)
|
||||
directory, base_fn = os.path.split(base)
|
||||
ext = ext.lower()
|
||||
|
||||
FMT_FOLDER = "%Y%m%d"
|
||||
FMT_BASE = "%Y%m%d_%H%M"
|
||||
|
||||
def get_matching_image(video_fn):
|
||||
base, ext = os.path.splitext(video_fn)
|
||||
ext = ext.lower()
|
||||
image_files = list(
|
||||
filter(lambda fn: os.path.splitext(fn)[1].lower() in [".jpeg", ".jpg", ".png"],
|
||||
glob.iglob(glob.escape(base) + "*")))
|
||||
assert len(image_files) < 2
|
||||
fmt_folder = "%Y%m%d"
|
||||
fmt_base = "%Y%m%d_%H%M"
|
||||
if image_files:
|
||||
f = image_files[0]
|
||||
image_ext = os.path.splitext(f)[-1]
|
||||
image_dt = get_time_image(f)
|
||||
image_size = os.path.getsize(f)
|
||||
new_base = os.path.join(os.path.expanduser("~/Pictures/Darktable"), image_dt.strftime(fmt_folder), image_dt.strftime(fmt_base))
|
||||
if len(image_files) >= 2:
|
||||
raise VideoCopyException(video_fn, "Too many images found that could belong to video")
|
||||
|
||||
if len(image_files) == 0:
|
||||
return None
|
||||
return image_files[0]
|
||||
|
||||
def match_to_destination_image(video_fn: str, dest_dir: str) -> str | None:
|
||||
image_file = get_matching_image(video_fn)
|
||||
if image_file is None:
|
||||
return None
|
||||
|
||||
image_ext = os.path.splitext(image_file)[-1]
|
||||
image_dt = get_time_image(image_file)
|
||||
image_size = os.path.getsize(image_file)
|
||||
new_base = os.path.join(
|
||||
dest_dir,
|
||||
image_dt.strftime(FMT_FOLDER),
|
||||
image_dt.strftime(FMT_BASE)
|
||||
)
|
||||
|
||||
g = glob.escape(new_base) + "_*" + image_ext
|
||||
matches = [g for g in glob.glob(g) if image_size == os.path.getsize(g)]
|
||||
if len(matches) != 1:
|
||||
print()
|
||||
print("NO MATCH")
|
||||
print(image_files)
|
||||
print(matches)
|
||||
print(g)
|
||||
print(image_dt)
|
||||
raise VideoCopyException(
|
||||
video_fn,
|
||||
"No unique image found at destination that matches the image '%s' "
|
||||
"(which belongs to video). Found %d images for glob '%s'"
|
||||
% (image_file, len(matches), g)
|
||||
)
|
||||
|
||||
return None
|
||||
assert len(matches) == 1, fn
|
||||
nfn = os.path.splitext(matches[0])[0] + ext
|
||||
nfn = list(os.path.split(nfn))
|
||||
nfn[-1] = "." + nfn[-1]
|
||||
return os.path.join(*nfn)
|
||||
_, ext = os.path.splitext(video_fn)
|
||||
nfn: str = os.path.splitext(matches[0])[0] + ext
|
||||
nfn_parts = list(os.path.split(nfn))
|
||||
nfn_parts[-1] = "." + nfn_parts[-1]
|
||||
return os.path.join(*nfn_parts)
|
||||
|
||||
time = get_time(fn)
|
||||
if isinstance(time, ValueError):
|
||||
print("NO TIME")
|
||||
return None
|
||||
new_base = os.path.join(os.path.expanduser("~/Pictures/Darktable"), time.strftime(fmt_folder), time.strftime(fmt_base))
|
||||
def get_new_fn(video_fn: str, dest_dir: str) -> str:
|
||||
_, ext = os.path.splitext(video_fn)
|
||||
ext = ext.lower()
|
||||
|
||||
dest_fn = match_to_destination_image(video_fn, dest_dir)
|
||||
if dest_fn:
|
||||
return dest_fn
|
||||
|
||||
time = video_creation_time(video_fn)
|
||||
new_base = os.path.join(dest_dir, time.strftime(FMT_FOLDER), time.strftime(FMT_BASE))
|
||||
g = glob.glob(glob.escape(new_base) + "_*" + ext)
|
||||
if len(g) == 0:
|
||||
return new_base + "_0000" + ext
|
||||
n = 0
|
||||
s = os.path.getsize(fn)
|
||||
s = os.path.getsize(video_fn)
|
||||
for f in g:
|
||||
if os.path.getsize(f) == s:
|
||||
return f
|
||||
p = r".*_(\d+)" + ext
|
||||
nn = re.match(p, f)
|
||||
n = max(n, int(nn.group(1)))
|
||||
n = new_base + "_%04d" % (n + 1) + ext
|
||||
return n
|
||||
|
||||
def main(dir, dry):
|
||||
print(dir)
|
||||
for p, folders, files in os.walk(os.path.expanduser(dir)):
|
||||
for f in sorted(files):
|
||||
base, ext = os.path.splitext(f)
|
||||
return new_base + "_%04d" % (n + 1) + ext
|
||||
|
||||
def is_video(fn):
|
||||
_, ext = os.path.splitext(fn)
|
||||
ext = ext.lower()
|
||||
if ext not in [".mp4", ".mov", ".mts"]:
|
||||
if ext not in [".jpg", ".jpeg", ".png"]:
|
||||
pass # print("Not handled: ", os.path.join(p, f))
|
||||
continue
|
||||
fn = os.path.join(p, f)
|
||||
nfn = get_new_fn(fn)
|
||||
if not nfn:
|
||||
print("SKIP: ", fn)
|
||||
continue
|
||||
assert nfn
|
||||
return ext in [".mp4", ".mov", ".mts"]
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
@dataclass
|
||||
class DiscoveredVideo:
|
||||
video_file: str
|
||||
destination_file: str | None
|
||||
exits: bool
|
||||
error: Exception | None
|
||||
|
||||
def discover_new(dest_dir, *directories):
|
||||
for directory in directories:
|
||||
print("Discovering video files in '%s'" % directory)
|
||||
n = 0
|
||||
to_copy = 0
|
||||
for path, _, files in os.walk(os.path.expanduser(directory)):
|
||||
for f in sorted(files):
|
||||
fn = os.path.join(path, f)
|
||||
if not is_video(fn):
|
||||
continue
|
||||
n += 1
|
||||
try:
|
||||
nfn = get_new_fn(fn, dest_dir)
|
||||
except VideoCopyException as e:
|
||||
yield DiscoveredVideo(fn, None, False, e)
|
||||
print("ERROR: %s: %s" % (e.video, e.message))
|
||||
continue
|
||||
if not os.path.exists(nfn):
|
||||
if not os.path.exists(os.path.dirname(nfn)):
|
||||
os.makedirs(os.path.dirname(nfn))
|
||||
print(fn, nfn)
|
||||
if not dry:
|
||||
shutil.copy(fn, nfn)
|
||||
to_copy += 1
|
||||
yield DiscoveredVideo(fn, nfn, False, None)
|
||||
else:
|
||||
print(fn, nfn, "EXIST")
|
||||
if os.path.getsize(fn) != os.path.getsize(nfn):
|
||||
raise VideoCopyException(fn, "Destination exists but has different size...")
|
||||
yield DiscoveredVideo(fn, nfn, True, None)
|
||||
print("Found %d/%d new videos." % (to_copy, n))
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("folder", nargs='+')
|
||||
parser.add_argument("--destination", default=os.path.expanduser("~/Pictures/Darktable"))
|
||||
parser.add_argument("--dry", action="store_true")
|
||||
parser.add_argument("--verbose", action="store_true")
|
||||
options = parser.parse_args()
|
||||
|
||||
video_files = list(discover_new(options.destination, *options.folder))
|
||||
copying = list(filter(lambda r: not r.exits and r.error is None, video_files))
|
||||
|
||||
if options.verbose:
|
||||
for c in video_files:
|
||||
print("%s => %s%s%s" % (
|
||||
c.video_file,
|
||||
c.destination_file,
|
||||
" (EXISTS)" if c.exits else "",
|
||||
" (ERROR)" if c.error else ""
|
||||
))
|
||||
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--dry", action="store_true")
|
||||
options = parser.parse_args()
|
||||
main("/run/media/oke/3634-3063", options.dry)
|
||||
main("/run/media/oke/3138-3162", options.dry)
|
||||
main("/run/media/oke/01D5-9878/DCIM/", options.dry)
|
||||
main("/run/media/oke/E5B5-DBF0/DCIM", options.dry)
|
||||
main("/run/media/oke/disk/", options.dry)
|
||||
main("~/Nextcloud/InstantUpload", options.dry)
|
||||
main("/home/oke/Desktop/360/", options.dry)
|
||||
print()
|
||||
print("Wants to copy %d/%d video files." % (len(copying), len(video_files)))
|
||||
|
||||
if len(copying) == 0:
|
||||
exit(0)
|
||||
reply = input("Continue? y/n ")
|
||||
if reply != 'y':
|
||||
print("You did not type 'y'. Not doing anything!")
|
||||
exit(1)
|
||||
|
||||
for c in copying:
|
||||
assert c.destination_file
|
||||
print("Copying %s => %s" % (c.video_file, c.destination_file))
|
||||
shutil.copy(c.video_file, c.destination_file)
|
||||
|
Loading…
x
Reference in New Issue
Block a user