File size: 100,619 Bytes
9c6594c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355
2356
2357
2358
2359
2360
2361
2362
2363
2364
2365
2366
2367
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380
2381
2382
2383
2384
2385
2386
2387
2388
2389
2390
2391
2392
2393
2394
2395
2396
2397
2398
2399
2400
2401
2402
2403
2404
2405
2406
2407
2408
2409
2410
2411
2412
2413
2414
2415
2416
2417
2418
2419
2420
2421
2422
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436
2437
2438
2439
2440
2441
2442
2443
2444
2445
2446
2447
2448
2449
2450
2451
2452
2453
2454
2455
2456
2457
2458
2459
2460
2461
2462
2463
2464
2465
2466
2467
2468
2469
2470
2471
2472
2473
2474
2475
2476
2477
2478
2479
2480
2481
2482
2483
2484
2485
2486
2487
2488
2489
2490
2491
2492
2493
2494
2495
2496
2497
2498
2499
2500
2501
2502
2503
2504
2505
2506
2507
2508
2509
2510
2511
2512
2513
2514
2515
2516
2517
2518
2519
2520
2521
2522
2523
2524
2525
2526
2527
2528
2529
2530
2531
2532
2533
2534
2535
2536
2537
2538
2539
2540
2541
2542
2543
2544
2545
2546
2547
2548
2549
2550
2551
2552
2553
2554
2555
2556
2557
2558
2559
2560
2561
2562
2563
2564
2565
2566
2567
2568
2569
2570
2571
2572
2573
2574
2575
2576
2577
2578
2579
2580
2581
2582
2583
2584
2585
2586
2587
2588
2589
2590
2591
2592
2593
2594
2595
2596
2597
2598
2599
2600
2601
2602
2603
2604
2605
2606
2607
2608
2609
2610
2611
2612
2613
2614
2615
2616
2617
2618
2619
2620
2621
2622
2623
2624
2625
2626
2627
2628
2629
2630
2631
2632
2633
2634
2635
2636
2637
2638
2639
2640
2641
2642
2643
2644
2645
2646
2647
2648
2649
2650
2651
2652
2653
2654
2655
2656
2657
2658
2659
2660
2661
2662
2663
2664
2665
2666
2667
"""Artifact class."""

from __future__ import annotations

import atexit
import concurrent.futures
import contextlib
import json
import logging
import multiprocessing.dummy
import os
import re
import shutil
import stat
import tempfile
import time
from collections import deque
from copy import copy
from dataclasses import dataclass
from datetime import datetime, timedelta
from functools import partial
from itertools import filterfalse
from pathlib import PurePosixPath
from typing import IO, TYPE_CHECKING, Any, Iterator, Literal, Sequence, Type, final
from urllib.parse import quote, urljoin, urlparse

import requests

import wandb
from wandb import data_types, env
from wandb._iterutils import one
from wandb.apis.normalize import normalize_exceptions
from wandb.apis.public import ArtifactCollection, ArtifactFiles, Run
from wandb.apis.public.utils import gql_compat
from wandb.data_types import WBValue
from wandb.errors import CommError
from wandb.errors.term import termerror, termlog, termwarn
from wandb.proto import wandb_internal_pb2 as pb
from wandb.proto.wandb_deprecated import Deprecated
from wandb.sdk import wandb_setup
from wandb.sdk.data_types._dtypes import Type as WBType
from wandb.sdk.data_types._dtypes import TypeRegistry
from wandb.sdk.internal.internal_api import Api as InternalApi
from wandb.sdk.internal.thread_local_settings import _thread_local_api_settings
from wandb.sdk.lib import filesystem, retry, runid, telemetry
from wandb.sdk.lib.deprecate import deprecate
from wandb.sdk.lib.hashutil import B64MD5, b64_to_hex_id, md5_file_b64
from wandb.sdk.lib.paths import FilePathStr, LogicalPath, StrPath, URIStr
from wandb.sdk.lib.runid import generate_id
from wandb.sdk.mailbox import MailboxHandle
from wandb.util import (
    alias_is_version_index,
    artifact_to_json,
    fsync_open,
    json_dumps_safer,
    uri_from_path,
    vendor_setup,
)

from ._generated import (
    ADD_ALIASES_GQL,
    DELETE_ALIASES_GQL,
    FETCH_LINKED_ARTIFACTS_GQL,
    UPDATE_ARTIFACT_GQL,
    ArtifactAliasInput,
    ArtifactCollectionAliasInput,
    FetchLinkedArtifacts,
    TagInput,
    UpdateArtifact,
)
from ._graphql_fragments import _gql_artifact_fragment, omit_artifact_fields
from ._validators import (
    LINKED_ARTIFACT_COLLECTION_TYPE,
    _LinkArtifactFields,
    ensure_logged,
    ensure_not_finalized,
    is_artifact_registry_project,
    validate_aliases,
    validate_artifact_name,
    validate_artifact_type,
    validate_metadata,
    validate_tags,
    validate_ttl_duration_seconds,
)
from .artifact_download_logger import ArtifactDownloadLogger
from .artifact_instance_cache import artifact_instance_cache
from .artifact_manifest import ArtifactManifest
from .artifact_manifest_entry import ArtifactManifestEntry
from .artifact_manifests.artifact_manifest_v1 import ArtifactManifestV1
from .artifact_state import ArtifactState
from .artifact_ttl import ArtifactTTL
from .exceptions import (
    ArtifactNotLoggedError,
    TooFewItemsError,
    TooManyItemsError,
    WaitTimeoutError,
)
from .staging import get_staging_dir
from .storage_handlers.gcs_handler import _GCSIsADirectoryError
from .storage_layout import StorageLayout
from .storage_policies import WANDB_STORAGE_POLICY
from .storage_policy import StoragePolicy

reset_path = vendor_setup()

from wandb_gql import gql  # noqa: E402

reset_path()

if TYPE_CHECKING:
    from wandb.apis.public import RetryingClient

logger = logging.getLogger(__name__)


@final
@dataclass
class _DeferredArtifactManifest:
    """A lightweight wrapper around the manifest URL, used to indicate deferred loading of the actual manifest."""

    url: str


class Artifact:
    """Flexible and lightweight building block for dataset and model versioning.

    Construct an empty W&B Artifact. Populate an artifacts contents with methods that
    begin with `add`. Once the artifact has all the desired files, you can call
    `wandb.log_artifact()` to log it.

    Args:
        name: A human-readable name for the artifact. Use the name to identify
            a specific artifact in the W&B App UI or programmatically. You can
            interactively reference an artifact with the `use_artifact` Public API.
            A name can contain letters, numbers, underscores, hyphens, and dots.
            The name must be unique across a project.
        type: The artifact's type. Use the type of an artifact to both organize
            and differentiate artifacts. You can use any string that contains letters,
            numbers, underscores, hyphens, and dots. Common types include `dataset` or `model`.
            Note: Some types are reserved for internal use and cannot be set by users.
                Such types include `job` and types that start with `wandb-`.
        description: A description of the artifact. For Model or Dataset Artifacts,
            add documentation for your standardized team model or dataset card. View
            an artifact's description programmatically with the `Artifact.description`
            attribute or programmatically with the W&B App UI. W&B renders the
            description as markdown in the W&B App.
        metadata: Additional information about an artifact. Specify metadata as a
            dictionary of key-value pairs. You can specify no more than 100 total keys.
        incremental: Use `Artifact.new_draft()` method instead to modify an
            existing artifact.
        use_as: Deprecated.
        is_link: Boolean indication of if the artifact is a linked artifact(`True`) or source artifact(`False`).

    Returns:
        An `Artifact` object.
    """

    _TMP_DIR = tempfile.TemporaryDirectory("wandb-artifacts")
    atexit.register(_TMP_DIR.cleanup)

    def __init__(
        self,
        name: str,
        type: str,
        description: str | None = None,
        metadata: dict[str, Any] | None = None,
        incremental: bool = False,
        use_as: str | None = None,
    ) -> None:
        if not re.match(r"^[a-zA-Z0-9_\-.]+$", name):
            raise ValueError(
                f"Artifact name may only contain alphanumeric characters, dashes, "
                f"underscores, and dots. Invalid name: {name}"
            )

        from wandb.sdk.artifacts._internal_artifact import InternalArtifact

        if incremental and not isinstance(self, InternalArtifact):
            termwarn("Using experimental arg `incremental`")

        # Internal.
        self._client: RetryingClient | None = None

        storage_policy_cls = StoragePolicy.lookup_by_name(WANDB_STORAGE_POLICY)
        layout = StorageLayout.V1 if env.get_use_v1_artifacts() else StorageLayout.V2
        policy_config = {"storageLayout": layout}
        self._storage_policy = storage_policy_cls.from_config(config=policy_config)

        self._tmp_dir: tempfile.TemporaryDirectory | None = None
        self._added_objs: dict[int, tuple[WBValue, ArtifactManifestEntry]] = {}
        self._added_local_paths: dict[str, ArtifactManifestEntry] = {}
        self._save_handle: MailboxHandle[pb.Result] | None = None
        self._download_roots: set[str] = set()
        # Set by new_draft(), otherwise the latest artifact will be used as the base.
        self._base_id: str | None = None
        # Properties.
        self._id: str | None = None
        self._client_id: str = runid.generate_id(128)
        self._sequence_client_id: str = runid.generate_id(128)
        self._entity: str | None = None
        self._project: str | None = None
        self._name: str = validate_artifact_name(name)  # includes version after saving
        self._version: str | None = None
        self._source_entity: str | None = None
        self._source_project: str | None = None
        self._source_name: str = name  # includes version after saving
        self._source_version: str | None = None
        self._source_artifact: Artifact | None = None
        self._is_link: bool = False
        self._type: str = validate_artifact_type(type, name)
        self._description: str | None = description
        self._metadata: dict[str, Any] = validate_metadata(metadata)
        self._ttl_duration_seconds: int | None = None
        self._ttl_is_inherited: bool = True
        self._ttl_changed: bool = False
        self._aliases: list[str] = []
        self._saved_aliases: list[str] = []
        self._tags: list[str] = []
        self._saved_tags: list[str] = []
        self._distributed_id: str | None = None
        self._incremental: bool = incremental
        if use_as is not None:
            deprecate(
                field_name=Deprecated.artifact__init_use_as,
                warning_message=(
                    "`use_as` argument is deprecated and does not affect the behaviour of `wandb.Artifact()`"
                ),
            )
        self._use_as: str | None = None
        self._state: ArtifactState = ArtifactState.PENDING
        self._manifest: ArtifactManifest | _DeferredArtifactManifest | None = (
            ArtifactManifestV1(self._storage_policy)
        )
        self._commit_hash: str | None = None
        self._file_count: int | None = None
        self._created_at: str | None = None
        self._updated_at: str | None = None
        self._final: bool = False
        self._history_step: int | None = None
        self._linked_artifacts: list[Artifact] = []

        # Cache.
        artifact_instance_cache[self._client_id] = self

    def __repr__(self) -> str:
        return f"<Artifact {self.id or self.name}>"

    @classmethod
    def _from_id(cls, artifact_id: str, client: RetryingClient) -> Artifact | None:
        if (artifact := artifact_instance_cache.get(artifact_id)) is not None:
            return artifact

        query = gql(
            """
            query ArtifactByID($id: ID!) {
                artifact(id: $id) {
                    ...ArtifactFragment
                }
            }
            """
            + _gql_artifact_fragment()
        )
        response = client.execute(
            query,
            variable_values={"id": artifact_id},
        )
        attrs = response.get("artifact")
        if attrs is None:
            return None

        src_collection = attrs["artifactSequence"]
        src_project = src_collection["project"]

        entity_name = src_project["entityName"] if src_project else ""
        project_name = src_project["name"] if src_project else ""

        name = "{}:v{}".format(src_collection["name"], attrs["versionIndex"])
        return cls._from_attrs(entity_name, project_name, name, attrs, client)

    @classmethod
    def _from_name(
        cls,
        *,
        entity: str,
        project: str,
        name: str,
        client: RetryingClient,
        enable_tracking: bool = False,
    ) -> Artifact:
        server_supports_enabling_artifact_usage_tracking = (
            InternalApi().server_project_type_introspection()
        )
        query_vars = ["$entityName: String!", "$projectName: String!", "$name: String!"]
        query_args = ["name: $name"]
        if server_supports_enabling_artifact_usage_tracking:
            query_vars.append("$enableTracking: Boolean")
            query_args.append("enableTracking: $enableTracking")

        vars_str = ", ".join(query_vars)
        args_str = ", ".join(query_args)

        query = gql(
            f"""
            query ArtifactByName({vars_str}) {{
                project(name: $projectName, entityName: $entityName) {{
                    artifact({args_str}) {{
                        ...ArtifactFragment
                    }}
                }}
            }}
            {_gql_artifact_fragment()}
            """
        )
        query_variable_values: dict[str, Any] = {
            "entityName": entity,
            "projectName": project,
            "name": name,
        }
        if server_supports_enabling_artifact_usage_tracking:
            query_variable_values["enableTracking"] = enable_tracking

        response = client.execute(
            query,
            variable_values=query_variable_values,
        )
        project_attrs = response.get("project")
        if not project_attrs:
            raise ValueError(f"project '{project}' not found under entity '{entity}'")
        attrs = project_attrs.get("artifact")
        if not attrs:
            raise ValueError(f"artifact '{name}' not found in '{entity}/{project}'")
        return cls._from_attrs(entity, project, name, attrs, client)

    @classmethod
    def _from_attrs(
        cls,
        entity: str,
        project: str,
        name: str,
        attrs: dict[str, Any],
        client: RetryingClient,
        aliases: list[str] | None = None,
    ) -> Artifact:
        # Placeholder is required to skip validation.
        artifact = cls("placeholder", type="placeholder")
        artifact._client = client
        artifact._entity = entity
        artifact._project = project
        artifact._name = name
        artifact._assign_attrs(attrs, aliases)

        artifact.finalize()

        # Cache.
        assert artifact.id is not None
        artifact_instance_cache[artifact.id] = artifact
        return artifact

    # TODO: Eventually factor out is_link. Have to currently use it since some forms of fetching the artifact
    # doesn't make it clear if the artifact is a link or not and have to manually set it.
    def _assign_attrs(
        self,
        attrs: dict[str, Any],
        aliases: list[str] | None = None,
        is_link: bool | None = None,
    ) -> None:
        """Update this Artifact's attributes using the server response."""
        self._id = attrs["id"]

        src_version = f"v{attrs['versionIndex']}"
        src_collection = attrs["artifactSequence"]
        src_project = src_collection["project"]

        self._source_entity = src_project["entityName"] if src_project else ""
        self._source_project = src_project["name"] if src_project else ""
        self._source_name = f"{src_collection['name']}:{src_version}"
        self._source_version = src_version

        if self._entity is None:
            self._entity = self._source_entity
        if self._project is None:
            self._project = self._source_project

        if self._name is None:
            self._name = self._source_name

        # TODO: Refactor artifact query to fetch artifact via membership instead
        # and get the collection type
        if is_link is None:
            self._is_link = (
                self._entity != self._source_entity
                or self._project != self._source_project
                or self._name != self._source_name
            )
        else:
            self._is_link = is_link

        self._type = attrs["artifactType"]["name"]
        self._description = attrs["description"]

        entity = self._entity
        project = self._project
        collection, *_ = self._name.split(":")

        processed_aliases = []
        # The future of aliases is to move all alias fetches to the membership level
        # so we don't have to do the collection fetches below
        if aliases:
            processed_aliases = aliases
        else:
            processed_aliases = [
                obj["alias"]
                for obj in attrs["aliases"]
                if obj["artifactCollection"]
                and obj["artifactCollection"]["project"]
                and obj["artifactCollection"]["project"]["entityName"] == entity
                and obj["artifactCollection"]["project"]["name"] == project
                and obj["artifactCollection"]["name"] == collection
            ]

        version_aliases = list(filter(alias_is_version_index, processed_aliases))
        other_aliases = list(filterfalse(alias_is_version_index, processed_aliases))

        try:
            version = one(
                version_aliases, too_short=TooFewItemsError, too_long=TooManyItemsError
            )
        except TooFewItemsError:
            version = src_version  # default to the source version
        except TooManyItemsError:
            msg = f"Expected at most one version alias, got {len(version_aliases)}: {version_aliases!r}"
            raise ValueError(msg) from None

        self._version = version

        if ":" not in self._name:
            self._name = f"{self._name}:{version}"

        self._aliases = other_aliases
        self._saved_aliases = copy(other_aliases)

        tags = [obj["name"] for obj in (attrs.get("tags") or [])]
        self._tags = tags
        self._saved_tags = copy(tags)

        metadata_str = attrs["metadata"]
        self._metadata = validate_metadata(
            json.loads(metadata_str) if metadata_str else {}
        )

        self._ttl_duration_seconds = validate_ttl_duration_seconds(
            attrs.get("ttlDurationSeconds")
        )
        self._ttl_is_inherited = (
            True if (attrs.get("ttlIsInherited") is None) else attrs["ttlIsInherited"]
        )

        self._state = ArtifactState(attrs["state"])

        try:
            manifest_url = attrs["currentManifest"]["file"]["directUrl"]
        except (LookupError, TypeError):
            self._manifest = None
        else:
            self._manifest = _DeferredArtifactManifest(manifest_url)

        self._commit_hash = attrs["commitHash"]
        self._file_count = attrs["fileCount"]
        self._created_at = attrs["createdAt"]
        self._updated_at = attrs["updatedAt"]
        self._history_step = attrs.get("historyStep", None)

    @ensure_logged
    def new_draft(self) -> Artifact:
        """Create a new draft artifact with the same content as this committed artifact.

        Modifying an existing artifact creates a new artifact version known
        as an "incremental artifact". The artifact returned can be extended or
        modified and logged as a new version.

        Returns:
            An `Artifact` object.

        Raises:
            ArtifactNotLoggedError: If the artifact is not logged.
        """
        # Name, _entity and _project are set to the *source* name/entity/project:
        # if this artifact is saved it must be saved to the source sequence.
        artifact = Artifact(self.source_name.split(":")[0], self.type)
        artifact._entity = self._source_entity
        artifact._project = self._source_project
        artifact._source_entity = self._source_entity
        artifact._source_project = self._source_project

        # This artifact's parent is the one we are making a draft from.
        artifact._base_id = self.id

        # We can reuse the client, and copy over all the attributes that aren't
        # version-dependent and don't depend on having been logged.
        artifact._client = self._client
        artifact._description = self.description
        artifact._metadata = self.metadata
        artifact._manifest = ArtifactManifest.from_manifest_json(
            self.manifest.to_manifest_json()
        )
        return artifact

    # Properties (Python Class managed attributes).

    @property
    def id(self) -> str | None:
        """The artifact's ID."""
        if self.is_draft():
            return None
        assert self._id is not None
        return self._id

    @property
    @ensure_logged
    def entity(self) -> str:
        """The name of the entity that the artifact collection belongs to.

        If the artifact is a link, the entity will be the entity of the linked artifact.
        """
        assert self._entity is not None
        return self._entity

    @property
    @ensure_logged
    def project(self) -> str:
        """The name of the project that the artifact collection belongs to.

        If the artifact is a link, the project will be the project of the linked artifact.
        """
        assert self._project is not None
        return self._project

    @property
    def name(self) -> str:
        """The artifact name and version of the artifact.

        A string with the format `{collection}:{alias}`. If fetched before an artifact is logged/saved, the name won't contain the alias.
        If the artifact is a link, the name will be the name of the linked artifact.
        """
        return self._name

    @property
    def qualified_name(self) -> str:
        """The entity/project/name of the artifact.

        If the artifact is a link, the qualified name will be the qualified name of the linked artifact path.
        """
        return f"{self.entity}/{self.project}/{self.name}"

    @property
    @ensure_logged
    def version(self) -> str:
        """The artifact's version.

        A string with the format `v{number}`.
        If the artifact is a link artifact, the version will be from the linked collection.
        """
        assert self._version is not None
        return self._version

    @property
    @ensure_logged
    def collection(self) -> ArtifactCollection:
        """The collection this artifact was retrieved from.

        A collection is an ordered group of artifact versions.
        If this artifact was retrieved from a portfolio / linked collection, that
        collection will be returned rather than the collection
        that an artifact version originated from. The collection
        that an artifact originates from is known as the source sequence.
        """
        base_name = self.name.split(":")[0]
        return ArtifactCollection(
            self._client, self.entity, self.project, base_name, self.type
        )

    @property
    @ensure_logged
    def source_entity(self) -> str:
        """The name of the entity of the source artifact."""
        assert self._source_entity is not None
        return self._source_entity

    @property
    @ensure_logged
    def source_project(self) -> str:
        """The name of the project of the source artifact."""
        assert self._source_project is not None
        return self._source_project

    @property
    def source_name(self) -> str:
        """The artifact name and version of the source artifact.

        A string with the format `{source_collection}:{alias}`. Before the artifact is saved,
        contains only the name since the version is not yet known.
        """
        return self._source_name

    @property
    def source_qualified_name(self) -> str:
        """The source_entity/source_project/source_name of the source artifact."""
        return f"{self.source_entity}/{self.source_project}/{self.source_name}"

    @property
    @ensure_logged
    def source_version(self) -> str:
        """The source artifact's version.

        A string with the format `v{number}`.
        """
        assert self._source_version is not None
        return self._source_version

    @property
    @ensure_logged
    def source_collection(self) -> ArtifactCollection:
        """The artifact's source collection.

        The source collection is the collection that the artifact was logged from.
        """
        base_name = self.source_name.split(":")[0]
        return ArtifactCollection(
            self._client, self.source_entity, self.source_project, base_name, self.type
        )

    @property
    def is_link(self) -> bool:
        """Boolean flag indicating if the artifact is a link artifact.

        True: The artifact is a link artifact to a source artifact.
        False: The artifact is a source artifact.
        """
        return self._is_link

    @property
    @ensure_logged
    def linked_artifacts(self) -> list[Artifact]:
        """Returns a list of all the linked artifacts of a source artifact.

        If the artifact is a link artifact (`artifact.is_link == True`), it will return an empty list.
        Limited to 500 results."""
        if not self.is_link:
            self._linked_artifacts = self._fetch_linked_artifacts()
        return self._linked_artifacts

    @property
    @ensure_logged
    def source_artifact(self) -> Artifact:
        """Returns the source artifact. The source artifact is the original logged artifact.

        If the artifact itself is a source artifact (`artifact.is_link == False`), it will return itself."""
        if not self.is_link:
            return self
        if self._source_artifact is None:
            if self._client is None:
                raise ValueError("Client is not initialized")

            try:
                artifact = self._from_name(
                    entity=self.source_entity,
                    project=self.source_project,
                    name=self.source_name,
                    client=self._client,
                )
                self._source_artifact = artifact
            except Exception as e:
                raise ValueError(
                    f"Unable to fetch source artifact for linked artifact {self.name}"
                ) from e
        return self._source_artifact

    @property
    def type(self) -> str:
        """The artifact's type. Common types include `dataset` or `model`."""
        return self._type

    @property
    @ensure_logged
    def url(self) -> str:
        """
        Constructs the URL of the artifact.

        Returns:
            str: The URL of the artifact.
        """
        try:
            base_url = self._client.app_url  # type: ignore[union-attr]
        except AttributeError:
            return ""

        if not self.is_link:
            return self._construct_standard_url(base_url)
        if is_artifact_registry_project(self.project):
            return self._construct_registry_url(base_url)
        if self._type == "model" or self.project == "model-registry":
            return self._construct_model_registry_url(base_url)
        return self._construct_standard_url(base_url)

    def _construct_standard_url(self, base_url: str) -> str:
        if not all(
            [
                base_url,
                self.entity,
                self.project,
                self._type,
                self.collection.name,
                self._version,
            ]
        ):
            return ""
        return urljoin(
            base_url,
            f"{self.entity}/{self.project}/artifacts/{quote(self._type)}/{quote(self.collection.name)}/{self._version}",
        )

    def _construct_registry_url(self, base_url: str) -> str:
        if not all(
            [
                base_url,
                self.entity,
                self.project,
                self.collection.name,
                self._version,
            ]
        ):
            return ""

        try:
            org, *_ = InternalApi()._fetch_orgs_and_org_entities_from_entity(
                self.entity
            )
        except ValueError:
            return ""

        selection_path = quote(
            f"{self.entity}/{self.project}/{self.collection.name}", safe=""
        )
        return urljoin(
            base_url,
            f"orgs/{org.display_name}/registry/{self._type}?selectionPath={selection_path}&view=membership&version={self._version}",
        )

    def _construct_model_registry_url(self, base_url: str) -> str:
        if not all(
            [
                base_url,
                self.entity,
                self.project,
                self.collection.name,
                self._version,
            ]
        ):
            return ""
        selection_path = quote(
            f"{self.entity}/{self.project}/{self.collection.name}", safe=""
        )
        return urljoin(
            base_url,
            f"{self.entity}/registry/model?selectionPath={selection_path}&view=membership&version={self._version}",
        )

    @property
    def description(self) -> str | None:
        """A description of the artifact."""
        return self._description

    @description.setter
    def description(self, description: str | None) -> None:
        """Set the description of the artifact.

        For model or dataset Artifacts, add documentation for your
        standardized team model or dataset card. In the W&B UI the
        description is rendered as markdown.

        Editing the description will apply the changes to the source artifact and all linked artifacts associated with it.

        Args:
            description: Free text that offers a description of the artifact.
        """
        if self.is_link:
            wandb.termwarn(
                "Editing the description of this linked artifact will edit the description for the source artifact and it's linked artifacts as well."
            )
        self._description = description

    @property
    def metadata(self) -> dict:
        """User-defined artifact metadata.

        Structured data associated with the artifact.
        """
        return self._metadata

    @metadata.setter
    def metadata(self, metadata: dict) -> None:
        """User-defined artifact metadata.

        Metadata set this way will eventually be queryable and plottable in the UI; e.g.
        the class distribution of a dataset.

        Note: There is currently a limit of 100 total keys.
        Editing the metadata will apply the changes to the source artifact and all linked artifacts associated with it.

        Args:
            metadata: Structured data associated with the artifact.
        """
        if self.is_link:
            wandb.termwarn(
                "Editing the metadata of this linked artifact will edit the metadata for the source artifact and it's linked artifacts as well."
            )
        self._metadata = validate_metadata(metadata)

    @property
    def ttl(self) -> timedelta | None:
        """The time-to-live (TTL) policy of an artifact.

        Artifacts are deleted shortly after a TTL policy's duration passes.
        If set to `None`, the artifact deactivates TTL policies and will be not
        scheduled for deletion, even if there is a team default TTL.
        An artifact inherits a TTL policy from
        the team default if the team administrator defines a default
        TTL and there is no custom policy set on an artifact.

        Raises:
            ArtifactNotLoggedError: Unable to fetch inherited TTL if the artifact has not been logged or saved
        """
        if self._ttl_is_inherited and (self.is_draft() or self._ttl_changed):
            raise ArtifactNotLoggedError(f"{type(self).__name__}.ttl", self)
        if self._ttl_duration_seconds is None:
            return None
        return timedelta(seconds=self._ttl_duration_seconds)

    @ttl.setter
    def ttl(self, ttl: timedelta | ArtifactTTL | None) -> None:
        """The time-to-live (TTL) policy of an artifact.

        Artifacts are deleted shortly after a TTL policy's duration passes.
        If set to `None`, the artifact has no TTL policy set and it is not
        scheduled for deletion. An artifact inherits a TTL policy from
        the team default if the team administrator defines a default
        TTL and there is no custom policy set on an artifact.

        Args:
            ttl: The duration as a positive Python `datetime.timedelta` Type
                that represents how long the artifact will remain active from its creation.

        """
        if self.type == "wandb-history":
            raise ValueError("Cannot set artifact TTL for type wandb-history")

        if self.is_link:
            raise ValueError(
                "Cannot set TTL for link artifact. "
                "Unlink the artifact first then set the TTL for the source artifact"
            )

        self._ttl_changed = True
        if isinstance(ttl, ArtifactTTL):
            if ttl == ArtifactTTL.INHERIT:
                self._ttl_is_inherited = True
            else:
                raise ValueError(f"Unhandled ArtifactTTL enum {ttl}")
        else:
            self._ttl_is_inherited = False
            if ttl is None:
                self._ttl_duration_seconds = None
            else:
                if ttl.total_seconds() <= 0:
                    raise ValueError(
                        f"Artifact TTL Duration has to be positive. ttl: {ttl.total_seconds()}"
                    )
                self._ttl_duration_seconds = int(ttl.total_seconds())

    @property
    @ensure_logged
    def aliases(self) -> list[str]:
        """List of one or more semantically-friendly references or identifying "nicknames" assigned to an artifact version.

        Aliases are mutable references that you can programmatically reference.
        Change an artifact's alias with the W&B App UI or programmatically.
        See [Create new artifact versions](https://docs.wandb.ai/guides/artifacts/create-a-new-artifact-version)
        for more information.
        """
        return self._aliases

    @aliases.setter
    @ensure_logged
    def aliases(self, aliases: list[str]) -> None:
        """Set the aliases associated with this artifact."""
        self._aliases = validate_aliases(aliases)

    @property
    @ensure_logged
    def tags(self) -> list[str]:
        """List of one or more tags assigned to this artifact version."""
        return self._tags

    @tags.setter
    @ensure_logged
    def tags(self, tags: list[str]) -> None:
        """Set the tags associated with this artifact.

        Editing tags will apply the changes to the source artifact and all linked artifacts associated with it.
        """
        if self.is_link:
            wandb.termwarn(
                "Editing tags will apply the changes to the source artifact and all linked artifacts associated with it."
            )
        self._tags = validate_tags(tags)

    @property
    def distributed_id(self) -> str | None:
        return self._distributed_id

    @distributed_id.setter
    def distributed_id(self, distributed_id: str | None) -> None:
        self._distributed_id = distributed_id

    @property
    def incremental(self) -> bool:
        return self._incremental

    @property
    def use_as(self) -> str | None:
        """Deprecated."""
        deprecate(
            field_name=Deprecated.artifact__use_as,
            warning_message=("The use_as property of Artifact is deprecated."),
        )
        return self._use_as

    @property
    def state(self) -> str:
        """The status of the artifact. One of: "PENDING", "COMMITTED", or "DELETED"."""
        return self._state.value

    @property
    def manifest(self) -> ArtifactManifest:
        """The artifact's manifest.

        The manifest lists all of its contents, and can't be changed once the artifact
        has been logged.
        """
        if isinstance(self._manifest, _DeferredArtifactManifest):
            # A deferred manifest URL flags a deferred download request,
            # so fetch the manifest to override the placeholder object
            self._manifest = self._load_manifest(self._manifest.url)
            return self._manifest

        if self._manifest is None:
            query = gql(
                """
                query ArtifactManifest(
                    $entityName: String!,
                    $projectName: String!,
                    $name: String!
                ) {
                    project(entityName: $entityName, name: $projectName) {
                        artifact(name: $name) {
                            currentManifest {
                                file {
                                    directUrl
                                }
                            }
                        }
                    }
                }
                """
            )
            assert self._client is not None
            response = self._client.execute(
                query,
                variable_values={
                    "entityName": self._entity,
                    "projectName": self._project,
                    "name": self._name,
                },
            )
            attrs = response["project"]["artifact"]
            manifest_url = attrs["currentManifest"]["file"]["directUrl"]
            self._manifest = self._load_manifest(manifest_url)

        return self._manifest

    @property
    def digest(self) -> str:
        """The logical digest of the artifact.

        The digest is the checksum of the artifact's contents. If an artifact has the
        same digest as the current `latest` version, then `log_artifact` is a no-op.
        """
        return self.manifest.digest()

    @property
    def size(self) -> int:
        """The total size of the artifact in bytes.

        Includes any references tracked by this artifact.
        """
        total_size: int = 0
        for entry in self.manifest.entries.values():
            if entry.size is not None:
                total_size += entry.size
        return total_size

    @property
    @ensure_logged
    def commit_hash(self) -> str:
        """The hash returned when this artifact was committed."""
        assert self._commit_hash is not None
        return self._commit_hash

    @property
    @ensure_logged
    def file_count(self) -> int:
        """The number of files (including references)."""
        assert self._file_count is not None
        return self._file_count

    @property
    @ensure_logged
    def created_at(self) -> str:
        """Timestamp when the artifact was created."""
        assert self._created_at is not None
        return self._created_at

    @property
    @ensure_logged
    def updated_at(self) -> str:
        """The time when the artifact was last updated."""
        assert self._created_at is not None
        return self._updated_at or self._created_at

    @property
    @ensure_logged
    def history_step(self) -> int | None:
        """The nearest step at which history metrics were logged for the source run of the artifact.

        Examples:
            ```python
            run = artifact.logged_by()
            if run and (artifact.history_step is not None):
                history = run.sample_history(
                    min_step=artifact.history_step,
                    max_step=artifact.history_step + 1,
                    keys=["my_metric"],
                )
            ```
        """
        if self._history_step is None:
            return None
        return max(0, self._history_step - 1)

    # State management.

    def finalize(self) -> None:
        """Finalize the artifact version.

        You cannot modify an artifact version once it is finalized because the artifact
        is logged as a specific artifact version. Create a new artifact version
        to log more data to an artifact. An artifact is automatically finalized
        when you log the artifact with `log_artifact`.
        """
        self._final = True

    def is_draft(self) -> bool:
        """Check if artifact is not saved.

        Returns: Boolean. `False` if artifact is saved. `True` if artifact is not saved.
        """
        return self._state == ArtifactState.PENDING

    def _is_draft_save_started(self) -> bool:
        return self._save_handle is not None

    def save(
        self,
        project: str | None = None,
        settings: wandb.Settings | None = None,
    ) -> None:
        """Persist any changes made to the artifact.

        If currently in a run, that run will log this artifact. If not currently in a
        run, a run of type "auto" is created to track this artifact.

        Args:
            project: A project to use for the artifact in the case that a run is not
                already in context.
            settings: A settings object to use when initializing an automatic run. Most
                commonly used in testing harness.
        """
        if self._state != ArtifactState.PENDING:
            return self._update()

        if self._incremental:
            with telemetry.context() as tel:
                tel.feature.artifact_incremental = True

        if run := wandb_setup.singleton().most_recent_active_run:
            # TODO: Deprecate and encourage explicit log_artifact().
            run.log_artifact(self)
        else:
            if settings is None:
                settings = wandb.Settings(silent="true")
            with wandb.init(  # type: ignore
                entity=self._source_entity,
                project=project or self._source_project,
                job_type="auto",
                settings=settings,
            ) as run:
                # redoing this here because in this branch we know we didn't
                # have the run at the beginning of the method
                if self._incremental:
                    with telemetry.context(run=run) as tel:
                        tel.feature.artifact_incremental = True
                run.log_artifact(self)

    def _set_save_handle(
        self,
        save_handle: MailboxHandle[pb.Result],
        client: RetryingClient,
    ) -> None:
        self._save_handle = save_handle
        self._client = client

    def wait(self, timeout: int | None = None) -> Artifact:
        """If needed, wait for this artifact to finish logging.

        Args:
            timeout: The time, in seconds, to wait.

        Returns:
            An `Artifact` object.
        """
        if self.is_draft():
            if self._save_handle is None:
                raise ArtifactNotLoggedError(type(self).wait.__qualname__, self)

            try:
                result = self._save_handle.wait_or(timeout=timeout)
            except TimeoutError as e:
                raise WaitTimeoutError(
                    "Artifact upload wait timed out, failed to fetch Artifact response"
                ) from e

            response = result.response.log_artifact_response
            if response.error_message:
                raise ValueError(response.error_message)
            self._populate_after_save(response.artifact_id)
        return self

    def _populate_after_save(self, artifact_id: str) -> None:
        query_template = """
            query ArtifactByIDShort($id: ID!) {
                artifact(id: $id) {
                    ...ArtifactFragment
                }
            }
        """ + _gql_artifact_fragment()

        query = gql(query_template)

        assert self._client is not None
        response = self._client.execute(
            query,
            variable_values={"id": artifact_id},
        )

        try:
            attrs = response["artifact"]
        except LookupError:
            raise ValueError(f"Unable to fetch artifact with id: {artifact_id!r}")
        else:
            # _populate_after_save is only called on source artifacts, not linked artifacts
            # We have to manually set is_link because we aren't fetching the collection the artifact.
            # That requires greater refactoring for commitArtifact to return the artifact collection type.
            self._assign_attrs(attrs, is_link=False)

    @normalize_exceptions
    def _update(self) -> None:
        """Persists artifact changes to the wandb backend."""
        if self._client is None:
            raise RuntimeError("Client not initialized for artifact mutations")

        entity = self.entity
        project = self.project
        collection = self.name.split(":")[0]

        aliases = None
        introspect_query = gql(
            """
            query ProbeServerAddAliasesInput {
               AddAliasesInputInfoType: __type(name: "AddAliasesInput") {
                   name
                   inputFields {
                       name
                   }
                }
            }
            """
        )

        data = self._client.execute(introspect_query)
        if data.get("AddAliasesInputInfoType"):  # wandb backend version >= 0.13.0
            alias_props = {
                "entity_name": entity,
                "project_name": project,
                "artifact_collection_name": collection,
            }
            if aliases_to_add := (set(self.aliases) - set(self._saved_aliases)):
                add_mutation = gql(ADD_ALIASES_GQL)
                add_alias_inputs = [
                    ArtifactCollectionAliasInput(**alias_props, alias=alias)
                    for alias in aliases_to_add
                ]
                try:
                    self._client.execute(
                        add_mutation,
                        variable_values={
                            "artifactID": self.id,
                            "aliases": [a.model_dump() for a in add_alias_inputs],
                        },
                    )
                except CommError as e:
                    raise CommError(
                        "You do not have permission to add"
                        f" {'at least one of the following aliases' if len(aliases_to_add) > 1 else 'the following alias'}"
                        f" to this artifact: {aliases_to_add}"
                    ) from e

            if aliases_to_delete := (set(self._saved_aliases) - set(self.aliases)):
                delete_mutation = gql(DELETE_ALIASES_GQL)
                delete_alias_inputs = [
                    ArtifactCollectionAliasInput(**alias_props, alias=alias)
                    for alias in aliases_to_delete
                ]
                try:
                    self._client.execute(
                        delete_mutation,
                        variable_values={
                            "artifactID": self.id,
                            "aliases": [a.model_dump() for a in delete_alias_inputs],
                        },
                    )
                except CommError as e:
                    raise CommError(
                        f"You do not have permission to delete"
                        f" {'at least one of the following aliases' if len(aliases_to_delete) > 1 else 'the following alias'}"
                        f" from this artifact: {aliases_to_delete}"
                    ) from e

            self._saved_aliases = copy(self.aliases)

        else:  # wandb backend version < 0.13.0
            aliases = [
                ArtifactAliasInput(
                    artifact_collection_name=collection, alias=alias
                ).model_dump()
                for alias in self.aliases
            ]

        omit_fields = omit_artifact_fields(api=InternalApi())
        omit_variables = set()

        if {"ttlIsInherited", "ttlDurationSeconds"} & omit_fields:
            if self._ttl_changed:
                termwarn(
                    "Server not compatible with setting Artifact TTLs, please upgrade the server to use Artifact TTL"
                )

            omit_variables |= {"ttlDurationSeconds"}

        tags_to_add = validate_tags(set(self.tags) - set(self._saved_tags))
        tags_to_del = validate_tags(set(self._saved_tags) - set(self.tags))

        if {"tags"} & omit_fields:
            if tags_to_add or tags_to_del:
                termwarn(
                    "Server not compatible with Artifact tags. "
                    "To use Artifact tags, please upgrade the server to v0.85 or higher."
                )

            omit_variables |= {"tagsToAdd", "tagsToDelete"}

        mutation = gql_compat(
            UPDATE_ARTIFACT_GQL, omit_variables=omit_variables, omit_fields=omit_fields
        )

        gql_vars = {
            "artifactID": self.id,
            "description": self.description,
            "metadata": json_dumps_safer(self.metadata),
            "ttlDurationSeconds": self._ttl_duration_seconds_to_gql(),
            "aliases": aliases,
            "tagsToAdd": [TagInput(tag_name=t).model_dump() for t in tags_to_add],
            "tagsToDelete": [TagInput(tag_name=t).model_dump() for t in tags_to_del],
        }

        data = self._client.execute(mutation, variable_values=gql_vars)

        result = UpdateArtifact.model_validate(data).update_artifact
        if not (result and (artifact := result.artifact)):
            raise ValueError("Unable to parse updateArtifact response")
        self._assign_attrs(artifact.model_dump())

        self._ttl_changed = False  # Reset after updating artifact

    # Adding, removing, getting entries.

    def __getitem__(self, name: str) -> WBValue | None:
        """Get the WBValue object located at the artifact relative `name`.

        Args:
            name: The artifact relative name to get.

        Returns:
            W&B object that can be logged with `wandb.log()` and visualized in the W&B UI.

        Raises:
            ArtifactNotLoggedError: If the artifact isn't logged or the run is offline.
        """
        return self.get(name)

    def __setitem__(self, name: str, item: WBValue) -> ArtifactManifestEntry:
        """Add `item` to the artifact at path `name`.

        Args:
            name: The path within the artifact to add the object.
            item: The object to add.

        Returns:
            The added manifest entry

        Raises:
            ArtifactFinalizedError: You cannot make changes to the current artifact
            version because it is finalized. Log a new artifact version instead.
        """
        return self.add(item, name)

    @contextlib.contextmanager
    @ensure_not_finalized
    def new_file(
        self, name: str, mode: str = "x", encoding: str | None = None
    ) -> Iterator[IO]:
        """Open a new temporary file and add it to the artifact.

        Args:
            name: The name of the new file to add to the artifact.
            mode: The file access mode to use to open the new file.
            encoding: The encoding used to open the new file.

        Returns:
            A new file object that can be written to. Upon closing, the file will be
            automatically added to the artifact.

        Raises:
            ArtifactFinalizedError: You cannot make changes to the current artifact
            version because it is finalized. Log a new artifact version instead.
        """
        overwrite: bool = "x" not in mode

        if self._tmp_dir is None:
            self._tmp_dir = tempfile.TemporaryDirectory()
        path = os.path.join(self._tmp_dir.name, name.lstrip("/"))

        filesystem.mkdir_exists_ok(os.path.dirname(path))
        try:
            with fsync_open(path, mode, encoding) as f:
                yield f
        except FileExistsError:
            raise ValueError(f"File with name {name!r} already exists at {path!r}")
        except UnicodeEncodeError as e:
            termerror(
                f"Failed to open the provided file ({type(e).__name__}: {e}). Please "
                f"provide the proper encoding."
            )
            raise

        self.add_file(
            path, name=name, policy="immutable", skip_cache=True, overwrite=overwrite
        )

    @ensure_not_finalized
    def add_file(
        self,
        local_path: str,
        name: str | None = None,
        is_tmp: bool | None = False,
        skip_cache: bool | None = False,
        policy: Literal["mutable", "immutable"] | None = "mutable",
        overwrite: bool = False,
    ) -> ArtifactManifestEntry:
        """Add a local file to the artifact.

        Args:
            local_path: The path to the file being added.
            name: The path within the artifact to use for the file being added. Defaults
                to the basename of the file.
            is_tmp: If true, then the file is renamed deterministically to avoid
                collisions.
            skip_cache: If `True`, W&B will not copy files to the cache after uploading.
            policy: By default, set to "mutable". If set to "mutable", create a temporary copy of the
                file to prevent corruption during upload. If set to "immutable", disable
                protection and rely on the user not to delete or change the file.
            overwrite: If `True`, overwrite the file if it already exists.

        Returns:
            The added manifest entry.

        Raises:
            ArtifactFinalizedError: You cannot make changes to the current artifact
            version because it is finalized. Log a new artifact version instead.
            ValueError: Policy must be "mutable" or "immutable"
        """
        if not os.path.isfile(local_path):
            raise ValueError(f"Path is not a file: {local_path!r}")

        name = LogicalPath(name or os.path.basename(local_path))
        digest = md5_file_b64(local_path)

        if is_tmp:
            file_path, file_name = os.path.split(name)
            file_name_parts = file_name.split(".")
            file_name_parts[0] = b64_to_hex_id(digest)[:20]
            name = os.path.join(file_path, ".".join(file_name_parts))

        return self._add_local_file(
            name,
            local_path,
            digest=digest,
            skip_cache=skip_cache,
            policy=policy,
            overwrite=overwrite,
        )

    @ensure_not_finalized
    def add_dir(
        self,
        local_path: str,
        name: str | None = None,
        skip_cache: bool | None = False,
        policy: Literal["mutable", "immutable"] | None = "mutable",
        merge: bool = False,
    ) -> None:
        """Add a local directory to the artifact.

        Args:
            local_path: The path of the local directory.
            name: The subdirectory name within an artifact. The name you specify appears
                in the W&B App UI nested by artifact's `type`.
                Defaults to the root of the artifact.
            skip_cache: If set to `True`, W&B will not copy/move files to the cache while uploading
            policy: "mutable" | "immutable". By default, "mutable"
                "mutable": Create a temporary copy of the file to prevent corruption during upload.
                "immutable": Disable protection, rely on the user not to delete or change the file.
            merge: If `False` (default), throws ValueError if a file was already added in a previous add_dir call
                and its content has changed. If `True`, overwrites existing files with changed content.
                Always adds new files and never removes files. To replace an entire directory, pass a name when adding the directory
                using `add_dir(local_path, name=my_prefix)` and call `remove(my_prefix)` to remove the directory, then add it again.

        Raises:
            ArtifactFinalizedError: You cannot make changes to the current artifact
            version because it is finalized. Log a new artifact version instead.
            ValueError: Policy must be "mutable" or "immutable"
        """
        if not os.path.isdir(local_path):
            raise ValueError(f"Path is not a directory: {local_path}")

        termlog(
            "Adding directory to artifact ({})... ".format(
                os.path.join(".", os.path.normpath(local_path))
            ),
            newline=False,
        )
        start_time = time.time()

        paths = []
        for dirpath, _, filenames in os.walk(local_path, followlinks=True):
            for fname in filenames:
                physical_path = os.path.join(dirpath, fname)
                logical_path = os.path.relpath(physical_path, start=local_path)
                if name is not None:
                    logical_path = os.path.join(name, logical_path)
                paths.append((logical_path, physical_path))

        def add_manifest_file(log_phy_path: tuple[str, str]) -> None:
            logical_path, physical_path = log_phy_path
            self._add_local_file(
                name=logical_path,
                path=physical_path,
                skip_cache=skip_cache,
                policy=policy,
                overwrite=merge,
            )

        num_threads = 8
        pool = multiprocessing.dummy.Pool(num_threads)
        pool.map(add_manifest_file, paths)
        pool.close()
        pool.join()

        termlog("Done. %.1fs" % (time.time() - start_time), prefix=False)

    @ensure_not_finalized
    def add_reference(
        self,
        uri: ArtifactManifestEntry | str,
        name: StrPath | None = None,
        checksum: bool = True,
        max_objects: int | None = None,
    ) -> Sequence[ArtifactManifestEntry]:
        """Add a reference denoted by a URI to the artifact.

        Unlike files or directories that you add to an artifact, references are not
        uploaded to W&B. For more information,
        see [Track external files](https://docs.wandb.ai/guides/artifacts/track-external-files).

        By default, the following schemes are supported:

        - http(s): The size and digest of the file will be inferred by the
          `Content-Length` and the `ETag` response headers returned by the server.
        - s3: The checksum and size are pulled from the object metadata. If bucket
          versioning is enabled, then the version ID is also tracked.
        - gs: The checksum and size are pulled from the object metadata. If bucket
          versioning is enabled, then the version ID is also tracked.
        - https, domain matching `*.blob.core.windows.net` (Azure): The checksum and size
          are be pulled from the blob metadata. If storage account versioning is
          enabled, then the version ID is also tracked.
        - file: The checksum and size are pulled from the file system. This scheme
          is useful if you have an NFS share or other externally mounted volume
          containing files you wish to track but not necessarily upload.

        For any other scheme, the digest is just a hash of the URI and the size is left
        blank.

        Args:
            uri: The URI path of the reference to add. The URI path can be an object
                returned from `Artifact.get_entry` to store a reference to another
                artifact's entry.
            name: The path within the artifact to place the contents of this reference.
            checksum: Whether or not to checksum the resource(s) located at the
                reference URI. Checksumming is strongly recommended as it enables
                automatic integrity validation. Disabling checksumming will speed up
                artifact creation but reference directories will not iterated through so the
                objects in the directory will not be saved to the artifact. We recommend
                setting `checksum=False` when adding reference objects, in which case
                a new version will only be created if the reference URI changes.
            max_objects: The maximum number of objects to consider when adding a
                reference that points to directory or bucket store prefix. By default,
                the maximum number of objects allowed for Amazon S3,
                GCS, Azure, and local files is 10,000,000. Other URI schemas do not have a maximum.

        Returns:
            The added manifest entries.

        Raises:
            ArtifactFinalizedError: You cannot make changes to the current artifact
            version because it is finalized. Log a new artifact version instead.
        """
        if name is not None:
            name = LogicalPath(name)

        # This is a bit of a hack, we want to check if the uri is a of the type
        # ArtifactManifestEntry. If so, then recover the reference URL.
        if isinstance(uri, ArtifactManifestEntry):
            uri_str = uri.ref_url()
        elif isinstance(uri, str):
            uri_str = uri
        url = urlparse(str(uri_str))
        if not url.scheme:
            raise ValueError(
                "References must be URIs. To reference a local file, use file://"
            )

        manifest_entries = self._storage_policy.store_reference(
            self,
            URIStr(uri_str),
            name=name,
            checksum=checksum,
            max_objects=max_objects,
        )
        for entry in manifest_entries:
            self.manifest.add_entry(entry)

        return manifest_entries

    @ensure_not_finalized
    def add(
        self, obj: WBValue, name: StrPath, overwrite: bool = False
    ) -> ArtifactManifestEntry:
        """Add wandb.WBValue `obj` to the artifact.

        Args:
            obj: The object to add. Currently support one of Bokeh, JoinedTable,
                PartitionedTable, Table, Classes, ImageMask, BoundingBoxes2D, Audio,
                Image, Video, Html, Object3D
            name: The path within the artifact to add the object.
            overwrite: If True, overwrite existing objects with the same file path (if applicable).

        Returns:
            The added manifest entry

        Raises:
            ArtifactFinalizedError: You cannot make changes to the current artifact
            version because it is finalized. Log a new artifact version instead.
        """
        name = LogicalPath(name)

        # This is a "hack" to automatically rename tables added to
        # the wandb /media/tables directory to their sha-based name.
        # TODO: figure out a more appropriate convention.
        is_tmp_name = name.startswith("media/tables")

        # Validate that the object is one of the correct wandb.Media types
        # TODO: move this to checking subclass of wandb.Media once all are
        # generally supported
        allowed_types = (
            data_types.Bokeh,
            data_types.JoinedTable,
            data_types.PartitionedTable,
            data_types.Table,
            data_types.Classes,
            data_types.ImageMask,
            data_types.BoundingBoxes2D,
            data_types.Audio,
            data_types.Image,
            data_types.Video,
            data_types.Html,
            data_types.Object3D,
            data_types.Molecule,
            data_types._SavedModel,
        )
        if not isinstance(obj, allowed_types):
            raise TypeError(
                f"Found object of type {obj.__class__}, expected one of:"
                f" {allowed_types}"
            )

        obj_id = id(obj)
        if obj_id in self._added_objs:
            return self._added_objs[obj_id][1]

        # If the object is coming from another artifact, save it as a reference
        ref_path = obj._get_artifact_entry_ref_url()
        if ref_path is not None:
            return self.add_reference(ref_path, type(obj).with_suffix(name))[0]

        val = obj.to_json(self)
        name = obj.with_suffix(name)
        entry = self.manifest.get_entry_by_path(name)
        if (not overwrite) and (entry is not None):
            return entry

        if is_tmp_name:
            file_path = os.path.join(self._TMP_DIR.name, str(id(self)), name)
            folder_path, _ = os.path.split(file_path)
            os.makedirs(folder_path, exist_ok=True)
            with open(file_path, "w", encoding="utf-8") as tmp_f:
                json.dump(val, tmp_f, sort_keys=True)
        else:
            filemode = "w" if overwrite else "x"
            with self.new_file(name, mode=filemode, encoding="utf-8") as f:
                json.dump(val, f, sort_keys=True)
                file_path = f.name

        # Note, we add the file from our temp directory.
        # It will be added again later on finalize, but succeed since
        # the checksum should match
        entry = self.add_file(file_path, name, is_tmp_name)
        # We store a reference to the obj so that its id doesn't get reused.
        self._added_objs[obj_id] = (obj, entry)
        if obj._artifact_target is None:
            obj._set_artifact_target(self, entry.path)

        if is_tmp_name:
            with contextlib.suppress(FileNotFoundError):
                os.remove(file_path)

        return entry

    def _add_local_file(
        self,
        name: StrPath,
        path: StrPath,
        digest: B64MD5 | None = None,
        skip_cache: bool | None = False,
        policy: Literal["mutable", "immutable"] | None = "mutable",
        overwrite: bool = False,
    ) -> ArtifactManifestEntry:
        policy = policy or "mutable"
        if policy not in ["mutable", "immutable"]:
            raise ValueError(
                f"Invalid policy {policy!r}. Policy may only be `mutable` or `immutable`."
            )
        upload_path = path
        if policy == "mutable":
            with tempfile.NamedTemporaryFile(dir=get_staging_dir(), delete=False) as f:
                staging_path = f.name
                shutil.copyfile(path, staging_path)
                # Set as read-only to prevent changes to the file during upload process
                os.chmod(staging_path, stat.S_IRUSR)
                upload_path = staging_path

        entry = ArtifactManifestEntry(
            path=name,
            digest=digest or md5_file_b64(upload_path),
            size=os.path.getsize(upload_path),
            local_path=upload_path,
            skip_cache=skip_cache,
        )
        self.manifest.add_entry(entry, overwrite=overwrite)
        self._added_local_paths[os.fspath(path)] = entry
        return entry

    @ensure_not_finalized
    def remove(self, item: StrPath | ArtifactManifestEntry) -> None:
        """Remove an item from the artifact.

        Args:
            item: The item to remove. Can be a specific manifest entry or the name of an
                artifact-relative path. If the item matches a directory all items in
                that directory will be removed.

        Raises:
            ArtifactFinalizedError: You cannot make changes to the current artifact
            version because it is finalized. Log a new artifact version instead.
            FileNotFoundError: If the item isn't found in the artifact.
        """
        if isinstance(item, ArtifactManifestEntry):
            self.manifest.remove_entry(item)
            return

        path = str(PurePosixPath(item))
        entry = self.manifest.get_entry_by_path(path)
        if entry:
            self.manifest.remove_entry(entry)
            return

        entries = self.manifest.get_entries_in_directory(path)
        if not entries:
            raise FileNotFoundError(f"No such file or directory: {path}")
        for entry in entries:
            self.manifest.remove_entry(entry)

    def get_path(self, name: StrPath) -> ArtifactManifestEntry:
        """Deprecated. Use `get_entry(name)`."""
        deprecate(
            field_name=Deprecated.artifact__get_path,
            warning_message="Artifact.get_path(name) is deprecated, use Artifact.get_entry(name) instead.",
        )
        return self.get_entry(name)

    @ensure_logged
    def get_entry(self, name: StrPath) -> ArtifactManifestEntry:
        """Get the entry with the given name.

        Args:
            name: The artifact relative name to get

        Returns:
            A `W&B` object.

        Raises:
            ArtifactNotLoggedError: if the artifact isn't logged or the run is offline.
            KeyError: if the artifact doesn't contain an entry with the given name.
        """
        name = LogicalPath(name)
        entry = self.manifest.entries.get(name) or self._get_obj_entry(name)[0]
        if entry is None:
            raise KeyError(f"Path not contained in artifact: {name}")
        entry._parent_artifact = self
        return entry

    @ensure_logged
    def get(self, name: str) -> WBValue | None:
        """Get the WBValue object located at the artifact relative `name`.

        Args:
            name: The artifact relative name to retrieve.

        Returns:
            W&B object that can be logged with `wandb.log()` and visualized in the W&B UI.

        Raises:
            ArtifactNotLoggedError: if the artifact isn't logged or the run is offline
        """
        entry, wb_class = self._get_obj_entry(name)
        if entry is None or wb_class is None:
            return None

        # If the entry is a reference from another artifact, then get it directly from
        # that artifact.
        referenced_id = entry._referenced_artifact_id()
        if referenced_id:
            assert self._client is not None
            artifact = self._from_id(referenced_id, client=self._client)
            assert artifact is not None
            return artifact.get(uri_from_path(entry.ref))

        # Special case for wandb.Table. This is intended to be a short term
        # optimization. Since tables are likely to download many other assets in
        # artifact(s), we eagerly download the artifact using the parallelized
        # `artifact.download`. In the future, we should refactor the deserialization
        # pattern such that this special case is not needed.
        if wb_class == wandb.Table:
            self.download()

        # Get the ArtifactManifestEntry
        item = self.get_entry(entry.path)
        item_path = item.download()

        # Load the object from the JSON blob
        result = None
        json_obj = {}
        with open(item_path) as file:
            json_obj = json.load(file)
        result = wb_class.from_json(json_obj, self)
        result._set_artifact_source(self, name)
        return result

    def get_added_local_path_name(self, local_path: str) -> str | None:
        """Get the artifact relative name of a file added by a local filesystem path.

        Args:
            local_path: The local path to resolve into an artifact relative name.

        Returns:
            The artifact relative name.
        """
        entry = self._added_local_paths.get(local_path, None)
        if entry is None:
            return None
        return entry.path

    def _get_obj_entry(
        self, name: str
    ) -> tuple[ArtifactManifestEntry, Type[WBValue]] | tuple[None, None]:  # noqa: UP006  # `type` shadows `Artifact.type`
        """Return an object entry by name, handling any type suffixes.

        When objects are added with `.add(obj, name)`, the name is typically changed to
        include the suffix of the object type when serializing to JSON. So we need to be
        able to resolve a name, without tasking the user with appending .THING.json.
        This method returns an entry if it exists by a suffixed name.

        Args:
            name: name used when adding
        """
        for wb_class in WBValue.type_mapping().values():
            wandb_file_name = wb_class.with_suffix(name)
            entry = self.manifest.entries.get(wandb_file_name)
            if entry is not None:
                return entry, wb_class
        return None, None

    # Downloading.

    @ensure_logged
    def download(
        self,
        root: StrPath | None = None,
        allow_missing_references: bool = False,
        skip_cache: bool | None = None,
        path_prefix: StrPath | None = None,
        multipart: bool | None = None,
    ) -> FilePathStr:
        """Download the contents of the artifact to the specified root directory.

        Existing files located within `root` are not modified. Explicitly delete `root`
        before you call `download` if you want the contents of `root` to exactly match
        the artifact.

        Args:
            root: The directory W&B stores the artifact's files.
            allow_missing_references: If set to `True`, any invalid reference paths
                will be ignored while downloading referenced files.
            skip_cache: If set to `True`, the artifact cache will be skipped when
                downloading and W&B will download each file into the default root or
                specified download directory.
            path_prefix: If specified, only files with a path that starts with the given
                prefix will be downloaded. Uses unix format (forward slashes).
            multipart: If set to `None` (default), the artifact will be downloaded
                in parallel using multipart download if individual file size is greater than
                2GB. If set to `True` or `False`, the artifact will be downloaded in
                parallel or serially regardless of the file size.

        Returns:
            The path to the downloaded contents.

        Raises:
            ArtifactNotLoggedError: If the artifact is not logged.
        """
        root = FilePathStr(str(root or self._default_root()))
        self._add_download_root(root)

        # TODO: download artifacts using core when implemented
        # if is_require_core():
        #     return self._download_using_core(
        #         root=root,
        #         allow_missing_references=allow_missing_references,
        #         skip_cache=bool(skip_cache),
        #         path_prefix=path_prefix,
        #     )
        return self._download(
            root=root,
            allow_missing_references=allow_missing_references,
            skip_cache=skip_cache,
            path_prefix=path_prefix,
            multipart=multipart,
        )

    def _download_using_core(
        self,
        root: str,
        allow_missing_references: bool = False,
        skip_cache: bool = False,
        path_prefix: StrPath | None = None,
    ) -> FilePathStr:
        import pathlib

        from wandb.sdk.backend.backend import Backend

        # TODO: Create a special stream instead of relying on an existing run.
        if wandb.run is None:
            wl = wandb_setup.singleton()

            stream_id = generate_id()

            settings = wl.settings.to_proto()
            # TODO: remove this
            tmp_dir = pathlib.Path(tempfile.mkdtemp())

            settings.sync_dir.value = str(tmp_dir)
            settings.sync_file.value = str(tmp_dir / f"{stream_id}.wandb")
            settings.files_dir.value = str(tmp_dir / "files")
            settings.run_id.value = stream_id

            service = wl.ensure_service()
            service.inform_init(settings=settings, run_id=stream_id)

            backend = Backend(settings=wl.settings, service=service)
            backend.ensure_launched()

            assert backend.interface
            backend.interface._stream_id = stream_id  # type: ignore
        else:
            assert wandb.run._backend
            backend = wandb.run._backend

        assert backend.interface
        handle = backend.interface.deliver_download_artifact(
            self.id,  # type: ignore
            root,
            allow_missing_references,
            skip_cache,
            path_prefix,  # type: ignore
        )
        # TODO: Start the download process in the user process too, to handle reference downloads
        self._download(
            root=root,
            allow_missing_references=allow_missing_references,
            skip_cache=skip_cache,
            path_prefix=path_prefix,
        )
        result = handle.wait_or(timeout=None)

        response = result.response.download_artifact_response
        if response.error_message:
            raise ValueError(f"Error downloading artifact: {response.error_message}")

        return FilePathStr(root)

    def _download(
        self,
        root: str,
        allow_missing_references: bool = False,
        skip_cache: bool | None = None,
        path_prefix: StrPath | None = None,
        multipart: bool | None = None,
    ) -> FilePathStr:
        nfiles = len(self.manifest.entries)
        size = sum(e.size or 0 for e in self.manifest.entries.values())
        log = False
        if nfiles > 5000 or size > 50 * 1024 * 1024:
            log = True
            termlog(
                f"Downloading large artifact {self.name}, {size / (1024 * 1024):.2f}MB. {nfiles} files... ",
            )
            start_time = datetime.now()
        download_logger = ArtifactDownloadLogger(nfiles=nfiles)

        def _download_entry(
            entry: ArtifactManifestEntry,
            executor: concurrent.futures.Executor,
            api_key: str | None,
            cookies: dict | None,
            headers: dict | None,
        ) -> None:
            _thread_local_api_settings.api_key = api_key
            _thread_local_api_settings.cookies = cookies
            _thread_local_api_settings.headers = headers

            try:
                entry.download(
                    root,
                    skip_cache=skip_cache,
                    executor=executor,
                    multipart=multipart,
                )
            except FileNotFoundError as e:
                if allow_missing_references:
                    wandb.termwarn(str(e))
                    return
                raise
            except _GCSIsADirectoryError as e:
                logger.debug(str(e))
                return
            download_logger.notify_downloaded()

        with concurrent.futures.ThreadPoolExecutor(64) as executor:
            download_entry = partial(
                _download_entry,
                executor=executor,
                api_key=_thread_local_api_settings.api_key,
                cookies=_thread_local_api_settings.cookies,
                headers=_thread_local_api_settings.headers,
            )
            active_futures = set()
            has_next_page = True
            cursor = None
            while has_next_page:
                fetch_url_batch_size = env.get_artifact_fetch_file_url_batch_size()
                attrs = self._fetch_file_urls(cursor, fetch_url_batch_size)
                has_next_page = attrs["pageInfo"]["hasNextPage"]
                cursor = attrs["pageInfo"]["endCursor"]
                for edge in attrs["edges"]:
                    entry = self.get_entry(edge["node"]["name"])
                    # TODO: uncomment once artifact downloads are supported in core
                    # if require_core and entry.ref is None:
                    #     # Handled by core
                    #     continue
                    entry._download_url = edge["node"]["directUrl"]
                    if (not path_prefix) or entry.path.startswith(str(path_prefix)):
                        active_futures.add(executor.submit(download_entry, entry))
                # Wait for download threads to catch up.
                max_backlog = fetch_url_batch_size
                if len(active_futures) > max_backlog:
                    for future in concurrent.futures.as_completed(active_futures):
                        future.result()  # check for errors
                        active_futures.remove(future)
                        if len(active_futures) <= max_backlog:
                            break
            # Check for errors.
            for future in concurrent.futures.as_completed(active_futures):
                future.result()

        if log:
            now = datetime.now()
            delta = abs((now - start_time).total_seconds())
            hours = int(delta // 3600)
            minutes = int((delta - hours * 3600) // 60)
            seconds = delta - hours * 3600 - minutes * 60
            speed = size / 1024 / 1024 / delta
            termlog(
                f"Done. {hours}:{minutes}:{seconds:.1f} ({speed:.1f}MB/s)",
                prefix=False,
            )
        return FilePathStr(root)

    @retry.retriable(
        retry_timedelta=timedelta(minutes=3),
        retryable_exceptions=(requests.RequestException),
    )
    def _fetch_file_urls(self, cursor: str | None, per_page: int | None = 5000) -> Any:
        if InternalApi()._server_supports(
            pb.ServerFeature.ARTIFACT_COLLECTION_MEMBERSHIP_FILES
        ):
            query = gql(
                """
                query ArtifactCollectionMembershipFileURLs($entityName: String!, $projectName: String!, \
                        $artifactName: String!, $artifactVersionIndex: String!, $cursor: String, $perPage: Int) {
                    project(name: $projectName, entityName: $entityName) {
                        artifactCollection(name: $artifactName) {
                            artifactMembership(aliasName: $artifactVersionIndex) {
                                files(after: $cursor, first: $perPage) {
                                    pageInfo {
                                        hasNextPage
                                        endCursor
                                    }
                                    edges {
                                        node {
                                            name
                                            directUrl
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
                """
            )
            assert self._client is not None
            response = self._client.execute(
                query,
                variable_values={
                    "entityName": self.entity,
                    "projectName": self.project,
                    "artifactName": self.name.split(":")[0],
                    "artifactVersionIndex": self.version,
                    "cursor": cursor,
                    "perPage": per_page,
                },
                timeout=60,
            )
            return response["project"]["artifactCollection"]["artifactMembership"][
                "files"
            ]
        else:
            query = gql(
                """
                query ArtifactFileURLs($id: ID!, $cursor: String, $perPage: Int) {
                    artifact(id: $id) {
                        files(after: $cursor, first: $perPage) {
                            pageInfo {
                                hasNextPage
                                endCursor
                            }
                            edges {
                                node {
                                    name
                                    directUrl
                                }
                            }
                        }
                    }
                }
                """
            )
            assert self._client is not None
            response = self._client.execute(
                query,
                variable_values={"id": self.id, "cursor": cursor, "perPage": per_page},
                timeout=60,
            )
            return response["artifact"]["files"]

    @ensure_logged
    def checkout(self, root: str | None = None) -> str:
        """Replace the specified root directory with the contents of the artifact.

        WARNING: This will delete all files in `root` that are not included in the
        artifact.

        Args:
            root: The directory to replace with this artifact's files.

        Returns:
           The path of the checked out contents.

        Raises:
            ArtifactNotLoggedError: If the artifact is not logged.
        """
        root = root or self._default_root(include_version=False)

        for dirpath, _, files in os.walk(root):
            for file in files:
                full_path = os.path.join(dirpath, file)
                artifact_path = os.path.relpath(full_path, start=root)
                try:
                    self.get_entry(artifact_path)
                except KeyError:
                    # File is not part of the artifact, remove it.
                    os.remove(full_path)

        return self.download(root=root)

    @ensure_logged
    def verify(self, root: str | None = None) -> None:
        """Verify that the contents of an artifact match the manifest.

        All files in the directory are checksummed and the checksums are then
        cross-referenced against the artifact's manifest. References are not verified.

        Args:
            root: The directory to verify. If None artifact will be downloaded to
                './artifacts/self.name/'.

        Raises:
            ArtifactNotLoggedError: If the artifact is not logged.
            ValueError: If the verification fails.
        """
        root = root or self._default_root()

        for dirpath, _, files in os.walk(root):
            for file in files:
                full_path = os.path.join(dirpath, file)
                artifact_path = os.path.relpath(full_path, start=root)
                try:
                    self.get_entry(artifact_path)
                except KeyError:
                    raise ValueError(
                        f"Found file {full_path} which is not a member of artifact {self.name}"
                    )

        ref_count = 0
        for entry in self.manifest.entries.values():
            if entry.ref is None:
                if md5_file_b64(os.path.join(root, entry.path)) != entry.digest:
                    raise ValueError(f"Digest mismatch for file: {entry.path}")
            else:
                ref_count += 1
        if ref_count > 0:
            termwarn(f"skipped verification of {ref_count} refs")

    @ensure_logged
    def file(self, root: str | None = None) -> StrPath:
        """Download a single file artifact to the directory you specify with `root`.

        Args:
            root: The root directory to store the file. Defaults to
                './artifacts/self.name/'.

        Returns:
            The full path of the downloaded file.

        Raises:
            ArtifactNotLoggedError: If the artifact is not logged.
            ValueError: If the artifact contains more than one file.
        """
        if root is None:
            root = os.path.join(".", "artifacts", self.name)

        if len(self.manifest.entries) > 1:
            raise ValueError(
                "This artifact contains more than one file, call `.download()` to get "
                'all files or call .get_entry("filename").download()'
            )

        return self.get_entry(list(self.manifest.entries)[0]).download(root)

    @ensure_logged
    def files(
        self, names: list[str] | None = None, per_page: int = 50
    ) -> ArtifactFiles:
        """Iterate over all files stored in this artifact.

        Args:
            names: The filename paths relative to the root of the artifact you wish to
                list.
            per_page: The number of files to return per request.

        Returns:
            An iterator containing `File` objects.

        Raises:
            ArtifactNotLoggedError: If the artifact is not logged.
        """
        return ArtifactFiles(self._client, self, names, per_page)

    def _default_root(self, include_version: bool = True) -> FilePathStr:
        name = self.source_name if include_version else self.source_name.split(":")[0]
        root = os.path.join(env.get_artifact_dir(), name)
        # In case we're on a system where the artifact dir has a name corresponding to
        # an unexpected filesystem, we'll check for alternate roots. If one exists we'll
        # use that, otherwise we'll fall back to the system-preferred path.
        path = filesystem.check_exists(root) or filesystem.system_preferred_path(root)
        return FilePathStr(str(path))

    def _add_download_root(self, dir_path: str) -> None:
        self._download_roots.add(os.path.abspath(dir_path))

    def _local_path_to_name(self, file_path: str) -> str | None:
        """Convert a local file path to a path entry in the artifact."""
        abs_file_path = os.path.abspath(file_path)
        abs_file_parts = abs_file_path.split(os.sep)
        for i in range(len(abs_file_parts) + 1):
            if os.path.join(os.sep, *abs_file_parts[:i]) in self._download_roots:
                return os.path.join(*abs_file_parts[i:])
        return None

    # Others.

    @ensure_logged
    def delete(self, delete_aliases: bool = False) -> None:
        """Delete an artifact and its files.

        If called on a linked artifact (i.e. a member of a portfolio collection): only the link is deleted, and the
        source artifact is unaffected.

        Use `artifact.unlink()` instead of `artifact.delete()` to remove a link between a source artifact and a linked artifact.

        Args:
            delete_aliases: If set to `True`, deletes all aliases associated with the artifact.
                Otherwise, this raises an exception if the artifact has existing
                aliases.
                This parameter is ignored if the artifact is linked (i.e. a member of a portfolio collection).

        Raises:
            ArtifactNotLoggedError: If the artifact is not logged.
        """
        if self.is_link:
            wandb.termwarn(
                "Deleting a link artifact will only unlink the artifact from the source artifact and not delete the source artifact and the data of the source artifact."
            )
            self._unlink()
        else:
            self._delete(delete_aliases)

    @normalize_exceptions
    def _delete(self, delete_aliases: bool = False) -> None:
        mutation = gql(
            """
            mutation DeleteArtifact($artifactID: ID!, $deleteAliases: Boolean) {
                deleteArtifact(input: {
                    artifactID: $artifactID
                    deleteAliases: $deleteAliases
                }) {
                    artifact {
                        id
                    }
                }
            }
            """
        )
        assert self._client is not None
        self._client.execute(
            mutation,
            variable_values={
                "artifactID": self.id,
                "deleteAliases": delete_aliases,
            },
        )

    @normalize_exceptions
    def link(
        self, target_path: str, aliases: list[str] | None = None
    ) -> Artifact | None:
        """Link this artifact to a portfolio (a promoted collection of artifacts).

        Args:
            target_path: The path to the portfolio inside a project.
                The target path must adhere to one of the following
                schemas `{portfolio}`, `{project}/{portfolio}` or
                `{entity}/{project}/{portfolio}`.
                To link the artifact to the Model Registry, rather than to a generic
                portfolio inside a project, set `target_path` to the following
                schema `{"model-registry"}/{Registered Model Name}` or
                `{entity}/{"model-registry"}/{Registered Model Name}`.
            aliases: A list of strings that uniquely identifies the artifact inside the
                specified portfolio.

        Raises:
            ArtifactNotLoggedError: If the artifact is not logged.

        Returns:
            The linked artifact if linking was successful, otherwise None.
        """
        if self.is_link:
            wandb.termwarn(
                "Linking to a link artifact will result in directly linking to the source artifact of that link artifact."
            )

        if run := wandb_setup.singleton().most_recent_active_run:
            # TODO: Deprecate and encourage explicit link_artifact().
            return run.link_artifact(self, target_path, aliases)

        else:
            with wandb.init(
                entity=self._source_entity,
                project=self._source_project,
                job_type="auto",
                settings=wandb.Settings(silent="true"),
            ) as run:
                return run.link_artifact(self, target_path, aliases)

        return None

    @ensure_logged
    def unlink(self) -> None:
        """Unlink this artifact if it is currently a member of a portfolio (a promoted collection of artifacts).

        Raises:
            ArtifactNotLoggedError: If the artifact is not logged.
            ValueError: If the artifact is not linked, i.e. it is not a member of a portfolio collection.
        """
        # Fail early if this isn't a linked artifact to begin with
        if not self.is_link:
            raise ValueError(
                f"Artifact {self.qualified_name!r} is not a linked artifact and cannot be unlinked.  "
                f"To delete it, use {self.delete.__qualname__!r} instead."
            )

        self._unlink()

    @normalize_exceptions
    def _unlink(self) -> None:
        mutation = gql(
            """
            mutation UnlinkArtifact($artifactID: ID!, $artifactPortfolioID: ID!) {
                unlinkArtifact(
                    input: { artifactID: $artifactID, artifactPortfolioID: $artifactPortfolioID }
                ) {
                    artifactID
                    success
                    clientMutationId
                }
            }
            """
        )
        assert self._client is not None
        try:
            self._client.execute(
                mutation,
                variable_values={
                    "artifactID": self.id,
                    "artifactPortfolioID": self.collection.id,
                },
            )
        except CommError as e:
            raise CommError(
                f"You do not have permission to unlink the artifact {self.qualified_name}"
            ) from e

    @ensure_logged
    def used_by(self) -> list[Run]:
        """Get a list of the runs that have used this artifact and its linked artifacts.

        Returns:
            A list of `Run` objects.

        Raises:
            ArtifactNotLoggedError: If the artifact is not logged.
        """
        query = gql(
            """
            query ArtifactUsedBy(
                $id: ID!,
            ) {
                artifact(id: $id) {
                    usedBy {
                        edges {
                            node {
                                name
                                project {
                                    name
                                    entityName
                                }
                            }
                        }
                    }
                }
            }
            """
        )
        assert self._client is not None
        response = self._client.execute(
            query,
            variable_values={"id": self.id},
        )
        return [
            Run(
                self._client,
                edge["node"]["project"]["entityName"],
                edge["node"]["project"]["name"],
                edge["node"]["name"],
            )
            for edge in response.get("artifact", {}).get("usedBy", {}).get("edges", [])
        ]

    @ensure_logged
    def logged_by(self) -> Run | None:
        """Get the W&B run that originally logged the artifact.

        Returns:
            The name of the W&B run that originally logged the artifact.

        Raises:
            ArtifactNotLoggedError: If the artifact is not logged.
        """
        query = gql(
            """
            query ArtifactCreatedBy(
                $id: ID!
            ) {
                artifact(id: $id) {
                    createdBy {
                        ... on Run {
                            name
                            project {
                                name
                                entityName
                            }
                        }
                    }
                }
            }
        """
        )
        assert self._client is not None
        response = self._client.execute(
            query,
            variable_values={"id": self.id},
        )
        creator = response.get("artifact", {}).get("createdBy", {})
        if creator.get("name") is None:
            return None
        return Run(
            self._client,
            creator["project"]["entityName"],
            creator["project"]["name"],
            creator["name"],
        )

    @ensure_logged
    def json_encode(self) -> dict[str, Any]:
        """Returns the artifact encoded to the JSON format.

        Returns:
            A `dict` with `string` keys representing attributes of the artifact.
        """
        return artifact_to_json(self)

    @staticmethod
    def _expected_type(
        entity_name: str, project_name: str, name: str, client: RetryingClient
    ) -> str | None:
        """Returns the expected type for a given artifact name and project."""
        query = gql(
            """
            query ArtifactType(
                $entityName: String,
                $projectName: String,
                $name: String!
            ) {
                project(name: $projectName, entityName: $entityName) {
                    artifact(name: $name) {
                        artifactType {
                            name
                        }
                    }
                }
            }
            """
        )
        if ":" not in name:
            name += ":latest"
        response = client.execute(
            query,
            variable_values={
                "entityName": entity_name,
                "projectName": project_name,
                "name": name,
            },
        )
        return (
            ((response.get("project") or {}).get("artifact") or {}).get("artifactType")
            or {}
        ).get("name")

    def _load_manifest(self, url: str) -> ArtifactManifest:
        with requests.get(url) as response:
            response.raise_for_status()
            return ArtifactManifest.from_manifest_json(response.json())

    def _ttl_duration_seconds_to_gql(self) -> int | None:
        # Set artifact ttl value to ttl_duration_seconds if the user set a value
        # otherwise use ttl_status to indicate the backend INHERIT(-1) or DISABLED(-2) when the TTL is None
        # When ttl_change = None its a no op since nothing changed
        INHERIT = -1  # noqa: N806
        DISABLED = -2  # noqa: N806

        if not self._ttl_changed:
            return None
        if self._ttl_is_inherited:
            return INHERIT
        return self._ttl_duration_seconds or DISABLED

    def _fetch_linked_artifacts(self) -> list[Artifact]:
        """Fetches all linked artifacts from the server."""
        if self.id is None:
            raise ValueError(
                "Unable to find any artifact memberships for artifact without an ID"
            )
        if self._client is None:
            raise ValueError("Client is not initialized")
        response = self._client.execute(
            gql_compat(FETCH_LINKED_ARTIFACTS_GQL),
            variable_values={"artifactID": self.id},
        )
        result = FetchLinkedArtifacts.model_validate(response)

        if not (
            (artifact := result.artifact)
            and (memberships := artifact.artifact_memberships)
            and (membership_edges := memberships.edges)
        ):
            raise ValueError("Unable to find any artifact memberships for artifact")

        linked_artifacts: deque[Artifact] = deque()
        linked_nodes = (
            node
            for edge in membership_edges
            if (
                (node := edge.node)
                and (col := node.artifact_collection)
                and (col.typename__ == LINKED_ARTIFACT_COLLECTION_TYPE)
            )
        )
        for node in linked_nodes:
            # Trick for O(1) membership check that maintains order
            alias_names = dict.fromkeys(a.alias for a in node.aliases)
            version = f"v{node.version_index}"
            aliases = (
                [*alias_names, version]
                if version not in alias_names
                else [*alias_names]
            )

            if not (
                node
                and (col := node.artifact_collection)
                and (proj := col.project)
                and (proj.entity_name and proj.name)
            ):
                raise ValueError("Unable to fetch fields for linked artifact")

            link_fields = _LinkArtifactFields(
                entity_name=proj.entity_name,
                project_name=proj.name,
                name=f"{col.name}:{version}",
                version=version,
                aliases=aliases,
            )
            link = self._create_linked_artifact_using_source_artifact(link_fields)
            linked_artifacts.append(link)
        return list(linked_artifacts)

    def _create_linked_artifact_using_source_artifact(
        self,
        link_fields: _LinkArtifactFields,
    ) -> Artifact:
        """Copies the source artifact to a linked artifact."""
        linked_artifact = copy(self)
        linked_artifact._version = link_fields.version
        linked_artifact._aliases = link_fields.aliases
        linked_artifact._saved_aliases = copy(link_fields.aliases)
        linked_artifact._name = link_fields.name
        linked_artifact._entity = link_fields.entity_name
        linked_artifact._project = link_fields.project_name
        linked_artifact._is_link = link_fields.is_link
        linked_artifact._linked_artifacts = link_fields.linked_artifacts
        return linked_artifact


class _ArtifactVersionType(WBType):
    name = "artifactVersion"
    types = [Artifact]


TypeRegistry.add(_ArtifactVersionType)