21"""Move SFTs between directories."""
27from contextlib
import contextmanager
28from concurrent.futures
import ProcessPoolExecutor, as_completed
31from lal
import LALERRORBIT, LALWARNINGBIT, LALINFOBIT, LALTRACEBIT
32from lal
import GetDebugLevel, ClobberDebugLevel
34from lalpulsar
import git_version
35from lalpulsar
import ValidateSFTFile, SFTErrorMessage
37__author__ =
"Evan Goetz <evan.goetz@ligo.org>"
38__credits__ =
"Karl Wette <karl.wette@ligo.org>"
39__version__ = git_version.id
40__date__ = git_version.date
45 saveDebugLevel = GetDebugLevel()
46 silentDebugLevel = saveDebugLevel & ~(
47 LALERRORBIT | LALWARNINGBIT | LALINFOBIT | LALTRACEBIT
49 ClobberDebugLevel(silentDebugLevel)
53 ClobberDebugLevel(saveDebugLevel)
58 parser = argparse.ArgumentParser(description=__doc__)
60 "-p",
"--processes", type=int, default=1, help=
"number of moving processes"
67 help=
"do not validate destination SFTs",
70 "-s",
"--source-directory", type=str, help=
"SFT source directory"
77 help=
"Channel names (must be the same number as " "--dest-directory arguments)",
84 help=
"SFT destination directory (must be the same "
85 "number as --channels arguments)",
87 args = parser.parse_args()
90 if args.processes <= 0:
91 parser.error(
"--processes must be strictly positive")
92 if not os.path.isdir(args.source_directory):
93 parser.error(
"source_directory is not a directory")
94 if len(args.channels) != len(args.dest_directory):
96 "Number of channel arguments must equal number of " "directory arguments"
107 chan = channel.split(
":")[1].replace(
"-",
"").replace(
"_",
"")
110 for src_root, _, src_files
in os.walk(source_directory):
111 for src_file
in src_files:
112 if src_file.endswith(
".sft")
and chan
in src_file:
113 src_path = os.path.join(src_root, src_file)
114 _, src_name = os.path.split(src_path)
117 dest_path = os.path.join(dest_directory, src_name)
120 src_dest_paths.append((src_path, dest_path))
122 return src_dest_paths
127 print(f
"{__file__}: making {len(dest_dirs)} directories ...", flush=
True)
128 for dest_dir
in dest_dirs:
129 os.makedirs(dest_dir, exist_ok=
True)
130 print(f
"{__file__}: making {len(dest_dirs)} directories ... done\n", flush=
True)
135 tmp_dest_path = dest_path +
"_TO_BE_VALIDATED"
136 shutil.move(src_path, tmp_dest_path)
142 if validate_errorcode != 0:
144 return (tmp_dest_path, validate_errorstr)
147 os.rename(tmp_dest_path, dest_path)
156 print(f
"{__file__}: copying {len(src_dest_paths)} SFTs ...", flush=
True)
157 with ProcessPoolExecutor(max_workers=args.processes)
as executor:
160 executor.submit(move_SFT_file, src_path, dest_path, validate)
161 for src_path, dest_path
in src_dest_paths
165 for task
in tqdm(as_completed(pool), total=len(pool)):
166 validate_error = task.result()
167 if validate_error
is not None:
168 validate_errors.append(validate_error)
175 f
"{__file__}: failed to validate {len(validate_errors)} SFTs after copying:",
178 for tmp_dest_path, validate_errorstr
in validate_errors:
179 print(f
" {tmp_dest_path}\n {validate_errorstr}", flush=
True)
182 print(f
"{__file__}: copying {len(src_dest_paths)} SFTs ... done\n", flush=
True)
185if __name__ ==
"__main__":
190 for idx, c
in enumerate(args.channels):
193 dest_dirs.add(args.dest_directory[idx])
194 src_dest_paths.extend(src_dest)
200 print(f
"{__file__}: DONE", flush=
True)
const char * SFTErrorMessage(int errorcode)
int ValidateSFTFile(const char *fname)
Verify that the contents of a SFT file are valid.
def move_SFT_file(src_path, dest_path, validate)
def move_all_SFT_files(src_dest_paths, validate, processes)
def find_SFT_files(source_directory, channel, dest_directory)
def make_dest_dirs(dest_dirs)
def silence_xlal_error_messages()