aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/profile/pstats_collector.py
blob: f58da1159c21a7a1c719ea62ea30e3e7ce673682 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import collections
import marshal

from .collector import Collector


class PstatsCollector(Collector):
    def __init__(self, sample_interval_usec):
        self.result = collections.defaultdict(
            lambda: dict(total_rec_calls=0, direct_calls=0, cumulative_calls=0)
        )
        self.stats = {}
        self.sample_interval_usec = sample_interval_usec
        self.callers = collections.defaultdict(
            lambda: collections.defaultdict(int)
        )

    def collect(self, stack_frames):
        for thread_id, frames in stack_frames:
            if not frames:
                continue

            # Process each frame in the stack to track cumulative calls
            for frame in frames:
                location = (frame.filename, frame.lineno, frame.funcname)
                self.result[location]["cumulative_calls"] += 1

            # The top frame gets counted as an inline call (directly executing)
            top_frame = frames[0]
            top_location = (
                top_frame.filename,
                top_frame.lineno,
                top_frame.funcname,
            )

            self.result[top_location]["direct_calls"] += 1

            # Track caller-callee relationships for call graph
            for i in range(1, len(frames)):
                callee_frame = frames[i - 1]
                caller_frame = frames[i]

                callee = (
                    callee_frame.filename,
                    callee_frame.lineno,
                    callee_frame.funcname,
                )
                caller = (
                    caller_frame.filename,
                    caller_frame.lineno,
                    caller_frame.funcname,
                )

                self.callers[callee][caller] += 1

    def export(self, filename):
        self.create_stats()
        self._dump_stats(filename)

    def _dump_stats(self, file):
        stats_with_marker = dict(self.stats)
        stats_with_marker[("__sampled__",)] = True
        with open(file, "wb") as f:
            marshal.dump(stats_with_marker, f)

    # Needed for compatibility with pstats.Stats
    def create_stats(self):
        sample_interval_sec = self.sample_interval_usec / 1_000_000
        callers = {}
        for fname, call_counts in self.result.items():
            total = call_counts["direct_calls"] * sample_interval_sec
            cumulative_calls = call_counts["cumulative_calls"]
            cumulative = cumulative_calls * sample_interval_sec
            callers = dict(self.callers.get(fname, {}))
            self.stats[fname] = (
                call_counts["direct_calls"],  # cc = direct calls for sample percentage
                cumulative_calls,  # nc = cumulative calls for cumulative percentage
                total,
                cumulative,
                callers,
            )