aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/profile/sample.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/profile/sample.py')
-rw-r--r--Lib/profile/sample.py730
1 files changed, 730 insertions, 0 deletions
diff --git a/Lib/profile/sample.py b/Lib/profile/sample.py
new file mode 100644
index 00000000000..97d23611e67
--- /dev/null
+++ b/Lib/profile/sample.py
@@ -0,0 +1,730 @@
+import argparse
+import _remote_debugging
+import os
+import pstats
+import statistics
+import sys
+import sysconfig
+import time
+from collections import deque
+from _colorize import ANSIColors
+
+from .pstats_collector import PstatsCollector
+from .stack_collector import CollapsedStackCollector
+
+FREE_THREADED_BUILD = sysconfig.get_config_var("Py_GIL_DISABLED") is not None
+
+class SampleProfiler:
+ def __init__(self, pid, sample_interval_usec, all_threads):
+ self.pid = pid
+ self.sample_interval_usec = sample_interval_usec
+ self.all_threads = all_threads
+ if FREE_THREADED_BUILD:
+ self.unwinder = _remote_debugging.RemoteUnwinder(
+ self.pid, all_threads=self.all_threads
+ )
+ else:
+ only_active_threads = bool(self.all_threads)
+ self.unwinder = _remote_debugging.RemoteUnwinder(
+ self.pid, only_active_thread=only_active_threads
+ )
+ # Track sample intervals and total sample count
+ self.sample_intervals = deque(maxlen=100)
+ self.total_samples = 0
+ self.realtime_stats = False
+
+ def sample(self, collector, duration_sec=10):
+ sample_interval_sec = self.sample_interval_usec / 1_000_000
+ running_time = 0
+ num_samples = 0
+ errors = 0
+ start_time = next_time = time.perf_counter()
+ last_sample_time = start_time
+ realtime_update_interval = 1.0 # Update every second
+ last_realtime_update = start_time
+
+ while running_time < duration_sec:
+ current_time = time.perf_counter()
+ if next_time < current_time:
+ try:
+ stack_frames = self.unwinder.get_stack_trace()
+ collector.collect(stack_frames)
+ except ProcessLookupError:
+ break
+ except (RuntimeError, UnicodeDecodeError, MemoryError, OSError):
+ errors += 1
+ except Exception as e:
+ if not self._is_process_running():
+ break
+ raise e from None
+
+ # Track actual sampling intervals for real-time stats
+ if num_samples > 0:
+ actual_interval = current_time - last_sample_time
+ self.sample_intervals.append(
+ 1.0 / actual_interval
+ ) # Convert to Hz
+ self.total_samples += 1
+
+ # Print real-time statistics if enabled
+ if (
+ self.realtime_stats
+ and (current_time - last_realtime_update)
+ >= realtime_update_interval
+ ):
+ self._print_realtime_stats()
+ last_realtime_update = current_time
+
+ last_sample_time = current_time
+ num_samples += 1
+ next_time += sample_interval_sec
+
+ running_time = time.perf_counter() - start_time
+
+ # Clear real-time stats line if it was being displayed
+ if self.realtime_stats and len(self.sample_intervals) > 0:
+ print() # Add newline after real-time stats
+
+ print(f"Captured {num_samples} samples in {running_time:.2f} seconds")
+ print(f"Sample rate: {num_samples / running_time:.2f} samples/sec")
+ print(f"Error rate: {(errors / num_samples) * 100:.2f}%")
+
+ expected_samples = int(duration_sec / sample_interval_sec)
+ if num_samples < expected_samples:
+ print(
+ f"Warning: missed {expected_samples - num_samples} samples "
+ f"from the expected total of {expected_samples} "
+ f"({(expected_samples - num_samples) / expected_samples * 100:.2f}%)"
+ )
+
+ def _is_process_running(self):
+ if sys.platform == "linux" or sys.platform == "darwin":
+ try:
+ os.kill(self.pid, 0)
+ return True
+ except ProcessLookupError:
+ return False
+ elif sys.platform == "win32":
+ try:
+ _remote_debugging.RemoteUnwinder(self.pid)
+ except Exception:
+ return False
+ return True
+ else:
+ raise ValueError(f"Unsupported platform: {sys.platform}")
+
+ def _print_realtime_stats(self):
+ """Print real-time sampling statistics."""
+ if len(self.sample_intervals) < 2:
+ return
+
+ # Calculate statistics on the Hz values (deque automatically maintains rolling window)
+ hz_values = list(self.sample_intervals)
+ mean_hz = statistics.mean(hz_values)
+ min_hz = min(hz_values)
+ max_hz = max(hz_values)
+
+ # Calculate microseconds per sample for all metrics (1/Hz * 1,000,000)
+ mean_us_per_sample = (1.0 / mean_hz) * 1_000_000 if mean_hz > 0 else 0
+ min_us_per_sample = (
+ (1.0 / max_hz) * 1_000_000 if max_hz > 0 else 0
+ ) # Min time = Max Hz
+ max_us_per_sample = (
+ (1.0 / min_hz) * 1_000_000 if min_hz > 0 else 0
+ ) # Max time = Min Hz
+
+ # Clear line and print stats
+ print(
+ f"\r\033[K{ANSIColors.BOLD_BLUE}Real-time sampling stats:{ANSIColors.RESET} "
+ f"{ANSIColors.YELLOW}Mean: {mean_hz:.1f}Hz ({mean_us_per_sample:.2f}µs){ANSIColors.RESET} "
+ f"{ANSIColors.GREEN}Min: {min_hz:.1f}Hz ({max_us_per_sample:.2f}µs){ANSIColors.RESET} "
+ f"{ANSIColors.RED}Max: {max_hz:.1f}Hz ({min_us_per_sample:.2f}µs){ANSIColors.RESET} "
+ f"{ANSIColors.CYAN}Samples: {self.total_samples}{ANSIColors.RESET}",
+ end="",
+ flush=True,
+ )
+
+
+def _determine_best_unit(max_value):
+ """Determine the best unit (s, ms, μs) and scale factor for a maximum value."""
+ if max_value >= 1.0:
+ return "s", 1.0
+ elif max_value >= 0.001:
+ return "ms", 1000.0
+ else:
+ return "μs", 1000000.0
+
+
+def print_sampled_stats(
+ stats, sort=-1, limit=None, show_summary=True, sample_interval_usec=100
+):
+ # Get the stats data
+ stats_list = []
+ for func, (
+ direct_calls,
+ cumulative_calls,
+ total_time,
+ cumulative_time,
+ callers,
+ ) in stats.stats.items():
+ stats_list.append(
+ (
+ func,
+ direct_calls,
+ cumulative_calls,
+ total_time,
+ cumulative_time,
+ callers,
+ )
+ )
+
+ # Calculate total samples for percentage calculations (using direct_calls)
+ total_samples = sum(
+ direct_calls for _, direct_calls, _, _, _, _ in stats_list
+ )
+
+ # Sort based on the requested field
+ sort_field = sort
+ if sort_field == -1: # stdname
+ stats_list.sort(key=lambda x: str(x[0]))
+ elif sort_field == 0: # nsamples (direct samples)
+ stats_list.sort(key=lambda x: x[1], reverse=True) # direct_calls
+ elif sort_field == 1: # tottime
+ stats_list.sort(key=lambda x: x[3], reverse=True) # total_time
+ elif sort_field == 2: # cumtime
+ stats_list.sort(key=lambda x: x[4], reverse=True) # cumulative_time
+ elif sort_field == 3: # sample%
+ stats_list.sort(
+ key=lambda x: (x[1] / total_samples * 100)
+ if total_samples > 0
+ else 0,
+ reverse=True, # direct_calls percentage
+ )
+ elif sort_field == 4: # cumul%
+ stats_list.sort(
+ key=lambda x: (x[2] / total_samples * 100)
+ if total_samples > 0
+ else 0,
+ reverse=True, # cumulative_calls percentage
+ )
+ elif sort_field == 5: # nsamples (cumulative samples)
+ stats_list.sort(key=lambda x: x[2], reverse=True) # cumulative_calls
+
+ # Apply limit if specified
+ if limit is not None:
+ stats_list = stats_list[:limit]
+
+ # Determine the best unit for time columns based on maximum values
+ max_total_time = max(
+ (total_time for _, _, _, total_time, _, _ in stats_list), default=0
+ )
+ max_cumulative_time = max(
+ (cumulative_time for _, _, _, _, cumulative_time, _ in stats_list),
+ default=0,
+ )
+
+ total_time_unit, total_time_scale = _determine_best_unit(max_total_time)
+ cumulative_time_unit, cumulative_time_scale = _determine_best_unit(
+ max_cumulative_time
+ )
+
+ # Define column widths for consistent alignment
+ col_widths = {
+ "nsamples": 15, # "nsamples" column (inline/cumulative format)
+ "sample_pct": 8, # "sample%" column
+ "tottime": max(12, len(f"tottime ({total_time_unit})")),
+ "cum_pct": 8, # "cumul%" column
+ "cumtime": max(12, len(f"cumtime ({cumulative_time_unit})")),
+ }
+
+ # Print header with colors and proper alignment
+ print(f"{ANSIColors.BOLD_BLUE}Profile Stats:{ANSIColors.RESET}")
+
+ header_nsamples = f"{ANSIColors.BOLD_BLUE}{'nsamples':>{col_widths['nsamples']}}{ANSIColors.RESET}"
+ header_sample_pct = f"{ANSIColors.BOLD_BLUE}{'sample%':>{col_widths['sample_pct']}}{ANSIColors.RESET}"
+ header_tottime = f"{ANSIColors.BOLD_BLUE}{f'tottime ({total_time_unit})':>{col_widths['tottime']}}{ANSIColors.RESET}"
+ header_cum_pct = f"{ANSIColors.BOLD_BLUE}{'cumul%':>{col_widths['cum_pct']}}{ANSIColors.RESET}"
+ header_cumtime = f"{ANSIColors.BOLD_BLUE}{f'cumtime ({cumulative_time_unit})':>{col_widths['cumtime']}}{ANSIColors.RESET}"
+ header_filename = (
+ f"{ANSIColors.BOLD_BLUE}filename:lineno(function){ANSIColors.RESET}"
+ )
+
+ print(
+ f"{header_nsamples} {header_sample_pct} {header_tottime} {header_cum_pct} {header_cumtime} {header_filename}"
+ )
+
+ # Print each line with proper alignment
+ for (
+ func,
+ direct_calls,
+ cumulative_calls,
+ total_time,
+ cumulative_time,
+ callers,
+ ) in stats_list:
+ # Calculate percentages
+ sample_pct = (
+ (direct_calls / total_samples * 100) if total_samples > 0 else 0
+ )
+ cum_pct = (
+ (cumulative_calls / total_samples * 100)
+ if total_samples > 0
+ else 0
+ )
+
+ # Format values with proper alignment - always use A/B format
+ nsamples_str = f"{direct_calls}/{cumulative_calls}"
+ nsamples_str = f"{nsamples_str:>{col_widths['nsamples']}}"
+ sample_pct_str = f"{sample_pct:{col_widths['sample_pct']}.1f}"
+ tottime = f"{total_time * total_time_scale:{col_widths['tottime']}.3f}"
+ cum_pct_str = f"{cum_pct:{col_widths['cum_pct']}.1f}"
+ cumtime = f"{cumulative_time * cumulative_time_scale:{col_widths['cumtime']}.3f}"
+
+ # Format the function name with colors
+ func_name = (
+ f"{ANSIColors.GREEN}{func[0]}{ANSIColors.RESET}:"
+ f"{ANSIColors.YELLOW}{func[1]}{ANSIColors.RESET}("
+ f"{ANSIColors.CYAN}{func[2]}{ANSIColors.RESET})"
+ )
+
+ # Print the formatted line with consistent spacing
+ print(
+ f"{nsamples_str} {sample_pct_str} {tottime} {cum_pct_str} {cumtime} {func_name}"
+ )
+
+ # Print legend
+ print(f"\n{ANSIColors.BOLD_BLUE}Legend:{ANSIColors.RESET}")
+ print(
+ f" {ANSIColors.YELLOW}nsamples{ANSIColors.RESET}: Direct/Cumulative samples (direct executing / on call stack)"
+ )
+ print(
+ f" {ANSIColors.YELLOW}sample%{ANSIColors.RESET}: Percentage of total samples this function was directly executing"
+ )
+ print(
+ f" {ANSIColors.YELLOW}tottime{ANSIColors.RESET}: Estimated total time spent directly in this function"
+ )
+ print(
+ f" {ANSIColors.YELLOW}cumul%{ANSIColors.RESET}: Percentage of total samples when this function was on the call stack"
+ )
+ print(
+ f" {ANSIColors.YELLOW}cumtime{ANSIColors.RESET}: Estimated cumulative time (including time in called functions)"
+ )
+ print(
+ f" {ANSIColors.YELLOW}filename:lineno(function){ANSIColors.RESET}: Function location and name"
+ )
+
+ def _format_func_name(func):
+ """Format function name with colors."""
+ return (
+ f"{ANSIColors.GREEN}{func[0]}{ANSIColors.RESET}:"
+ f"{ANSIColors.YELLOW}{func[1]}{ANSIColors.RESET}("
+ f"{ANSIColors.CYAN}{func[2]}{ANSIColors.RESET})"
+ )
+
+ def _print_top_functions(stats_list, title, key_func, format_line, n=3):
+ """Print top N functions sorted by key_func with formatted output."""
+ print(f"\n{ANSIColors.BOLD_BLUE}{title}:{ANSIColors.RESET}")
+ sorted_stats = sorted(stats_list, key=key_func, reverse=True)
+ for stat in sorted_stats[:n]:
+ if line := format_line(stat):
+ print(f" {line}")
+
+ # Print summary of interesting functions if enabled
+ if show_summary and stats_list:
+ print(
+ f"\n{ANSIColors.BOLD_BLUE}Summary of Interesting Functions:{ANSIColors.RESET}"
+ )
+
+ # Aggregate stats by fully qualified function name (ignoring line numbers)
+ func_aggregated = {}
+ for (
+ func,
+ direct_calls,
+ cumulative_calls,
+ total_time,
+ cumulative_time,
+ callers,
+ ) in stats_list:
+ # Use filename:function_name as the key to get fully qualified name
+ qualified_name = f"{func[0]}:{func[2]}"
+ if qualified_name not in func_aggregated:
+ func_aggregated[qualified_name] = [
+ 0,
+ 0,
+ 0,
+ 0,
+ ] # direct_calls, cumulative_calls, total_time, cumulative_time
+ func_aggregated[qualified_name][0] += direct_calls
+ func_aggregated[qualified_name][1] += cumulative_calls
+ func_aggregated[qualified_name][2] += total_time
+ func_aggregated[qualified_name][3] += cumulative_time
+
+ # Convert aggregated data back to list format for processing
+ aggregated_stats = []
+ for qualified_name, (
+ prim_calls,
+ total_calls,
+ total_time,
+ cumulative_time,
+ ) in func_aggregated.items():
+ # Parse the qualified name back to filename and function name
+ if ":" in qualified_name:
+ filename, func_name = qualified_name.rsplit(":", 1)
+ else:
+ filename, func_name = "", qualified_name
+ # Create a dummy func tuple with filename and function name for display
+ dummy_func = (filename, "", func_name)
+ aggregated_stats.append(
+ (
+ dummy_func,
+ prim_calls,
+ total_calls,
+ total_time,
+ cumulative_time,
+ {},
+ )
+ )
+
+ # Determine best units for summary metrics
+ max_total_time = max(
+ (total_time for _, _, _, total_time, _, _ in aggregated_stats),
+ default=0,
+ )
+ max_cumulative_time = max(
+ (
+ cumulative_time
+ for _, _, _, _, cumulative_time, _ in aggregated_stats
+ ),
+ default=0,
+ )
+
+ total_unit, total_scale = _determine_best_unit(max_total_time)
+ cumulative_unit, cumulative_scale = _determine_best_unit(
+ max_cumulative_time
+ )
+
+ # Functions with highest direct/cumulative ratio (hot spots)
+ def format_hotspots(stat):
+ func, direct_calls, cumulative_calls, total_time, _, _ = stat
+ if direct_calls > 0 and cumulative_calls > 0:
+ ratio = direct_calls / cumulative_calls
+ direct_pct = (
+ (direct_calls / total_samples * 100)
+ if total_samples > 0
+ else 0
+ )
+ return (
+ f"{ratio:.3f} direct/cumulative ratio, "
+ f"{direct_pct:.1f}% direct samples: {_format_func_name(func)}"
+ )
+ return None
+
+ _print_top_functions(
+ aggregated_stats,
+ "Functions with Highest Direct/Cumulative Ratio (Hot Spots)",
+ key_func=lambda x: (x[1] / x[2]) if x[2] > 0 else 0,
+ format_line=format_hotspots,
+ )
+
+ # Functions with highest call frequency (cumulative/direct difference)
+ def format_call_frequency(stat):
+ func, direct_calls, cumulative_calls, total_time, _, _ = stat
+ if cumulative_calls > direct_calls:
+ call_frequency = cumulative_calls - direct_calls
+ cum_pct = (
+ (cumulative_calls / total_samples * 100)
+ if total_samples > 0
+ else 0
+ )
+ return (
+ f"{call_frequency:d} indirect calls, "
+ f"{cum_pct:.1f}% total stack presence: {_format_func_name(func)}"
+ )
+ return None
+
+ _print_top_functions(
+ aggregated_stats,
+ "Functions with Highest Call Frequency (Indirect Calls)",
+ key_func=lambda x: x[2] - x[1], # Sort by (cumulative - direct)
+ format_line=format_call_frequency,
+ )
+
+ # Functions with highest cumulative-to-direct multiplier (call magnification)
+ def format_call_magnification(stat):
+ func, direct_calls, cumulative_calls, total_time, _, _ = stat
+ if direct_calls > 0 and cumulative_calls > direct_calls:
+ multiplier = cumulative_calls / direct_calls
+ indirect_calls = cumulative_calls - direct_calls
+ return (
+ f"{multiplier:.1f}x call magnification, "
+ f"{indirect_calls:d} indirect calls from {direct_calls:d} direct: {_format_func_name(func)}"
+ )
+ return None
+
+ _print_top_functions(
+ aggregated_stats,
+ "Functions with Highest Call Magnification (Cumulative/Direct)",
+ key_func=lambda x: (x[2] / x[1])
+ if x[1] > 0
+ else 0, # Sort by cumulative/direct ratio
+ format_line=format_call_magnification,
+ )
+
+
+def sample(
+ pid,
+ *,
+ sort=2,
+ sample_interval_usec=100,
+ duration_sec=10,
+ filename=None,
+ all_threads=False,
+ limit=None,
+ show_summary=True,
+ output_format="pstats",
+ realtime_stats=False,
+):
+ profiler = SampleProfiler(
+ pid, sample_interval_usec, all_threads=all_threads
+ )
+ profiler.realtime_stats = realtime_stats
+
+ collector = None
+ match output_format:
+ case "pstats":
+ collector = PstatsCollector(sample_interval_usec)
+ case "collapsed":
+ collector = CollapsedStackCollector()
+ filename = filename or f"collapsed.{pid}.txt"
+ case _:
+ raise ValueError(f"Invalid output format: {output_format}")
+
+ profiler.sample(collector, duration_sec)
+
+ if output_format == "pstats" and not filename:
+ stats = pstats.SampledStats(collector).strip_dirs()
+ print_sampled_stats(
+ stats, sort, limit, show_summary, sample_interval_usec
+ )
+ else:
+ collector.export(filename)
+
+
+def _validate_collapsed_format_args(args, parser):
+ # Check for incompatible pstats options
+ invalid_opts = []
+
+ # Get list of pstats-specific options
+ pstats_options = {"sort": None, "limit": None, "no_summary": False}
+
+ # Find the default values from the argument definitions
+ for action in parser._actions:
+ if action.dest in pstats_options and hasattr(action, "default"):
+ pstats_options[action.dest] = action.default
+
+ # Check if any pstats-specific options were provided by comparing with defaults
+ for opt, default in pstats_options.items():
+ if getattr(args, opt) != default:
+ invalid_opts.append(opt.replace("no_", ""))
+
+ if invalid_opts:
+ parser.error(
+ f"The following options are only valid with --pstats format: {', '.join(invalid_opts)}"
+ )
+
+ # Set default output filename for collapsed format
+ if not args.outfile:
+ args.outfile = f"collapsed.{args.pid}.txt"
+
+
+def main():
+ # Create the main parser
+ parser = argparse.ArgumentParser(
+ description=(
+ "Sample a process's stack frames and generate profiling data.\n"
+ "Supports two output formats:\n"
+ " - pstats: Detailed profiling statistics with sorting options\n"
+ " - collapsed: Stack traces for generating flamegraphs\n"
+ "\n"
+ "Examples:\n"
+ " # Profile process 1234 for 10 seconds with default settings\n"
+ " python -m profile.sample 1234\n"
+ "\n"
+ " # Profile with custom interval and duration, save to file\n"
+ " python -m profile.sample -i 50 -d 30 -o profile.stats 1234\n"
+ "\n"
+ " # Generate collapsed stacks for flamegraph\n"
+ " python -m profile.sample --collapsed 1234\n"
+ "\n"
+ " # Profile all threads, sort by total time\n"
+ " python -m profile.sample -a --sort-tottime 1234\n"
+ "\n"
+ " # Profile for 1 minute with 1ms sampling interval\n"
+ " python -m profile.sample -i 1000 -d 60 1234\n"
+ "\n"
+ " # Show only top 20 functions sorted by direct samples\n"
+ " python -m profile.sample --sort-nsamples -l 20 1234\n"
+ "\n"
+ " # Profile all threads and save collapsed stacks\n"
+ " python -m profile.sample -a --collapsed -o stacks.txt 1234\n"
+ "\n"
+ " # Profile with real-time sampling statistics\n"
+ " python -m profile.sample --realtime-stats 1234\n"
+ "\n"
+ " # Sort by sample percentage to find most sampled functions\n"
+ " python -m profile.sample --sort-sample-pct 1234\n"
+ "\n"
+ " # Sort by cumulative samples to find functions most on call stack\n"
+ " python -m profile.sample --sort-nsamples-cumul 1234"
+ ),
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ )
+
+ # Required arguments
+ parser.add_argument("pid", type=int, help="Process ID to sample")
+
+ # Sampling options
+ sampling_group = parser.add_argument_group("Sampling configuration")
+ sampling_group.add_argument(
+ "-i",
+ "--interval",
+ type=int,
+ default=100,
+ help="Sampling interval in microseconds (default: 100)",
+ )
+ sampling_group.add_argument(
+ "-d",
+ "--duration",
+ type=int,
+ default=10,
+ help="Sampling duration in seconds (default: 10)",
+ )
+ sampling_group.add_argument(
+ "-a",
+ "--all-threads",
+ action="store_true",
+ help="Sample all threads in the process instead of just the main thread",
+ )
+ sampling_group.add_argument(
+ "--realtime-stats",
+ action="store_true",
+ default=False,
+ help="Print real-time sampling statistics (Hz, mean, min, max, stdev) during profiling",
+ )
+
+ # Output format selection
+ output_group = parser.add_argument_group("Output options")
+ output_format = output_group.add_mutually_exclusive_group()
+ output_format.add_argument(
+ "--pstats",
+ action="store_const",
+ const="pstats",
+ dest="format",
+ default="pstats",
+ help="Generate pstats output (default)",
+ )
+ output_format.add_argument(
+ "--collapsed",
+ action="store_const",
+ const="collapsed",
+ dest="format",
+ help="Generate collapsed stack traces for flamegraphs",
+ )
+
+ output_group.add_argument(
+ "-o",
+ "--outfile",
+ help="Save output to a file (if omitted, prints to stdout for pstats, "
+ "or saves to collapsed.<pid>.txt for collapsed format)",
+ )
+
+ # pstats-specific options
+ pstats_group = parser.add_argument_group("pstats format options")
+ sort_group = pstats_group.add_mutually_exclusive_group()
+ sort_group.add_argument(
+ "--sort-nsamples",
+ action="store_const",
+ const=0,
+ dest="sort",
+ help="Sort by number of direct samples (nsamples column)",
+ )
+ sort_group.add_argument(
+ "--sort-tottime",
+ action="store_const",
+ const=1,
+ dest="sort",
+ help="Sort by total time (tottime column)",
+ )
+ sort_group.add_argument(
+ "--sort-cumtime",
+ action="store_const",
+ const=2,
+ dest="sort",
+ help="Sort by cumulative time (cumtime column, default)",
+ )
+ sort_group.add_argument(
+ "--sort-sample-pct",
+ action="store_const",
+ const=3,
+ dest="sort",
+ help="Sort by sample percentage (sample%% column)",
+ )
+ sort_group.add_argument(
+ "--sort-cumul-pct",
+ action="store_const",
+ const=4,
+ dest="sort",
+ help="Sort by cumulative sample percentage (cumul%% column)",
+ )
+ sort_group.add_argument(
+ "--sort-nsamples-cumul",
+ action="store_const",
+ const=5,
+ dest="sort",
+ help="Sort by cumulative samples (nsamples column, cumulative part)",
+ )
+ sort_group.add_argument(
+ "--sort-name",
+ action="store_const",
+ const=-1,
+ dest="sort",
+ help="Sort by function name",
+ )
+
+ pstats_group.add_argument(
+ "-l",
+ "--limit",
+ type=int,
+ help="Limit the number of rows in the output",
+ default=15,
+ )
+ pstats_group.add_argument(
+ "--no-summary",
+ action="store_true",
+ help="Disable the summary section in the output",
+ )
+
+ args = parser.parse_args()
+
+ # Validate format-specific arguments
+ if args.format == "collapsed":
+ _validate_collapsed_format_args(args, parser)
+
+ sort_value = args.sort if args.sort is not None else 2
+
+ sample(
+ args.pid,
+ sample_interval_usec=args.interval,
+ duration_sec=args.duration,
+ filename=args.outfile,
+ all_threads=args.all_threads,
+ limit=args.limit,
+ sort=sort_value,
+ show_summary=not args.no_summary,
+ output_format=args.format,
+ realtime_stats=args.realtime_stats,
+ )
+
+
+if __name__ == "__main__":
+ main()