diff options
Diffstat (limited to 'cache/dynacache/dynacache.go')
-rw-r--r-- | cache/dynacache/dynacache.go | 550 |
1 files changed, 550 insertions, 0 deletions
diff --git a/cache/dynacache/dynacache.go b/cache/dynacache/dynacache.go new file mode 100644 index 000000000..bb3f7b098 --- /dev/null +++ b/cache/dynacache/dynacache.go @@ -0,0 +1,550 @@ +// Copyright 2024 The Hugo Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dynacache + +import ( + "context" + "fmt" + "math" + "path" + "regexp" + "runtime" + "sync" + "time" + + "github.com/bep/lazycache" + "github.com/bep/logg" + "github.com/gohugoio/hugo/common/herrors" + "github.com/gohugoio/hugo/common/loggers" + "github.com/gohugoio/hugo/common/paths" + "github.com/gohugoio/hugo/common/rungroup" + "github.com/gohugoio/hugo/config" + "github.com/gohugoio/hugo/helpers" + "github.com/gohugoio/hugo/identity" + "github.com/gohugoio/hugo/resources/resource" +) + +const minMaxSize = 10 + +// New creates a new cache. +func New(opts Options) *Cache { + if opts.CheckInterval == 0 { + opts.CheckInterval = time.Second * 2 + } + + if opts.MaxSize == 0 { + opts.MaxSize = 100000 + } + if opts.Log == nil { + panic("nil Log") + } + + if opts.MinMaxSize == 0 { + opts.MinMaxSize = 30 + } + + stats := &stats{ + opts: opts, + adjustmentFactor: 1.0, + currentMaxSize: opts.MaxSize, + availableMemory: config.GetMemoryLimit(), + } + + infol := opts.Log.InfoCommand("dynacache") + + c := &Cache{ + partitions: make(map[string]PartitionManager), + opts: opts, + stats: stats, + infol: infol, + } + + c.stop = c.start() + + return c +} + +// Options for the cache. +type Options struct { + Log loggers.Logger + CheckInterval time.Duration + MaxSize int + MinMaxSize int + Running bool +} + +// Options for a partition. +type OptionsPartition struct { + // When to clear the this partition. + ClearWhen ClearWhen + + // Weight is a number between 1 and 100 that indicates how, in general, how big this partition may get. + Weight int +} + +func (o OptionsPartition) WeightFraction() float64 { + return float64(o.Weight) / 100 +} + +func (o OptionsPartition) CalculateMaxSize(maxSizePerPartition int) int { + return int(math.Floor(float64(maxSizePerPartition) * o.WeightFraction())) +} + +// A dynamic partitioned cache. +type Cache struct { + mu sync.RWMutex + + partitions map[string]PartitionManager + opts Options + infol logg.LevelLogger + + stats *stats + stopOnce sync.Once + stop func() +} + +// ClearMatching clears all partition for which the predicate returns true. +func (c *Cache) ClearMatching(predicate func(k, v any) bool) { + g := rungroup.Run[PartitionManager](context.Background(), rungroup.Config[PartitionManager]{ + NumWorkers: len(c.partitions), + Handle: func(ctx context.Context, partition PartitionManager) error { + partition.clearMatching(predicate) + return nil + }, + }) + + for _, p := range c.partitions { + g.Enqueue(p) + } + + g.Wait() +} + +// ClearOnRebuild prepares the cache for a new rebuild taking the given changeset into account. +func (c *Cache) ClearOnRebuild(changeset ...identity.Identity) { + g := rungroup.Run[PartitionManager](context.Background(), rungroup.Config[PartitionManager]{ + NumWorkers: len(c.partitions), + Handle: func(ctx context.Context, partition PartitionManager) error { + partition.clearOnRebuild(changeset...) + return nil + }, + }) + + for _, p := range c.partitions { + g.Enqueue(p) + } + + g.Wait() + + // Clear any entries marked as stale above. + g = rungroup.Run[PartitionManager](context.Background(), rungroup.Config[PartitionManager]{ + NumWorkers: len(c.partitions), + Handle: func(ctx context.Context, partition PartitionManager) error { + partition.clearStale() + return nil + }, + }) + + for _, p := range c.partitions { + g.Enqueue(p) + } + + g.Wait() +} + +type keysProvider interface { + Keys() []string +} + +// Keys returns a list of keys in all partitions. +func (c *Cache) Keys(predicate func(s string) bool) []string { + if predicate == nil { + predicate = func(s string) bool { return true } + } + var keys []string + for pn, g := range c.partitions { + pkeys := g.(keysProvider).Keys() + for _, k := range pkeys { + p := path.Join(pn, k) + if predicate(p) { + keys = append(keys, p) + } + } + + } + return keys +} + +func calculateMaxSizePerPartition(maxItemsTotal, totalWeightQuantity, numPartitions int) int { + if numPartitions == 0 { + panic("numPartitions must be > 0") + } + if totalWeightQuantity == 0 { + panic("totalWeightQuantity must be > 0") + } + + avgWeight := float64(totalWeightQuantity) / float64(numPartitions) + return int(math.Floor(float64(maxItemsTotal) / float64(numPartitions) * (100.0 / avgWeight))) +} + +// Stop stops the cache. +func (c *Cache) Stop() { + c.stopOnce.Do(func() { + c.stop() + }) +} + +func (c *Cache) adjustCurrentMaxSize() { + c.mu.RLock() + defer c.mu.RUnlock() + + if len(c.partitions) == 0 { + return + } + var m runtime.MemStats + runtime.ReadMemStats(&m) + s := c.stats + s.memstatsCurrent = m + // fmt.Printf("\n\nAvailable = %v\nAlloc = %v\nTotalAlloc = %v\nSys = %v\nNumGC = %v\nMaxSize = %d\nAdjustmentFactor=%f\n\n", helpers.FormatByteCount(s.availableMemory), helpers.FormatByteCount(m.Alloc), helpers.FormatByteCount(m.TotalAlloc), helpers.FormatByteCount(m.Sys), m.NumGC, c.stats.currentMaxSize, s.adjustmentFactor) + + if s.availableMemory >= s.memstatsCurrent.Alloc { + if s.adjustmentFactor <= 1.0 { + s.adjustmentFactor += 0.2 + } + } else { + // We're low on memory. + s.adjustmentFactor -= 0.4 + } + + if s.adjustmentFactor <= 0 { + s.adjustmentFactor = 0.05 + } + + if !s.adjustCurrentMaxSize() { + return + } + + totalWeight := 0 + for _, pm := range c.partitions { + totalWeight += pm.getOptions().Weight + } + + maxSizePerPartition := calculateMaxSizePerPartition(c.stats.currentMaxSize, totalWeight, len(c.partitions)) + + evicted := 0 + for _, p := range c.partitions { + evicted += p.adjustMaxSize(p.getOptions().CalculateMaxSize(maxSizePerPartition)) + } + + if evicted > 0 { + c.infol. + WithFields( + logg.Fields{ + {Name: "evicted", Value: evicted}, + {Name: "numGC", Value: m.NumGC}, + {Name: "limit", Value: helpers.FormatByteCount(c.stats.availableMemory)}, + {Name: "alloc", Value: helpers.FormatByteCount(m.Alloc)}, + {Name: "totalAlloc", Value: helpers.FormatByteCount(m.TotalAlloc)}, + }, + ).Logf("adjusted partitions' max size") + } +} + +func (c *Cache) start() func() { + ticker := time.NewTicker(c.opts.CheckInterval) + quit := make(chan struct{}) + + go func() { + for { + select { + case <-ticker.C: + c.adjustCurrentMaxSize() + case <-quit: + ticker.Stop() + return + } + } + }() + + return func() { + close(quit) + } +} + +var partitionNameRe = regexp.MustCompile(`^\/[a-zA-Z0-9]{4}(\/[a-zA-Z0-9]+)?(\/[a-zA-Z0-9]+)?`) + +// GetOrCreatePartition gets or creates a partition with the given name. +func GetOrCreatePartition[K comparable, V any](c *Cache, name string, opts OptionsPartition) *Partition[K, V] { + if c == nil { + panic("nil Cache") + } + if opts.Weight < 1 || opts.Weight > 100 { + panic("invalid Weight, must be between 1 and 100") + } + + if partitionNameRe.FindString(name) != name { + panic(fmt.Sprintf("invalid partition name %q", name)) + } + + c.mu.RLock() + p, found := c.partitions[name] + c.mu.RUnlock() + if found { + return p.(*Partition[K, V]) + } + + c.mu.Lock() + defer c.mu.Unlock() + + // Double check. + p, found = c.partitions[name] + if found { + return p.(*Partition[K, V]) + } + + // At this point, we don't know the the number of partitions or their configuration, but + // this will be re-adjusted later. + const numberOfPartitionsEstimate = 10 + maxSize := opts.CalculateMaxSize(c.opts.MaxSize / numberOfPartitionsEstimate) + + // Create a new partition and cache it. + partition := &Partition[K, V]{ + c: lazycache.New(lazycache.Options[K, V]{MaxEntries: maxSize}), + maxSize: maxSize, + trace: c.opts.Log.Logger().WithLevel(logg.LevelTrace).WithField("partition", name), + opts: opts, + } + c.partitions[name] = partition + + return partition +} + +// Partition is a partition in the cache. +type Partition[K comparable, V any] struct { + c *lazycache.Cache[K, V] + + zero V + + trace logg.LevelLogger + opts OptionsPartition + + maxSize int +} + +// GetOrCreate gets or creates a value for the given key. +func (p *Partition[K, V]) GetOrCreate(key K, create func(key K) (V, error)) (V, error) { + v, _, err := p.c.GetOrCreate(key, create) + return v, err +} + +// GetOrCreateWitTimeout gets or creates a value for the given key and times out if the create function +// takes too long. +func (p *Partition[K, V]) GetOrCreateWitTimeout(key K, duration time.Duration, create func(key K) (V, error)) (V, error) { + resultch := make(chan V, 1) + errch := make(chan error, 1) + + go func() { + v, _, err := p.c.GetOrCreate(key, create) + if err != nil { + errch <- err + return + } + resultch <- v + }() + + select { + case v := <-resultch: + return v, nil + case err := <-errch: + return p.zero, err + case <-time.After(duration): + return p.zero, &herrors.TimeoutError{ + Duration: duration, + } + } +} + +func (p *Partition[K, V]) clearMatching(predicate func(k, v any) bool) { + p.c.DeleteFunc(func(key K, v V) bool { + if predicate(key, v) { + p.trace.Log( + logg.StringFunc( + func() string { + return fmt.Sprintf("clearing cache key %v", key) + }, + ), + ) + return true + } + return false + }) +} + +func (p *Partition[K, V]) clearOnRebuild(changeset ...identity.Identity) { + opts := p.getOptions() + if opts.ClearWhen == ClearNever { + return + } + + if opts.ClearWhen == ClearOnRebuild { + // Clear all. + p.Clear() + return + } + + depsFinder := identity.NewFinder(identity.FinderConfig{}) + + shouldDelete := func(key K, v V) bool { + // We always clear elements marked as stale. + if resource.IsStaleAny(v) { + return true + } + + // Now check if this entry has changed based on the changeset + // based on filesystem events. + if len(changeset) == 0 { + // Nothing changed. + return false + } + + var probablyDependent bool + identity.WalkIdentitiesShallow(v, func(level int, id2 identity.Identity) bool { + for _, id := range changeset { + if r := depsFinder.Contains(id, id2, -1); r > 0 { + // It's probably dependent, evict from cache. + probablyDependent = true + return true + } + } + return false + }) + + return probablyDependent + } + + // First pass. + // Second pass needs to be done in a separate loop to catch any + // elements marked as stale in the other partitions. + p.c.DeleteFunc(func(key K, v V) bool { + if shouldDelete(key, v) { + p.trace.Log( + logg.StringFunc( + func() string { + return fmt.Sprintf("first pass: clearing cache key %v", key) + }, + ), + ) + resource.MarkStale(v) + return true + } + return false + }) +} + +func (p *Partition[K, V]) Keys() []K { + var keys []K + p.c.DeleteFunc(func(key K, v V) bool { + keys = append(keys, key) + return false + }) + return keys +} + +func (p *Partition[K, V]) clearStale() { + p.c.DeleteFunc(func(key K, v V) bool { + isStale := resource.IsStaleAny(v) + if isStale { + p.trace.Log( + logg.StringFunc( + func() string { + return fmt.Sprintf("second pass: clearing cache key %v", key) + }, + ), + ) + } + + return isStale + }) +} + +// adjustMaxSize adjusts the max size of the and returns the number of items evicted. +func (p *Partition[K, V]) adjustMaxSize(newMaxSize int) int { + if newMaxSize < minMaxSize { + newMaxSize = minMaxSize + } + p.maxSize = newMaxSize + // fmt.Println("Adjusting max size of partition from", oldMaxSize, "to", newMaxSize) + return p.c.Resize(newMaxSize) +} + +func (p *Partition[K, V]) getMaxSize() int { + return p.maxSize +} + +func (p *Partition[K, V]) getOptions() OptionsPartition { + return p.opts +} + +func (p *Partition[K, V]) Clear() { + p.c.DeleteFunc(func(key K, v V) bool { + return true + }) +} + +func (p *Partition[K, V]) Get(ctx context.Context, key K) (V, bool) { + return p.c.Get(key) +} + +type PartitionManager interface { + adjustMaxSize(addend int) int + getMaxSize() int + getOptions() OptionsPartition + clearOnRebuild(changeset ...identity.Identity) + clearMatching(predicate func(k, v any) bool) + clearStale() +} + +const ( + ClearOnRebuild ClearWhen = iota + 1 + ClearOnChange + ClearNever +) + +type ClearWhen int + +type stats struct { + opts Options + memstatsCurrent runtime.MemStats + currentMaxSize int + availableMemory uint64 + + adjustmentFactor float64 +} + +func (s *stats) adjustCurrentMaxSize() bool { + newCurrentMaxSize := int(math.Floor(float64(s.opts.MaxSize) * s.adjustmentFactor)) + + if newCurrentMaxSize < s.opts.MaxSize { + newCurrentMaxSize = int(s.opts.MinMaxSize) + } + changed := newCurrentMaxSize != s.currentMaxSize + s.currentMaxSize = newCurrentMaxSize + return changed +} + +// CleanKey turns s into a format suitable for a cache key for this package. +// The key will be a Unix-styled path with a leading slash but no trailing slash. +func CleanKey(s string) string { + return path.Clean(paths.ToSlashPreserveLeading(s)) +} |