274 lines
8.4 KiB
Python
274 lines
8.4 KiB
Python
from pytable.models import Image, ImageFlags, FilmRoll
|
|
|
|
from datetime import datetime
|
|
import peewee as pw
|
|
import os
|
|
from dataclasses import dataclass
|
|
from typing import Tuple, Optional, Generic, TypeVar, Iterable
|
|
from vmgr.actions import Trash
|
|
import subprocess
|
|
import shlex
|
|
|
|
T = TypeVar('T')
|
|
Range = Tuple[Optional[T], Optional[T]]
|
|
|
|
|
|
class Filter:
|
|
|
|
def __call__(self, image: Image):
|
|
return self.matches(image)
|
|
|
|
def matches(self, image: Image) -> bool:
|
|
return True
|
|
|
|
def __str__(self) -> str:
|
|
return 'All'
|
|
|
|
|
|
@dataclass
|
|
class LocalRegion(Filter):
|
|
|
|
date_range: Range[datetime] = (None, None)
|
|
stars_range: Range[int] = (None, None)
|
|
|
|
def matches(self, image: Image) -> bool:
|
|
|
|
stars = image.stars if not image.flag(ImageFlags.REJECTED) else -1
|
|
if self.stars_range[0] is not None and stars < self.stars_range[0]:
|
|
return False
|
|
if self.stars_range[1] is not None and stars > self.stars_range[1]:
|
|
return False
|
|
|
|
dt = (image.datetime_taken or image.datetime_imported)
|
|
if self.date_range[0] is not None and dt < self.date_range[0]:
|
|
return False
|
|
if self.date_range[1] is not None and dt > self.date_range[1]:
|
|
return False
|
|
return True
|
|
|
|
def __str__(self) -> str:
|
|
return f"LocalRange[({self.date_range[0] and self.date_range[0].strftime("%Y%m%d")}, {self.date_range[1] and self.date_range[1].strftime("%Y%m%d")}), ({self.stars_range[0]}, {self.stars_range[1]})]"
|
|
|
|
@dataclass
|
|
class Or(Filter):
|
|
|
|
filters: Iterable[Filter]
|
|
|
|
def matches(self, image: Image) -> bool:
|
|
return any(f(image) for f in self.filters)
|
|
|
|
def __str__(self) -> str:
|
|
return f"Or([\n\t{"\n\t".join((str(f) + ',' for f in self.filters))}\n])"
|
|
|
|
@dataclass
|
|
class And(Filter):
|
|
|
|
filters: Iterable[Filter]
|
|
|
|
def matches(self, image: Image) -> bool:
|
|
return all(f(image) for f in self.filters)
|
|
|
|
def __str__(self) -> str:
|
|
return f"And([\n\t{"\n\t".join(('<' + str(f) + '>,' for f in self.filters))}\n])"
|
|
|
|
|
|
class Config:
|
|
|
|
def __init__(self, d_root: str | None = None, d_local: str | None=None, dry_run=False) -> None:
|
|
self.d_root = d_root or "/home/oke/Pictures/Darktable"
|
|
self.d_local = d_local or "/home/oke/Pictures/DarktableLocal"
|
|
self.dry_run = dry_run
|
|
|
|
def image_local_folder(self, image: Image) -> str:
|
|
folder = image.film.folder
|
|
if not folder.startswith(self.d_root):
|
|
raise NotImplementedError()
|
|
assert folder.startswith(self.d_root)
|
|
return folder.replace(self.d_root, self.d_local)
|
|
|
|
def image_local_image(self, image: Image) -> str:
|
|
return os.path.join(self.image_local_folder(image), image.filename)
|
|
|
|
def image_local_image_xmp(self, image: Image) -> str:
|
|
base, ext = os.path.splitext(image.filename)
|
|
version = ("_%02d" % image.version) if image.version != 0 else ""
|
|
xmp_filename = base + version + ext + '.xmp'
|
|
return os.path.join(self.image_local_folder(image), xmp_filename)
|
|
|
|
def image_local_iphone_movie(self, image: Image) -> str:
|
|
base, _ = os.path.splitext(image.filename)
|
|
return os.path.join(self.image_local_folder(image), "." + base + ".mov")
|
|
|
|
def potential_extras(self, image):
|
|
yield self.image_local_iphone_movie(image)
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Dict, List, Optional
|
|
from enum import Enum
|
|
|
|
class SyncReq(Enum):
|
|
EXISTS = 0
|
|
REQUIRED_SOFT = 1
|
|
REQUIRED = 2
|
|
|
|
@staticmethod
|
|
def max(a: "SyncReq", b: "SyncReq"):
|
|
if a.value < b.value:
|
|
return b
|
|
return a
|
|
@staticmethod
|
|
def required(v: 'SyncReq'):
|
|
return v == SyncReq.REQUIRED or v == SyncReq.REQUIRED_SOFT
|
|
|
|
@staticmethod
|
|
def make(required, optional):
|
|
if required:
|
|
if optional:
|
|
return SyncReq.REQUIRED_SOFT
|
|
return SyncReq.REQUIRED
|
|
return SyncReq.EXISTS
|
|
|
|
@dataclass
|
|
class FileInfo:
|
|
required: bool = False
|
|
optional: bool = False
|
|
|
|
class SyncManager:
|
|
|
|
def __init__(self, config: Config, filter: Filter) -> None:
|
|
self._config = config
|
|
self._filter = filter
|
|
self._files: Dict[str, SyncReq] = {}
|
|
|
|
def register_file(self, fn: str, requirement: SyncReq):
|
|
if fn in self._files:
|
|
requirement = SyncReq.max(self._files[fn], requirement)
|
|
self._files[fn] = requirement
|
|
|
|
def register_image(self, image: Image):
|
|
required = self._filter(image)
|
|
self.register_file(
|
|
self._config.image_local_image(image),
|
|
SyncReq.make(required, False)
|
|
)
|
|
self.register_file(
|
|
self._config.image_local_image_xmp(image),
|
|
SyncReq.make(required, False)
|
|
)
|
|
|
|
for extra in self._config.potential_extras(image):
|
|
self.register_file(
|
|
extra,
|
|
SyncReq.make(required, True)
|
|
)
|
|
|
|
def partition(self):
|
|
result = {k: [] for k in SyncReq}
|
|
for file, req in self._files.items():
|
|
result[req].append(file)
|
|
return result
|
|
|
|
def write_to(fn, file_list, realtive_to):
|
|
lines = list(sorted(map(
|
|
lambda fn: os.path.relpath(fn, realtive_to) + "\n",
|
|
file_list
|
|
)))
|
|
with open(fn, "w") as f:
|
|
f.writelines(lines)
|
|
|
|
def synchronize_from_remote(config, required_files):
|
|
write_to("required.txt", required_files, config.d_local)
|
|
user = "oke"
|
|
host = "192.168.20.2"
|
|
args = [
|
|
"rsync",
|
|
"--info=progress",
|
|
"--ignore-existing",
|
|
"-av",
|
|
"--files-from",
|
|
"required.txt",
|
|
f"{user}@{host}:lenovo-darktable{config.d_local}",
|
|
os.path.abspath(config.d_local)
|
|
]
|
|
if config.dry_run:
|
|
args.append('--dry-run')
|
|
print(shlex.join(args))
|
|
subprocess.call(args)
|
|
|
|
def remove_unnessecary(config: Config, to_remove):
|
|
remove_list_fn = "sync_remove.txt"
|
|
try:
|
|
|
|
reply = input("There are %d files to remove. Want to see them [type: y]? " % len(to_remove))
|
|
if reply == 'y':
|
|
write_to(remove_list_fn, to_remove, config.d_local)
|
|
subprocess.call(['vim', '-R', remove_list_fn])
|
|
if not config.dry_run:
|
|
reply = input("There are %d files to remove. Want to DELETE them [type: YES]? " % len(to_remove))
|
|
if reply == 'YES':
|
|
for f in to_remove:
|
|
Trash(f).run()
|
|
finally:
|
|
if os.path.exists(remove_list_fn):
|
|
os.remove(remove_list_fn)
|
|
|
|
def main():
|
|
import argparse
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--dry-run", action='store_true')
|
|
options, filter_args = parser.parse_known_args()
|
|
|
|
filter = None
|
|
while filter_args:
|
|
if filter_args[0] != '--sync':
|
|
print(f"Unexpected arg {filter_args[0]}")
|
|
exit(1)
|
|
try:
|
|
next_sync = filter_args.index('--sync', 1)
|
|
args = filter_args[:next_sync]
|
|
filter_args = filter_args[next_sync:]
|
|
except ValueError:
|
|
args = filter_args
|
|
filter_args = []
|
|
|
|
filter_parser = argparse.ArgumentParser()
|
|
filter_parser.add_argument("--from", dest='from_date', type=lambda s: datetime.strptime(s, '%Y%m%d'), default=None)
|
|
filter_parser.add_argument("--to", dest='to_date', type=lambda s: datetime.strptime(s, '%Y%m%d'), default=None)
|
|
filter_parser.add_argument("--min-stars", type=int, default=None)
|
|
filter_parser.add_argument("--max-stars", type=int, default=None)
|
|
|
|
filter_options = filter_parser.parse_args(args[1:])
|
|
parsed_filter = LocalRegion((filter_options.from_date, filter_options.to_date), (filter_options.min_stars, filter_options.max_stars))
|
|
if filter:
|
|
filter = Or([
|
|
filter,
|
|
parsed_filter
|
|
])
|
|
else:
|
|
filter = parsed_filter
|
|
|
|
if not filter:
|
|
print("Nothing to be synchronized")
|
|
exit()
|
|
print(filter)
|
|
config = Config(dry_run=options.dry_run)
|
|
|
|
sync_manager = SyncManager(config, filter)
|
|
|
|
query = Image.filter()
|
|
pw.prefetch(query, FilmRoll)
|
|
for image in query:
|
|
sync_manager.register_image(image)
|
|
|
|
|
|
actions = sync_manager.partition()
|
|
|
|
required_files = actions[SyncReq.REQUIRED]
|
|
synchronize_from_remote(config, required_files)
|
|
|
|
to_remove = [l for l in actions[SyncReq.EXISTS] if os.path.exists(l)]
|
|
remove_unnessecary(config, to_remove)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|