Sync Load and Async Load
In my previous article, I introduced the statistics in TiDB and how it initializes them. However, there is an issue: even with comprehensive initialization, statistics may still be missing for columns that are not indexed. Additionally, after initialization, statistics may be evicted from memory due to memory pressure. In this article, I will introduce two methods to load statistics in TiDB on the fly: Sync Load and Async Load.
Sync Load
Sync Load is a method to load statistics synchronously. When a query is executed, if the statistics of the table are missing, the query will wait for the statistics to be loaded before continuing.
For example, consider the following query:
SELECT * FROM t WHERE a = 1;
If the statistics of column a
in table t
are missing, the query will wait for the statistics to be loaded before continuing. This is called Sync Load.
The basic idea of Sync Load
Although it might seem straightforward to implement Sync Load by loading the statistics before executing the query, the actual implementation is more complex. TiDB uses a sync load handler to manage this process. The sync load handler is a singleton in the TiDB server, responsible for loading statistics synchronously. When a query is executed, it submits a load request to the sync load handler. The handler then loads the statistics, and the optimizer checks if the statistics are available before proceeding with the query execution.
In TiDB, the query execution process is divided into two stages: optimization and execution. During optimization, the optimizer checks if the necessary statistics are available. If not, it submits a load request to the sync load handler. This process occurs during the logical optimization phase, specifically in the CollectPredicateColumnsPoint
and SyncWaitStatsLoadPoint
steps.
CollectPredicateColumnsPoint
collects columns used in the query that require statistics and sends a request to the sync load handler. SyncWaitStatsLoadPoint
waits for the statistics to be loaded, ensuring that all necessary statistics are available before the query execution proceeds.
The handler implementation
To implement the sync load handler, the first challenge is ensuring that statistics are not loaded multiple times, as many queries might require the same statistics. To address this, TiDB introduces a singleflight
mechanism. This mechanism ensures that only one loading operation occurs for a given statistic at any point in time, preventing multiple queries from triggering the same loading requests simultaneously.
TiDB uses an open-source library named golang.org/x/sync/singleflight
to implement the singleflight
mechanism. This mechanism allows multiple callers to share the result of a function call, ensuring that the function is only executed once for a given key. Here’s how it works:
var g singleflight.Group
func loadStatistics(ctx context.Context, tableID int64) (*statistics.Table, error) {
v, err, _ := g.Do(tableID, func() (interface{}, error) {
return loadStatisticsFromStore(ctx, tableID)
})
if err != nil {
return nil, err
}
return v.(*statistics.Table), nil
}
In this example, the loadStatisticsFromStore
function is guaranteed to be called only once for each tableID
, even if multiple goroutines invoke the loadStatistics
function concurrently. This prevents redundant loading operations for the same statistics.
The handler is implemented as follows:
type statsSyncLoad struct {
statsHandle statstypes.StatsHandle
is infoschema.InfoSchema
StatsLoad statstypes.StatsLoad
}
type StatsLoad struct {
NeededItemsCh chan *NeededItemTask
TimeoutItemsCh chan *NeededItemTask
sync.Mutex
}
The core of the sync load handler is the StatsLoad
structure, which contains two channels: NeededItemsCh
and TimeoutItemsCh
. The NeededItemsCh
channel is used to submit load requests, while the TimeoutItemsCh
channel is used to handle timeout events.
The handler implementation can be divided into three parts:
- Send requests to the handler
- Load statistics concurrently
- Wait for the statistics to be loaded
statsSyncLoad
provides the SendLoadRequests
method to allow the optimizer to send load requests to the handler.
func (s *statsSyncLoad) SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems []model.StatsLoadItem, timeout time.Duration) error {
remainedItems := s.removeHistLoadedColumns(neededHistItems)
...
sc.StatsLoad.Timeout = timeout
sc.StatsLoad.NeededItems = remainedItems
sc.StatsLoad.ResultCh = make([]<-chan singleflight.Result, 0, len(remainedItems))
for _, item := range remainedItems {
localItem := item
resultCh := globalStatsSyncLoadSingleFlight.DoChan(localItem.Key(), func() (any, error) {
timer := time.NewTimer(timeout)
defer timer.Stop()
task := &statstypes.NeededItemTask{
Item: localItem,
ToTimeout: time.Now().Local().Add(timeout),
ResultCh: make(chan stmtctx.StatsLoadResult, 1),
}
select {
case s.StatsLoad.NeededItemsCh <- task:
metrics.SyncLoadDedupCounter.Inc()
select {
case <-timer.C:
return nil, errors.New("sync load took too long to return")
case result, ok := <-task.ResultCh:
intest.Assert(ok, "task.ResultCh cannot be closed")
return result, nil
}
case <-timer.C:
return nil, errors.New("sync load stats channel is full and timeout sending task to channel")
}
})
sc.StatsLoad.ResultCh = append(sc.StatsLoad.ResultCh, resultCh)
}
sc.StatsLoad.LoadStartTime = time.Now()
return nil
}
The SendLoadRequests
method first filters out columns that have already been loaded or are unnecessary. For the remaining columns, it:
- Creates a
NeededItemTask
for each column/index requiring statistics - Sends these tasks to the
NeededItemsCh
channel - Includes in each task:
- Column/index information
- A
ResultCh
channel for receiving loading results - Timeout settings
This design ensures efficient handling of statistics loading requests while preventing duplicate loads for the different queries from different sessions.
A thing to note is that we maintain the ResultCh
and NeededItems
in the stmtctx.StatementContext
to keep track of the loading status for each statement from each session. This is a key point to track the loading status for each statement (query).
After sending the load tasks, the optimizer waits for the statistics to be loaded. This process is implemented in the WaitLoadFinished
method.
func (*statsSyncLoad) SyncWaitStatsLoad(sc *stmtctx.StatementContext) error {
...
var errorMsgs []string
defer func() {
if len(errorMsgs) > 0 {
logutil.BgLogger().Warn("SyncWaitStatsLoad meets error",
zap.Strings("errors", errorMsgs))
}
sc.StatsLoad.NeededItems = nil
}()
resultCheckMap := map[model.TableItemID]struct{}{}
for _, col := range sc.StatsLoad.NeededItems {
resultCheckMap[col.TableItemID] = struct{}{}
}
timer := time.NewTimer(sc.StatsLoad.Timeout)
defer timer.Stop()
for _, resultCh := range sc.StatsLoad.ResultCh {
select {
case result, ok := <-resultCh:
...
if !ok {
return errors.New("sync load stats channel closed unexpectedly")
}
// this error is from statsSyncLoad.SendLoadRequests which start to task and send task into worker,
// not the stats loading error
if result.Err != nil {
errorMsgs = append(errorMsgs, result.Err.Error())
} else {
val := result.Val.(stmtctx.StatsLoadResult)
// this error is from the stats loading error
if val.HasError() {
errorMsgs = append(errorMsgs, val.ErrorMsg())
}
delete(resultCheckMap, val.Item)
}
case <-timer.C:
metrics.SyncLoadCounter.Inc()
metrics.SyncLoadTimeoutCounter.Inc()
return errors.New("sync load stats timeout")
}
}
if len(resultCheckMap) == 0 {
metrics.SyncLoadHistogram.Observe(float64(time.Since(sc.StatsLoad.LoadStartTime).Milliseconds()))
return nil
}
return nil
}
The SyncWaitStatsLoad
method monitors the loading progress of statistics. It processes results from the ResultCh
channels and handles any potential errors that occur during loading. The method operates under the fundamental premise that each result channel will receive exactly one result, and it will continue waiting until either:
- All results are successfully received
- A timeout occurs
To handle the tasks, statsSyncLoad
utilizes multiple sub-workers to load statistics concurrently. This design ensures efficient processing without blocking query execution.
func (s *statsSyncLoad) SubLoadWorker(sctx sessionctx.Context, exit chan struct{}, exitWg *util.WaitGroupEnhancedWrapper) {
defer func() {
exitWg.Done()
logutil.BgLogger().Info("SubLoadWorker exited.")
}()
var lastTask *statstypes.NeededItemTask
for {
task, err := s.HandleOneTask(sctx, lastTask, exit)
lastTask = task
if err != nil {
switch err {
case errExit:
return
default:
...
r := rand.Intn(500)
time.Sleep(s.statsHandle.Lease()/10 + time.Duration(r)*time.Microsecond)
continue
}
}
}
}
From the code snippet above, we can see that the SubLoadWorker
function is responsible for loading statistics concurrently. It processes tasks from the NeededItemsCh
channel, loading statistics for each task. The worker continues processing until all statistics are loaded or the worker is terminated. Additionally, it incorporates a retry mechanism to handle potential errors during the loading process.
func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statstypes.NeededItemTask, exit chan struct{}) (task *statstypes.NeededItemTask, err error) {
...
if lastTask == nil {
task, err = s.drainColTask(sctx, exit)
if err != nil {
if err != errExit {
logutil.BgLogger().Error("Fail to drain task for stats loading.", zap.Error(err))
}
return task, err
}
} else {
task = lastTask
}
result := stmtctx.StatsLoadResult{Item: task.Item.TableItemID}
err = s.handleOneItemTask(task)
if err == nil {
task.ResultCh <- result
return nil, nil
}
if !isVaildForRetry(task) {
result.Error = err
task.ResultCh <- result
return nil, nil
}
return task, err
}
The HandleOneTask
function processes a single task from either the NeededItemsCh
or TimeoutItemsCh
channel. It loads statistics for the task and sends the result back to the ResultCh channel. If an error occurs during the loading process, the function checks if the task is eligible for a retry. If so, the task is returned for retry; otherwise, the error is sent back to the ResultCh
channel. The default retry limit is set to 2 attempts.