-
Notifications
You must be signed in to change notification settings - Fork 189
/
Copy pathlpad_run.py
1572 lines (1353 loc) · 63.1 KB
/
lpad_run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""A runnable script for managing a FireWorks database (a command-line interface to launchpad.py)."""
from __future__ import annotations
import ast
import copy
import datetime
import json
import os
import re
import sys
import time
from argparse import ArgumentParser, ArgumentTypeError, Namespace
from importlib import metadata
from typing import Any, Sequence
from pymongo import ASCENDING, DESCENDING
from ruamel.yaml import YAML
from fireworks import FW_INSTALL_DIR
from fireworks.core.firework import Firework, Workflow
from fireworks.core.fworker import FWorker
from fireworks.core.launchpad import LaunchPad, WFLock
from fireworks.features.fw_report import FWReport
from fireworks.features.introspect import Introspector
from fireworks.fw_config import (
CONFIG_FILE_DIR,
FWORKER_LOC,
LAUNCHPAD_LOC,
MAINTAIN_INTERVAL,
PW_CHECK_NUM,
RESERVATION_EXPIRATION_SECS,
RUN_EXPIRATION_SECS,
WEBSERVER_HOST,
WEBSERVER_PORT,
)
from fireworks.user_objects.firetasks.script_task import ScriptTask
from fireworks.utilities.fw_serializers import DATETIME_HANDLER, recursive_dict
from ._helpers import _validate_config_file_paths
__author__ = "Anubhav Jain"
__credits__ = "Shyue Ping Ong"
__copyright__ = "Copyright 2013, The Materials Project"
__maintainer__ = "Anubhav Jain"
__email__ = "[email protected]"
__date__ = "Feb 7, 2013"
DEFAULT_LPAD_YAML = "my_launchpad.yaml"
def pw_check(ids: list[int], args: Namespace, skip_pw: bool = False) -> list[int]:
if len(ids) > PW_CHECK_NUM and not skip_pw:
m_password = datetime.datetime.now().strftime("%Y-%m-%d")
if not args.password:
if input(f"Are you sure? This will modify {len(ids)} entries. (Y/N)")[0].upper() == "Y":
args.password = datetime.datetime.now().strftime("%Y-%m-%d")
else:
print("Operation aborted by user.")
if args.password != m_password:
raise ValueError(
f"Modifying more than {PW_CHECK_NUM} entries requires setting the --password parameter! "
"(Today's date, e.g. 2012-02-25)"
)
return ids
def parse_helper(lp: LaunchPad, args: Namespace, wf_mode: bool = False, skip_pw: bool = False) -> list[int]:
"""
Helper method to parse args that can take either id, name, state or query.
Args:
args: Namespace of parsed CLI arguments.
wf_mode (bool): If True, will query lp for workflow instead of fireworks IDs.
skip_pw (bool): If True, skip PW check. Defaults to False.
Returns:
list[int]: Firework or Workflow IDs.
"""
if args.fw_id and sum(bool(x) for x in [args.name, args.state, args.query]) >= 1:
raise ValueError("Cannot specify both fw_id and name/state/query)")
query = {}
if args.fw_id:
return pw_check(args.fw_id, args, skip_pw)
if args.query:
query = ast.literal_eval(args.query)
if args.name and "launches_mode" in args and not args.launches_mode:
query["name"] = args.name
if args.state:
query["state"] = args.state
if hasattr(args, "sort") and args.sort:
sort = [(args.sort, ASCENDING)]
elif hasattr(args, "rsort") and args.rsort:
sort = [(args.rsort, DESCENDING)]
else:
sort = None
max = args.max if hasattr(args, "max") else 0
if wf_mode:
return pw_check(lp.get_wf_ids(query, sort=sort, limit=max), args, skip_pw)
return pw_check(lp.get_fw_ids(query, sort=sort, limit=max, launches_mode=args.launches_mode), args, skip_pw)
def get_lp(args: Namespace) -> LaunchPad:
try:
if args.launchpad_file:
lp = LaunchPad.from_file(args.launchpad_file)
else:
args.loglvl = "CRITICAL" if args.silencer else args.loglvl
# no lpad file means we try connect to localhost which is fast so use small timeout
# (default 30s) for quick response to user if no DB is running
mongo_kwds = {"serverSelectionTimeoutMS": 500}
lp = LaunchPad(logdir=args.logdir, strm_lvl=args.loglvl, mongoclient_kwargs=mongo_kwds)
# make sure we can connect to DB, raises pymongo.errors.ServerSelectionTimeoutError if not
lp.connection.admin.command("ping")
return lp
except Exception:
err_message = (
f"FireWorks was not able to connect to MongoDB at {lp.host}:{lp.port}. Is the server running? "
f"The database file specified was {args.launchpad_file}."
)
if not args.launchpad_file:
err_message += (
' Type "lpad init" if you would like to set up a file that specifies '
"location and credentials of your Mongo database (otherwise use default "
"localhost configuration)."
)
# use from None to hide the pymongo ServerSelectionTimeoutError that otherwise clutters up the stack trace
raise ValueError(err_message) from None
def init_yaml(args: Namespace) -> None:
if args.uri_mode:
fields = (
("host", None, "Example: mongodb+srv://USER:[email protected]/fireworks"),
("ssl_ca_file", None, "Path to any client certificate to be used for mongodb connection"),
(
"authsource",
None,
"Database used for authentication, if not connection db. e.g., for MongoDB Atlas this is sometimes "
"'admin'.",
),
)
else:
fields = (
("host", "localhost", "Example: 'localhost' or 'mongodb+srv://CLUSTERNAME.mongodb.net'"),
("port", 27017, ""),
("name", "fireworks", "Database under which to store the fireworks collections"),
("username", None, "Username for MongoDB authentication"),
("password", None, "Password for MongoDB authentication"),
("ssl_ca_file", None, "Path to any client certificate to be used for Mongodb connection"),
(
"authsource",
None,
"Database used for authentication, if not connection db. e.g., for MongoDB Atlas this is sometimes "
"'admin'.",
),
)
doc: dict[str, str | int | bool | None] = {}
if args.uri_mode:
print(
"Note 1: You are in URI format mode. This means that all database parameters (username, password, host, "
"port, database name, etc.) must be present in the URI. See: "
"https://docs.mongodb.com/manual/reference/connection-string/ for details."
)
print("(Enter your connection URI through the 'host' parameter)")
print("Please supply the following configuration values")
print("(press Enter if you want to accept the defaults)\n")
for k, default, helptext in fields:
val = input(f"Enter {k} parameter. ({default=}). {helptext}: ")
doc[k] = val or default
if "port" in doc:
doc["port"] = int(doc["port"]) # enforce the port as an int
if args.uri_mode:
doc["uri_mode"] = True
lp = LaunchPad.from_dict(doc)
lp.to_file(args.config_file)
print(f"\nConfiguration written to {args.config_file}!")
def reset(args: Namespace) -> None:
lp = get_lp(args)
if not args.password:
n_docs = lp.workflows.count_documents({})
answer = input(
f"Are you sure? This will RESET {n_docs} workflows and all data. "
f"To confirm, please type the name of this database ({lp.name}) :"
)
if answer == lp.name:
args.password = datetime.datetime.now().strftime("%Y-%m-%d")
else:
raise ValueError("Incorrect input to confirm database reset, operation aborted.")
lp.reset(args.password)
def add_wf(args: Namespace) -> None:
lp = get_lp(args)
if args.dir:
files = []
for f in args.wf_file:
files.extend([os.path.join(f, i) for i in os.listdir(f)])
else:
files = args.wf_file
for f in files:
fwf = Workflow.from_file(f)
if args.check:
from fireworks.utilities.dagflow import DAGFlow
DAGFlow.from_fireworks(fwf).check()
lp.add_wf(fwf)
def append_wf(args: Namespace) -> None:
lp = get_lp(args)
lp.append_wf(Workflow.from_file(args.wf_file), args.fw_id, detour=args.detour, pull_spec_mods=args.pull_spec_mods)
def dump_wf(args: Namespace) -> None:
lp = get_lp(args)
lp.get_wf_by_fw_id(args.fw_id).to_file(args.wf_file)
def check_wf(args: Namespace) -> None:
from fireworks.utilities.dagflow import DAGFlow
lp = get_lp(args)
DAGFlow.from_fireworks(lp.get_wf_by_fw_id(args.fw_id)).check()
def add_wf_dir(args: Namespace) -> None:
lp = get_lp(args)
for filename in os.listdir(args.wf_dir):
fwf = Workflow.from_file(filename)
lp.add_wf(fwf)
def print_fws(ids, lp, args: Namespace) -> None:
"""Prints results of some FireWorks query to stdout."""
fws = []
if args.display_format == "ids":
fws = ids
elif args.display_format == "count":
fws = [ids]
else:
for id in ids:
fw = lp.get_fw_by_id(id)
d = fw.to_dict()
d["state"] = d.get("state", "WAITING")
if args.display_format == "more" or args.display_format == "less":
if "archived_launches" in d:
del d["archived_launches"]
del d["spec"]
if args.display_format == "less" and "launches" in d:
del d["launches"]
fws.append(d)
if len(fws) == 1:
fws = fws[0]
get_output(args, fws)
def get_fw_ids_helper(lp: LaunchPad, args: Namespace, count_only: bool | None = None) -> list[int] | int:
"""Build fws query from command line options and submit.
Parameters:
lp (fireworks.core.firework.Launchpad)
args (argparse.Namespace)
count_only (bool): if None, then looked up in args.
Returns:
list[int] | int: resulting fw_ids or count of fws in query.
"""
if sum(bool(x) for x in [args.fw_id, args.name, args.state, args.query]) > 1:
raise ValueError("Please specify exactly one of (fw_id, name, state, query)")
if sum(bool(x) for x in [args.fw_id, args.name, args.state, args.query]) == 0:
args.query = "{}"
args.display_format = args.display_format or "ids"
if sum(bool(x) for x in [args.fw_id, args.name, args.qid]) > 1:
raise ValueError("Please specify exactly one of (fw_id, name, qid)")
args.display_format = args.display_format or "more"
if args.fw_id:
query = {"fw_id": {"$in": args.fw_id}}
elif args.name and not args.launches_mode:
query = {"name": args.name}
elif args.state:
query = {"state": args.state}
elif args.query:
query = ast.literal_eval(args.query)
else:
query = None
if args.sort:
sort = [(args.sort, ASCENDING)]
elif args.rsort:
sort = [(args.rsort, DESCENDING)]
else:
sort = None
if count_only is None:
count_only = args.display_format == "count"
if args.qid:
ids = lp.get_fw_ids_from_reservation_id(args.qid)
if query:
query["fw_id"] = {"$in": ids}
ids = lp.get_fw_ids(query, sort, args.max, launches_mode=args.launches_mode)
else:
ids = lp.get_fw_ids(query, sort, args.max, count_only=count_only, launches_mode=args.launches_mode)
return ids
def get_fws_helper(
lp: LaunchPad, ids: list[int], args: Namespace
) -> list[int] | int | list[dict[str, str | int | bool]] | str | bool:
"""Get fws from ids in a representation according to args.display_format."""
fws = []
if args.display_format == "ids":
fws = ids
elif args.display_format == "count":
fws = [ids]
else:
for id in ids:
fw = lp.get_fw_by_id(id)
d = fw.to_dict()
d["state"] = d.get("state", "WAITING")
if args.display_format == "more" or args.display_format == "less":
if "archived_launches" in d:
del d["archived_launches"]
del d["spec"]
if args.display_format == "less" and "launches" in d:
del d["launches"]
fws.append(d)
return fws[0] if len(fws) == 1 else fws
def get_fws(args: Namespace) -> None:
lp = get_lp(args)
ids = get_fw_ids_helper(lp, args)
fws = get_fws_helper(lp, ids, args)
get_output(args, fws)
def get_fws_in_wfs(args: Namespace) -> None:
# get_wfs
lp = get_lp(args)
if sum(bool(x) for x in [args.wf_fw_id, args.wf_name, args.wf_state, args.wf_query]) > 1:
raise ValueError("Please specify exactly one of (fw_id, name, state, query)")
if sum(bool(x) for x in [args.wf_fw_id, args.wf_name, args.wf_state, args.wf_query]) == 0:
args.wf_query = "{}"
if args.wf_fw_id:
wf_query = {"nodes": {"$in": args.wf_fw_id}}
elif args.wf_name:
wf_query = {"name": args.wf_name}
elif args.wf_state:
wf_query = {"state": args.wf_state}
else:
wf_query = ast.literal_eval(args.wf_query)
# get_fws
if sum(bool(x) for x in [args.fw_fw_id, args.fw_name, args.fw_state, args.fw_query]) > 1:
raise ValueError("Please specify exactly one of (fw_id, name, state, query)")
if sum(bool(x) for x in [args.fw_fw_id, args.fw_name, args.fw_state, args.fw_query]) == 0:
args.fw_query = "{}"
args.display_format = args.display_format or "ids"
if sum(bool(x) for x in [args.fw_fw_id, args.fw_name, args.qid]) > 1:
raise ValueError("Please specify exactly one of (fw_id, name, qid)")
args.display_format = args.display_format or "more"
if args.fw_fw_id:
fw_query = {"fw_id": {"$in": args.fw_fw_id}}
elif args.fw_name and not args.launches_mode:
fw_query = {"name": args.fw_name}
elif args.fw_state:
fw_query = {"state": args.fw_state}
elif args.fw_query:
fw_query = ast.literal_eval(args.fw_query)
else:
fw_query = None
if args.sort:
sort = [(args.sort, ASCENDING)]
elif args.rsort:
sort = [(args.rsort, DESCENDING)]
else:
sort = None
if args.qid:
ids = lp.get_fw_ids_from_reservation_id(args.qid)
if fw_query:
fw_query["fw_id"] = {"$in": ids}
ids = lp.get_fw_ids_in_wfs(
wf_query=wf_query, fw_query=fw_query, sort=sort, limit=args.max, launches_mode=args.launches_mode
)
else:
ids = lp.get_fw_ids_in_wfs(
wf_query=wf_query,
fw_query=fw_query,
sort=sort,
limit=args.max,
count_only=args.display_format == "count",
launches_mode=args.launches_mode,
)
print_fws(ids, lp, args)
def update_fws(args: Namespace) -> None:
lp = get_lp(args)
fw_ids = parse_helper(lp, args)
lp.update_spec(fw_ids, json.loads(args.update), args.mongo)
def get_wfs(args: Namespace) -> None:
lp = get_lp(args)
if sum(bool(x) for x in [args.fw_id, args.name, args.state, args.query]) > 1:
raise ValueError("Please specify exactly one of (fw_id, name, state, query)")
if sum(bool(x) for x in [args.fw_id, args.name, args.state, args.query]) == 0:
args.query = "{}"
args.display_format = args.display_format or "ids"
else:
args.display_format = args.display_format or "more"
if args.fw_id:
query = {"nodes": {"$in": args.fw_id}}
elif args.name:
query = {"name": args.name}
elif args.state:
query = {"state": args.state}
else:
query = ast.literal_eval(args.query)
if args.sort:
sort = [(args.sort, ASCENDING)]
elif args.rsort:
sort = [(args.rsort, DESCENDING)]
else:
sort = None
ids = lp.get_wf_ids(query, sort, args.max, count_only=args.display_format == "count")
if args.display_format == "ids":
wfs = ids
elif args.display_format == "count":
wfs = [ids]
else:
wfs = []
for i in ids:
d = lp.get_wf_summary_dict(i, args.display_format)
d["name"] += f"--{int(i)}"
wfs.append(d)
if args.table:
if wfs:
headers = list(wfs[0])
from prettytable import PrettyTable
t = PrettyTable(headers)
for d in wfs:
t.add_row([d.get(k) for k in headers])
print(t)
else:
if len(wfs) == 1:
wfs = wfs[0]
get_output(args, wfs)
def delete_wfs(args: Namespace) -> None:
lp = get_lp(args)
fw_ids = parse_helper(lp, args, wf_mode=True)
for f in fw_ids:
lp.delete_wf(f, delete_launch_dirs=args.delete_launch_dirs)
lp.m_logger.debug(f"Processed fw_id: {f}")
lp.m_logger.info(f"Finished deleting {len(fw_ids)} WFs")
def get_children(links, start, max_depth):
data = {}
for link, child in links.items():
if link == start:
if len(child) > 0:
data[link] = [get_children(links, idx, max_depth) for idx in child]
else:
data[link] = child
return data
def detect_lostruns(args: Namespace) -> None:
lp = get_lp(args)
query = ast.literal_eval(args.query) if args.query else None
launch_query = ast.literal_eval(args.launch_query) if args.launch_query else None
fl, ff, fi = lp.detect_lostruns(
expiration_secs=args.time,
fizzle=args.fizzle,
rerun=args.rerun,
max_runtime=args.max_runtime,
min_runtime=args.min_runtime,
refresh=args.refresh,
query=query,
launch_query=launch_query,
)
lp.m_logger.debug(f"Detected {len(fl)} lost launches: {fl}")
lp.m_logger.info(f"Detected {len(ff)} lost FWs: {ff}")
if args.display_format is not None and args.display_format != "none":
print_fws(ff, lp, args)
lp.m_logger.info(f"Detected {len(fi)} inconsistent FWs: {fi}")
if args.display_format is not None and args.display_format != "none":
print_fws(fi, lp, args)
if len(ff) > 0 and not args.fizzle and not args.rerun:
print("You can fix lost FWs using the --rerun or --fizzle arguments to the detect_lostruns command")
if len(fi) > 0 and not args.refresh:
print("You can fix inconsistent FWs using the --refresh argument to the detect_lostruns command")
def detect_unreserved(args: Namespace) -> None:
lp = get_lp(args)
if args.display_format is not None and args.display_format != "none":
unreserved = lp.detect_unreserved(expiration_secs=args.time, rerun=False)
# very inefficient, replace by mongo aggregation
fw_ids = []
for launch_id in unreserved:
launch = lp.get_launch_by_id(launch_id)
fw_ids.append(launch.fw_id)
print_fws(fw_ids, lp, args)
print(lp.detect_unreserved(expiration_secs=args.time, rerun=args.rerun))
def tuneup(args: Namespace) -> None:
lp = get_lp(args)
lp.tuneup(bkground=not args.full)
def defuse_wfs(args: Namespace) -> None:
lp = get_lp(args)
fw_ids = parse_helper(lp, args, wf_mode=True)
for f in fw_ids:
lp.defuse_wf(f, defuse_all_states=args.defuse_all_states)
lp.m_logger.debug(f"Processed fw_id: {f}")
lp.m_logger.info(f"Finished defusing {len(fw_ids)} FWs.")
if not args.defuse_all_states:
lp.m_logger.info(
"Note: FIZZLED and COMPLETED FWs were not defused. "
"Use the --defuse_all_states option to force this (or rerun FIZZLED FWs first)."
)
def pause_wfs(args: Namespace) -> None:
lp = get_lp(args)
fw_ids = parse_helper(lp, args, wf_mode=True)
for f in fw_ids:
lp.pause_wf(f)
lp.m_logger.debug(f"Processed fw_id: {f}")
lp.m_logger.info(f"Finished defusing {len(fw_ids)} FWs.")
def archive(args: Namespace) -> None:
lp = get_lp(args)
fw_ids = parse_helper(lp, args, wf_mode=True)
for f in fw_ids:
lp.archive_wf(f)
lp.m_logger.debug(f"Processed fw_id: {f}")
lp.m_logger.info(f"Finished archiving {len(fw_ids)} WFs")
def reignite_wfs(args: Namespace) -> None:
lp = get_lp(args)
fw_ids = parse_helper(lp, args, wf_mode=True)
for f in fw_ids:
lp.reignite_wf(f)
lp.m_logger.debug(f"Processed Workflow with fw_id: {f}")
lp.m_logger.info(f"Finished reigniting {len(fw_ids)} Workflows")
def defuse_fws(args: Namespace) -> None:
lp = get_lp(args)
fw_ids = parse_helper(lp, args)
for f in fw_ids:
lp.defuse_fw(f)
lp.m_logger.debug(f"Processed fw_id: {f}")
lp.m_logger.info(f"Finished defusing {len(fw_ids)} FWs")
def pause_fws(args: Namespace) -> None:
lp = get_lp(args)
fw_ids = parse_helper(lp, args)
for f in fw_ids:
lp.pause_fw(f)
lp.m_logger.debug(f"Processed fw_id: {f}")
lp.m_logger.info(f"Finished pausing {len(fw_ids)} FWs")
def reignite_fws(args: Namespace) -> None:
lp = get_lp(args)
fw_ids = parse_helper(lp, args)
for f in fw_ids:
lp.reignite_fw(f)
lp.m_logger.debug(f"Processed fw_id: {f}")
lp.m_logger.info(f"Finished reigniting {len(fw_ids)} FWs")
def resume_fws(args: Namespace) -> None:
lp = get_lp(args)
fw_ids = parse_helper(lp, args)
for f in fw_ids:
lp.resume_fw(f)
lp.m_logger.debug(f"Processed fw_id: {f}")
lp.m_logger.info(f"Finished resuming {len(fw_ids)} FWs")
def rerun_fws(args: Namespace) -> None:
lp = get_lp(args)
fw_ids = parse_helper(lp, args)
if args.task_level:
launch_ids = args.launch_id
if launch_ids is None:
launch_ids = ["last"] * len(fw_ids)
elif len(launch_ids) != len(fw_ids):
raise ValueError("Specify the same number of tasks and launches")
else:
launch_ids = [None] * len(fw_ids)
for fw_id, l_id in zip(fw_ids, launch_ids):
lp.rerun_fw(int(fw_id), recover_launch=l_id, recover_mode=args.recover_mode)
lp.m_logger.debug(f"Processed {fw_id=}")
lp.m_logger.info(f"Finished setting {len(fw_ids)} FWs to rerun")
def refresh(args: Namespace) -> None:
lp = get_lp(args)
fw_ids = parse_helper(lp, args, wf_mode=True)
for f in fw_ids:
wf = lp.get_wf_by_fw_id_lzyfw(f)
for fw_id in wf.root_fw_ids:
lp._refresh_wf(fw_id)
lp.m_logger.debug(f"Processed Workflow with fw_id: {f}")
lp.m_logger.info(f"Finished refreshing {len(fw_ids)} Workflows")
def unlock(args: Namespace) -> None:
lp = get_lp(args)
fw_ids = parse_helper(lp, args, wf_mode=True)
for fw_id in fw_ids:
with WFLock(lp, fw_id, expire_secs=0, kill=True):
lp.m_logger.warning(f"FORCIBLY RELEASING LOCK DUE TO USER COMMAND, WF: {fw_id}")
lp.m_logger.debug(f"Processed Workflow with {fw_id=}")
lp.m_logger.info(f"Finished unlocking {len(fw_ids)} Workflows")
def get_qid(args: Namespace) -> None:
lp = get_lp(args)
for f in args.fw_id:
print(lp.get_reservation_id_from_fw_id(f))
def cancel_qid(args: Namespace) -> None:
lp = get_lp(args)
lp.m_logger.warning(
"WARNING: cancel_qid does not actually remove jobs from the queue "
"(e.g., execute qdel), this must be done manually!"
)
lp.cancel_reservation_by_reservation_id(args.qid)
def set_priority(args: Namespace) -> None:
wf_mode = args.wf
lp = get_lp(args)
fw_ids = parse_helper(lp, args, wf_mode=wf_mode)
if wf_mode:
all_fw_ids = set()
for fw_id in fw_ids:
wf = lp.get_wf_by_fw_id_lzyfw(fw_id)
all_fw_ids.update(wf.id_fw)
fw_ids = list(all_fw_ids)
for f in fw_ids:
lp.set_priority(f, args.priority)
lp.m_logger.debug(f"Processed fw_id {f}")
lp.m_logger.info(f"Finished setting priorities of {len(fw_ids)} FWs")
def _open_webbrowser(url) -> None:
"""Open a web browser after a delay to give the web server more startup time."""
import webbrowser
time.sleep(2)
webbrowser.open(url)
def webgui(args: Namespace) -> None:
from fireworks.flask_site.app import app
app.lp = get_lp(args)
if any([args.webgui_username, args.webgui_password]) and not all([args.webgui_username, args.webgui_password]):
raise ValueError("Must set BOTH a webgui_username and webgui_password!")
app.config["WEBGUI_USERNAME"] = args.webgui_username
app.config["WEBGUI_PASSWORD"] = args.webgui_password
if args.wflowquery:
app.BASE_Q_WF = json.loads(args.wflowquery)
if args.fwquery:
app.BASE_Q = json.loads(args.fwquery)
if "state" in app.BASE_Q:
app.BASE_Q_WF["state"] = app.BASE_Q["state"]
if not args.server_mode:
from threading import Thread
url = f"http://{args.host}:{args.port}"
p1 = Thread(target=_open_webbrowser, args=(url,))
p1.start()
app.run(host=args.host, port=args.port, debug=args.debug)
p1.join()
else:
try:
from fireworks.flask_site.gunicorn import StandaloneApplication
except ImportError:
import sys
sys.exit("Gunicorn is required for server mode. Install using `pip install gunicorn`.")
options = {
"bind": f"{args.host}:{args.port}",
"workers": args.nworkers,
}
StandaloneApplication(app, options).run()
def add_scripts(args: Namespace) -> None:
lp = get_lp(args)
args.names = args.names or [None] * len(args.scripts)
args.wf_name = args.wf_name or args.names[0]
fws = []
links = {}
for idx, s in enumerate(args.scripts):
fws.append(Firework(ScriptTask({"script": s, "use_shell": True}), name=args.names[idx], fw_id=idx))
if idx != 0:
links[idx - 1] = idx
lp.add_wf(Workflow(fws, links, args.wf_name))
def recover_offline(args: Namespace) -> None:
lp = get_lp(args)
fworker_name = FWorker.from_file(args.fworker_file).name if args.fworker_file else None
failed_fws = []
recovered_fws = []
for launch in lp.offline_runs.find({"completed": False, "deprecated": False}, {"launch_id": 1, "fw_id": 1}):
if fworker_name and lp.launches.count_documents({"launch_id": launch["launch_id"], "fworker.name": fworker_name}) == 0:
continue
fw = lp.recover_offline(launch["launch_id"], args.ignore_errors, args.print_errors)
if fw:
failed_fws.append(launch["fw_id"])
else:
recovered_fws.append(launch["fw_id"])
lp.m_logger.info(f"FINISHED recovering offline runs. {len(recovered_fws)} job(s) recovered: {recovered_fws}")
if failed_fws:
lp.m_logger.info(f"FAILED to recover offline fw_ids: {failed_fws}")
def forget_offline(args: Namespace) -> None:
lp = get_lp(args)
fw_ids = parse_helper(lp, args)
for f in fw_ids:
lp.forget_offline(f, launch_mode=False)
lp.m_logger.debug(f"Processed fw_id: {f}")
lp.m_logger.info(f"Finished forget_offline, processed {len(fw_ids)} FWs")
def report(args: Namespace) -> None:
lp = get_lp(args)
query = ast.literal_eval(args.query) if args.query else None
fwr = FWReport(lp)
stats = fwr.get_stats(
coll=args.collection, interval=args.interval, num_intervals=args.num_intervals, additional_query=query
)
title_str = f"Stats on {args.collection}"
title_dec = "-" * len(title_str)
print(title_dec)
print(title_str)
print(title_dec)
print(fwr.get_stats_str(stats))
def introspect(args: Namespace) -> None:
print("NOTE: This feature is in beta mode...")
lp = get_lp(args)
isp = Introspector(lp)
for coll in ["launches", "tasks", "fireworks", "workflows"]:
print(f"generating report for {coll}...please wait...")
print()
table = isp.introspect_fizzled(coll=coll, threshold=args.threshold, limit=args.max)
isp.print_report(table, coll)
print()
def get_launchdir(args: Namespace) -> None:
lp = get_lp(args)
ld = lp.get_launchdir(args.fw_id, args.launch_idx)
print(ld)
def track_fws(args: Namespace) -> None:
lp = get_lp(args)
fw_ids = parse_helper(lp, args, skip_pw=True)
include = args.include
exclude = args.exclude
first_print = True # used to control newline
for fw_id in fw_ids:
data = lp.get_tracker_data(fw_id)
output = []
for dct in data:
for tracker in dct["trackers"]:
if (not include or tracker.filename in include) and (not exclude or tracker.filename not in exclude):
output.extend((f"## Launch id: {dct['launch_id']}", str(tracker)))
if output:
name = lp.fireworks.find_one({"fw_id": fw_id}, {"name": 1})["name"]
output.insert(0, f"# FW id: {fw_id}, FW {name=}")
if first_print:
first_print = False
else:
output.insert(0, ">------<")
print("\n".join(output))
def maintain(args: Namespace) -> None:
lp = get_lp(args)
lp.maintain(args.infinite, args.maintain_interval)
def orphaned(args: Namespace) -> None:
# get_fws
lp = get_lp(args)
fw_ids = get_fw_ids_helper(lp, args, count_only=False)
# get_wfs
orphaned_fw_ids = []
for fw_id in fw_ids:
query = {"nodes": fw_id}
wf_ids = lp.get_wf_ids(query)
if len(wf_ids) == 0:
orphaned_fw_ids.append(fw_id)
fws = get_fws_helper(lp, orphaned_fw_ids, args)
if args.remove:
lp.m_logger.info(f"Found {len(orphaned_fw_ids)} orphaned fw_ids: {orphaned_fw_ids}")
lp.delete_fws(orphaned_fw_ids, delete_launch_dirs=args.delete_launch_dirs)
else:
get_output(args, fws)
def get_output(args: Namespace, objs: list[Any]) -> None:
"""Prints output on stdout"""
if args.output == "json":
json.dump(objs, sys.stdout, default=DATETIME_HANDLER, indent=4)
else:
yaml = YAML(typ="safe", pure=True)
yaml.default_flow_style = False
yaml.dump(recursive_dict(objs, preserve_unicode=False), sys.stdout)
print()
def arg_positive_int(value: str) -> int:
try:
ivalue = int(value)
except ValueError:
raise ArgumentTypeError(f"int(value) conversion failed for {value}")
if ivalue < 1:
raise ValueError(f"{value} is not a positive integer")
return ivalue
def lpad(argv: Sequence[str] | None = None) -> int:
m_description = (
"A command line interface to FireWorks. For more help on a specific command, type 'lpad <command> -h'."
)
parser = ArgumentParser("lpad", description=m_description)
fw_version = metadata.version("fireworks")
v_out = f"%(prog)s v{fw_version} located in {FW_INSTALL_DIR}"
parser.add_argument("-v", "--version", action="version", version=v_out)
parent_parser = ArgumentParser(add_help=False)
parser.add_argument(
"-o",
"--output",
choices=["json", "yaml"],
default="json",
type=lambda s: s.lower(),
help="Set output display format to either json or YAML. "
"YAML is easier to read for long documents. JSON is the default.",
)
subparsers = parser.add_subparsers(help="command", dest="command")
# This makes common argument options easier to maintain. E.g., what if
# there is a new state or disp option?
# NOTE: Those sets of standard options are not used consistently below (jotelha)
fw_id_args = ["-i", "--fw_id"]
fw_id_kwargs = {"type": str, "help": "fw_id"}
state_args = ["-s", "--state"]
state_kwargs = {
"type": lambda s: s.upper(),
"help": "Select by state.",
"choices": Firework.STATE_RANKS,
}
disp_args = ["-d", "--display_format"]
disp_kwargs = {
"type": lambda s: s.lower(),
"help": "Display format.",
"default": "less",
"choices": ["all", "more", "less", "ids", "count", "reservations"],
}
# enhanced display options allow for value 'none' or None (default) for no output
enh_disp_args = copy.deepcopy(disp_args)
enh_disp_kwargs = copy.deepcopy(disp_kwargs)
enh_disp_kwargs["choices"].append("none")
enh_disp_kwargs["default"] = None
query_args = ["-q", "--query"]
query_kwargs = {"help": 'Query (enclose pymongo-style dict in single-quotes, e.g. \'{"state":"COMPLETED"}\')'}
launches_mode_args = ["-lm", "--launches_mode"]
launches_mode_kwargs = {
"action": "store_true",
"help": "Query the launches collection (enclose pymongo-style "
"dict in single-quotes, e.g. '{\"launch_id\": 1}')",
}
qid_args = ["--qid"]
qid_kwargs = {"help": "Query by reservation id of job in queue"}
# for using fw- and wf-specific options on one command line, distinguish by prefix fw and wf
# prefix short one-dash options with 'wf', i.e. '-i' -> '-wfi'
# prefix long two-dash options with 'wf_', i.e. '--fw_id' -> '--wf_fw_id'
wf_prefixed_fw_id_args = [re.sub("^-([^-].*)$", "-wf\\1", s) for s in fw_id_args]
wf_prefixed_fw_id_args = [re.sub("^--(.*)$", "--wf_\\1", s) for s in wf_prefixed_fw_id_args]
wf_prefixed_state_args = [re.sub("^-([^-].*)$", "-wf\\1", s) for s in state_args]
wf_prefixed_state_args = [re.sub("^--(.*)$", "--wf_\\1", s) for s in wf_prefixed_state_args]
wf_prefixed_query_args = [re.sub("^-([^-].*)$", "-wf\\1", s) for s in query_args]
wf_prefixed_query_args = [re.sub("^--(.*)$", "--wf_\\1", s) for s in wf_prefixed_query_args]
# prefix short one-dash options with 'fw', i.e. '-i' -> '-fwi'
# prefix long two-dash options with 'fw_', i.e. '--fw_id' -> '--fw_fw_id'
fw_prefixed_fw_id_args = [re.sub("^-([^-].*)$", "-fw\\1", s) for s in fw_id_args]
fw_prefixed_fw_id_args = [re.sub("^--(.*)$", "--fw_\\1", s) for s in fw_prefixed_fw_id_args]
fw_prefixed_state_args = [re.sub("^-([^-].*)$", "-fw\\1", s) for s in state_args]
fw_prefixed_state_args = [re.sub("^--(.*)$", "--fw_\\1", s) for s in fw_prefixed_state_args]
fw_prefixed_query_args = [re.sub("^-([^-].*)$", "-fw\\1", s) for s in query_args]
fw_prefixed_query_args = [re.sub("^--(.*)$", "--fw_\\1", s) for s in fw_prefixed_query_args]
# filter all long options, i.e. '--fw_id' and strip off preceding '--'
fw_id_options = [
re.sub("^--(.*)$", "\\1", opt)
for opt in [*fw_id_args, *wf_prefixed_fw_id_args, *fw_prefixed_fw_id_args]
if re.match("^--.*$", opt)
]
init_parser = subparsers.add_parser("init", help="Initialize a Fireworks launchpad YAML file.")
init_parser.add_argument(
"-u",
"--uri_mode",
action="store_true",
help="Connect via a URI, see: https://docs.mongodb.com/manual/reference/connection-string/",
)
init_parser.add_argument("--config-file", default=DEFAULT_LPAD_YAML, type=str, help="Filename to write to.")
init_parser.set_defaults(func=init_yaml)
reset_parser = subparsers.add_parser("reset", help="reset and re-initialize the FireWorks database")
reset_parser.add_argument(
"--password",
help="Today's date, e.g. 2012-02-25. "
"Password or positive response to input prompt "
"required to protect against accidental reset.",
)
reset_parser.set_defaults(func=reset)
addwf_parser = subparsers.add_parser("add", help="insert a Workflow from file")
addwf_parser.add_argument(
"-d", "--dir", action="store_true", help="Directory mode. Finds all files in the paths given by wf_file."
)
addwf_parser.add_argument("wf_file", nargs="+", help="Path to a Firework or Workflow file")
addwf_parser.add_argument(
"-c", "--check", help="check the workflow before adding", dest="check", action="store_true"
)