"""
Provides functions to sync folder to replica.
"""
import logging
from collections import deque
from hashlib import md5
from pathlib import Path
from shutil import copy2, copytree, rmtree
from time import sleep
from timeit import default_timer
from typing import Literal, TypeAlias
import click
__author__ = "George Murga"
__copyright__ = "George Murga"
__license__ = "MIT"
_logger: logging.Logger = logging.getLogger(__name__)
LOGLEVEL: TypeAlias = Literal["debug", "info", "warn", "error", "critical"]
[docs]def sync_folder(source: Path, replica: Path, syncinterval: int, logfile: Path, loglevel: LOGLEVEL) -> None:
"""Synchronizes SOURCE folder to REPLICA folder.
Args:
source (pathlib.Path): path of the source folder
replica (pathlib.Path): path of the target folder
syncinterval (int): period with which to repeat the sync in seconds
logfile (pathlib.Path): path of the logfile
loglevel (LOGLEVEL): level of the log
Returns:
None
"""
source = Path(source).resolve()
replica = Path(replica).resolve()
logfile = Path(logfile).resolve()
sync_count: int = 1
setup_logging(loglevel, logfile=logfile)
validate_source(source)
validate_replica(replica)
_logger.info("Starting sync every %s seconds. SOURCE: %s -> REPLICA: %s" % (syncinterval, source.resolve(), replica.resolve()))
try:
while True:
_logger.info("Syncing round %d (every %d seconds)" % (sync_count, syncinterval))
start_time: float = default_timer()
files_count: int = 0
folders_count: int = 0
files_copied: int = 0
files_updated: int = 0
folders_copied: int = 0
files_deleted: int = 0
folders_deleted: int = 0
folders_deleted1: int = 0
folders_deleted2: int = 0
files_count, folders_count, files_copied, files_updated, folders_copied, folders_deleted1 = sync_source_to_replica(
source, replica
)
files_deleted, folders_deleted2 = sync_replica_to_source(source, replica)
folders_deleted = folders_deleted1 + folders_deleted2
_logger.info(
"processed: total files = %d, total folders = %d, files copied = %d, files_updated = %d, folders_copied = %d, "
"files_deleted = %d, folders_deleted = %d"
% (files_count, folders_count, files_copied, files_updated, folders_copied, files_deleted, folders_deleted)
)
if syncinterval == 0:
break
end_time: float = default_timer()
_logger.info("Waiting for next sync round...")
while True:
if end_time - start_time < syncinterval:
sleep(1)
end_time = default_timer()
else:
break
sync_count += 1
except KeyboardInterrupt:
_logger.warn("Sync interrupted by keyboard")
except FileNotFoundError as e:
_logger.exception(e, exc_info=True)
except ExpectedFileIsAFolder as e:
_logger.exception(e, exc_info=True)
except PermissionError as e:
_logger.exception(e, exc_info=True)
finally:
_logger.info("Syncing stopped.")
[docs]def sync_source_to_replica(source: Path, replica: Path) -> tuple[int, int, int, int, int, int]:
"""Sync source contents to replica.
Args:
source (pathlib.Path): path of the source folder
replica (pathlib.Path): path of the target folder
Returns:
Tuple[int, int, int, int]:
files_count - how many files were processed,
folders_count - how many folders were processed,
files_copied - how many files were copied to the replica,
folders_copied - how many folders were copied to the replica.
"""
_logging: logging.Logger = logging.getLogger(__name__)
files_count: int = 0
folders_count: int = 0
files_copied: int = 0
files_updated: int = 0
folders_copied: int = 0
folders_deleted: int = 0
folder_queue: deque = deque()
folder_queue.append(source)
_logging.debug("added source folder to queue: %s" % source.as_posix())
while len(folder_queue) > 0:
current_folder: Path = folder_queue.popleft()
for el in current_folder.glob("*"):
if el.is_file():
_logging.debug("processing file: %s" % el.as_posix())
files_count += 1
if is_file_in_other_as_folder(el, source, replica):
replica_folder: Path = replica / (el.as_posix().replace(source.as_posix(), "").lstrip("/"))
replica_folder.rmdir()
_logging.info("deleted folder %s" % replica_folder.as_posix())
folders_deleted += 1
if is_file_in_other(el, source, replica):
if is_file_in_other_modified(el, source, replica):
replica_file: Path = Path(
copy2(
el,
replica / (el.as_posix().replace(source.as_posix(), "").lstrip("/")),
)
)
_logging.info("updated file %s to %s" % (el.as_posix(), replica_file.as_posix()))
files_updated += 1
else:
replica_file = Path(
copy2(
el,
replica / (el.as_posix().replace(source.as_posix(), "").lstrip("/")),
)
)
_logging.info("copied file %s to %s" % (el.as_posix(), replica_file.as_posix()))
files_copied += 1
elif el.is_dir():
folders_count += 1
if is_folder_in_other_as_folder(el, source, replica):
folder_queue.append(el)
_logging.debug("added to queue: %s" % el.as_posix())
else:
if is_folder_in_other_as_file(el, source, replica):
replica_file = Path(
copy2(
el,
replica / (el.as_posix().replace(source.as_posix(), "").lstrip("/")),
)
)
replica_file.unlink()
_logging.info("deleted file %s" % replica_file.as_posix())
destination_folder = Path(el.as_posix().replace(source.as_posix(), replica.as_posix()))
replica_folder = Path(copytree(el, destination_folder))
_logging.info("copied whole folder to replica %s" % replica_folder.as_posix())
folders_copied += 1
return files_count, folders_count, files_copied, files_updated, folders_copied, folders_deleted
[docs]def sync_replica_to_source(source: Path, replica: Path) -> tuple[int, int]:
"""Remove files and folders from replica which are not in source.
Args:
source (pathlib.Path): path of the source folder
replica (pathlib.Path): path of the target folder
Returns:
Tuple[int, int, int, int]:
files_deleted - how many files were deleted,
folders_deleted - how many folders were deleted,
"""
_logging: logging.Logger = logging.getLogger(__name__)
files_deleted: int = 0
folders_deleted: int = 0
folder_queue: deque = deque()
folder_queue.append(replica)
_logging.debug("added replica to folder queue: %s" % replica.as_posix())
while len(folder_queue) > 0:
current_folder: Path = folder_queue.popleft()
for el in current_folder.glob("*"):
if el.is_file():
_logging.debug("processing file: %s" % el.as_posix())
if not is_file_in_other(el, replica, source):
el.unlink()
_logger.info("deleted file from replica: %s" % el.as_posix())
files_deleted += 1
elif el.is_dir():
if is_folder_in_other_as_folder(el, replica, source):
pass
else:
rmtree(el)
_logger.info("deleted folder from replica: %s" % el.as_posix())
folders_deleted += 1
return files_deleted, folders_deleted
[docs]def is_folder_in_other_as_folder(folder_to_check: Path, source: Path, destination: Path) -> bool:
"""Return true if the folder_to_check path is in destination and is a folder.
Args:
folder_to_check (pathlib.Path): the folder to search for in destination (relative path must match)
source (pathlib.Path): path of the soruce folder
destination (pathlib.Path): path of the destination
Returns:
bool: True if the folder searched is in the destination folder and is a file. False otherwise.
"""
glob_str: str = folder_to_check.as_posix().replace(source.as_posix(), "").lstrip("/")
potential_matches = list(destination.glob(glob_str))
if len(potential_matches) > 0:
match_folder: Path = potential_matches[0]
if match_folder.is_dir():
return True
return False
[docs]def is_folder_in_other_as_file(folder_to_check: Path, source: Path, destination: Path) -> bool:
"""Return true if the folder_to_check path is in destination but it's a file not a folder.
Args:
folder_to_check (pathlib.Path): the folder to search for in destination (relative path must match)
source (pathlib.Path): path of the soruce folder
destination (pathlib.Path): path of the destination
Returns:
bool: True if the folder searched is in the destination folder but it's a file. False otherwise.
"""
glob_str: str = folder_to_check.as_posix().replace(source.as_posix(), "").lstrip("/")
potential_matches = list(destination.glob(glob_str))
if len(potential_matches) > 0:
match_folder: Path = potential_matches[0]
if match_folder.is_file():
return True
return False
[docs]def is_file_in_other_as_folder(file_to_check: Path, source: Path, destination: Path) -> bool:
"""Check if file_to_check is in destination folder but it's a folder.
Args:
file_to_check (pathlib.Path): path of the file to check from the source folder
source (pathlib.Path): path of the source folder
destination (pathlib.Path): path of the destination folder
Returns:
bool: True if the file is found in the destination and is a folder. False otherwise
"""
glob_str: str = file_to_check.as_posix().replace(source.as_posix(), "").lstrip("/")
potential_matches = list(destination.glob(glob_str))
if len(potential_matches) > 0:
match_file: Path = potential_matches[0]
if match_file.is_dir():
return True
return False
[docs]def is_file_in_other(file_to_check: Path, source: Path, destination: Path) -> bool:
"""Check if file_to_check is in destination folder.
Args:
file_to_check (pathlib.Path): path of the file to check from the source folder
source (pathlib.Path): path of the source folder
destination (pathlib.Path): path of the destination folder
Returns:
bool: True if the file is found in the destination False otherwise
Raises:
ExpectedFileIsAFolder custom exception if the file is found but it's a folder
"""
glob_str: str = file_to_check.as_posix().replace(source.as_posix(), "").lstrip("/")
potential_matches = list(destination.glob(glob_str))
if len(potential_matches) > 0:
match_file: Path = potential_matches[0]
if match_file.is_file():
return True
else:
raise ExpectedFileIsAFolder(f"Expected {match_file.as_posix()} to be a file but it's a folder.")
return False
[docs]class ExpectedFileIsAFolder(Exception):
pass
[docs]def is_file_in_other_modified(file_to_check: Path, source: Path, destination: Path) -> bool:
"""Check if file_to_check is in destination folder and it's the same file.
Given there is a file with the same name in the destination folder (same relative path)
assume if modification times are the same the files are the same.
If the modifications time are different compare the files' content using md5.
Args:
file_to_check (pathlib.Path): path of the file to check from the source folder
source (pathlib.Path): path of the source folder
destination (pathlib.Path): path of the destination folder
Returns:
bool: False of the file is found in the destination at the same relative path and either
the modification times are the same or the md5 hash of the contents are the same. True otherwise
Raises:
FileNotFoundError if the file is not found in the destination folder
"""
glob_str: str = file_to_check.as_posix().replace(source.as_posix(), "").lstrip("/")
potential_matches = list(destination.glob(glob_str))
if len(potential_matches) > 0:
match_file: Path = potential_matches[0]
destination_mdate: float = match_file.stat().st_mtime
source_mdate: float = file_to_check.stat().st_mtime
if source_mdate == destination_mdate:
return False
source_hash: str = compute_hash(file_to_check)
destination_hash: str = compute_hash(match_file)
if source_hash == destination_hash:
return False
else:
raise FileNotFoundError
return True
[docs]def compute_hash(file_to_check: Path) -> str:
hash = md5()
with file_to_check.open("rb") as f:
chunk: bytes = f.read(4096)
while chunk:
hash.update(chunk)
chunk = f.read(4096)
return hash.hexdigest()
[docs]def validate_source(path: Path) -> bool:
"""Validate source folder
Args:
path (pathlib.Path): source folder path
Returns:
bool: True if path exists and is a folder
Raises:
SystemExit: path does not exist
SystemExit: path is not a folder
"""
_logger: logging.Logger = logging.getLogger(__name__)
if not path.exists():
_logger.error("SOURCE folder: %s doesn't exist" % path.as_posix())
raise SystemExit(1)
if not path.is_dir():
_logger.error("SOURCE: %s is not a folder" % path.as_posix())
raise SystemExit(1)
return True
[docs]def validate_replica(path: Path) -> bool:
"""Validate replica folder.
If it doesn't exist, create it.
If it exists but it's not a folder raise SystemExit.
If it can't create it raise PermissionError
Args:
path (pathlib.Path): replica folder path
Returns:
bool: True if path exists and is a folder or if it doesn't exist but it created it successfuelly.
Raises:
SystemExit: path is not a folder
SystemExit: could not create replica folder
"""
_logger: logging.Logger = logging.getLogger(__name__)
if not path.exists():
try:
path.mkdir(parents=True)
except PermissionError:
_logger.error("Permission denied trying to create REPLICA folder: %s" % path.as_posix())
raise SystemExit()
if not path.is_dir():
_logger.error("REPLICA: %s is not a folder" % path.as_posix())
raise SystemExit(1)
return True
[docs]def setup_logging(loglevel: LOGLEVEL, logfile: str | Path) -> None:
"""Setup logging
Args:
loglevel (logging._Level): minimum loglevel for emitting messages
Returns:
None
"""
_logger: logging.Logger = logging.getLogger(__name__)
loglevels: dict[str, int] = {
"debug": logging.DEBUG,
"info": logging.INFO,
"warn": logging.WARNING,
"error": logging.ERROR,
"critical": logging.CRITICAL,
}
_logger.setLevel(loglevels[loglevel])
logformat = "[%(asctime)s.%(msecs)03d] %(levelname)s:%(name)s:- %(message)s"
formatter = logging.Formatter(fmt=logformat, datefmt="%Y-%m-%d %H:%M:%S")
# setup file logging
fh = logging.FileHandler(logfile, encoding="utf-8", errors="replace")
fh.setLevel(loglevels[loglevel])
fh.setFormatter(formatter)
# setup console logging
ch = logging.StreamHandler() # type: ignore
ch.setLevel(loglevels[loglevel])
ch.setFormatter(formatter)
_logger.addHandler(fh)
_logger.addHandler(ch)
@click.command()
@click.argument("source", type=click.Path(path_type=Path))
@click.argument("replica", type=click.Path(path_type=Path))
@click.option(
"--syncinterval",
type=click.IntRange(min=0, max=2_678_400),
help="Seconds bettwen synchronizations.\nmin = 0 (sync only once), \nmax = 2678400 (31 days). Default = 0",
required=True,
default=0,
)
@click.option("--logfile", type=click.Path(path_type=Path), help="path to log file", required=True)
@click.option(
"--loglevel",
type=click.Choice(["debug", "info", "warn", "error", "critical"], case_sensitive=False),
default="info",
help="Default = info",
)
@click.version_option()
@click.help_option("-h", "--help")
def main(
source: Path,
replica: Path,
syncinterval: int,
logfile: Path,
loglevel: LOGLEVEL,
# loglevel: Literal["debug", "info", "warn", "error", "critical"],
) -> None:
"""Main entrypoint"""
sync_folder(source, replica, syncinterval, logfile, loglevel)
if __name__ == "__main__":
main()