Loading [MathJax]/extensions/TeX/AMSsymbols.js
LALPulsar 7.1.1.1-b246709
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Modules Pages
lalpulsar_MoveSFTs.py
Go to the documentation of this file.
1##python
2# Copyright (C) 2024 Evan Goetz
3#
4# This program is free software; you can redistribute it and/or modify
5# it under the terms of the GNU General Public License as published by
6# the Free Software Foundation; either version 2 of the License, or
7# (at your option) any later version.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU General Public License for more details.
13#
14# You should have received a copy of the GNU General Public License
15# along with with program; see the file COPYING. If not, write to the
16# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17# MA 02110-1301 USA
18
19## \file
20## \ingroup lalpulsar_bin_SFTTools
21"""Move SFTs between directories."""
22
23import argparse
24import os
25import sys
26import shutil
27from contextlib import contextmanager
28from concurrent.futures import ProcessPoolExecutor, as_completed
29from tqdm import tqdm
30
31from lal import LALERRORBIT, LALWARNINGBIT, LALINFOBIT, LALTRACEBIT
32from lal import GetDebugLevel, ClobberDebugLevel
33
34from lalpulsar import git_version
35from lalpulsar import ValidateSFTFile, SFTErrorMessage
36
37__author__ = "Evan Goetz <evan.goetz@ligo.org>"
38__credits__ = "Karl Wette <karl.wette@ligo.org>"
39__version__ = git_version.id
40__date__ = git_version.date
41
42
43@contextmanager
45 saveDebugLevel = GetDebugLevel()
46 silentDebugLevel = saveDebugLevel & ~(
47 LALERRORBIT | LALWARNINGBIT | LALINFOBIT | LALTRACEBIT
48 )
49 ClobberDebugLevel(silentDebugLevel)
50 try:
51 yield None
52 finally:
53 ClobberDebugLevel(saveDebugLevel)
54
55
57 # parse command line
58 parser = argparse.ArgumentParser(description=__doc__)
59 parser.add_argument(
60 "-p", "--processes", type=int, default=1, help="number of moving processes"
61 )
62 parser.add_argument(
63 "-n",
64 "--no-validate",
65 dest="validate",
66 action="store_false",
67 help="do not validate destination SFTs",
68 )
69 parser.add_argument(
70 "-s", "--source-directory", type=str, help="SFT source directory"
71 )
72 parser.add_argument(
73 "-c",
74 "--channels",
75 type=str,
76 nargs="+",
77 help="Channel names (must be the same number as " "--dest-directory arguments)",
78 )
79 parser.add_argument(
80 "-d",
81 "--dest-directory",
82 type=str,
83 nargs="+",
84 help="SFT destination directory (must be the same "
85 "number as --channels arguments)",
86 )
87 args = parser.parse_args()
88
89 # check arguments
90 if args.processes <= 0:
91 parser.error("--processes must be strictly positive")
92 if not os.path.isdir(args.source_directory):
93 parser.error("source_directory is not a directory")
94 if len(args.channels) != len(args.dest_directory):
95 parser.error(
96 "Number of channel arguments must equal number of " "directory arguments"
97 )
98
99 return args
100
101
102def find_SFT_files(source_directory, channel, dest_directory):
103 # find SFT files for a specific channel and return the source-destiation tuple
104 src_dest_paths = []
105
106 # channel format in filename
107 chan = channel.split(":")[1].replace("-", "").replace("_", "")
108
109 # find source SFT files
110 for src_root, _, src_files in os.walk(source_directory):
111 for src_file in src_files:
112 if src_file.endswith(".sft") and chan in src_file:
113 src_path = os.path.join(src_root, src_file)
114 _, src_name = os.path.split(src_path)
115
116 # build SFT destination path
117 dest_path = os.path.join(dest_directory, src_name)
118
119 # add to outputs
120 src_dest_paths.append((src_path, dest_path))
121
122 return src_dest_paths
123
124
125def make_dest_dirs(dest_dirs):
126 # make destination SFT directories
127 print(f"{__file__}: making {len(dest_dirs)} directories ...", flush=True)
128 for dest_dir in dest_dirs:
129 os.makedirs(dest_dir, exist_ok=True)
130 print(f"{__file__}: making {len(dest_dirs)} directories ... done\n", flush=True)
131
132
133def move_SFT_file(src_path, dest_path, validate):
134 # move SFT with a temporary extension
135 tmp_dest_path = dest_path + "_TO_BE_VALIDATED"
136 shutil.move(src_path, tmp_dest_path)
137
138 # validate SFT if requested
139 if validate:
140 with silence_xlal_error_messages() as _:
141 validate_errorcode = ValidateSFTFile(tmp_dest_path)
142 if validate_errorcode != 0:
143 validate_errorstr = SFTErrorMessage(validate_errorcode)
144 return (tmp_dest_path, validate_errorstr)
145
146 # move destination SFT to final location
147 os.rename(tmp_dest_path, dest_path)
148
149 return None
150
151
152def move_all_SFT_files(src_dest_paths, validate, processes):
153 validate_errors = []
154
155 # create executor
156 print(f"{__file__}: copying {len(src_dest_paths)} SFTs ...", flush=True)
157 with ProcessPoolExecutor(max_workers=args.processes) as executor:
158 # submit tasks
159 pool = [
160 executor.submit(move_SFT_file, src_path, dest_path, validate)
161 for src_path, dest_path in src_dest_paths
162 ]
163
164 # collect tasks
165 for task in tqdm(as_completed(pool), total=len(pool)):
166 validate_error = task.result()
167 if validate_error is not None:
168 validate_errors.append(validate_error)
169
170 print("")
171
172 # show any validation errors
173 if validate_errors:
174 print(
175 f"{__file__}: failed to validate {len(validate_errors)} SFTs after copying:",
176 flush=True,
177 )
178 for tmp_dest_path, validate_errorstr in validate_errors:
179 print(f" {tmp_dest_path}\n {validate_errorstr}", flush=True)
180 sys.exit(1)
181
182 print(f"{__file__}: copying {len(src_dest_paths)} SFTs ... done\n", flush=True)
183
184
185if __name__ == "__main__":
187
188 dest_dirs = set()
189 src_dest_paths = []
190 for idx, c in enumerate(args.channels):
191 src_dest = find_SFT_files(args.source_directory, c, args.dest_directory[idx])
192
193 dest_dirs.add(args.dest_directory[idx])
194 src_dest_paths.extend(src_dest)
195
196 make_dest_dirs(dest_dirs)
197
198 move_all_SFT_files(src_dest_paths, args.validate, args.processes)
199
200 print(f"{__file__}: DONE", flush=True)
const char * SFTErrorMessage(int errorcode)
int ValidateSFTFile(const char *fname)
Verify that the contents of a SFT file are valid.
def move_SFT_file(src_path, dest_path, validate)
def move_all_SFT_files(src_dest_paths, validate, processes)
def find_SFT_files(source_directory, channel, dest_directory)
def make_dest_dirs(dest_dirs)