summaryrefslogtreecommitdiffstatshomepage
path: root/bridge/github/import_mediator.go
diff options
context:
space:
mode:
authorMichael Muré <batolettre@gmail.com>2022-06-05 15:01:08 +0200
committerMichael Muré <batolettre@gmail.com>2022-06-05 15:13:49 +0200
commit7348fb9edb68ca9142f5d87673da48cef733b3d3 (patch)
tree9d4667ecff1f40491d5825e60293e5c85a725104 /bridge/github/import_mediator.go
parent96327c3371ca762d906209c6114092bbf552c0f4 (diff)
downloadgit-bug-7348fb9edb68ca9142f5d87673da48cef733b3d3.tar.gz
git-bug-7348fb9edb68ca9142f5d87673da48cef733b3d3.zip
github: fix data race when closing event channel
I believe the issue was twofold: When done importing, the calling context is likely still valid, so if the output channel is not read enough and reach capacity, some event producer down the line can be blocked trying to send in that channel. When closing it, this send is still trying to proceed, which is illegal in go. In rateLimitHandlerClient, there was a need to 2 different type of output channel: core.ExportResult and ImportEvent. To do so, the previous code was using a single channel type RateLimitingEvent and a series of goroutines to read/cast/send to the final channel. This could result in more async goroutine being stuck trying to send in an at-capacity channel. Instead, the code now use a simple synchronous callback to directly push to the final output channel. No concurrency needed anymore and the code is simpler. Any of those fixes could have resolved the data race, but both fixes is more correct.
Diffstat (limited to 'bridge/github/import_mediator.go')
-rw-r--r--bridge/github/import_mediator.go134
1 files changed, 56 insertions, 78 deletions
diff --git a/bridge/github/import_mediator.go b/bridge/github/import_mediator.go
index db9f877c..be4e3880 100644
--- a/bridge/github/import_mediator.go
+++ b/bridge/github/import_mediator.go
@@ -9,6 +9,7 @@ import (
const (
// These values influence how fast the github graphql rate limit is exhausted.
+
NumIssues = 40
NumIssueEdits = 100
NumTimelineItems = 100
@@ -41,43 +42,6 @@ type importMediator struct {
err error
}
-type ImportEvent interface {
- isImportEvent()
-}
-
-func (RateLimitingEvent) isImportEvent() {}
-
-type IssueEvent struct {
- issue
-}
-
-func (IssueEvent) isImportEvent() {}
-
-type IssueEditEvent struct {
- issueId githubv4.ID
- userContentEdit
-}
-
-func (IssueEditEvent) isImportEvent() {}
-
-type TimelineEvent struct {
- issueId githubv4.ID
- timelineItem
-}
-
-func (TimelineEvent) isImportEvent() {}
-
-type CommentEditEvent struct {
- commentId githubv4.ID
- userContentEdit
-}
-
-func (CommentEditEvent) isImportEvent() {}
-
-func (mm *importMediator) NextImportEvent() ImportEvent {
- return <-mm.importEvents
-}
-
func NewImportMediator(ctx context.Context, client *rateLimitHandlerClient, owner, project string, since time.Time) *importMediator {
mm := importMediator{
gh: client,
@@ -87,48 +51,24 @@ func NewImportMediator(ctx context.Context, client *rateLimitHandlerClient, owne
importEvents: make(chan ImportEvent, ChanCapacity),
err: nil,
}
- go func() {
- mm.fillImportEvents(ctx)
- close(mm.importEvents)
- }()
- return &mm
-}
-type varmap map[string]interface{}
+ go mm.start(ctx)
-func newIssueVars(owner, project string, since time.Time) varmap {
- return varmap{
- "owner": githubv4.String(owner),
- "name": githubv4.String(project),
- "issueSince": githubv4.DateTime{Time: since},
- "issueFirst": githubv4.Int(NumIssues),
- "issueEditLast": githubv4.Int(NumIssueEdits),
- "issueEditBefore": (*githubv4.String)(nil),
- "timelineFirst": githubv4.Int(NumTimelineItems),
- "timelineAfter": (*githubv4.String)(nil),
- "commentEditLast": githubv4.Int(NumCommentEdits),
- "commentEditBefore": (*githubv4.String)(nil),
- }
-}
-
-func newIssueEditVars() varmap {
- return varmap{
- "issueEditLast": githubv4.Int(NumIssueEdits),
- }
+ return &mm
}
-func newTimelineVars() varmap {
- return varmap{
- "timelineFirst": githubv4.Int(NumTimelineItems),
- "commentEditLast": githubv4.Int(NumCommentEdits),
- "commentEditBefore": (*githubv4.String)(nil),
- }
+func (mm *importMediator) start(ctx context.Context) {
+ ctx, cancel := context.WithCancel(ctx)
+ mm.fillImportEvents(ctx)
+ // Make sure we cancel everything when we are done, instead of relying on the parent context
+ // This should unblock pending send to the channel if the capacity was reached and avoid a panic/race when closing.
+ cancel()
+ close(mm.importEvents)
}
-func newCommentEditVars() varmap {
- return varmap{
- "commentEditLast": githubv4.Int(NumCommentEdits),
- }
+// NextImportEvent returns the next ImportEvent, or nil if done.
+func (mm *importMediator) NextImportEvent() ImportEvent {
+ return <-mm.importEvents
}
func (mm *importMediator) Error() error {
@@ -138,7 +78,7 @@ func (mm *importMediator) Error() error {
func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) {
query := userQuery{}
vars := varmap{"login": githubv4.String(loginName)}
- if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
+ if err := mm.gh.queryImport(ctx, &query, vars, mm.importEvents); err != nil {
return nil, err
}
return &query.User, nil
@@ -200,7 +140,7 @@ func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID,
vars["issueEditBefore"] = cursor
}
query := issueEditQuery{}
- if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
+ if err := mm.gh.queryImport(ctx, &query, vars, mm.importEvents); err != nil {
mm.err = err
return nil, false
}
@@ -244,7 +184,7 @@ func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cu
vars["timelineAfter"] = cursor
}
query := timelineQuery{}
- if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
+ if err := mm.gh.queryImport(ctx, &query, vars, mm.importEvents); err != nil {
mm.err = err
return nil, false
}
@@ -294,7 +234,7 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID
vars["commentEditBefore"] = cursor
}
query := commentEditQuery{}
- if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
+ if err := mm.gh.queryImport(ctx, &query, vars, mm.importEvents); err != nil {
mm.err = err
return nil, false
}
@@ -313,7 +253,7 @@ func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String
vars["issueAfter"] = cursor
}
query := issueQuery{}
- if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
+ if err := mm.gh.queryImport(ctx, &query, vars, mm.importEvents); err != nil {
mm.err = err
return nil, false
}
@@ -334,3 +274,41 @@ func reverse(eds []userContentEdit) chan userContentEdit {
}()
return ret
}
+
+// varmap is a container for Github API's pagination variables
+type varmap map[string]interface{}
+
+func newIssueVars(owner, project string, since time.Time) varmap {
+ return varmap{
+ "owner": githubv4.String(owner),
+ "name": githubv4.String(project),
+ "issueSince": githubv4.DateTime{Time: since},
+ "issueFirst": githubv4.Int(NumIssues),
+ "issueEditLast": githubv4.Int(NumIssueEdits),
+ "issueEditBefore": (*githubv4.String)(nil),
+ "timelineFirst": githubv4.Int(NumTimelineItems),
+ "timelineAfter": (*githubv4.String)(nil),
+ "commentEditLast": githubv4.Int(NumCommentEdits),
+ "commentEditBefore": (*githubv4.String)(nil),
+ }
+}
+
+func newIssueEditVars() varmap {
+ return varmap{
+ "issueEditLast": githubv4.Int(NumIssueEdits),
+ }
+}
+
+func newTimelineVars() varmap {
+ return varmap{
+ "timelineFirst": githubv4.Int(NumTimelineItems),
+ "commentEditLast": githubv4.Int(NumCommentEdits),
+ "commentEditBefore": (*githubv4.String)(nil),
+ }
+}
+
+func newCommentEditVars() varmap {
+ return varmap{
+ "commentEditLast": githubv4.Int(NumCommentEdits),
+ }
+}