Skip to content

cloudmesh-ai-hpc API Reference

This page provides the API reference for the cloudmesh-ai-hpc library.

API Documentation

Attributes

console module-attribute

console = Console()

Classes

Hpc

Bases: SSHBase

Source code in src/cloudmesh/ai/hpc.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
class Hpc(SSHBase):
    def __init__(self, host: str = "uva", debug: bool = False) -> None:
        """
        Initialize the Hpc class.
        """
        super().__init__(debug=debug)
        self.host = host

        # 1. Load base partitions from the package
        try:
            path = os.path.join(os.path.dirname(__file__), "partitions.yaml")
            data = load_yaml(path)
            self.ai_config: Dict[str, Any] = data.get("cloudmesh", {}).get("ai", {})
            self.directive: Dict[str, Any] = self.ai_config.get("partition", {})
        except (FileNotFoundError, RuntimeError) as e:
            console.error(f"Failed to load base partitions.yaml: {e}")
            self.ai_config = {}
            self.directive = {}

        # 2. Load local overrides from ~/.cloudmesh/hpc.yaml
        local_path = os.path.expanduser("~/.cloudmesh/hpc.yaml")
        if os.path.exists(local_path):
            try:
                local_data = load_yaml(local_path)
                # Local data can be the full structure or just the 'ai' part
                ai_overrides = local_data.get("cloudmesh", {}).get("ai", local_data)

                # Merge global ai_config (e.g., default host/partition)
                self.ai_config.update(ai_overrides)

                # Merge partitions
                local_partitions = ai_overrides.get("partition", {})
                for h, p in local_partitions.items():
                    if h not in self.directive:
                        self.directive[h] = {}
                    self.directive[h].update(p)

                # Update the directive reference in ai_config to match the merged one
                self.ai_config["partition"] = self.directive

            except Exception as e:
                console.error(f"Failed to load local config {local_path}: {e}")

    def _suggest_match(self, word: str, possibilities: List[str]) -> Optional[str]:
        """Suggest the closest match from a list of possibilities."""
        matches = difflib.get_close_matches(word, possibilities, n=1, cutoff=0.6)
        return matches[0] if matches else None

    def parse_sbatch_parameter(self, parameters: str) -> Dict[str, str]:
        """
        Parse the parameters string and convert it to a dictionary.
        Expected format: "key1:val1,key2:val2"
        Supports aliases defined in local config.
        Raises ValueError if the format is invalid.
        """
        # Common valid Slurm parameters for basic validation
        valid_params = {
            "nodes", "ntasks", "cpus-per-task", "time", "partition", 
            "mem", "gres", "job-name", "output", "error", "mail-type", "mail-user"
        }

        result = {}
        if not parameters:
            return result

        # Handle aliases from local config
        aliases = self.ai_config.get("aliases", {})

        data = parameters.split(",")
        for line in data:
            line = line.strip()
            if not line:
                continue

            # Check if the line is an alias
            if line in aliases:
                alias_val = aliases[line]
                # Recursively parse the alias value
                result.update(self.parse_sbatch_parameter(alias_val))
                continue

            if ":" not in line:
                raise ValueError(f"Invalid sbatch parameter format: '{line}'. Expected 'key:value' or a defined alias.")

            key, value = line.split(":", 1)
            key = key.strip()
            value = value.strip()

            if not key or not value:
                raise ValueError(f"Invalid sbatch parameter: '{line}'. Both key and value must be provided.")

            if key not in valid_params:
                console.warn(f"Unknown sbatch parameter: '{key}'. It may be ignored by Slurm.")

            result[key] = value
        return result

    def create_slurm_directives(self, host: Optional[str] = None, key: Optional[str] = None) -> str:
        """Create Slurm directives based on the provided host and key."""
        host = host or self.host
        try:
            directives = self.directive[host][key]
        except KeyError:
            suggestion = self._suggest_match(key, list(self.directive.get(host, {}).keys()))
            msg = f"In directive searching for:\n  host {host}\n  key {key}\nNot found"
            if suggestion:
                msg += f"\nDid you mean '{suggestion}'?"
            raise ValueError(msg)

        block = ""
        for k, v in directives.items():
            block += f"#SBATCH --{k}={v}\n"
        return block

    def get_partition_data(self, host: str) -> Tuple[Optional[List[str]], Optional[List[List[str]]]]:
        """
        Return raw partition data for table display.
        Returns a tuple of (header_list, data_list).
        """
        partitions = self.directive.get(host, {})
        if not partitions:
            return None, None

        display_partitions = {k: v for k, v in partitions.items() if k != "default"}
        if not display_partitions:
            return None, None

        default_key = self.get_default_partition(host)

        all_directive_keys = set()
        for v in display_partitions.values():
            all_directive_keys.update(v.keys())
        sorted_keys = sorted(list(all_directive_keys))

        header = ["Default", "Key"] + sorted_keys

        data = []
        for k, v in display_partitions.items():
            is_default = "*" if k == default_key else " "
            row = [is_default, k] + [str(v.get(dk, "")) for dk in sorted_keys]
            data.append(row)

        return header, data

    def get_partition_static_data(self, host: str) -> Tuple[Optional[str], Optional[List[Dict[str, Any]]]]:
        """
        Prepare basic partition data for interactive selection.
        Returns a tuple of (header_string, choices_list).
        """
        header_list, data_list = self.get_partition_data(host)
        if not header_list or not data_list:
            return None, None

        # Add resource columns to header (initially empty)
        header_list = header_list + ["Idle Nodes", "GPU Usage (Avail/Used/Total)"]

        # Update data rows with placeholders for resource counts
        updated_data = []
        for row in data_list:
            updated_row = row + ["...", "..."]
            updated_data.append(updated_row)

        # Calculate column widths for alignment
        col_widths = {name: len(name) for name in header_list}
        for row in updated_data:
            for i, val in enumerate(row):
                col_widths[header_list[i]] = max(col_widths[header_list[i]], len(val))

        # Create formatted header
        header = " | ".join([name.ljust(col_widths[name]) for name in header_list])

        # Create formatted rows as choices
        choices = []
        for row in updated_data:
            row_str = " | ".join([val.ljust(col_widths[header_list[i]]) for i, val in enumerate(row)])
            choices.append({"name": row_str, "value": row[1]})

        return header, choices

    def get_partition_realtime_data(self, host: str) -> Dict[str, Dict[str, Any]]:
        """
        Fetch real-time resource availability for all partitions on the host.
        Returns a map of partition_name -> resource_stats.
        """
        resource_map = {}
        try:
            # 1. Get all node states and GRES
            sinfo_cmd = "sinfo -N -o \"%P %N %G %t\""
            sinfo_output = self._run_remote(host, sinfo_cmd).stdout

            if not sinfo_output:
                return resource_map

            mix_nodes = []
            node_data = []
            for line in sinfo_output.strip().split("\n"):
                parts = line.split()
                if len(parts) < 4: continue
                partitions, node, gres, state = parts[0].split(","), parts[1], parts[2], parts[3]
                node_data.append((partitions, node, gres, state))
                if state == "mix":
                    mix_nodes.append(node)

            node_allocations = {}
            if mix_nodes:
                nodes_list = ",".join(mix_nodes)
                sctrl_cmd = f"scontrol show node {nodes_list}"
                sctrl_output = self._run_remote(host, sctrl_cmd).stdout

                if sctrl_output:
                    node_blocks = sctrl_output.split("NodeName=")[1:]
                    for block in node_blocks:
                        node_name = block.split()[0]
                        alloc_match = re.search(r'AllocTRES=[^=]*gres/gpu=([^,\s]+)', block)
                        if alloc_match:
                            val = alloc_match.group(1)
                            num_match = re.search(r'(\d+)$', val)
                            node_allocations[node_name] = int(num_match.group(1)) if num_match else 1
                        else:
                            node_allocations[node_name] = 0

            for partitions, node, gres, state in node_data:
                if state in ["idle", "mix"]:
                    total_gpus = 0
                    if "gpu" in gres:
                        # Match gpu:X or gpu:type:X
                        match = re.search(r'gpu:([^:]*:)?(\d+)$', gres)
                        total_gpus = int(match.group(2)) if match else 1

                    if state == "idle":
                        available_gpus, is_idle_node = total_gpus, True
                    else: # state == "mix"
                        used_gpus = node_allocations.get(node, 0)
                        available_gpus, is_idle_node = max(0, total_gpus - used_gpus), False

                    for p in partitions:
                        if p not in resource_map:
                            resource_map[p] = {"nodes": 0, "gpus": 0, "used_gpus": 0, "total_nodes": 0, "total_gpus": 0}
                        if is_idle_node:
                            resource_map[p]["nodes"] += 1
                        resource_map[p]["gpus"] += available_gpus
                        resource_map[p]["used_gpus"] += (total_gpus - available_gpus)
                        resource_map[p]["total_nodes"] += 1
                        resource_map[p]["total_gpus"] += total_gpus
        except Exception as e:
            console.error(f"Failed to fetch real-time resources: {e}")

        return resource_map


    def get_default_partition(self, host: str) -> Optional[str]:
        """Return the default partition for the host if it exists."""
        if host not in self.directive:
            return None

        host_partitions = self.directive.get(host, {})

        # 1. Check for host-specific default
        if "default" in host_partitions:
            # Return the actual partition name pointed to by 'default'
            default_val = host_partitions["default"].get("partition")
            if default_val:
                return default_val

        # 2. Check for global default
        global_default = self.ai_config.get("default", {}).get("partition")
        if global_default:
            # Return only the short key (e.g., 'a100-dgx' from 'cloudmesh.ai.partition.uva.a100-dgx')
            return global_default.split(".")[-1]

        # 3. Fallback to the first available partition for the host
        # Filter out 'default' key if it exists
        keys = [k for k in host_partitions.keys() if k != "default"]
        return next(iter(keys)) if keys else None

    def get_tutorial_url(self, keyword: Optional[str] = None) -> str:
        """Return the tutorial URL for the given keyword."""
        tutorials = self.ai_config.get("tutorials", {})
        return tutorials.get(keyword, tutorials.get("default", "https://infomall.org/uva/docs/tutorial/"))

    def get_config_path(self) -> str:
        """Return the path to the config.csv file."""
        # The config.csv is located in the same directory as this file
        return os.path.join(os.path.dirname(__file__), "config.csv")

    def get_login_command(self, host: Optional[str], key: Optional[str], sbatch_params: Optional[Dict[str, str]] = None) -> Optional[str]:
        """Construct the SSH ijob command without executing it."""
        host = host or self.host
        if not key:
            key = "default"

        try:
            directives = self.directive[host][key].copy()
        except KeyError:
            return None

        if sbatch_params:
            directives.update(sbatch_params)

        parameters = "".join([f" --{k}={v}" for k, v in directives.items()])
        return f'ssh -tt {host} "/opt/rci/bin/ijob{parameters}"'

    def get_node_gpu_usage(self, node_name: str) -> Dict[str, Any]:
        """
        Get the GPU usage for a specific node.
        Returns a dictionary with total, used, and available GPUs.
        """
        cmd = f"scontrol show node {node_name}"
        output = self.run_command(cmd)

        if not output:
            return {"error": f"No output received for node {node_name}"}

        try:
            # Parse CfgTRES for total GPUs
            cfg_match = re.search(r'CfgTRES=[^=]*gres/gpu=([^,\s]+)', output)
            total_gpus = 0
            if cfg_match:
                val = cfg_match.group(1)
                num_match = re.search(r'(\d+)$', val)
                total_gpus = int(num_match.group(1)) if num_match else 1

            # Parse AllocTRES for used GPUs
            alloc_match = re.search(r'AllocTRES=[^=]*gres/gpu=([^,\s]+)', output)
            used_gpus = 0
            if alloc_match:
                val = alloc_match.group(1)
                num_match = re.search(r'(\d+)$', val)
                used_gpus = int(num_match.group(1)) if num_match else 1

            return {
                "node": node_name,
                "total": total_gpus,
                "used": used_gpus,
                "available": max(0, total_gpus - used_gpus)
            }
        except Exception as e:
            return {"error": f"Failed to parse GPU usage: {e}"}

    def get_cluster_gpu_usage(self) -> List[Dict[str, Any]]:
        """
        Get GPU usage for all nodes across all partitions on the host.
        Returns a list of nodes sorted by available GPUs (descending).
        """
        all_nodes_usage = []
        resource_map = self.get_partition_realtime_data(self.host)

        # We need to get the actual node data. 
        # get_partition_realtime_data already does the sinfo and scontrol calls.
        # However, it aggregates by partition. We want a node-centric view.

        try:
            # Re-run the sinfo command to get the raw node list
            sinfo_cmd = "sinfo -N -o \"%P %N %G %t\""
            sinfo_output = self._run_remote(self.host, sinfo_cmd).stdout
            if not sinfo_output:
                return []

            # Get all mix nodes for scontrol lookup
            mix_nodes = []
            node_data = []
            for line in sinfo_output.strip().split("\n"):
                parts = line.split()
                if len(parts) < 4: continue
                partitions, node, gres, state = parts[0].split(","), parts[1], parts[2], parts[3]
                node_data.append((partitions, node, gres, state))
                if state == "mix":
                    mix_nodes.append(node)

            node_allocations = {}
            if mix_nodes:
                nodes_list = ",".join(mix_nodes)
                sctrl_output = self._run_remote(self.host, f"scontrol show node {nodes_list}").stdout
                if sctrl_output:
                    node_blocks = sctrl_output.split("NodeName=")[1:]
                    for block in node_blocks:
                        node_name = block.split()[0]
                        alloc_match = re.search(r'AllocTRES=[^=]*gres/gpu=([^,\s]+)', block)
                        if alloc_match:
                            val = alloc_match.group(1)
                            num_match = re.search(r'(\d+)$', val)
                            node_allocations[node_name] = int(num_match.group(1)) if num_match else 1
                        else:
                            node_allocations[node_name] = 0

            for partitions, node, gres, state in node_data:
                total_gpus = 0
                if "gpu" in gres:
                    match = re.search(r'gpu:([^:]*:)?(\d+)$', gres)
                    total_gpus = int(match.group(2)) if match else 1

                if state == "idle":
                    available_gpus = total_gpus
                elif state == "mix":
                    used_gpus = node_allocations.get(node, 0)
                    available_gpus = max(0, total_gpus - used_gpus)
                else:
                    available_gpus = 0

                all_nodes_usage.append({
                    "node": node,
                    "partition": partitions[0], # Primary partition
                    "total": total_gpus,
                    "used": total_gpus - available_gpus,
                    "available": available_gpus,
                    "state": state
                })
        except Exception as e:
            logger.error(f"Failed to get cluster GPU usage: {e}")

        # Sort by available GPUs descending
        return sorted(all_nodes_usage, key=lambda x: x["available"], reverse=True)

    def get_reservation_gpu_usage(self, reservation_name: str) -> Dict[str, Any]:
        """
        Get the aggregate GPU usage for a Slurm reservation.
        """
        cmd = f"scontrol show reservation {reservation_name}"
        output = self.run_command(cmd)

        if not output or "Invalid reservation" in output:
            return {"error": f"Reservation {reservation_name} not found or no output received."}

        try:
            # Find the Nodes=... part of the reservation output
            nodes_match = re.search(r'Nodes=([^,\s]+)', output)
            if not nodes_match:
                return {"error": f"No nodes found for reservation {reservation_name}"}

            nodes_str = nodes_match.group(1)
            # Slurm node lists can be complex (e.g., node[01-04,06])
            # We use scontrol show node to expand them or just query the list
            # A simpler way to get the expanded list is to use scontrol show node with the list
            nodes_output = self.run_command(f"scontrol show node {nodes_str}")

            total_gpus = 0
            used_gpus = 0

            # Split by NodeName= to process each node in the reservation
            node_blocks = nodes_output.split("NodeName=")[1:]
            for block in node_blocks:
                # Total GPUs
                cfg_match = re.search(r'CfgTRES=[^=]*gres/gpu=([^,\s]+)', block)
                if cfg_match:
                    val = cfg_match.group(1)
                    num_match = re.search(r'(\d+)$', val)
                    total_gpus += int(num_match.group(1)) if num_match else 1

                # Used GPUs
                alloc_match = re.search(r'AllocTRES=[^=]*gres/gpu=([^,\s]+)', block)
                if alloc_match:
                    val = alloc_match.group(1)
                    num_match = re.search(r'(\d+)$', val)
                    used_gpus += int(num_match.group(1)) if num_match else 1

            return {
                "reservation": reservation_name,
                "total": total_gpus,
                "used": used_gpus,
                "available": max(0, total_gpus - used_gpus)
            }
        except Exception as e:
            return {"error": f"Failed to parse reservation GPU usage: {e}"}

    def run_command(self, command: str, host: Optional[str] = None) -> str:
        """Execute an arbitrary command on the HPC."""
        target_host = host or self.host
        try:
            result = self._run_remote(target_host, command)
            return result.stdout
        except Exception as e:
            console.error(f"Failed to execute command on {target_host}: {e}")
            return ""

    def login(self, host: Optional[str], key: Optional[str], sbatch_params: Optional[Dict[str, str]] = None) -> str:
        """SSH on HPC by executing an interactive job command."""
        command = self.get_login_command(host, key, sbatch_params)
        if not command:
            # Handle the error case as before
            target_host = host or self.host
            available_keys = list(self.directive.get(target_host, {}).keys())
            suggestion = self._suggest_match(key, available_keys) if key else None
            msg = f"Key {key} not found for host {target_host}. Available keys: {', '.join(available_keys)}"
            if suggestion:
                msg += f"\nDid you mean '{suggestion}'?"
            console.error(msg)
            return

        console.msg(command)
        if not self.debug:
            try:
                # Use os.system for interactive sessions to ensure the terminal 
                # is connected directly to the process. Shell.run captures output,
                # which causes interactive shells to hang.
                os.system(command)
            except Exception as e:
                console.error(f"Login failed: {e}")
        return ""

    def create_apptainer_image(self, name: str) -> None:
        """Create an apptainer image on HPC."""
        try:
            cache = os.environ.get("APPTAINER_CACHEDIR", "/scratch/$USER/.apptainer/")
            console.banner("Cloudmesh HPC Apptainer Build")

            image = os.path.basename(name.replace(".def", ".sif"))
            logger.debug(f"Building image {image} from {name}")

            console.print(f"Image name       : {image}")
            console.print(f"Singularity cache: {cache}")
            console.print(f"Definition       : {name}")
            console.print()
            StopWatch.start("build image")
            Shell.run(f"apptainer build {image} {name}")
            StopWatch.stop("build image")

            # Use Shell.run to get size
            size_output = Shell.run(f"du -sh {image}")
            size = size_output.split()[0] if size_output else "unknown"
            timer = StopWatch.get("build image")
            console.print()
            console.print(f"Time to build {image}s ({size}) {timer}s")
            console.print()

        except (RuntimeError, OSError) as e:
            console.error(f"Apptainer build failed: {e}")

    def jupyter(self, port: int = 8000) -> None:
        """Start a Jupyter notebook on HPC."""
        console.print(f"Starting Jupyter on port {port}...")
        console.print("Note: This requires an active VPN connection.")
        console.print(f"Command: jupyter notebook --no-browser --port={port}")
        console.print(f"Tunnel: ssh -L 8080:localhost:{port} hpc")

    def cancel(self, job_id: str) -> str:
        """Cancel a Slurm job."""
        console.msg(f"Canceling job {job_id}...")
        try:
            result = self._run_remote(self.host, f"scancel {job_id}")
            return result.stdout
        except Exception as e:
            console.error(f"Failed to cancel job {job_id}: {e}")
            return ""

    def get_job_status(self, job_id: str) -> str:
        """Get the status of a specific Slurm job."""
        try:
            result = self._run_remote(self.host, f"squeue -j {job_id}")
            return result.stdout
        except Exception as e:
            console.error(f"Failed to get status for job {job_id}: {e}")
            return ""

    def list_jobs(self) -> str:
        """List all active Slurm jobs for the current user."""
        try:
            result = self._run_remote(self.host, "squeue -u $USER")
            return result.stdout
        except Exception as e:
            console.error(f"Failed to list jobs: {e}")
            return ""

    def search_jobs_by_node(self, node_regex: str) -> List[Dict[str, str]]:
        """
        Find jobs running on nodes that match the given regex.

        Args:
            node_regex: Regex to match node names.

        Returns:
            A list of dictionaries containing detailed job information.
        """
        # Comprehensive format: User|JobID|Name|State|TimeUsed|TimeLeft|Node|CPUs|Memory|QOS|Partition
        fmt = "%u|%i|%j|%T|%M|%L|%N|%C|%m|%q|%P"
        cmd = f"squeue -o \"{fmt}\""
        try:
            result = self._run_remote(self.host, cmd)
            output = result.stdout
            if not output:
                return []

            matches = []
            pattern = re.compile(node_regex)

            lines = output.strip().split("\n")
            # Skip header
            for line in lines[1:]:
                if not line.strip():
                    continue
                parts = line.split("|")
                if len(parts) < 11:
                    continue

                # The node list is at index 6
                node = parts[6]
                if pattern.search(node):
                    matches.append({
                        "User": parts[0],
                        "JobID": parts[1],
                        "Name": parts[2],
                        "State": parts[3],
                        "TimeUsed": parts[4],
                        "TimeLeft": parts[5],
                        "Node": node,
                        "CPUs": parts[7],
                        "Memory": parts[8],
                        "QOS": parts[9],
                        "Partition": parts[10]
                    })
            return matches
        except Exception as e:
            console.error(f"Failed to search jobs by node: {e}")
            return []

    def storage(self, directory: str) -> str:
        """Get storage information for a directory."""
        try:
            result = self._run_remote(self.host, f"du -sh {directory}")
            if result.stdout:
                # du -sh returns "size directory", we only want the size
                return result.stdout.split()[0]
        except Exception as e:
            console.error(f"Failed to get storage info for {directory}: {e}")
        return "unknown"

    def edit(self, filename: str, editor: str = "emacs") -> str:
        """Edit a file on the remote host."""
        # For interactive editing, we still use Shell.run with -t to ensure a TTY
        command = f"ssh -t {self.host} '{editor} {filename}'"
        console.msg(f"Editing {filename} with {editor}...")
        if not self.debug:
            Shell.run(command)
        return ""

    def set_default(self, host: str, partition: Optional[str] = None) -> None:
        """Set the default host and partition in the local config file."""
        local_path = os.path.expanduser("~/.cloudmesh/hpc.yaml")

        # Load existing config or start fresh
        config = {}
        if os.path.exists(local_path):
            try:
                config = load_yaml(local_path)
            except Exception:
                config = {}

        # Ensure structure: cloudmesh -> ai
        if "cloudmesh" not in config:
            config["cloudmesh"] = {}
        if "ai" not in config["cloudmesh"]:
            config["cloudmesh"]["ai"] = {}

        ai_config = config["cloudmesh"]["ai"]

        # Set default host/partition
        if "default" not in ai_config:
            ai_config["default"] = {}

        ai_config["default"]["host"] = host
        if partition:
            # Store as full path to match expected format if needed, 
            # but the Hpc class handles short keys too.
            ai_config["default"]["partition"] = f"cloudmesh.ai.partition.{host}.{partition}"

        # Save back to file
        try:
            os.makedirs(os.path.dirname(local_path), exist_ok=True)
            with open(local_path, "w") as f:
                yaml.dump(config, f)
            console.msg(f"Default host set to {host}" + (f" and partition to {partition}" if partition else ""))
        except Exception as e:
            console.error(f"Failed to save local config: {e}")

    def submit(self, script_path: str, key: Optional[str] = None, sbatch_params: Optional[Dict[str, str]] = None) -> str:
        """Upload a script and submit it as a Slurm job."""
        host = self.host
        key = key or self.get_default_partition(host)
        if not key:
            raise ValueError(f"No partition key provided and no default found for host {host}")

        # 1. Generate directives
        directives = self.create_slurm_directives(host, key)
        if sbatch_params:
            for k, v in sbatch_params.items():
                directives += f"#SBATCH --{k}={v}\n"

        # 2. Read script and prepend directives
        with open(script_path, "r") as f:
            script_content = f.read()

        full_script = directives + script_content

        # Create a temporary local file to upload
        with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp:
            tmp.write(full_script)
            tmp_path = tmp.name

        remote_script = f"/tmp/job_{os.path.basename(script_path)}"

        try:
            # 3. Upload script using Fabric put
            self.put(tmp_path, remote_script, host)

            # 4. Submit job
            result = self._run_remote(host, f"sbatch {remote_script}")
            return result.stdout
        except Exception as e:
            console.error(f"Failed to submit job: {e}")
            return ""
        finally:
            if os.path.exists(tmp_path):
                os.remove(tmp_path)

    def logs(self, job_id: str, tail: bool = False, follow: bool = False) -> str:
        """Read the Slurm output logs for a job."""
        # Try to find the actual StdOut and StdErr paths from scontrol
        stdout_path = f"slurm-{job_id}.out"
        stderr_path = f"slurm-{job_id}.err"

        info = self.job_info(job_id)
        if info:
            # Look for StdOut=/path/to/log
            out_match = re.search(r'StdOut=([^\s]+)', info)
            if out_match:
                stdout_path = out_match.group(1)

            # Look for StdErr=/path/to/log
            err_match = re.search(r'StdErr=([^\s]+)', info)
            if err_match:
                stderr_path = err_match.group(1)

        if follow:
            # Streaming mode: use tail -f
            cmd = f"ssh -t {self.host} 'tail -f {stdout_path} {stderr_path}'"
            if not self.debug:
                console.banner(f"Streaming logs for job {job_id} (Out: {stdout_path}, Err: {stderr_path})")
                os.system(cmd)
                return ""
            else:
                console.msg(f"Debug: {cmd}")
                return "Log output (debug)"
        elif tail:
            # Snapshot mode: use tail -n 20
            for label, path in [("STDOUT", stdout_path), ("STDERR", stderr_path)]:
                try:
                    res = self._run_remote(self.host, f"tail -n 20 {path}")
                    if res.stdout:
                        console.banner(f"{label} (Last 20 lines)", path)
                        console.print(res.stdout)
                except Exception as e:
                    console.error(f"Error reading {label} from {path}: {e}")
            return ""
        else:
            # Full read mode: use cat
            for label, path in [("STDOUT", stdout_path), ("STDERR", stderr_path)]:
                try:
                    res = self._run_remote(self.host, f"cat {path}")
                    if res.stdout:
                        console.banner(label, path)
                        console.print(res.stdout)
                except Exception as e:
                    console.error(f"Error reading {label} from {path}: {e}")

            return ""

    def job_info(self, job_id: str) -> str:
        """Get detailed information about a Slurm job."""
        try:
            result = self._run_remote(self.host, f"scontrol show job {job_id}")
            return result.stdout
        except Exception as e:
            console.error(f"Failed to get job info for {job_id}: {e}")
            return ""

    def quota(self) -> str:
        """Check disk quota on the HPC."""
        try:
            # Use hdquota as 'quota -s' fails on some HPC clusters (e.g., UVA)
            result = self._run_remote(self.host, "hdquota")
            return result.stdout
        except Exception as e:
            console.error(f"Failed to get quota: {e}")
            return ""

    def get_billing_usage(self, username: Optional[str] = None) -> List[Dict[str, Any]]:
        """
        Get billing and core-hour usage using sreport.
        """
        # Use sreport for 'user' entity to get usage statistics
        # We use the current user if none provided
        user = username or os.environ.get("USER")
        if not user:
            return []

        try:
            # sreport for user usage
            return self.sreport(entity="user", filter_val=user)
        except Exception as e:
            logger.error(f"Failed to get billing usage for {user}: {e}")
            return []

    def nodes(self, partition: Optional[str] = None) -> str:
        """Check node status for a partition."""
        cmd = "sinfo"
        if partition:
            cmd = f"sinfo -p {partition}"

        try:
            result = self._run_remote(self.host, cmd)
            return result.stdout
        except Exception as e:
            console.error(f"Failed to get node status: {e}")
            return ""

    def sinfo(self, partition: Optional[str] = None, json_support: bool = False, format: str = "%all") -> List[Dict[str, Any]]:
        """
        Return sinfo output as a list of dictionaries, with summary metrics merged into each node.
        Delegates to the Slurm class.
        """
        slurm = Slurm(host=self.host, debug=self.debug)
        return slurm.sinfo(partition=partition, json_support=json_support, format=format)

    def sreport(self, entity: str = "user", filter_val: Optional[str] = None, 
                start: Optional[str] = None, end: Optional[str] = None, 
                stat: bool = False) -> List[Dict[str, Any]]:
        """
        Get usage report using sreport for a specific entity.
        Delegates to the Slurm class.
        """
        slurm = Slurm(host=self.host, debug=self.debug)
        return slurm.sreport(entity=entity, filter_val=filter_val, start=start, end=end, stat=stat)

    def check_resource_availability(self, key: str) -> Dict[str, Any]:
        """
        Check if resources are available for the given partition key.
        Returns with availability details.
        """
        host = self.host
        try:
            # Get the actual Slurm partition name from the key
            partition_name = self.directive[host][key].get("partition")
            if not partition_name:
                return {"error": "Partition name not found for key"}

            # Run sinfo to get node status, GRES, and state
            # %N: Node list, %G: Generic Resources (GRES), %t: State
            cmd = f"sinfo -p {partition_name} -N -o \"%N %G %t\""
            result = self._run_remote(host, cmd)
            output = result.stdout

            if not output:
                return {"error": "No nodes found in partition"}

            lines = output.strip().split("\n")
            idle_nodes = []
            all_nodes = []

            for line in lines:
                parts = line.split()
                if len(parts) < 3:
                    continue

                node, gres, state = parts[0], parts[1], parts[2]
                node_info = {"node": node, "gres": gres, "state": state}
                all_nodes.append(node_info)

                # 'idle' means completely free, 'mix' means partially used
                if state == "idle":
                    idle_nodes.append(node_info)

            return {
                "partition": partition_name,
                "total_nodes": len(all_nodes),
                "idle_nodes": len(idle_nodes),
                "idle_details": idle_nodes,
                "all_details": all_nodes
            }
        except Exception as e:
            return {"error": str(e)}

    def wait(self, job_id: str, interval: int = 30) -> bool:
        """Wait for a Slurm job to complete with detailed status updates."""
        console.msg(f"Waiting for job {job_id} to complete...")
        while True:
            status = self.get_job_status(job_id)
            if not status:
                console.msg(f"Job {job_id} is no longer in the queue (finished or failed).")
                return True

            if "R" in status:
                console.msg(f"Job {job_id} is currently RUNNING...")
            elif "PD" in status:
                console.msg(f"Job {job_id} is PENDING...")
            else:
                console.msg(f"Job {job_id} has finished (Status: {status.strip()}).")
                return True

            time.sleep(interval)

    def monitor_job(self, job_id: str, interval: int = 10) -> None:
        """Actively monitor a job and print status updates."""
        console.banner(f"Monitoring Job {job_id}")
        try:
            self.wait(job_id, interval)
        except KeyboardInterrupt:
            console.msg("Monitoring stopped by user.")

    def check(self) -> None:
        """Perform a health check of the HPC environment."""
        from cloudmesh.ai.vpn.vpn import Vpn

        console.banner("HPC Health Check")

        # 1. VPN Check
        try:
            vpn = Vpn(service="hpc")
            vpn_ok = vpn.enabled()
            status_vpn = "[green]✓[/green]" if vpn_ok else "[red]✗[/red]"
            console.print(f"{status_vpn} VPN Connected")
        except Exception as e:
            console.print(f"[red]✗[/red] VPN Check Failed: {e}")

        # 2. SSH Check
        try:
            res = self._run_remote(self.host, "hostname")
            if res.stdout:
                console.print(f"[green]✓[/green] SSH Access Verified ({res.stdout.strip()})")
            else:
                console.print(f"[red]✗[/red] SSH Access: No response from host")
        except Exception as e:
            console.print(f"[red]✗[/red] SSH Access Failed: {e}")

        # 3. Quota Check
        try:
            q = self.quota()
            if q:
                console.print(f"[green]✓[/green] Disk Quota Accessible")
            else:
                console.print(f"[yellow]âš [/yellow] Disk Quota: No data returned")
        except Exception as e:
            console.print(f"[red]✗[/red] Disk Quota Check Failed: {e}")

        console.print()

    def template(self, key: Optional[str] = None) -> str:
        """Generate a boilerplate .sbatch script."""
        key = key or self.get_default_partition(self.host)
        if not key:
            return "# No default partition found. Please specify a key."

        directives = self.create_slurm_directives(self.host, key)
        template = (
            f"{directives}"
            f"#SBATCH --output=slurm-%j.out\n"
            f"#SBATCH --error=slurm-%j.err\n\n"
            f"#!/bin/bash\n"
            f"# Your commands here\n"
            f"echo 'Hello from Slurm job on {self.host} partition {key}'\n"
            f"hostname\n"
        )
        return template

Functions

__init__
__init__(host='uva', debug=False)

Initialize the Hpc class.

Source code in src/cloudmesh/ai/hpc.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def __init__(self, host: str = "uva", debug: bool = False) -> None:
    """
    Initialize the Hpc class.
    """
    super().__init__(debug=debug)
    self.host = host

    # 1. Load base partitions from the package
    try:
        path = os.path.join(os.path.dirname(__file__), "partitions.yaml")
        data = load_yaml(path)
        self.ai_config: Dict[str, Any] = data.get("cloudmesh", {}).get("ai", {})
        self.directive: Dict[str, Any] = self.ai_config.get("partition", {})
    except (FileNotFoundError, RuntimeError) as e:
        console.error(f"Failed to load base partitions.yaml: {e}")
        self.ai_config = {}
        self.directive = {}

    # 2. Load local overrides from ~/.cloudmesh/hpc.yaml
    local_path = os.path.expanduser("~/.cloudmesh/hpc.yaml")
    if os.path.exists(local_path):
        try:
            local_data = load_yaml(local_path)
            # Local data can be the full structure or just the 'ai' part
            ai_overrides = local_data.get("cloudmesh", {}).get("ai", local_data)

            # Merge global ai_config (e.g., default host/partition)
            self.ai_config.update(ai_overrides)

            # Merge partitions
            local_partitions = ai_overrides.get("partition", {})
            for h, p in local_partitions.items():
                if h not in self.directive:
                    self.directive[h] = {}
                self.directive[h].update(p)

            # Update the directive reference in ai_config to match the merged one
            self.ai_config["partition"] = self.directive

        except Exception as e:
            console.error(f"Failed to load local config {local_path}: {e}")
cancel
cancel(job_id)

Cancel a Slurm job.

Source code in src/cloudmesh/ai/hpc.py
554
555
556
557
558
559
560
561
562
def cancel(self, job_id: str) -> str:
    """Cancel a Slurm job."""
    console.msg(f"Canceling job {job_id}...")
    try:
        result = self._run_remote(self.host, f"scancel {job_id}")
        return result.stdout
    except Exception as e:
        console.error(f"Failed to cancel job {job_id}: {e}")
        return ""
check
check()

Perform a health check of the HPC environment.

Source code in src/cloudmesh/ai/hpc.py
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
def check(self) -> None:
    """Perform a health check of the HPC environment."""
    from cloudmesh.ai.vpn.vpn import Vpn

    console.banner("HPC Health Check")

    # 1. VPN Check
    try:
        vpn = Vpn(service="hpc")
        vpn_ok = vpn.enabled()
        status_vpn = "[green]✓[/green]" if vpn_ok else "[red]✗[/red]"
        console.print(f"{status_vpn} VPN Connected")
    except Exception as e:
        console.print(f"[red]✗[/red] VPN Check Failed: {e}")

    # 2. SSH Check
    try:
        res = self._run_remote(self.host, "hostname")
        if res.stdout:
            console.print(f"[green]✓[/green] SSH Access Verified ({res.stdout.strip()})")
        else:
            console.print(f"[red]✗[/red] SSH Access: No response from host")
    except Exception as e:
        console.print(f"[red]✗[/red] SSH Access Failed: {e}")

    # 3. Quota Check
    try:
        q = self.quota()
        if q:
            console.print(f"[green]✓[/green] Disk Quota Accessible")
        else:
            console.print(f"[yellow]âš [/yellow] Disk Quota: No data returned")
    except Exception as e:
        console.print(f"[red]✗[/red] Disk Quota Check Failed: {e}")

    console.print()
check_resource_availability
check_resource_availability(key)

Check if resources are available for the given partition key. Returns with availability details.

Source code in src/cloudmesh/ai/hpc.py
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
def check_resource_availability(self, key: str) -> Dict[str, Any]:
    """
    Check if resources are available for the given partition key.
    Returns with availability details.
    """
    host = self.host
    try:
        # Get the actual Slurm partition name from the key
        partition_name = self.directive[host][key].get("partition")
        if not partition_name:
            return {"error": "Partition name not found for key"}

        # Run sinfo to get node status, GRES, and state
        # %N: Node list, %G: Generic Resources (GRES), %t: State
        cmd = f"sinfo -p {partition_name} -N -o \"%N %G %t\""
        result = self._run_remote(host, cmd)
        output = result.stdout

        if not output:
            return {"error": "No nodes found in partition"}

        lines = output.strip().split("\n")
        idle_nodes = []
        all_nodes = []

        for line in lines:
            parts = line.split()
            if len(parts) < 3:
                continue

            node, gres, state = parts[0], parts[1], parts[2]
            node_info = {"node": node, "gres": gres, "state": state}
            all_nodes.append(node_info)

            # 'idle' means completely free, 'mix' means partially used
            if state == "idle":
                idle_nodes.append(node_info)

        return {
            "partition": partition_name,
            "total_nodes": len(all_nodes),
            "idle_nodes": len(idle_nodes),
            "idle_details": idle_nodes,
            "all_details": all_nodes
        }
    except Exception as e:
        return {"error": str(e)}
create_apptainer_image
create_apptainer_image(name)

Create an apptainer image on HPC.

Source code in src/cloudmesh/ai/hpc.py
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
def create_apptainer_image(self, name: str) -> None:
    """Create an apptainer image on HPC."""
    try:
        cache = os.environ.get("APPTAINER_CACHEDIR", "/scratch/$USER/.apptainer/")
        console.banner("Cloudmesh HPC Apptainer Build")

        image = os.path.basename(name.replace(".def", ".sif"))
        logger.debug(f"Building image {image} from {name}")

        console.print(f"Image name       : {image}")
        console.print(f"Singularity cache: {cache}")
        console.print(f"Definition       : {name}")
        console.print()
        StopWatch.start("build image")
        Shell.run(f"apptainer build {image} {name}")
        StopWatch.stop("build image")

        # Use Shell.run to get size
        size_output = Shell.run(f"du -sh {image}")
        size = size_output.split()[0] if size_output else "unknown"
        timer = StopWatch.get("build image")
        console.print()
        console.print(f"Time to build {image}s ({size}) {timer}s")
        console.print()

    except (RuntimeError, OSError) as e:
        console.error(f"Apptainer build failed: {e}")
create_slurm_directives
create_slurm_directives(host=None, key=None)

Create Slurm directives based on the provided host and key.

Source code in src/cloudmesh/ai/hpc.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def create_slurm_directives(self, host: Optional[str] = None, key: Optional[str] = None) -> str:
    """Create Slurm directives based on the provided host and key."""
    host = host or self.host
    try:
        directives = self.directive[host][key]
    except KeyError:
        suggestion = self._suggest_match(key, list(self.directive.get(host, {}).keys()))
        msg = f"In directive searching for:\n  host {host}\n  key {key}\nNot found"
        if suggestion:
            msg += f"\nDid you mean '{suggestion}'?"
        raise ValueError(msg)

    block = ""
    for k, v in directives.items():
        block += f"#SBATCH --{k}={v}\n"
    return block
edit
edit(filename, editor='emacs')

Edit a file on the remote host.

Source code in src/cloudmesh/ai/hpc.py
645
646
647
648
649
650
651
652
def edit(self, filename: str, editor: str = "emacs") -> str:
    """Edit a file on the remote host."""
    # For interactive editing, we still use Shell.run with -t to ensure a TTY
    command = f"ssh -t {self.host} '{editor} {filename}'"
    console.msg(f"Editing {filename} with {editor}...")
    if not self.debug:
        Shell.run(command)
    return ""
get_billing_usage
get_billing_usage(username=None)

Get billing and core-hour usage using sreport.

Source code in src/cloudmesh/ai/hpc.py
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
def get_billing_usage(self, username: Optional[str] = None) -> List[Dict[str, Any]]:
    """
    Get billing and core-hour usage using sreport.
    """
    # Use sreport for 'user' entity to get usage statistics
    # We use the current user if none provided
    user = username or os.environ.get("USER")
    if not user:
        return []

    try:
        # sreport for user usage
        return self.sreport(entity="user", filter_val=user)
    except Exception as e:
        logger.error(f"Failed to get billing usage for {user}: {e}")
        return []
get_cluster_gpu_usage
get_cluster_gpu_usage()

Get GPU usage for all nodes across all partitions on the host. Returns a list of nodes sorted by available GPUs (descending).

Source code in src/cloudmesh/ai/hpc.py
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
def get_cluster_gpu_usage(self) -> List[Dict[str, Any]]:
    """
    Get GPU usage for all nodes across all partitions on the host.
    Returns a list of nodes sorted by available GPUs (descending).
    """
    all_nodes_usage = []
    resource_map = self.get_partition_realtime_data(self.host)

    # We need to get the actual node data. 
    # get_partition_realtime_data already does the sinfo and scontrol calls.
    # However, it aggregates by partition. We want a node-centric view.

    try:
        # Re-run the sinfo command to get the raw node list
        sinfo_cmd = "sinfo -N -o \"%P %N %G %t\""
        sinfo_output = self._run_remote(self.host, sinfo_cmd).stdout
        if not sinfo_output:
            return []

        # Get all mix nodes for scontrol lookup
        mix_nodes = []
        node_data = []
        for line in sinfo_output.strip().split("\n"):
            parts = line.split()
            if len(parts) < 4: continue
            partitions, node, gres, state = parts[0].split(","), parts[1], parts[2], parts[3]
            node_data.append((partitions, node, gres, state))
            if state == "mix":
                mix_nodes.append(node)

        node_allocations = {}
        if mix_nodes:
            nodes_list = ",".join(mix_nodes)
            sctrl_output = self._run_remote(self.host, f"scontrol show node {nodes_list}").stdout
            if sctrl_output:
                node_blocks = sctrl_output.split("NodeName=")[1:]
                for block in node_blocks:
                    node_name = block.split()[0]
                    alloc_match = re.search(r'AllocTRES=[^=]*gres/gpu=([^,\s]+)', block)
                    if alloc_match:
                        val = alloc_match.group(1)
                        num_match = re.search(r'(\d+)$', val)
                        node_allocations[node_name] = int(num_match.group(1)) if num_match else 1
                    else:
                        node_allocations[node_name] = 0

        for partitions, node, gres, state in node_data:
            total_gpus = 0
            if "gpu" in gres:
                match = re.search(r'gpu:([^:]*:)?(\d+)$', gres)
                total_gpus = int(match.group(2)) if match else 1

            if state == "idle":
                available_gpus = total_gpus
            elif state == "mix":
                used_gpus = node_allocations.get(node, 0)
                available_gpus = max(0, total_gpus - used_gpus)
            else:
                available_gpus = 0

            all_nodes_usage.append({
                "node": node,
                "partition": partitions[0], # Primary partition
                "total": total_gpus,
                "used": total_gpus - available_gpus,
                "available": available_gpus,
                "state": state
            })
    except Exception as e:
        logger.error(f"Failed to get cluster GPU usage: {e}")

    # Sort by available GPUs descending
    return sorted(all_nodes_usage, key=lambda x: x["available"], reverse=True)
get_config_path
get_config_path()

Return the path to the config.csv file.

Source code in src/cloudmesh/ai/hpc.py
300
301
302
303
def get_config_path(self) -> str:
    """Return the path to the config.csv file."""
    # The config.csv is located in the same directory as this file
    return os.path.join(os.path.dirname(__file__), "config.csv")
get_default_partition
get_default_partition(host)

Return the default partition for the host if it exists.

Source code in src/cloudmesh/ai/hpc.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
def get_default_partition(self, host: str) -> Optional[str]:
    """Return the default partition for the host if it exists."""
    if host not in self.directive:
        return None

    host_partitions = self.directive.get(host, {})

    # 1. Check for host-specific default
    if "default" in host_partitions:
        # Return the actual partition name pointed to by 'default'
        default_val = host_partitions["default"].get("partition")
        if default_val:
            return default_val

    # 2. Check for global default
    global_default = self.ai_config.get("default", {}).get("partition")
    if global_default:
        # Return only the short key (e.g., 'a100-dgx' from 'cloudmesh.ai.partition.uva.a100-dgx')
        return global_default.split(".")[-1]

    # 3. Fallback to the first available partition for the host
    # Filter out 'default' key if it exists
    keys = [k for k in host_partitions.keys() if k != "default"]
    return next(iter(keys)) if keys else None
get_job_status
get_job_status(job_id)

Get the status of a specific Slurm job.

Source code in src/cloudmesh/ai/hpc.py
564
565
566
567
568
569
570
571
def get_job_status(self, job_id: str) -> str:
    """Get the status of a specific Slurm job."""
    try:
        result = self._run_remote(self.host, f"squeue -j {job_id}")
        return result.stdout
    except Exception as e:
        console.error(f"Failed to get status for job {job_id}: {e}")
        return ""
get_login_command
get_login_command(host, key, sbatch_params=None)

Construct the SSH ijob command without executing it.

Source code in src/cloudmesh/ai/hpc.py
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
def get_login_command(self, host: Optional[str], key: Optional[str], sbatch_params: Optional[Dict[str, str]] = None) -> Optional[str]:
    """Construct the SSH ijob command without executing it."""
    host = host or self.host
    if not key:
        key = "default"

    try:
        directives = self.directive[host][key].copy()
    except KeyError:
        return None

    if sbatch_params:
        directives.update(sbatch_params)

    parameters = "".join([f" --{k}={v}" for k, v in directives.items()])
    return f'ssh -tt {host} "/opt/rci/bin/ijob{parameters}"'
get_node_gpu_usage
get_node_gpu_usage(node_name)

Get the GPU usage for a specific node. Returns a dictionary with total, used, and available GPUs.

Source code in src/cloudmesh/ai/hpc.py
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
def get_node_gpu_usage(self, node_name: str) -> Dict[str, Any]:
    """
    Get the GPU usage for a specific node.
    Returns a dictionary with total, used, and available GPUs.
    """
    cmd = f"scontrol show node {node_name}"
    output = self.run_command(cmd)

    if not output:
        return {"error": f"No output received for node {node_name}"}

    try:
        # Parse CfgTRES for total GPUs
        cfg_match = re.search(r'CfgTRES=[^=]*gres/gpu=([^,\s]+)', output)
        total_gpus = 0
        if cfg_match:
            val = cfg_match.group(1)
            num_match = re.search(r'(\d+)$', val)
            total_gpus = int(num_match.group(1)) if num_match else 1

        # Parse AllocTRES for used GPUs
        alloc_match = re.search(r'AllocTRES=[^=]*gres/gpu=([^,\s]+)', output)
        used_gpus = 0
        if alloc_match:
            val = alloc_match.group(1)
            num_match = re.search(r'(\d+)$', val)
            used_gpus = int(num_match.group(1)) if num_match else 1

        return {
            "node": node_name,
            "total": total_gpus,
            "used": used_gpus,
            "available": max(0, total_gpus - used_gpus)
        }
    except Exception as e:
        return {"error": f"Failed to parse GPU usage: {e}"}
get_partition_data
get_partition_data(host)

Return raw partition data for table display. Returns a tuple of (header_list, data_list).

Source code in src/cloudmesh/ai/hpc.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def get_partition_data(self, host: str) -> Tuple[Optional[List[str]], Optional[List[List[str]]]]:
    """
    Return raw partition data for table display.
    Returns a tuple of (header_list, data_list).
    """
    partitions = self.directive.get(host, {})
    if not partitions:
        return None, None

    display_partitions = {k: v for k, v in partitions.items() if k != "default"}
    if not display_partitions:
        return None, None

    default_key = self.get_default_partition(host)

    all_directive_keys = set()
    for v in display_partitions.values():
        all_directive_keys.update(v.keys())
    sorted_keys = sorted(list(all_directive_keys))

    header = ["Default", "Key"] + sorted_keys

    data = []
    for k, v in display_partitions.items():
        is_default = "*" if k == default_key else " "
        row = [is_default, k] + [str(v.get(dk, "")) for dk in sorted_keys]
        data.append(row)

    return header, data
get_partition_realtime_data
get_partition_realtime_data(host)

Fetch real-time resource availability for all partitions on the host. Returns a map of partition_name -> resource_stats.

Source code in src/cloudmesh/ai/hpc.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def get_partition_realtime_data(self, host: str) -> Dict[str, Dict[str, Any]]:
    """
    Fetch real-time resource availability for all partitions on the host.
    Returns a map of partition_name -> resource_stats.
    """
    resource_map = {}
    try:
        # 1. Get all node states and GRES
        sinfo_cmd = "sinfo -N -o \"%P %N %G %t\""
        sinfo_output = self._run_remote(host, sinfo_cmd).stdout

        if not sinfo_output:
            return resource_map

        mix_nodes = []
        node_data = []
        for line in sinfo_output.strip().split("\n"):
            parts = line.split()
            if len(parts) < 4: continue
            partitions, node, gres, state = parts[0].split(","), parts[1], parts[2], parts[3]
            node_data.append((partitions, node, gres, state))
            if state == "mix":
                mix_nodes.append(node)

        node_allocations = {}
        if mix_nodes:
            nodes_list = ",".join(mix_nodes)
            sctrl_cmd = f"scontrol show node {nodes_list}"
            sctrl_output = self._run_remote(host, sctrl_cmd).stdout

            if sctrl_output:
                node_blocks = sctrl_output.split("NodeName=")[1:]
                for block in node_blocks:
                    node_name = block.split()[0]
                    alloc_match = re.search(r'AllocTRES=[^=]*gres/gpu=([^,\s]+)', block)
                    if alloc_match:
                        val = alloc_match.group(1)
                        num_match = re.search(r'(\d+)$', val)
                        node_allocations[node_name] = int(num_match.group(1)) if num_match else 1
                    else:
                        node_allocations[node_name] = 0

        for partitions, node, gres, state in node_data:
            if state in ["idle", "mix"]:
                total_gpus = 0
                if "gpu" in gres:
                    # Match gpu:X or gpu:type:X
                    match = re.search(r'gpu:([^:]*:)?(\d+)$', gres)
                    total_gpus = int(match.group(2)) if match else 1

                if state == "idle":
                    available_gpus, is_idle_node = total_gpus, True
                else: # state == "mix"
                    used_gpus = node_allocations.get(node, 0)
                    available_gpus, is_idle_node = max(0, total_gpus - used_gpus), False

                for p in partitions:
                    if p not in resource_map:
                        resource_map[p] = {"nodes": 0, "gpus": 0, "used_gpus": 0, "total_nodes": 0, "total_gpus": 0}
                    if is_idle_node:
                        resource_map[p]["nodes"] += 1
                    resource_map[p]["gpus"] += available_gpus
                    resource_map[p]["used_gpus"] += (total_gpus - available_gpus)
                    resource_map[p]["total_nodes"] += 1
                    resource_map[p]["total_gpus"] += total_gpus
    except Exception as e:
        console.error(f"Failed to fetch real-time resources: {e}")

    return resource_map
get_partition_static_data
get_partition_static_data(host)

Prepare basic partition data for interactive selection. Returns a tuple of (header_string, choices_list).

Source code in src/cloudmesh/ai/hpc.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def get_partition_static_data(self, host: str) -> Tuple[Optional[str], Optional[List[Dict[str, Any]]]]:
    """
    Prepare basic partition data for interactive selection.
    Returns a tuple of (header_string, choices_list).
    """
    header_list, data_list = self.get_partition_data(host)
    if not header_list or not data_list:
        return None, None

    # Add resource columns to header (initially empty)
    header_list = header_list + ["Idle Nodes", "GPU Usage (Avail/Used/Total)"]

    # Update data rows with placeholders for resource counts
    updated_data = []
    for row in data_list:
        updated_row = row + ["...", "..."]
        updated_data.append(updated_row)

    # Calculate column widths for alignment
    col_widths = {name: len(name) for name in header_list}
    for row in updated_data:
        for i, val in enumerate(row):
            col_widths[header_list[i]] = max(col_widths[header_list[i]], len(val))

    # Create formatted header
    header = " | ".join([name.ljust(col_widths[name]) for name in header_list])

    # Create formatted rows as choices
    choices = []
    for row in updated_data:
        row_str = " | ".join([val.ljust(col_widths[header_list[i]]) for i, val in enumerate(row)])
        choices.append({"name": row_str, "value": row[1]})

    return header, choices
get_reservation_gpu_usage
get_reservation_gpu_usage(reservation_name)

Get the aggregate GPU usage for a Slurm reservation.

Source code in src/cloudmesh/ai/hpc.py
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
def get_reservation_gpu_usage(self, reservation_name: str) -> Dict[str, Any]:
    """
    Get the aggregate GPU usage for a Slurm reservation.
    """
    cmd = f"scontrol show reservation {reservation_name}"
    output = self.run_command(cmd)

    if not output or "Invalid reservation" in output:
        return {"error": f"Reservation {reservation_name} not found or no output received."}

    try:
        # Find the Nodes=... part of the reservation output
        nodes_match = re.search(r'Nodes=([^,\s]+)', output)
        if not nodes_match:
            return {"error": f"No nodes found for reservation {reservation_name}"}

        nodes_str = nodes_match.group(1)
        # Slurm node lists can be complex (e.g., node[01-04,06])
        # We use scontrol show node to expand them or just query the list
        # A simpler way to get the expanded list is to use scontrol show node with the list
        nodes_output = self.run_command(f"scontrol show node {nodes_str}")

        total_gpus = 0
        used_gpus = 0

        # Split by NodeName= to process each node in the reservation
        node_blocks = nodes_output.split("NodeName=")[1:]
        for block in node_blocks:
            # Total GPUs
            cfg_match = re.search(r'CfgTRES=[^=]*gres/gpu=([^,\s]+)', block)
            if cfg_match:
                val = cfg_match.group(1)
                num_match = re.search(r'(\d+)$', val)
                total_gpus += int(num_match.group(1)) if num_match else 1

            # Used GPUs
            alloc_match = re.search(r'AllocTRES=[^=]*gres/gpu=([^,\s]+)', block)
            if alloc_match:
                val = alloc_match.group(1)
                num_match = re.search(r'(\d+)$', val)
                used_gpus += int(num_match.group(1)) if num_match else 1

        return {
            "reservation": reservation_name,
            "total": total_gpus,
            "used": used_gpus,
            "available": max(0, total_gpus - used_gpus)
        }
    except Exception as e:
        return {"error": f"Failed to parse reservation GPU usage: {e}"}
get_tutorial_url
get_tutorial_url(keyword=None)

Return the tutorial URL for the given keyword.

Source code in src/cloudmesh/ai/hpc.py
295
296
297
298
def get_tutorial_url(self, keyword: Optional[str] = None) -> str:
    """Return the tutorial URL for the given keyword."""
    tutorials = self.ai_config.get("tutorials", {})
    return tutorials.get(keyword, tutorials.get("default", "https://infomall.org/uva/docs/tutorial/"))
job_info
job_info(job_id)

Get detailed information about a Slurm job.

Source code in src/cloudmesh/ai/hpc.py
785
786
787
788
789
790
791
792
def job_info(self, job_id: str) -> str:
    """Get detailed information about a Slurm job."""
    try:
        result = self._run_remote(self.host, f"scontrol show job {job_id}")
        return result.stdout
    except Exception as e:
        console.error(f"Failed to get job info for {job_id}: {e}")
        return ""
jupyter
jupyter(port=8000)

Start a Jupyter notebook on HPC.

Source code in src/cloudmesh/ai/hpc.py
547
548
549
550
551
552
def jupyter(self, port: int = 8000) -> None:
    """Start a Jupyter notebook on HPC."""
    console.print(f"Starting Jupyter on port {port}...")
    console.print("Note: This requires an active VPN connection.")
    console.print(f"Command: jupyter notebook --no-browser --port={port}")
    console.print(f"Tunnel: ssh -L 8080:localhost:{port} hpc")
list_jobs
list_jobs()

List all active Slurm jobs for the current user.

Source code in src/cloudmesh/ai/hpc.py
573
574
575
576
577
578
579
580
def list_jobs(self) -> str:
    """List all active Slurm jobs for the current user."""
    try:
        result = self._run_remote(self.host, "squeue -u $USER")
        return result.stdout
    except Exception as e:
        console.error(f"Failed to list jobs: {e}")
        return ""
login
login(host, key, sbatch_params=None)

SSH on HPC by executing an interactive job command.

Source code in src/cloudmesh/ai/hpc.py
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
def login(self, host: Optional[str], key: Optional[str], sbatch_params: Optional[Dict[str, str]] = None) -> str:
    """SSH on HPC by executing an interactive job command."""
    command = self.get_login_command(host, key, sbatch_params)
    if not command:
        # Handle the error case as before
        target_host = host or self.host
        available_keys = list(self.directive.get(target_host, {}).keys())
        suggestion = self._suggest_match(key, available_keys) if key else None
        msg = f"Key {key} not found for host {target_host}. Available keys: {', '.join(available_keys)}"
        if suggestion:
            msg += f"\nDid you mean '{suggestion}'?"
        console.error(msg)
        return

    console.msg(command)
    if not self.debug:
        try:
            # Use os.system for interactive sessions to ensure the terminal 
            # is connected directly to the process. Shell.run captures output,
            # which causes interactive shells to hang.
            os.system(command)
        except Exception as e:
            console.error(f"Login failed: {e}")
    return ""
logs
logs(job_id, tail=False, follow=False)

Read the Slurm output logs for a job.

Source code in src/cloudmesh/ai/hpc.py
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
def logs(self, job_id: str, tail: bool = False, follow: bool = False) -> str:
    """Read the Slurm output logs for a job."""
    # Try to find the actual StdOut and StdErr paths from scontrol
    stdout_path = f"slurm-{job_id}.out"
    stderr_path = f"slurm-{job_id}.err"

    info = self.job_info(job_id)
    if info:
        # Look for StdOut=/path/to/log
        out_match = re.search(r'StdOut=([^\s]+)', info)
        if out_match:
            stdout_path = out_match.group(1)

        # Look for StdErr=/path/to/log
        err_match = re.search(r'StdErr=([^\s]+)', info)
        if err_match:
            stderr_path = err_match.group(1)

    if follow:
        # Streaming mode: use tail -f
        cmd = f"ssh -t {self.host} 'tail -f {stdout_path} {stderr_path}'"
        if not self.debug:
            console.banner(f"Streaming logs for job {job_id} (Out: {stdout_path}, Err: {stderr_path})")
            os.system(cmd)
            return ""
        else:
            console.msg(f"Debug: {cmd}")
            return "Log output (debug)"
    elif tail:
        # Snapshot mode: use tail -n 20
        for label, path in [("STDOUT", stdout_path), ("STDERR", stderr_path)]:
            try:
                res = self._run_remote(self.host, f"tail -n 20 {path}")
                if res.stdout:
                    console.banner(f"{label} (Last 20 lines)", path)
                    console.print(res.stdout)
            except Exception as e:
                console.error(f"Error reading {label} from {path}: {e}")
        return ""
    else:
        # Full read mode: use cat
        for label, path in [("STDOUT", stdout_path), ("STDERR", stderr_path)]:
            try:
                res = self._run_remote(self.host, f"cat {path}")
                if res.stdout:
                    console.banner(label, path)
                    console.print(res.stdout)
            except Exception as e:
                console.error(f"Error reading {label} from {path}: {e}")

        return ""
monitor_job
monitor_job(job_id, interval=10)

Actively monitor a job and print status updates.

Source code in src/cloudmesh/ai/hpc.py
919
920
921
922
923
924
925
def monitor_job(self, job_id: str, interval: int = 10) -> None:
    """Actively monitor a job and print status updates."""
    console.banner(f"Monitoring Job {job_id}")
    try:
        self.wait(job_id, interval)
    except KeyboardInterrupt:
        console.msg("Monitoring stopped by user.")
nodes
nodes(partition=None)

Check node status for a partition.

Source code in src/cloudmesh/ai/hpc.py
821
822
823
824
825
826
827
828
829
830
831
832
def nodes(self, partition: Optional[str] = None) -> str:
    """Check node status for a partition."""
    cmd = "sinfo"
    if partition:
        cmd = f"sinfo -p {partition}"

    try:
        result = self._run_remote(self.host, cmd)
        return result.stdout
    except Exception as e:
        console.error(f"Failed to get node status: {e}")
        return ""
parse_sbatch_parameter
parse_sbatch_parameter(parameters)

Parse the parameters string and convert it to a dictionary. Expected format: "key1:val1,key2:val2" Supports aliases defined in local config. Raises ValueError if the format is invalid.

Source code in src/cloudmesh/ai/hpc.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def parse_sbatch_parameter(self, parameters: str) -> Dict[str, str]:
    """
    Parse the parameters string and convert it to a dictionary.
    Expected format: "key1:val1,key2:val2"
    Supports aliases defined in local config.
    Raises ValueError if the format is invalid.
    """
    # Common valid Slurm parameters for basic validation
    valid_params = {
        "nodes", "ntasks", "cpus-per-task", "time", "partition", 
        "mem", "gres", "job-name", "output", "error", "mail-type", "mail-user"
    }

    result = {}
    if not parameters:
        return result

    # Handle aliases from local config
    aliases = self.ai_config.get("aliases", {})

    data = parameters.split(",")
    for line in data:
        line = line.strip()
        if not line:
            continue

        # Check if the line is an alias
        if line in aliases:
            alias_val = aliases[line]
            # Recursively parse the alias value
            result.update(self.parse_sbatch_parameter(alias_val))
            continue

        if ":" not in line:
            raise ValueError(f"Invalid sbatch parameter format: '{line}'. Expected 'key:value' or a defined alias.")

        key, value = line.split(":", 1)
        key = key.strip()
        value = value.strip()

        if not key or not value:
            raise ValueError(f"Invalid sbatch parameter: '{line}'. Both key and value must be provided.")

        if key not in valid_params:
            console.warn(f"Unknown sbatch parameter: '{key}'. It may be ignored by Slurm.")

        result[key] = value
    return result
quota
quota()

Check disk quota on the HPC.

Source code in src/cloudmesh/ai/hpc.py
794
795
796
797
798
799
800
801
802
def quota(self) -> str:
    """Check disk quota on the HPC."""
    try:
        # Use hdquota as 'quota -s' fails on some HPC clusters (e.g., UVA)
        result = self._run_remote(self.host, "hdquota")
        return result.stdout
    except Exception as e:
        console.error(f"Failed to get quota: {e}")
        return ""
run_command
run_command(command, host=None)

Execute an arbitrary command on the HPC.

Source code in src/cloudmesh/ai/hpc.py
484
485
486
487
488
489
490
491
492
def run_command(self, command: str, host: Optional[str] = None) -> str:
    """Execute an arbitrary command on the HPC."""
    target_host = host or self.host
    try:
        result = self._run_remote(target_host, command)
        return result.stdout
    except Exception as e:
        console.error(f"Failed to execute command on {target_host}: {e}")
        return ""
search_jobs_by_node
search_jobs_by_node(node_regex)

Find jobs running on nodes that match the given regex.

Parameters:

Name Type Description Default
node_regex str

Regex to match node names.

required

Returns:

Type Description
List[Dict[str, str]]

A list of dictionaries containing detailed job information.

Source code in src/cloudmesh/ai/hpc.py
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
def search_jobs_by_node(self, node_regex: str) -> List[Dict[str, str]]:
    """
    Find jobs running on nodes that match the given regex.

    Args:
        node_regex: Regex to match node names.

    Returns:
        A list of dictionaries containing detailed job information.
    """
    # Comprehensive format: User|JobID|Name|State|TimeUsed|TimeLeft|Node|CPUs|Memory|QOS|Partition
    fmt = "%u|%i|%j|%T|%M|%L|%N|%C|%m|%q|%P"
    cmd = f"squeue -o \"{fmt}\""
    try:
        result = self._run_remote(self.host, cmd)
        output = result.stdout
        if not output:
            return []

        matches = []
        pattern = re.compile(node_regex)

        lines = output.strip().split("\n")
        # Skip header
        for line in lines[1:]:
            if not line.strip():
                continue
            parts = line.split("|")
            if len(parts) < 11:
                continue

            # The node list is at index 6
            node = parts[6]
            if pattern.search(node):
                matches.append({
                    "User": parts[0],
                    "JobID": parts[1],
                    "Name": parts[2],
                    "State": parts[3],
                    "TimeUsed": parts[4],
                    "TimeLeft": parts[5],
                    "Node": node,
                    "CPUs": parts[7],
                    "Memory": parts[8],
                    "QOS": parts[9],
                    "Partition": parts[10]
                })
        return matches
    except Exception as e:
        console.error(f"Failed to search jobs by node: {e}")
        return []
set_default
set_default(host, partition=None)

Set the default host and partition in the local config file.

Source code in src/cloudmesh/ai/hpc.py
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
def set_default(self, host: str, partition: Optional[str] = None) -> None:
    """Set the default host and partition in the local config file."""
    local_path = os.path.expanduser("~/.cloudmesh/hpc.yaml")

    # Load existing config or start fresh
    config = {}
    if os.path.exists(local_path):
        try:
            config = load_yaml(local_path)
        except Exception:
            config = {}

    # Ensure structure: cloudmesh -> ai
    if "cloudmesh" not in config:
        config["cloudmesh"] = {}
    if "ai" not in config["cloudmesh"]:
        config["cloudmesh"]["ai"] = {}

    ai_config = config["cloudmesh"]["ai"]

    # Set default host/partition
    if "default" not in ai_config:
        ai_config["default"] = {}

    ai_config["default"]["host"] = host
    if partition:
        # Store as full path to match expected format if needed, 
        # but the Hpc class handles short keys too.
        ai_config["default"]["partition"] = f"cloudmesh.ai.partition.{host}.{partition}"

    # Save back to file
    try:
        os.makedirs(os.path.dirname(local_path), exist_ok=True)
        with open(local_path, "w") as f:
            yaml.dump(config, f)
        console.msg(f"Default host set to {host}" + (f" and partition to {partition}" if partition else ""))
    except Exception as e:
        console.error(f"Failed to save local config: {e}")
sinfo
sinfo(partition=None, json_support=False, format='%all')

Return sinfo output as a list of dictionaries, with summary metrics merged into each node. Delegates to the Slurm class.

Source code in src/cloudmesh/ai/hpc.py
834
835
836
837
838
839
840
def sinfo(self, partition: Optional[str] = None, json_support: bool = False, format: str = "%all") -> List[Dict[str, Any]]:
    """
    Return sinfo output as a list of dictionaries, with summary metrics merged into each node.
    Delegates to the Slurm class.
    """
    slurm = Slurm(host=self.host, debug=self.debug)
    return slurm.sinfo(partition=partition, json_support=json_support, format=format)
sreport
sreport(
    entity="user",
    filter_val=None,
    start=None,
    end=None,
    stat=False,
)

Get usage report using sreport for a specific entity. Delegates to the Slurm class.

Source code in src/cloudmesh/ai/hpc.py
842
843
844
845
846
847
848
849
850
def sreport(self, entity: str = "user", filter_val: Optional[str] = None, 
            start: Optional[str] = None, end: Optional[str] = None, 
            stat: bool = False) -> List[Dict[str, Any]]:
    """
    Get usage report using sreport for a specific entity.
    Delegates to the Slurm class.
    """
    slurm = Slurm(host=self.host, debug=self.debug)
    return slurm.sreport(entity=entity, filter_val=filter_val, start=start, end=end, stat=stat)
storage
storage(directory)

Get storage information for a directory.

Source code in src/cloudmesh/ai/hpc.py
634
635
636
637
638
639
640
641
642
643
def storage(self, directory: str) -> str:
    """Get storage information for a directory."""
    try:
        result = self._run_remote(self.host, f"du -sh {directory}")
        if result.stdout:
            # du -sh returns "size directory", we only want the size
            return result.stdout.split()[0]
    except Exception as e:
        console.error(f"Failed to get storage info for {directory}: {e}")
    return "unknown"
submit
submit(script_path, key=None, sbatch_params=None)

Upload a script and submit it as a Slurm job.

Source code in src/cloudmesh/ai/hpc.py
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
def submit(self, script_path: str, key: Optional[str] = None, sbatch_params: Optional[Dict[str, str]] = None) -> str:
    """Upload a script and submit it as a Slurm job."""
    host = self.host
    key = key or self.get_default_partition(host)
    if not key:
        raise ValueError(f"No partition key provided and no default found for host {host}")

    # 1. Generate directives
    directives = self.create_slurm_directives(host, key)
    if sbatch_params:
        for k, v in sbatch_params.items():
            directives += f"#SBATCH --{k}={v}\n"

    # 2. Read script and prepend directives
    with open(script_path, "r") as f:
        script_content = f.read()

    full_script = directives + script_content

    # Create a temporary local file to upload
    with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp:
        tmp.write(full_script)
        tmp_path = tmp.name

    remote_script = f"/tmp/job_{os.path.basename(script_path)}"

    try:
        # 3. Upload script using Fabric put
        self.put(tmp_path, remote_script, host)

        # 4. Submit job
        result = self._run_remote(host, f"sbatch {remote_script}")
        return result.stdout
    except Exception as e:
        console.error(f"Failed to submit job: {e}")
        return ""
    finally:
        if os.path.exists(tmp_path):
            os.remove(tmp_path)
template
template(key=None)

Generate a boilerplate .sbatch script.

Source code in src/cloudmesh/ai/hpc.py
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
def template(self, key: Optional[str] = None) -> str:
    """Generate a boilerplate .sbatch script."""
    key = key or self.get_default_partition(self.host)
    if not key:
        return "# No default partition found. Please specify a key."

    directives = self.create_slurm_directives(self.host, key)
    template = (
        f"{directives}"
        f"#SBATCH --output=slurm-%j.out\n"
        f"#SBATCH --error=slurm-%j.err\n\n"
        f"#!/bin/bash\n"
        f"# Your commands here\n"
        f"echo 'Hello from Slurm job on {self.host} partition {key}'\n"
        f"hostname\n"
    )
    return template
wait
wait(job_id, interval=30)

Wait for a Slurm job to complete with detailed status updates.

Source code in src/cloudmesh/ai/hpc.py
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
def wait(self, job_id: str, interval: int = 30) -> bool:
    """Wait for a Slurm job to complete with detailed status updates."""
    console.msg(f"Waiting for job {job_id} to complete...")
    while True:
        status = self.get_job_status(job_id)
        if not status:
            console.msg(f"Job {job_id} is no longer in the queue (finished or failed).")
            return True

        if "R" in status:
            console.msg(f"Job {job_id} is currently RUNNING...")
        elif "PD" in status:
            console.msg(f"Job {job_id} is PENDING...")
        else:
            console.msg(f"Job {job_id} has finished (Status: {status.strip()}).")
            return True

        time.sleep(interval)

JobMonitorApp

Bases: App

A Textual app to monitor Slurm jobs interactively.

Source code in src/cloudmesh/ai/command/hpc.py
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
class JobMonitorApp(App):
    """A Textual app to monitor Slurm jobs interactively."""

    CSS = """
    Screen {
        background: #f0f0f0;
        color: #333333;
    }
    DataTable {
        background: #ffffff;
        color: #333333;
        height: 1fr;
    }
    #details-panel {
        background: #e0e0e0;
        color: #333333;
        border: solid #cccccc;
        padding: 1;
        height: 10;
    }
    Footer {
        background: #000000;
        color: #ffffff;
    }
    """
    BINDINGS = [
        ("q", "quit", "- Quit"),
        ("l", "view_logs", "- View Logs"),
        ("c", "cancel_job", "- Cancel Job"),
        ("r", "refresh", "- Refresh List"),
    ]

    def __init__(self, hpc_instance):
        super().__init__()
        self.hpc = hpc_instance
        self.selected_job_id = None

    def compose(self) -> ComposeResult:
        yield DataTable()
        yield Static("Select a job to see details", id="details-panel")
        yield Footer()

    @work
    async def refresh_jobs(self) -> None:
        """Periodically refresh the job list."""
        while True:
            self.update_job_table()
            await asyncio.sleep(30)

    def update_job_table(self) -> None:
        table = self.query_one(DataTable)
        try:
            # Get jobs as a string from hpc.list_jobs()
            output = self.hpc.list_jobs()
            if not output or not output.strip():
                return

            lines = output.strip().split("\n")
            if len(lines) < 2:
                return

            # squeue output is typically fixed-width. 
            # We use split() for simplicity, but we must be careful with indices.
            header = lines[0].split()

            # Clear rows
            table.clear()

            # Only add columns if they haven't been added yet
            if not table.columns:
                table.add_columns(*header)

            for line in lines[1:]:
                row = line.split()
                if not row:
                    continue

                # In standard squeue, JOBID is the first column (index 0)
                job_id = row[0]
                table.add_row(*row, key=job_id)
        except Exception as e:
            self.query_one("#details-panel", Static).update(f"Error updating jobs: {e}")

    def on_mount(self) -> None:
        table = self.query_one(DataTable)
        table.cursor_type = "row"
        self.update_job_table()
        self.refresh_jobs()

    def on_data_table_row_selected(self, event) -> None:
        job_id = event.row_key.value if hasattr(event.row_key, "value") else event.row_key
        self.selected_job_id = job_id

        # Fetch detailed info
        info = self.hpc.job_info(job_id)
        self.query_one("#details-panel", Static).update(info)

    def action_view_logs(self) -> None:
        if not self.selected_job_id:
            return
        self.exit(self.selected_job_id)

    def action_cancel_job(self) -> None:
        if not self.selected_job_id:
            return
        # We exit with a special signal to indicate cancellation
        self.exit(f"CANCEL:{self.selected_job_id}")

    def action_refresh(self) -> None:
        self.update_job_table()

Functions

refresh_jobs async
refresh_jobs()

Periodically refresh the job list.

Source code in src/cloudmesh/ai/command/hpc.py
702
703
704
705
706
707
@work
async def refresh_jobs(self) -> None:
    """Periodically refresh the job list."""
    while True:
        self.update_job_table()
        await asyncio.sleep(30)

PartitionSelectorApp

Bases: App

A Textual app to select an HPC partition from a table.

Source code in src/cloudmesh/ai/command/hpc.py
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
class PartitionSelectorApp(App):
    """A Textual app to select an HPC partition from a table."""

    CSS = """
    Screen {
        background: #f0f0f0;
        color: #333333;
    }
    DataTable {
        background: #ffffff;
        color: #333333;
    }
    DataTable > .dp-row {
        color: #333333;
    }
    DataTable > .dp-row:focus {
        background: #e0e0e0;
        color: #000000;
    }
    Footer {
        background: #dddddd;
        color: #333333;
    }
    """
    BINDINGS = [
        ("q", "quit", "Quit"),
        ("enter", "select", "Select"),
        ("d", "select_default", "Default"),
        ("+", "increase_gres", "Increase GPU"),
        ("-", "decrease_gres", "Decrease GPU"),
    ]

    def __init__(self, host, hpc_instance, header, choices):
        super().__init__()
        self.host = host
        self.hpc = hpc_instance
        self.header = header
        self.choices = choices
        self.selected_key = None
        self.default_key = self.hpc.get_default_partition(host)
        self.modified_sbatch_params = {}
        self.model = (
            {}
        )  # Stores {partition_key: {"row": [], "gres_count": int, "gres_prefix": str, "gres_idx": int}}

    def compose(self) -> ComposeResult:
        yield DataTable()
        yield Footer()

    def _update_gres(self, delta: int) -> None:
        table = self.query_one(DataTable)
        cursor_row_idx = table.cursor_row
        if cursor_row_idx is None:
            return

        try:
            # With explicit keys, row_key is the partition_key
            partition_key = table.rows[cursor_row_idx]
        except (IndexError, KeyError):
            return

        if partition_key not in self.model:
            return

        model_data = self.model[partition_key]
        gres_idx = model_data["gres_idx"]

        if gres_idx == -1:
            return

        # Update Model
        model_data["gres_count"] = max(1, model_data["gres_count"] + delta)

        # Assemble GRES string from model
        new_val = f"{model_data['gres_prefix']}{model_data['gres_count']}"
        model_data["row"][gres_idx] = new_val

        # Update View (DataTable)
        columns = [col.strip() for col in self.header.split(" | ")]
        if gres_idx < len(columns):
            gres_col_key = columns[gres_idx]
            table.update_cell(partition_key, gres_col_key, new_val)

        # Update modified params
        self.modified_sbatch_params[partition_key] = f"gres={new_val}"

    def action_increase_gres(self) -> None:
        self._update_gres(1)

    def action_decrease_gres(self) -> None:
        self._update_gres(-1)

    @work
    async def refresh_resources(self) -> None:
        """Asynchronously fetch and update real-time resource data periodically."""
        # Small delay to ensure the DataTable is fully mounted and rows are registered
        await asyncio.sleep(1.0)

        while True:
            # Fetch data from cluster in a separate thread to avoid blocking the event loop
            resource_map = await asyncio.to_thread(
                self.hpc.get_partition_realtime_data, self.host
            )
            if not resource_map:
                await asyncio.sleep(30)
                continue

            table = self.query_one(DataTable)
        columns = [col.strip() for col in self.header.split(" | ")]

        # Resource columns are the last two
        idle_col_idx = len(columns) - 2
        gpu_col_idx = len(columns) - 1
        idle_col_key = columns[idle_col_idx]
        gpu_col_key = columns[gpu_col_idx]

        # Iterate over model keys instead of table.rows for stability
        for partition_key in list(self.model.keys()):
            # Ensure the row is actually present in the table before updating
            if partition_key not in table.rows:
                continue

            # Get the actual Slurm partition name from the Hpc instance
            partition_name = (
                self.hpc.directive.get(self.host, {})
                .get(partition_key, {})
                .get("partition", "")
            )
            res = resource_map.get(partition_name)

            if res:
                nodes_str = f"{res['nodes']}/{res['total_nodes']}"
                gpus_str = f"{res['gpus']}/{res['used_gpus']}/{res['total_gpus']}"

                # Update View
                try:
                    table.update_cell(partition_key, idle_col_key, nodes_str)
                    table.update_cell(partition_key, gpu_col_key, gpus_str)
                except Exception:
                    # If the cell isn't ready yet, we skip this update
                    pass

                # Update Model
                if partition_key in self.model:
                    self.model[partition_key]["row"][idle_col_idx] = nodes_str
                    self.model[partition_key]["row"][gpu_col_idx] = gpus_str

    def on_mount(self) -> None:
        table = self.query_one(DataTable)
        table.cursor_type = "row"

        # Setup columns using pre-fetched header, stripping whitespace for clean keys
        columns = [col.strip() for col in self.header.split(" | ")]
        table.add_columns(*columns)

        # Find GRES column index
        gres_col_idx = -1
        for i, col in enumerate(columns):
            if "gres" in col.lower():
                gres_col_idx = i
                break

        # Populate model and table
        for choice in self.choices:
            partition_key = choice["value"]
            row_data = choice["name"].split(" | ")

            gres_prefix = ""
            gres_count = 0
            if gres_col_idx != -1 and gres_col_idx < len(row_data):
                val = row_data[gres_col_idx].strip()
                import re

                match = re.search(r"(\d+)$", val)
                if match:
                    gres_count = int(match.group(1))
                    gres_prefix = val[: match.start()]
                else:
                    gres_prefix = val

            self.model[partition_key] = {
                "row": row_data,
                "gres_count": gres_count,
                "gres_prefix": gres_prefix,
                "gres_idx": gres_col_idx,
            }
            # Use partition_key as the explicit row key
            table.add_row(*row_data, key=partition_key)

        # Start asynchronous resource refresh
        self.refresh_resources()

    def on_data_table_row_selected(self, event) -> None:
        # With explicit keys, event.row_key is a RowKey object; we need its .value
        partition_key = (
            event.row_key.value if hasattr(event.row_key, "value") else event.row_key
        )
        self.selected_key = partition_key
        self.exit((self.selected_key, self.modified_sbatch_params))

    def action_select(self) -> None:
        table = self.query_one(DataTable)
        cursor_row_idx = table.cursor_row
        if cursor_row_idx is not None:
            # With explicit keys, table.rows[idx] is a RowKey object; we need its .value
            row_key = table.rows[cursor_row_idx]
            partition_key = row_key.value if hasattr(row_key, "value") else row_key
            self.selected_key = partition_key
            self.exit((self.selected_key, self.modified_sbatch_params))

    def action_select_default(self) -> None:
        """Select the default partition."""
        if not self.default_key:
            return

        table = self.query_one(DataTable)
        for row_key in table.rows:
            # Use the explicit key if available
            rk_val = row_key.value if hasattr(row_key, "value") else row_key
            if rk_val == self.default_key:
                self.selected_key = self.default_key
                self.exit((self.selected_key, self.modified_sbatch_params))
                break

            # Fallback to checking the second column
            row_data = table.get_row(row_key)
            if len(row_data) > 1 and row_data[1].strip() == self.default_key:
                self.selected_key = self.default_key
                self.exit((self.selected_key, self.modified_sbatch_params))
                break

Functions

action_select_default
action_select_default()

Select the default partition.

Source code in src/cloudmesh/ai/command/hpc.py
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
def action_select_default(self) -> None:
    """Select the default partition."""
    if not self.default_key:
        return

    table = self.query_one(DataTable)
    for row_key in table.rows:
        # Use the explicit key if available
        rk_val = row_key.value if hasattr(row_key, "value") else row_key
        if rk_val == self.default_key:
            self.selected_key = self.default_key
            self.exit((self.selected_key, self.modified_sbatch_params))
            break

        # Fallback to checking the second column
        row_data = table.get_row(row_key)
        if len(row_data) > 1 and row_data[1].strip() == self.default_key:
            self.selected_key = self.default_key
            self.exit((self.selected_key, self.modified_sbatch_params))
            break
refresh_resources async
refresh_resources()

Asynchronously fetch and update real-time resource data periodically.

Source code in src/cloudmesh/ai/command/hpc.py
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
@work
async def refresh_resources(self) -> None:
    """Asynchronously fetch and update real-time resource data periodically."""
    # Small delay to ensure the DataTable is fully mounted and rows are registered
    await asyncio.sleep(1.0)

    while True:
        # Fetch data from cluster in a separate thread to avoid blocking the event loop
        resource_map = await asyncio.to_thread(
            self.hpc.get_partition_realtime_data, self.host
        )
        if not resource_map:
            await asyncio.sleep(30)
            continue

        table = self.query_one(DataTable)
    columns = [col.strip() for col in self.header.split(" | ")]

    # Resource columns are the last two
    idle_col_idx = len(columns) - 2
    gpu_col_idx = len(columns) - 1
    idle_col_key = columns[idle_col_idx]
    gpu_col_key = columns[gpu_col_idx]

    # Iterate over model keys instead of table.rows for stability
    for partition_key in list(self.model.keys()):
        # Ensure the row is actually present in the table before updating
        if partition_key not in table.rows:
            continue

        # Get the actual Slurm partition name from the Hpc instance
        partition_name = (
            self.hpc.directive.get(self.host, {})
            .get(partition_key, {})
            .get("partition", "")
        )
        res = resource_map.get(partition_name)

        if res:
            nodes_str = f"{res['nodes']}/{res['total_nodes']}"
            gpus_str = f"{res['gpus']}/{res['used_gpus']}/{res['total_gpus']}"

            # Update View
            try:
                table.update_cell(partition_key, idle_col_key, nodes_str)
                table.update_cell(partition_key, gpu_col_key, gpus_str)
            except Exception:
                # If the cell isn't ready yet, we skip this update
                pass

            # Update Model
            if partition_key in self.model:
                self.model[partition_key]["row"][idle_col_idx] = nodes_str
                self.model[partition_key]["row"][gpu_col_idx] = gpus_str

Vpn

Context class for managing VPN connections using OS-specific strategies.

Source code in cloudmesh/ai/vpn/vpn.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
class Vpn:
    """Context class for managing VPN connections using OS-specific strategies."""

    def __init__(
        self,
        service: Optional[str] = None,
        timeout: Optional[int] = None,
        debug: bool = False,
        provider: Optional[str] = None,
        profile_name: Optional[str] = None,
    ) -> None:
        self.timeout = timeout or 60
        self.debug = debug
        self.profile_name = profile_name

        # Use VpnConfig for flexible configuration loading and merging
        self.vpn_config = VpnConfig(service=service, profile_name=profile_name)
        self.service_key = self.vpn_config.service.lower()
        self.service = self.vpn_config.service
        self.config = self.vpn_config.config

        # Strategy Selection
        strategy_class = get_vpn_strategy_class(provider)
        self.strategy = strategy_class(self)

        if os_is_mac():
            console.msg(f"Selected VPN Strategy: {self.strategy.__class__.__name__}")

    def _debug(self, msg: str) -> None:
        if self.debug:
            logger.debug(msg)

    def is_user_auth(self, org: str) -> bool:
        # Use a temporary config for the requested org to check auth
        temp_config = VpnConfig(service=org)
        return temp_config.get("user", False)

    def enabled(self) -> bool:
        return self.strategy.is_enabled()

    def warmup_sudo(self) -> bool:
        """Warm up sudo to cache the system password before starting progress UI."""
        if os_is_windows():
            return True

        from cloudmesh.ai.common.sudo import Sudo
        return Sudo.password() == 0

    def connect(self, creds: Optional[Dict[str, Any]] = None, progress_callback: Optional[callable] = None) -> Union[bool, str, None]:
        if creds is None:
            creds = {}

        no_split = creds.get("nosplit", True)
        vpn_name = creds.get("service", self.service_key)

        # Capture state before action
        before_org = self.strategy.get_current_org()

        result = self.strategy.connect(creds, vpn_name, no_split, progress_callback=progress_callback)

        if result:
            # Capture state after action
            after_org = self.strategy.get_current_org()
            if before_org and after_org and before_org != after_org:
                console.ok(f"Switched from {before_org} to {after_org}")
            elif after_org:
                console.ok(f"Connected to {after_org}")
            else:
                console.warning(
                    "Connection command succeeded, but could not verify organization via IP."
                )

        return result

    def disconnect(self) -> None:
        # Capture state before action
        before_org = self.strategy.get_current_org()

        if not self.enabled():
            console.ok("VPN is already deactivated")
            return

        self.strategy.disconnect()

        if self.enabled():
            console.error("VPN is still enabled. Disconnection may have failed.")
        else:
            if before_org:
                console.ok(f"Disconnected from {before_org}")
            else:
                console.ok("Successfully disconnected from VPN.")

    def get_reset_commands(self, service: Optional[str] = None) -> List[str]:
        return self.strategy.get_reset_commands(service)

    def reset_routes(self, service: Optional[str] = None) -> bool:
        return self.strategy.reset_routes(service)


    def info(self) -> str:
        """Display current IP information in a rich table using multiple fallback providers."""
        if os.environ.get("VPN_MOCK") == "1":
            logger.debug("[VPN Info] Location: UVA Campus")
            logger.debug("[VPN Info] IP: 128.118.x.x")
            return '{"location": "UVA Campus", "ip": "128.118.x.x"}'

        providers = [
            {"url": "https://ipinfo.io/json", "type": "json"},
            {"url": "https://ifconfig.me/all.json", "type": "json"},
            {"url": "https://api.ipify.org?format=json", "type": "json"},
            {"url": "https://icanhazip.com", "type": "text"},
        ]

        data = {}
        for provider in providers:
            try:
                res = requests.get(provider["url"], timeout=5)
                if res.status_code == 429:
                    console.warning(
                        f"Provider {provider['url']} rate limited (429). Trying next..."
                    )
                    continue
                res.raise_for_status()

                if provider["type"] == "json":
                    data = res.json()
                else:
                    data = {"ip": res.text.strip()}

                # If we got a valid IP, we can stop
                if data.get("ip") or data.get("query"):
                    break
            except Exception as e:
                console.error(
                    f"Provider {provider['url']} failed: {type(e).__name__}: {e}"
                )
                continue

        if not data:
            console.error(
                "All IP information providers failed to return a valid IP address."
            )
            return ""

        try:
            table = Table(
                title="IP Information",
                box=ROUNDED,
                show_header=True,
                header_style="bold magenta",
            )
            table.add_column("Field", style="cyan", width=15)
            table.add_column("Value", style="cyan")

            for key, value in data.items():
                table.add_row(key, str(value))

            console.print(table)
            return json.dumps(data, indent=2)
        except Exception as e:
            console.error(f"Failed to render IP info table: {e}")
            return ""

    def pw_fetcher(self, org: str):
        if os.environ.get("VPN_MOCK") == "1":
            return "mock-user", "mock-password"

        # Use a temporary config for the requested org to check auth
        temp_config = VpnConfig(service=org)
        if not temp_config.config:
            console.error(f"Unknown service {org}")
            return False

        if temp_config.get("auth") == "pw":
            # 1. Determine username: Profile override -> Keyring -> Prompt
            username = self.config.get("user")
            if not isinstance(username, str) or not username:
                username = kr.get_password(org, "cloudmesh-user")

            if isinstance(username, str) and username.upper() == "TBD":
                console.error(
                    f"Username for {org} is set to 'TBD'. Please update your profile or keyring."
                )
                os._exit(1)

            stored_pw = kr.get_password(org, "cloudmesh-pw")

            if stored_pw is None:
                import getpass

                if not username:
                    username = input(f"Enter your {org} username: ")

                console.msg(f"Using username: {username}")
                while True:
                    password = getpass.getpass(f"Enter your {org} password: ")
                    confirm_password = getpass.getpass("Confirm your password: ")
                    if password == confirm_password:
                        break
                    console.error("Passwords do not match. Please try again.")

                kr.set_password(org, "cloudmesh-pw", password)
                kr.set_password(org, "cloudmesh-user", username)
                stored_pw = password

            return username, stored_pw
        return False

    def pw_clearer(self, org: str):
        if os.environ.get("VPN_MOCK") == "1":
            console.ok(f"Credentials for {org} have been cleared (Mock).")
            return True

        # Use a temporary config for the requested org to verify it exists
        temp_config = VpnConfig(service=org)
        if not temp_config.config:
            console.error(f"Unknown service {org}")
            return False
        kr.delete_password(org, "cloudmesh-pw")
        kr.delete_password(org, "cloudmesh-user")
        console.ok(f"Credentials for {org} have been cleared.")

    def watch(self) -> List[str]:
        """Check for evidence that the VPN is active and using split-routing."""
        return self.strategy.watch()

    def validate_keys(self, cert_path: str, key_path: str, ca_path: Optional[str]) -> Dict[str, Any]:
        """Verify VPN certificates and keys using the KeyManager."""
        return KeyManager.validate_keys(cert_path, key_path, ca_path)

    def init_keys(self, p12_path: str, output_dir: str) -> bool:
        """Initialize VPN keys from a .p12 bundle using the KeyManager."""
        return KeyManager.init_keys(p12_path, output_dir)

Functions

info
info()

Display current IP information in a rich table using multiple fallback providers.

Source code in cloudmesh/ai/vpn/vpn.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def info(self) -> str:
    """Display current IP information in a rich table using multiple fallback providers."""
    if os.environ.get("VPN_MOCK") == "1":
        logger.debug("[VPN Info] Location: UVA Campus")
        logger.debug("[VPN Info] IP: 128.118.x.x")
        return '{"location": "UVA Campus", "ip": "128.118.x.x"}'

    providers = [
        {"url": "https://ipinfo.io/json", "type": "json"},
        {"url": "https://ifconfig.me/all.json", "type": "json"},
        {"url": "https://api.ipify.org?format=json", "type": "json"},
        {"url": "https://icanhazip.com", "type": "text"},
    ]

    data = {}
    for provider in providers:
        try:
            res = requests.get(provider["url"], timeout=5)
            if res.status_code == 429:
                console.warning(
                    f"Provider {provider['url']} rate limited (429). Trying next..."
                )
                continue
            res.raise_for_status()

            if provider["type"] == "json":
                data = res.json()
            else:
                data = {"ip": res.text.strip()}

            # If we got a valid IP, we can stop
            if data.get("ip") or data.get("query"):
                break
        except Exception as e:
            console.error(
                f"Provider {provider['url']} failed: {type(e).__name__}: {e}"
            )
            continue

    if not data:
        console.error(
            "All IP information providers failed to return a valid IP address."
        )
        return ""

    try:
        table = Table(
            title="IP Information",
            box=ROUNDED,
            show_header=True,
            header_style="bold magenta",
        )
        table.add_column("Field", style="cyan", width=15)
        table.add_column("Value", style="cyan")

        for key, value in data.items():
            table.add_row(key, str(value))

        console.print(table)
        return json.dumps(data, indent=2)
    except Exception as e:
        console.error(f"Failed to render IP info table: {e}")
        return ""
init_keys
init_keys(p12_path, output_dir)

Initialize VPN keys from a .p12 bundle using the KeyManager.

Source code in cloudmesh/ai/vpn/vpn.py
263
264
265
def init_keys(self, p12_path: str, output_dir: str) -> bool:
    """Initialize VPN keys from a .p12 bundle using the KeyManager."""
    return KeyManager.init_keys(p12_path, output_dir)
validate_keys
validate_keys(cert_path, key_path, ca_path)

Verify VPN certificates and keys using the KeyManager.

Source code in cloudmesh/ai/vpn/vpn.py
259
260
261
def validate_keys(self, cert_path: str, key_path: str, ca_path: Optional[str]) -> Dict[str, Any]:
    """Verify VPN certificates and keys using the KeyManager."""
    return KeyManager.validate_keys(cert_path, key_path, ca_path)
warmup_sudo
warmup_sudo()

Warm up sudo to cache the system password before starting progress UI.

Source code in cloudmesh/ai/vpn/vpn.py
73
74
75
76
77
78
79
def warmup_sudo(self) -> bool:
    """Warm up sudo to cache the system password before starting progress UI."""
    if os_is_windows():
        return True

    from cloudmesh.ai.common.sudo import Sudo
    return Sudo.password() == 0
watch
watch()

Check for evidence that the VPN is active and using split-routing.

Source code in cloudmesh/ai/vpn/vpn.py
255
256
257
def watch(self) -> List[str]:
    """Check for evidence that the VPN is active and using split-routing."""
    return self.strategy.watch()

Functions

check_cmd

check_cmd()

Perform a health check of the HPC environment (VPN, SSH, Quota).

Source code in src/cloudmesh/ai/command/hpc.py
1232
1233
1234
1235
1236
@hpc_group.command(name="check")
def check_cmd() -> None:
    """Perform a health check of the HPC environment (VPN, SSH, Quota)."""
    hpc = Hpc()
    hpc.check()

config_cmd

config_cmd()

Prints the hardware and queue configuration for HPC.

Source code in src/cloudmesh/ai/command/hpc.py
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
@hpc_group.command(name="config")
def config_cmd() -> None:
    """Prints the hardware and queue configuration for HPC."""

    hpc = Hpc()
    config_path = hpc.get_config_path()

    if not os.path.exists(config_path):
        console.error(f"Configuration file not found: {config_path}")
        return

    with open(config_path, "r", encoding="utf-8") as f:
        lines = [line.strip() for line in f if line.strip()]

    sections = {}
    current_section = None
    for line in lines:
        if line.startswith("[") and line.endswith("]"):
            current_section = line[1:-1]
            sections[current_section] = []
        elif current_section:
            sections[current_section].append(line)

    def print_table(title, header, data, link):
        # Use the simplified console.table for the main display
        console.table(header, data, title=title)
        console.print(f"[link={link}]Source: {link}[/link]\n")

    if "Hardware" in sections:
        hw_lines = sections["Hardware"]
        if hw_lines:
            header = []
            for h in hw_lines[0].split(","):
                # Split by slash
                h = re.sub(r"\s*/\s*", "\n/", h)
                # Specifically split "Specialty Hardware"
                if h == "Specialty Hardware":
                    h = "Specialty\nHardware"
                header.append(h)
            data = [line.split(",") for line in hw_lines[1:]]
            console.banner(
                "HPC Hardware Configuration",
                "Detailed hardware specifications for the HPC cluster.",
            )
            print_table(
                "Hardware Configuration",
                header,
                data,
                "https://www.rc.virginia.edu/userinfo/hpc/#hardware-configuration",
            )

    if "Queues" in sections:
        q_lines = sections["Queues"]
        if q_lines:
            header = [re.sub(r"\s*/\s*", "\n/", h) for h in q_lines[0].split(",")]
            data = [line.split(",") for line in q_lines[1:]]
            console.banner(
                "HPC Queue Configuration",
                "Available Slurm queues and their constraints.",
            )
            print_table(
                "Queues",
                header,
                data,
                "https://www.rc.virginia.edu/userinfo/hpc/#job-queues",
            )

edit_cmd

edit_cmd(filename, editor, debug)

Edits a file on HPC.

Source code in src/cloudmesh/ai/command/hpc.py
1145
1146
1147
1148
1149
1150
1151
1152
@hpc_group.command(name="edit")
@click.argument("filename")
@click.option("--editor", default="emacs", help="Editor to use")
@click.option("--debug", is_flag=True, help="Enable debug mode")
def edit_cmd(filename: str, editor: str, debug: bool) -> None:
    """Edits a file on HPC."""
    hpc = Hpc(debug=debug)
    hpc.edit(filename, editor)

hpc_group

hpc_group()

HPC tool for Cloudmesh AI. This command simplifies access to HPC resources.

Source code in src/cloudmesh/ai/command/hpc.py
17
18
19
20
21
22
23
@click.group()
def hpc_group():
    """
    HPC tool for Cloudmesh AI.
    This command simplifies access to HPC resources.
    """
    pass

image_build

image_build(deffile)

Builds an image from a definition file.

Source code in src/cloudmesh/ai/command/hpc.py
651
652
653
654
655
656
@image_group.command(name="build")
@click.argument("deffile")
def image_build(deffile: str) -> None:
    """Builds an image from a definition file."""
    hpc = Hpc()
    hpc.create_apptainer_image(deffile)

image_group

image_group()

Image related commands.

Source code in src/cloudmesh/ai/command/hpc.py
645
646
647
648
@hpc_group.group(name="image")
def image_group():
    """Image related commands."""
    pass

info_cmd

info_cmd(key)

Prints information about the current HPC configuration or a specific partition.

Source code in src/cloudmesh/ai/command/hpc.py
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
@hpc_group.command(name="info")
@click.argument("key", required=False)
def info_cmd(key: Optional[str]) -> None:
    """Prints information about the current HPC configuration or a specific partition."""
    hpc = Hpc()

    # 1. If no key provided, show default host info
    if not key:
        host = hpc.host
        default_partition = hpc.get_default_partition(host)
        available_hosts = list(hpc.directive.keys())
        console.print("\n[bold]HPC Configuration Info[/bold]")
        console.print(f"Current Host      : {host}")
        console.print(f"Default Partition : {default_partition or 'Not set'}")
        console.print(f"Available Hosts   : {', '.join(available_hosts)}")
        console.print("")

        header, data = hpc.get_partition_data(host)
        if header and data:
            console.table(header, data, title="Partitions")
        return

    # 2. Check if the key is a top-level host
    if key in hpc.directive:
        host = key
        default_partition = hpc.get_default_partition(host)
        available_hosts = list(hpc.directive.keys())
        console.print("\n[bold]HPC Host Info[/bold]")
        console.print(f"Host              : {host}")
        console.print(f"Default Partition : {default_partition or 'Not set'}")
        console.print(f"Available Hosts   : {', '.join(available_hosts)}")

        header, data = hpc.get_partition_data(host)
        if header and data:
            console.table(header, data, title="Partitions")
        return

    # 3. Check if the key is a partition key for any host
    for host, partitions in hpc.directive.items():
        if key in partitions:
            directives = partitions[key]
            console.print("\n[bold]HPC Partition Info[/bold]")
            console.print(f"Host       : {host}")
            console.print(f"Partition  : {key}")
            for k, v in directives.items():
                console.print(f"{k.ljust(12)} : {v}")
            console.print()
            return

    # 4. Not found
    # Try to suggest a match from hosts or any partition key
    all_hosts = list(hpc.directive.keys())
    all_partitions = []
    for p in hpc.directive.values():
        all_partitions.extend(list(p.keys()))

    suggestion = hpc._suggest_match(key, all_hosts + all_partitions)

    msg = f"Key '{key}' not found as a host or partition key."
    if suggestion:
        msg += f"\nDid you mean '{suggestion}'?"

    console.error(msg)
    # Fallback to default info
    host = hpc.host
    console.print(f"\nDefault Host: {host}")

jupyter_cmd

jupyter_cmd(port)

Starts a Jupyter notebook.

Source code in src/cloudmesh/ai/command/hpc.py
1137
1138
1139
1140
1141
1142
@hpc_group.command(name="jupyter")
@click.option("--port", default=8000, help="Port for Jupyter")
def jupyter_cmd(port: int) -> None:
    """Starts a Jupyter notebook."""
    hpc = Hpc()
    hpc.jupyter(port)

login_cmd

login_cmd(sbatch, host, key, debug, ui)

Logs into an interactive node on HPC.

Source code in src/cloudmesh/ai/command/hpc.py
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
@hpc_group.command(name="login")
@click.option("--sbatch", help="Sbatch parameter")
@click.option("--host", default="hpc", help="Host to use")
@click.argument("key", required=False)
@click.option("--debug", is_flag=True, help="Enable debug mode")
@click.option("--ui", is_flag=True, help="Use interactive UI to select partition")
def login_cmd(
    sbatch: Optional[str], host: str, key: Optional[str], debug: bool, ui: bool
) -> None:
    """Logs into an interactive node on HPC."""
    # Resolve host: if it's the default "hpc", use the Hpc class default (e.g., "uva")
    actual_host = host
    if host == "hpc":
        hpc_temp = Hpc()
        actual_host = hpc_temp.host

    hpc = Hpc(host=actual_host, debug=debug)

    if ui:
        try:
            # Fetch static table data (fast)
            header, choices = hpc.get_partition_static_data(actual_host)
            if not header or not choices:
                console.error("Could not fetch partition data.")
                return

            # Use Textual app for partition selection
            app = PartitionSelectorApp(actual_host, hpc, header, choices)
            result = app.run()

            if not result:
                console.warning("No partition selected. Exiting.")
                return

            selected_key, modified_params = result

            # Construct the command that will be executed
            cmd_parts = ["cmc hpc login"]
            if host != "hpc":
                cmd_parts.append(f"--host {host}")
            if sbatch:
                cmd_parts.append(f'--sbatch "{sbatch}"')
            cmd_parts.append(selected_key)
            full_cmd = " ".join(cmd_parts)

            # Check resource availability before proceeding
            console.print("\n[bold]Checking resource availability...[/bold]")
            availability = hpc.check_resource_availability(selected_key)
            if "error" in availability:
                console.error(f"Could not check resources: {availability['error']}")
            else:
                idle_count = availability["idle_nodes"]
                total_count = availability["total_nodes"]
                console.print(f"Partition: {availability['partition']}")
                console.print(f"Nodes: {idle_count}/{total_count} idle")

                if idle_count > 0:
                    console.print("[green]✓ Available resources found.[/green]")
                    # Show the first few idle nodes and their GPUs
                    for node in availability["idle_details"][:3]:
                        console.print(
                            f"  - {node['node']}: {node['gres']} ({node['state']})"
                        )
                    if len(availability["idle_details"]) > 3:
                        console.print(
                            f"  ... and {len(availability['idle_details']) - 3} more."
                        )
                else:
                    console.print(
                        "[yellow]âš  No completely idle nodes found. Your job may be queued.[/yellow]"
                    )
                console.print()

            # Get the actual ijob command that will be run
            sbatch_params = hpc.parse_sbatch_parameter(sbatch) if sbatch else None
            if sbatch_params is None:
                sbatch_params = {}

            # Merge modified params from UI
            if modified_params:
                # The modified_params are stored as {key: "gres=val"}
                # We only care about the one for the selected_key
                if selected_key in modified_params:
                    gres_val = modified_params[selected_key]
                    k, v = gres_val.split("=")
                    sbatch_params[k] = v

            ijob_cmd = hpc.get_login_command(actual_host, selected_key, sbatch_params)

            # Present the command in a banner
            banner_content = f"# {full_cmd}\n{ijob_cmd}"
            console.banner("Interactive Job", banner_content)

            # Confirmation Step using questionary
            confirmed = questionary.confirm(
                f"Do you want to start the login process?", default=True
            ).ask()

            if not confirmed:
                console.msg("Login cancelled by user.")
                return

            key = selected_key

        except KeyboardInterrupt:
            console.msg("\nLogin cancelled by user (Ctrl+C).")
            return

    try:
        sbatch_params = hpc.parse_sbatch_parameter(sbatch) if sbatch else None
    except ValueError as e:
        console.error(str(e))
        return
    hpc.login(actual_host, key, sbatch_params=sbatch_params)

register

register(cli)
Source code in src/cloudmesh/ai/command/hpc.py
1306
1307
def register(cli: Any) -> None:
    cli.add_command(hpc_group, name="hpc")

set_default_cmd

set_default_cmd(host, partition)

Set the default host and partition for future commands.

Source code in src/cloudmesh/ai/command/hpc.py
1155
1156
1157
1158
1159
1160
1161
@hpc_group.command(name="set-default")
@click.option("--host", required=True, help="Default host to use")
@click.option("--partition", help="Default partition key to use")
def set_default_cmd(host: str, partition: Optional[str]) -> None:
    """Set the default host and partition for future commands."""
    hpc = Hpc()
    hpc.set_default(host, partition)

slurm_cancel

slurm_cancel(job_id, debug)

Cancels a Slurm job.

Source code in src/cloudmesh/ai/command/hpc.py
577
578
579
580
581
582
583
@slurm_group.command(name="cancel")
@click.argument("job_id")
@click.option("--debug", is_flag=True, help="Enable debug mode")
def slurm_cancel(job_id: str, debug: bool) -> None:
    """Cancels a Slurm job."""
    hpc = Hpc(debug=debug)
    hpc.cancel(job_id)

slurm_gpu_usage

slurm_gpu_usage(target, cluster, debug)

Find out how many GPUs on a particular host, reservation, or the whole cluster are used.

Source code in src/cloudmesh/ai/command/hpc.py
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
@slurm_group.command(name="gpu-usage")
@click.argument("target", required=False)
@click.option("--cluster", is_flag=True, help="Show GPU usage for all nodes in the cluster (Heat Map)")
@click.option("--debug", is_flag=True, help="Enable debug mode")
def slurm_gpu_usage(target: Optional[str], cluster: bool, debug: bool) -> None:
    """Find out how many GPUs on a particular host, reservation, or the whole cluster are used."""
    hpc = Hpc(debug=debug)

    if cluster:
        result = hpc.get_cluster_gpu_usage()
        if not result:
            console.error("Could not fetch cluster GPU usage.")
            return

        # Use a simpler printing method to avoid "Can't open" issues in some environments
        console.print("\n[bold]Cluster GPU Heat Map (Sorted by Availability)[/bold]")
        header = ["Node", "Partition", "Total", "Used", "Available", "State"]

        # Print header
        header_str = " | ".join([h.ljust(12) for h in header])
        console.print(header_str)
        console.print("-" * len(header_str))

        # Print rows
        for r in result:
            row = [r["node"], r["partition"], str(r["total"]), str(r["used"]), str(r["available"]), r["state"]]
            row_str = " | ".join([val.ljust(12) for val in row])
            console.print(row_str)
        console.print()
        return

    if not target:
        console.error("Please provide a target node/reservation or use --cluster.")
        return

    # Try as a node first
    result = hpc.get_node_gpu_usage(target)

    # If node lookup fails, try as a reservation
    if "error" in result:
        result = hpc.get_reservation_gpu_usage(target)

    if "error" in result:
        console.error(
            f"Could not find GPU usage for node or reservation '{target}': {result['error']}"
        )
        return

    label = (
        result.get("reservation")
        if "reservation" in result
        else result.get("node", target)
    )
    type_label = "Reservation" if "reservation" in result else "Node"

    console.print(f"\n[bold]{type_label} GPU Usage for {label}[/bold]")
    console.print(f"Total     : {result['total']}")
    console.print(f"Used      : {result['used']}")
    console.print(f"Available : {result['available']}")
    console.print()

slurm_group

slurm_group()

Slurm related commands.

Source code in src/cloudmesh/ai/command/hpc.py
86
87
88
89
@hpc_group.group(name="slurm")
def slurm_group():
    """Slurm related commands."""
    pass

slurm_info

slurm_info(key)

Prints Slurm directive information for a partition key.

Source code in src/cloudmesh/ai/command/hpc.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
@slurm_group.command(name="info")
@click.argument("key", required=False)
def slurm_info(key: Optional[str]) -> None:
    """Prints Slurm directive information for a partition key."""
    hpc = Hpc()
    if not key:
        console.error("Please provide a partition key.")
        return
    directives = hpc.create_slurm_directives(hpc.host, key)
    console.print(directives)

slurm_job_info

slurm_job_info(job_id)

Get detailed information about a Slurm job.

Source code in src/cloudmesh/ai/command/hpc.py
104
105
106
107
108
109
110
@slurm_group.command(name="job-info")
@click.argument("job_id")
def slurm_job_info(job_id: str) -> None:
    """Get detailed information about a Slurm job."""
    hpc = Hpc()
    result = hpc.job_info(job_id)
    console.print(result)

slurm_list

slurm_list(debug)

Lists all active Slurm jobs for the current user.

Source code in src/cloudmesh/ai/command/hpc.py
596
597
598
599
600
601
602
@slurm_group.command(name="list")
@click.option("--debug", is_flag=True, help="Enable debug mode")
def slurm_list(debug: bool) -> None:
    """Lists all active Slurm jobs for the current user."""
    hpc = Hpc(debug=debug)
    result = hpc.list_jobs()
    console.print(result)

slurm_logs

slurm_logs(job_id, tail, follow)

Read the Slurm output logs for a job.

Source code in src/cloudmesh/ai/command/hpc.py
128
129
130
131
132
133
134
135
136
137
@slurm_group.command(name="logs")
@click.argument("job_id")
@click.option("--tail", is_flag=True, help="Show the last few lines of the log files")
@click.option("--follow", is_flag=True, help="Stream the log files in real-time")
def slurm_logs(job_id: str, tail: bool, follow: bool) -> None:
    """Read the Slurm output logs for a job."""
    hpc = Hpc()
    result = hpc.logs(job_id, tail=tail, follow=follow)
    if result:
        console.print(result)

slurm_monitor

slurm_monitor(ctx, debug)

Interactively monitor Slurm jobs.

Source code in src/cloudmesh/ai/command/hpc.py
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
@slurm_group.command(name="monitor")
@click.option("--debug", is_flag=True, help="Enable debug mode")
@click.pass_context
def slurm_monitor(ctx, debug: bool) -> None:
    """Interactively monitor Slurm jobs."""
    hpc = Hpc(debug=debug)
    app = JobMonitorApp(hpc)
    result = app.run()

    if not result:
        return

    if result.startswith("CANCEL:"):
        job_id = result.split(":", 1)[1]
        if questionary.confirm(f"Are you sure you want to cancel job {job_id}?").ask():
            hpc.cancel(job_id)
            console.msg(f"Job {job_id} cancelled.")
    else:
        # Result is a job_id, view logs
        ctx.invoke(slurm_logs, job_id=result, tail=False, follow=True)

slurm_nodes

slurm_nodes(partition)

Check node status for a partition.

Source code in src/cloudmesh/ai/command/hpc.py
470
471
472
473
474
475
476
@slurm_group.command(name="nodes")
@click.option("--partition", help="Partition to check")
def slurm_nodes(partition: Optional[str]) -> None:
    """Check node status for a partition."""
    hpc = Hpc()
    result = hpc.nodes(partition=partition)
    console.print(result)

slurm_quota

slurm_quota(billing)

Check disk quota or billing usage on the HPC.

Source code in src/cloudmesh/ai/command/hpc.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
@slurm_group.command(name="quota")
@click.option("--billing", is_flag=True, help="Show core-hour and billing usage")
def slurm_quota(billing: bool) -> None:
    """Check disk quota or billing usage on the HPC."""
    hpc = Hpc()
    if billing:
        result = hpc.get_billing_usage()
        if not result:
            console.error("No billing usage data found.")
            return

        header = list(result[0].keys())
        data = [[row.get(k, "N/A") for k in header] for row in result]
        console.table(header, data, title="Slurm Billing/Usage Report")
    else:
        result = hpc.quota()
        console.print(result)

slurm_run

slurm_run(sbatch, host, key, debug)

Runs a Slurm command.

Source code in src/cloudmesh/ai/command/hpc.py
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
@slurm_group.command(name="run")
@click.option("--sbatch", help="Sbatch parameter")
@click.option("--host", default="hpc", help="Host to use")
@click.argument("key", required=False)
@click.option("--debug", is_flag=True, help="Enable debug mode")
def slurm_run(
    sbatch: Optional[str], host: str, key: Optional[str], debug: bool
) -> None:
    """Runs a Slurm command."""
    hpc = Hpc(host=host, debug=debug)
    try:
        sbatch_params = hpc.parse_sbatch_parameter(sbatch) if sbatch else None
    except ValueError as e:
        console.error(str(e))
        return
    hpc.login(host, key, sbatch_params=sbatch_params)

slurm_search_jobs

slurm_search_jobs(node_regex, debug)

Find jobs running on nodes that match the given regex.

Source code in src/cloudmesh/ai/command/hpc.py
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
@slurm_group.command(name="search-jobs")
@click.argument("node_regex")
@click.option("--debug", is_flag=True, help="Enable debug mode")
def slurm_search_jobs(node_regex: str, debug: bool) -> None:
    """Find jobs running on nodes that match the given regex."""
    hpc = Hpc(debug=debug)
    result = hpc.search_jobs_by_node(node_regex)

    if not result:
        console.print(f"No jobs found running on nodes matching: {node_regex}")
        return

    header = ["User", "Job ID", "Node"]
    data = [[row["user"], row["job_id"], row["node"]] for row in result]

    console.table(header, data, title=f"Jobs on nodes matching: {node_regex}")

slurm_sinfo

slurm_sinfo(host, query, output, format, partition, search)

Get Slurm node information.

Source code in src/cloudmesh/ai/command/hpc.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
@hpc_group.command(name="sinfo")
@click.argument("host", required=False)
@click.option(
    "--query",
    default="all",
    help="Sinfo query format (all, summarize, long, Node, long_node, list_reasons, long_list_reasons)",
)
@click.option(
    "--output",
    default="attributes",
    type=click.Choice(["json", "yaml", "attributes", "table", "summary"]),
    help="Output format",
)
@click.option(
    "--format", help="Shortcut for output format (json, yaml, table) or query format"
)
@click.option("--partition", help="Partition to check")
@click.option("--search", help="Filter nodes by attribute value using regex")
def slurm_sinfo(
    host: Optional[str],
    query: str,
    output: str,
    format: Optional[str],
    partition: Optional[str],
    search: Optional[str],
) -> None:
    """Get Slurm node information."""
    # Resolve host: if not provided, Hpc() uses default (e.g., "uva")
    hpc = Hpc(host=host) if host else Hpc()

    # Handle --format shortcut
    if format:
        if format in ["json", "yaml", "table", "attributes"]:
            output = format
        else:
            query = format

    # Map "all" to "%all" for the library method
    fmt = "%all" if query == "all" else query

    result = hpc.sinfo(partition=partition, format=fmt)

    if not result:
        console.print("No information found.")
        return

    # Extract summary from the first node (since it's merged into all nodes)
    summary = result[0] if result else {}
    if "nodes_available" in summary:
        console.print("\n[bold]Partition Summary[/bold]")
        console.print(f"  NODES_AVAILABLE : {summary.get('nodes_available')}")
        console.print(f"  NODES_IDLE      : {summary.get('nodes_idle')}")
        console.print(f"  CPUS_AVAILABLE  : {summary.get('cpus_available')}")
        console.print(f"  CPUS_IDLE       : {summary.get('cpus_idle')}")
        console.print(f"  CPUS_O          : {summary.get('cpus_other')}")
        console.print(f"  CPUS_T          : {summary.get('cpus_total')}")
        console.print()

    if search:
        # Filter results: keep node if any attribute value matches the regex
        filtered_result = []
        try:
            regex = re.compile(search, re.IGNORECASE)
            for node in result:
                if any(regex.search(str(val)) for val in node.values()):
                    filtered_result.append(node)
            result = filtered_result
        except re.error as e:
            console.error(f"Invalid search regex: {e}")
            return

    if not result:
        console.print("No nodes matched the search criteria.")
        return

    if output == "json":
        console.print_json(result)
    elif output == "yaml":
        console.print_yaml(result)
    elif output == "summary":
        total_nodes = len(result)
        states = {}
        total_cpus = 0
        idle_cpus = 0
        total_gpus = 0
        idle_gpus = 0
        feature_gpus = {}

        # To accurately count idle GPUs on 'mixed' nodes, we need scontrol data
        node_gpu_usage = {}
        mix_nodes = [
            n.get("HOSTNAMES") or n.get("node")
            for n in result
            if "mix" in (n.get("STATE") or n.get("state", "")).lower()
        ]

        if mix_nodes:
            nodes_list = ",".join(mix_nodes)
            sctrl_output = hpc.run_command(f"scontrol show node {nodes_list}")
            if sctrl_output:
                node_blocks = sctrl_output.split("NodeName=")[1:]
                for block in node_blocks:
                    # NodeName is usually the first word after "NodeName="
                    node_name = block.split()[0]
                    # More robust regex to handle both = and : and potential (S:...) suffixes
                    alloc_match = re.search(
                        r"AllocTRES=[^=]*gres/gpu[:=]([^,\s\)]+)", block
                    )
                    if alloc_match:
                        val = alloc_match.group(1)
                        # Extract the number from the end of the value (e.g., "1" from "1" or "1" from "1(S:0)")
                        num_match = re.search(r"(\d+)", val)
                        node_gpu_usage[node_name] = (
                            int(num_match.group(1)) if num_match else 0
                        )
                    else:
                        # If AllocTRES is missing or doesn't mention GPUs, assume 0 GPUs are allocated
                        node_gpu_usage[node_name] = 0

        for node in result:
            node_name = node.get("HOSTNAMES") or node.get("node")
            # State
            state = node.get("STATE") or node.get("state", "Unknown")
            states[state] = states.get(state, 0) + 1

            # CPUs
            total_cpus += int(node.get("CPUS_TOTAL") or node.get("cpus_total", 0))
            idle_cpus += int(node.get("CPUS_IDLE") or node.get("cpus_idle", 0))

            # GPUs
            gres_gpu = node.get("GRES_GPU") or node.get("gres_gpu", "")
            if gres_gpu and ":" in gres_gpu:
                try:
                    gpu_count = int(gres_gpu.split(":")[-1])
                    total_gpus += gpu_count

                    # Calculate actual idle GPUs
                    if state.lower() == "idle":
                        current_idle_gpus = gpu_count
                    elif state.lower() == "mix":
                        used = node_gpu_usage.get(node_name, 0)
                        current_idle_gpus = max(0, gpu_count - used)
                    else:
                        current_idle_gpus = 0

                    idle_gpus += current_idle_gpus

                    # Feature-based GPU counts
                    features_str = node.get("ACTIVE_FEATURES") or node.get(
                        "active_features", ""
                    )
                    if features_str:
                        parts = [p.strip() for p in features_str.split(",")]
                        feature = parts[1] if len(parts) >= 2 else parts[0]

                        if feature:
                            if feature not in feature_gpus:
                                feature_gpus[feature] = {"total": 0, "idle": 0}
                            feature_gpus[feature]["total"] += gpu_count
                            feature_gpus[feature]["idle"] += current_idle_gpus
                except ValueError:
                    pass

        # Prepare state data for table
        state_data = [
            [s, c] for s, c in sorted(states.items(), key=lambda x: x[1], reverse=True)
        ]

        console.print("\n[bold]Cluster Summary[/bold]")
        console.print(f"  Total Nodes    : {total_nodes}")
        console.print(f"  Total CPUs     : {total_cpus}")
        console.print(f"  Idle CPUs      : {idle_cpus}")
        console.print(f"  Total GPUs     : {total_gpus}")
        console.print(f"  Idle GPUs      : {idle_gpus}")
        console.print()

        console.table(["State", "Count"], state_data, title="Node State Distribution")
        console.print()

        if feature_gpus:
            feature_data = [
                [f, d["total"], d["idle"]] for f, d in sorted(feature_gpus.items())
            ]
            console.table(
                ["Feature", "Total GPUs", "Idle GPUs"],
                feature_data,
                title="GPU Distribution by Active Feature",
            )
            console.print()
    elif output == "table":
        if query == "all":
            # Summary table for the detailed query
            header = ["Partition", "Node", "GRES", "State"]
            data = []
            for r in result:
                partition_val = r.get("PARTITION") or r.get("partition", "N/A")
                node_val = r.get("HOSTNAMES") or r.get("node", "N/A")
                gres_val = r.get("GRES") or r.get("gres", "N/A")
                state_val = r.get("STATE") or r.get("state", "N/A")
                data.append([partition_val, node_val, gres_val, state_val])
            console.table(header, data, title="Slurm Node Info Summary")
        else:
            if result and "raw" in result[0]:
                for row in result:
                    console.print(row["raw"])
            else:
                keys = list(result[0].keys())
                header = keys
                data = [[r.get(k, "N/A") for k in keys] for r in result]
                console.table(header, data, title="Slurm Node Info")
    elif output == "attributes":
        if query == "all":
            # Detailed key-value list
            for i, node_data in enumerate(result, 1):
                node_name = node_data.get("HOSTNAMES") or node_data.get(
                    "node", "Unknown Node"
                )
                console.print(f"\n[bold]Node {i}: {node_name}[/bold]")
                for key, value in node_data.items():
                    # GRES_SOCKET is already provided as a separate attribute by Slurm.sinfo
                    # We just print the attributes as they are.
                    console.print(f"  {key.ljust(20)} : {value}")
        else:
            for row in result:
                if "raw" in row:
                    console.print(row["raw"])
                else:
                    for k, v in row.items():
                        console.print(f"  {k.ljust(20)} : {v}")

slurm_squeue

slurm_squeue(host, search, output, debug)

Get Slurm queue information.

Source code in src/cloudmesh/ai/command/hpc.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
@hpc_group.command(name="squeue")
@click.argument("host", required=False)
@click.option("--search", help="Filter jobs by node name using regex")
@click.option(
    "--output",
    default="table",
    type=click.Choice(["table", "attributes"]),
    help="Output format",
)
@click.option("--debug", is_flag=True, help="Enable debug mode")
def slurm_squeue(
    host: Optional[str], search: Optional[str], output: str, debug: bool
) -> None:
    """Get Slurm queue information."""
    hpc = Hpc(host=host, debug=debug) if host else Hpc(debug=debug)

    if search:
        result = hpc.search_jobs_by_node(search)
        if not result:
            console.print(f"No jobs found running on nodes matching: {search}")
            return

        if output == "attributes":
            for i, job in enumerate(result, 1):
                console.print(f"\n[bold]Job {i}: {job['JobID']}[/bold]")
                for key, value in job.items():
                    console.print(f"  {key.ljust(15)} : {value}")
        else:
            header = list(result[0].keys())
            data = [[row.get(k, "N/A") for k in header] for row in result]
            console.table(header, data, title=f"Jobs on nodes matching: {search}")
    else:
        result = hpc.list_jobs()
        if not result:
            console.print("No active jobs found.")
            return
        console.print(result)

slurm_sreport

slurm_sreport(username, start, end, stat, debug)

Get Slurm usage reports for users, accounts, partitions, and nodes.

Source code in src/cloudmesh/ai/command/hpc.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
@hpc_group.command(name="sreport")
@click.argument("username", required=False)
@click.option("--start", help="Start date (YYYY-MM-DD)")
@click.option("--end", help="End date (YYYY-MM-DD)")
@click.option("--stat", is_flag=True, help="Enable statistics mode")
@click.option("--debug", is_flag=True, help="Enable debug mode")
def slurm_sreport(
    username: Optional[str],
    start: Optional[str],
    end: Optional[str],
    stat: bool,
    debug: bool,
) -> None:
    """Get Slurm usage reports for users, accounts, partitions, and nodes."""
    hpc = Hpc(debug=debug)

    entities = ["user", "account", "partition", "node"]

    for entity in entities:
        # Use username as filter for 'user' and 'account' entities
        filter_val = username if entity in ["user", "account"] else None

        result = hpc.sreport(
            entity=entity, filter_val=filter_val, start=start, end=end, stat=stat
        )

        if not result:
            console.print(f"No {entity} usage report found.")
            continue

        # Use the keys of the first dictionary as the table header
        header = list(result[0].keys())
        data = [[row.get(k, "N/A") for k in header] for row in result]

        title = f"Slurm {entity.capitalize()} Usage Report"
        if filter_val:
            title += f" ({filter_val})"

        console.table(header, data, title=title)

slurm_status

slurm_status(job_id, debug)

Gets the status of a specific Slurm job.

Source code in src/cloudmesh/ai/command/hpc.py
586
587
588
589
590
591
592
593
@slurm_group.command(name="status")
@click.argument("job_id")
@click.option("--debug", is_flag=True, help="Enable debug mode")
def slurm_status(job_id: str, debug: bool) -> None:
    """Gets the status of a specific Slurm job."""
    hpc = Hpc(debug=debug)
    result = hpc.get_job_status(job_id)
    console.print(result)

slurm_submit

slurm_submit(script, key, sbatch)

Upload a script and submit it as a Slurm job.

Source code in src/cloudmesh/ai/command/hpc.py
113
114
115
116
117
118
119
120
121
122
123
124
125
@slurm_group.command(name="submit")
@click.argument("script")
@click.option("--key", help="Partition key")
@click.option("--sbatch", help="Additional sbatch parameters (key:val,key:val)")
def slurm_submit(script: str, key: Optional[str], sbatch: Optional[str]) -> None:
    """Upload a script and submit it as a Slurm job."""
    hpc = Hpc()
    try:
        sbatch_params = hpc.parse_sbatch_parameter(sbatch) if sbatch else None
        result = hpc.submit(script, key=key, sbatch_params=sbatch_params)
        console.print(result)
    except Exception as e:
        console.error(e)

slurm_template

slurm_template(key)

Generate a boilerplate .sbatch script.

Source code in src/cloudmesh/ai/command/hpc.py
550
551
552
553
554
555
556
@slurm_group.command(name="template")
@click.option("--key", help="Partition key for the template")
def slurm_template(key: Optional[str]) -> None:
    """Generate a boilerplate .sbatch script."""
    hpc = Hpc()
    result = hpc.template(key=key)
    console.print(result)

slurm_wait

slurm_wait(job_id, interval)

Wait for a Slurm job to complete.

Source code in src/cloudmesh/ai/command/hpc.py
541
542
543
544
545
546
547
@slurm_group.command(name="wait")
@click.argument("job_id")
@click.option("--interval", default=30, help="Polling interval in seconds")
def slurm_wait(job_id: str, interval: int) -> None:
    """Wait for a Slurm job to complete."""
    hpc = Hpc()
    hpc.wait(job_id, interval=interval)

storage_group

storage_group()

Storage related commands.

Source code in src/cloudmesh/ai/command/hpc.py
27
28
29
30
@hpc_group.group(name="storage")
def storage_group():
    """Storage related commands."""
    pass

storage_info

storage_info(directory, info, debug)

Obtains information about the storage associated with a directory on HPC.

Source code in src/cloudmesh/ai/command/hpc.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@storage_group.command(name="info")
@click.argument("directory")
@click.option("--info", is_flag=True, help="Detailed information")
@click.option("--debug", is_flag=True, help="Enable debug mode")
def storage_info(directory: str, info: bool, debug: bool) -> None:
    """Obtains information about the storage associated with a directory on HPC."""
    hpc = Hpc(debug=debug)

    console.print("\n[bold]HPC Configuration Info[/bold]")
    console.print(f"Current Host      : {hpc.host}")
    console.print("")

    result = hpc.storage(directory)
    console.table(["Directory", "Size"], [[directory, result]], title="Storage Info")

ticket_cmd

ticket_cmd()

Opens the support request form.

Source code in src/cloudmesh/ai/command/hpc.py
1129
1130
1131
1132
1133
1134
@hpc_group.command(name="ticket")
def ticket_cmd() -> None:
    """Opens the support request form."""
    url = "https://www.rc.virginia.edu/form/support-request/"
    console.msg(f"Opening support ticket form: {url}")
    webbrowser.open(url)

tutorial_cmd

tutorial_cmd(keyword)

Shows HPC tutorials based on keyword.

Source code in src/cloudmesh/ai/command/hpc.py
1119
1120
1121
1122
1123
1124
1125
1126
@hpc_group.command(name="tutorial")
@click.argument("keyword", required=False)
def tutorial_cmd(keyword: Optional[str]) -> None:
    """Shows HPC tutorials based on keyword."""
    hpc = Hpc()
    url = hpc.get_tutorial_url(keyword)
    console.msg(f"Opening tutorial for {keyword or 'general'}: {url}")
    webbrowser.open(url)

vpn_group

vpn_group()

VPN related commands.

Source code in src/cloudmesh/ai/command/hpc.py
50
51
52
53
@hpc_group.group(name="vpn")
def vpn_group():
    """VPN related commands."""
    pass

vpn_info

vpn_info()

Prints information about the current connection to the internet.

Source code in src/cloudmesh/ai/command/hpc.py
70
71
72
73
74
@vpn_group.command(name="info")
def vpn_info() -> None:
    """Prints information about the current connection to the internet."""
    vpn = Vpn(service="hpc")
    vpn.info()

vpn_off

vpn_off()

Switches the VPN off.

Source code in src/cloudmesh/ai/command/hpc.py
63
64
65
66
67
@vpn_group.command(name="off")
def vpn_off() -> None:
    """Switches the VPN off."""
    vpn = Vpn(service="hpc")
    vpn.disconnect()

vpn_on

vpn_on()

Switches the VPN on.

Source code in src/cloudmesh/ai/command/hpc.py
56
57
58
59
60
@vpn_group.command(name="on")
def vpn_on() -> None:
    """Switches the VPN on."""
    vpn = Vpn(service="hpc")
    vpn.connect()

vpn_status

vpn_status()

Prints True if VPN is enabled, False if not.

Source code in src/cloudmesh/ai/command/hpc.py
77
78
79
80
81
82
@vpn_group.command(name="status")
def vpn_status() -> None:
    """Prints True if VPN is enabled, False if not."""
    vpn = Vpn(service="hpc")
    enabled = vpn.enabled()
    console.print(f"VPN Enabled: {enabled}")