Browse Source

Adds python tooling for running filval based analyses in parallel.

Caleb Fangmeier 7 years ago
parent
commit
1ac29f6026
3 changed files with 140 additions and 1 deletions
  1. 1 1
      .flake8
  2. 76 0
      filval_merge.py
  3. 63 0
      process_parallel.py

+ 1 - 1
.flake8

@@ -1,4 +1,4 @@
 
 [flake8]
-ignore = E248, E241, E226
+ignore = E248, E241, E226, E402, E701
 max_line_length=120

+ 76 - 0
filval_merge.py

@@ -0,0 +1,76 @@
+#!/usr/bin/env python3
+import argparse
+import re
+import os
+import ROOT
+
+
+def merge_stl_obj(obj_key, output_file, input1, input_rest, merge_func=None):
+    """ Merges STL objects and saves the result into the output file, user
+        must supply the merging function.
+    """
+    obj = input1.Get(obj_key)
+    type_name_raw = str(type(obj))
+    try:
+        type_name = re.findall("<class 'ROOT.([^']+)'>", type_name_raw)[0]
+    except IndexError:
+        raise ValueError(f"Couldn't extract stl type name from {type_name_raw}")
+    if merge_func is not None:
+        for input_file in input_rest:
+            obj_ = input_file.Get(obj_key)
+            merge_func(obj, obj_)
+    output_file.WriteObjectAny(obj, type_name, obj_key)
+
+
+def merge_obj(obj_key, output_file, input1, input_rest):
+    obj = input1.Get(obj_key)
+    print('='*80)
+    print(f'Merging object {obj_key} of type {type(obj)}')
+    if isinstance(obj, ROOT.TH1):
+        obj.SetDirectory(output_file)  # detach from input file
+        for input_file in input_rest:
+            obj_ = input_file.Get(obj_key)
+            obj.Add(obj_)
+        obj.Write()
+    else:
+        print(f"I don't know how to merge object of type{type(obj)}, but "
+              "you can add a case in merge_obj to handle it!")
+
+
+def merge_files(input_filenames, output_filename, preserve=False):
+    print(f"Merging files {', '.join(input_filenames)} into {output_filename}")
+
+    input1, *input_rest = [ROOT.TFile.Open(input_file, "READ") for input_file in input_filenames]
+    output_file = ROOT.TFile.Open(output_filename, "RECREATE")
+    output_file.cd()
+
+    obj_keys = [k.GetName() for k in input1.GetListOfKeys()]
+    for obj_key in obj_keys:
+        if obj_key in {"_value_lookup", "_function_impl_lookup"}:
+            merge_stl_obj(obj_key, output_file, input1, [])
+        else:
+            merge_obj(obj_key, output_file, input1, input_rest)
+
+    for file_ in [input1, *input_rest, output_file]:
+        file_.Close()
+    print(f"Merge finished! Results have been saved into {output_filename}")
+    if preserve:
+        print("Preseve specified, leaving input files intact")
+    else:
+        print("Removing input files...", end='', flush=True)
+        for filename in input_filenames:
+            os.remove(filename)
+        print("Done!")
+
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser()
+
+    add = parser.add_argument
+
+    add('output_file')
+    add('input_files', nargs='+')
+    add('--preserve', '-p', action='store_true')
+
+    args = parser.parse_args()
+    merge_files(args.input_files, args.output_file, args.preserve)

+ 63 - 0
process_parallel.py

@@ -0,0 +1,63 @@
+#!/usr/bin/env python3
+from os import listdir
+from os.path import join, isdir
+import argparse
+import subprocess
+
+import multiprocessing
+from multiprocessing.pool import Pool
+
+from filval_merge import merge_files
+
+
+def run_job(job_number, executable, files):
+    file_list = f'file_list_{job_number:02d}.txt'
+    with open(file_list, 'w') as f:
+        f.write("\n".join(files))
+
+    output_filename = f'output_{job_number:02d}.root'
+    ret = subprocess.run([executable, '-s', '-F', file_list,
+                          '-o', output_filename])
+    retcode = ret.returncode
+    if retcode != 0:
+        raise RuntimeError(f'Job {job_number} encountered errors!'
+                           f'(retcode: {retcode}), check log file.')
+    return (job_number, output_filename)
+
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser()
+    add = parser.add_argument
+
+    add('executable')
+    add('--jobs', '-j', type=int, default=multiprocessing.cpu_count())
+    add('--dir', '-d', default='./data')
+    add('--mergeinto', '-m', default='output.root')
+    args = parser.parse_args()
+
+    if not isdir(args.dir):
+        raise ValueError(f'Directory {args.dir} does not exist')
+
+    files = sorted([join(args.dir, fname) for fname in listdir(args.dir) if fname[-5:] == '.root'])
+
+    files_per_job = len(files) // args.jobs
+    job_files = [files[i::args.jobs] for i in range(args.jobs)]
+    output_files = []
+
+    def job_callback(args):
+        job_num, job_file = args
+        print(f'job {job_num} finished')
+        output_files.append(job_file)
+
+    with Pool(args.jobs) as pool:
+        print(f'Starting {args.jobs} processes to process {len(files)} files')
+        results = []
+        for i, files in enumerate(job_files):
+            results.append(pool.apply_async(run_job, (i, args.executable, files),
+                           callback=job_callback))
+        for result in results: result.get()
+        pool.close()
+        pool.join()
+        print('Finished processing nTuples.')
+    print('Begin merging job files')
+    merge_files(output_files, args.mergeinto)