open Core open Bistro_engine type stats = { elapsed_time : float ; } let create () = object (self) val started = String.Table.create () val table = String.Table.create () method event config t = function | Scheduler.Task_started (task, _) -> String.Table.set started (Task.id task) t | Task_ended (Step_result { step }) -> let id = step.id in let start_time = String.Table.find_exn started id in String.Table.set table id { elapsed_time = t -. start_time ; } | Task_ended (Input_check _ | Select_check _ | Map_command_result _) | Init _ | Task_ready _ | Task_skipped _ -> () method elapsed_time : 'a. 'a Bistro.workflow -> float = fun w -> (String.Table.find_exn table (Bistro.Workflow.id w)).elapsed_time method elapsed_time_of_result : Convergence_detection.result -> string * float option = let f w = let id = Bistro.Workflow.id w in id, Option.map (String.Table.find table (Bistro.Workflow.id w)) ~f:(fun x -> x.elapsed_time) in function | `Pcoc w | `Pcoc_gamma w | `Pcoc_C60 w -> f w | `Diffsel w -> f w | `Identical_LG w | `Identical_WAG w -> f w | `Topological_LG w | `Topological_WAG w -> f w | `Tdg09 w -> f w | `Multinomial w -> f w | `Msd (w, _) -> f w method stop = () method wait4shutdown = Lwt.return () method logger = (self :> Scheduler.logger) method report detection_results fn = List.map detection_results ~f:(fun (dataset_res : Convergence_detection.dataset_res) -> let tree_name = dataset_res.tree_prefix in let model_name = dataset_res.model_prefix in List.map dataset_res.res_by_tools ~f:(fun r -> let meth_name = Convergence_detection.meth_string_of_result r in let id, elapsed_time = self#elapsed_time_of_result r in String.concat ~sep:"\t" [ tree_name ; model_name ; meth_name ; id ; Option.value_map ~f:Float.to_string ~default:"NA" elapsed_time ] ) ) |> List.concat |> Out_channel.write_lines fn end