In a traditional distributed machine learning training system, the number of computing tasks of each computing node is statically allocated, that is, divided by equal amounts before node training. Aiming at the bottleneck of static allocation of LSP computing tasks, in this study, we propose a weight-based load-balancing strategy (WLBS). The core idea of the WLBS strategy is to calculate the ratio of the optimal computing task volume to the total computing task volume of each computing node in the cluster according to the performance of the computing nodes. Thereafter, the computing tasks are divided according to the ratio, and finally, the effect of node load balancing in the cluster is achieved. In the WLBS strategy, the number of computing tasks of each computing node can be adjusted in the later stage according to the computing performance; thus, computing nodes with high computing performance can train more data while reducing the amount of training data for nodes with lagging performance to achieve the load of the entire system balanced. This section discusses the three aspects of the WLBS strategy design, algorithm implementation, and related theoretical proof.

## 3.1 Limited Synchronous Parallel Strategy

Conversely, the LSP strategy alleviates the synchronization lag problem of the BSP strategy; however, it ensures the accuracy of model training and improves the training efficiency of distributed machine learning models. This section discusses the existing problems in the static scheduling process of the LSP strategy. LSP is based on the synchronization barrier, and proposes the concept of the limited synchronization barrier (LSB) and a limited threshold l, that is, when the current l computing nodes complete one iteration, the l computing nodes perform limited synchronization. The LSP strategy reduces the asynchronous defect to a certain extent, but it may still write the old local model to the parameter server. Another disadvantage is that the LSP strategy cannot adapt well to the dynamic changes of cluster node performance in the real environment, resulting in its inability to ensure accuracy.

The following combines the analysis of the limited synchronous parallel strategy based on static scheduling, and then provides a formal description. Suppose there are k computing nodes in a distributed cluster, \({u}_{k,t}\) is the model update increment of the k-th node in the t-round synchronization, and the state of the k-th node in the t-round synchronization is recorded as \({\tilde{\omega }}_{k,t}\), that is, the state of the initial model after t-round synchronization. *l* is the difference between the maximum number of iterations between the fast and slow nodes, that is, the limited threshold, where *l*∈[1,K], and the synchronization process that satisfies this condition is called limited synchronization. *l* is the difference in the maximum number of iterations between fast and slow nodes, that is, the limited threshold, where *l* ∈ [1, k]. The synchronization process satisfying this condition is called limited synchronization. When the number of limited synchronizations reaches *m*, all nodes enter the global synchronization process; that is, the number of limited synchronizations between two adjacent global synchronizations shall not be greater than m. The LSP model can then be described as:

$${\tilde{\omega }}_{k,t}={\omega }_{0}+\left[\sum _{{t}^{\text{'}}}^{t-m-1}\sum _{{k}^{\text{'}}}^{l}{u}_{{k}^{\text{'}},{t}^{\text{'}}}\right]+\left[\sum _{({k}^{\text{'}},{t}^{\text{'}})\in {u}_{k,t}}{u}_{{k}^{\text{'}},{t}^{\text{'}}}\right]$$

,

where \({\tilde{\omega }}_{k,t}\) represents the current model parameters,\({u}_{k,t}\) represents the updated subset of all computing nodes during the [t-m, t + m-1] iteration, and \({\tilde{\omega }}_{k,t}\) represents the current model parameters. \(\left[\sum _{{t}^{{\prime }}}^{t-m-1}\sum _{{k}^{{\prime }}}^{l}{u}_{{k}^{{\prime }},{t}^{{\prime }}}\right]\) represents the global update set of the previous local iteration, and \(\left[\sum _{({k}^{{\prime }},{t}^{{\prime }})\in {u}_{k,t}}{u}_{{k}^{{\prime }},{t}^{{\prime }}}\right]\) represents the update of this local iteration.

The limited synchronous parallel strategy has the following characteristics. The proportion of time occupied by the training of computing nodes increases, which avoids frequent synchronization waiting to a certain extent. This supports the composition of computing nodes with performance differences over time. This difference disappears with the recovery of computing node performance within a period of time; c. The delay threshold can be changed, but it cannot fundamentally solve the "delay" problem caused by stale model parameters for training.

In distributed systems, problems, such as network anomalies and hardware failures are frequently encountered. These problems cause differences in the training speeds of the computing nodes. To simplify the description, the differences in the training speeds of these computing nodes are called performance differences. The distributed machine learning system based on the limited synchronous parallel strategy can be applied to scenarios in which the computing nodes have performance differences over time. In a limited time, through the limited threshold l, one node with a faster training speed can perform limited synchronization first. After several limited synchronizations, the global synchronization threshold m can force all computing nodes with faster computing to stop training and wait for computing nodes with a lagging performance to perform global parameter synchronization. In an actual distributed cluster scenario, the time for which the performance difference between computing nodes exists is unpredictable. Moreover, as the scale of the distributed cluster increases, the possibility of a performance failure of one of the computing nodes is higher, and the recovery time of such a failure cannot be determined. In the limited synchronization parallel strategy, it is assumed that the faulty node in the cluster takes a long time to recover performance. Thereafter, when the number of limited synchronizations of the entire system reaches the threshold m, the global synchronization will still be affected by the faulty node, resulting in a distributed machine learning model that depends on the slowest compute node. Under more extreme conditions, the faulty node is down, and the iterative training is stopped. When the global synchronization conditions are met, the entire distributed training will stop running, which seriously affects the efficiency and results of distributed training and wastes a lot of computing resources.

As shown in Fig. 6, assume that in a statically allocated cluster with a total of five computing nodes participating in training, the global synchronization threshold m = 3. When the limited number of synchronizations in the entire system reaches 3, the global synchronization of parameters is forced. The statically allocated cluster means that in a distributed cluster, the number of computing tasks of each computing node is pre-allocated, that is, the amount of training data of each computing node is the same. Assuming an extreme situation, before entering the GSB, computing node 5 goes down and stops computing tasks; then, other computing nodes will be stuck in the synchronization waiting process until the performance of computing node 5 recovers.

## 3.2 Strategy Design

The LSP model for the static allocation of computing tasks cannot solve the long-standing performance difference problem in distributed systems. In this section, the core idea of the weight-based load balancing strategy WLBS is to determine the optimal amount of training data for each computing node and to recalculate and adjust within a certain period to finally achieve the effect of load balancing. The WLBS strategy recalculates and adjusts the optimal training task ratio of each node in a certain period, and adopts the LSP communication strategy for training in a certain period. Therefore, there are certain restrictions on the cluster environment conditions, and it is necessary to assume that the computing nodes in the cluster remain relatively stable during training in this period.

The key to the WLBS strategy is to determine the optimal amount of training data for each computing node and how to set the recalculation-adjustment period. Under the condition that the task training volume in a certain period is maintained unchanged, the training data volume of some computing nodes is dynamically adjusted according to the average time-consuming iterative training of each computing node in the previous cycle, until the optimal training data volume ratio is obtained. The dynamic allocation process was divided into initial and partial adjustments.

The initial division was mainly used before model training. According to the basic performance indicators (such as the number of CPUs, number of cores, and memory size) of the computing nodes in the cluster, the performance indicators of all computing nodes were aggregated into a performance indicator ratio array, and then the array was used in the initial division of the training data. To ensure that the total training data are certain, the load of each computing node is calculated according to the performance index ratio array, and then the computing node enters the next round of iterative training according to the initial divided data volume. Partial adjustment is mainly used in scenarios in which the performance of computing nodes is severely degraded during model training and cannot be recovered in a short time. Part or all of the training data of the node with the highest lagging performance is transferred to the computing node with the best performance. Tuning is performed only between the best-performing compute node and the worst-performing compute node in the previous cycle (the time between two partial tunings is defined as the partial tuning cycle). It underwent multiple partial adjustments in iterations until each node satisfied the optimal amount of training data. The partial adjustment process is divided into large and small adjustments.

(1) Major adjustment: The number of training samples of a node is adjusted, and the corresponding training time is one epoch. After multiple major adjustments, the training time for each computing node to complete all the samples tends to be the same.

(2) Small adjustment: The batch size is adjusted for each round of calculation. The corresponding training time was the time taken for one batch. After several small adjustments, the time required for each round of parameter calculation for each computing node tends to be the same.

After several partial adjustments, because the training time for each computing node to complete all the samples and the time for calculating the parameters in each round tend to be the same, the training time of each computing node tends to be stable. Because the training time for completing a batch between each computing node tends to be the same, the problem of being stuck in synchronization waiting owing to changes in the computing performance of the computing nodes is solved. When the computing node satisfies the condition that the time difference of any computing node to complete a batch is less than or equal to \({\theta }_{b}\), the small adjustment process of partial adjustment will be stopped, and when the computing node satisfies the requirement that the time-consuming difference of any computing node to complete an epoch is less than or equal to \({\theta }_{e}\), the large adjustment process of part of the adjustment will be stopped. The conditional expression is as follows:

$$\left|{bTime}_{i}-{bTime}_{j}\right|\le {\theta }_{b},{\theta }_{e}, i\ne j,1\le i,j\le N$$

Here, \(b{Time}_{i}\) represents the training time of the ith batch (single iteration) or epoch, and N represents the number of computing nodes. \({\theta }_{b}\) represents the time precision coefficient of batch (single iteration) training and \({\theta }_{e}\) represents the time precision coefficient of epoch training.

Considering the LSP communication model as an example, it is assumed that there are five computing nodes in the distributed system participating in task training. As shown in Fig. 2.2, before partial adjustment, the computing performance of nodes 1 and 3 is better, thus resulting in limited synchronization waiting. The computing performance of computing nodes 2 and 5 is relatively lagging, which may be caused by an unstable network, preemption of computing resources by other applications, hardware failure, and others. By reducing the computing task volume of node 5 and increasing the computing task volume of node 1, the difference in time-consuming calculation iterations between the two is gradually reduced. Simultaneously, computing node 2 may not perform training owing to downtime and other reasons. After partial adjustment, its computing tasks are distributed to other nodes such that the robustness of the distributed system is not affected. In addition, after several partial adjustments, except for some faulty nodes, the training speeds of other computing nodes are similar, thereby improving the global consistency of computing to a certain extent; that is, the size of the limited threshold is close to that of the computing nodes in the distributed system. Difference between the total number of nodes that cannot continue to participate in task training owing to failure.

## 3.3 Algorithm Implementation

For the strategy design in Section 3.2, this section elaborates on the WLBS strategy from the perspective of algorithm implementation. The WLBS strategy is designed based on the parameter-server architecture, and the nodes are divided into computing and parameter server nodes. The communication between the computing node and the parameter server node is realized using PYTORCH's distributed communication GLOO back-end framework primitives, the point-to-point TENSOR object communication is realized through the IRECV and ISEND primitives, and the parameter server is used to realize the multi-node or non-TENSOR object communication through the GATHER primitive point-to-point communication, and there is no communication between computing nodes.

First, the pseudo code of the WLBS strategy calculation node is given:

The implementation steps of the above strategy are as follows:

(1) Load the training data of the node.

(2) If the current epoch number is less than or equal to the predetermined epoch number, the epoch training is entered; otherwise, if the epoch number is greater than the predetermined epoch number, the node training ends.

(3) Use the GATHER primitive of PYTORCH's distributed communication GLOO back-end framework to determine whether the current computing task volume signal needs to be updated; in such a case, update the current node workload; otherwise, proceed to step (4).

(4) Obtain and record the current timestamp. If the BATCH number is a multiple of K, communicate with the parameter server and obtain the semaphore whether K needs to be updated through the GATHER primitive. The amount of training data for one iteration of the computing node is equal to the product of K and BATCH_SIZE; therefore, adjusting the size of K is equivalent to adjusting the amount of training data for one iteration. In such a case, receive it from the parameter server node; otherwise, proceed to step (5).

(5) Calculate the local gradient using the back-propagation algorithm and send it to the parameter server through the ISEND primitive.

(6) Receive the new gradient parameters from the parameter server node through the IRECV primitive as model parameters for the next iterative calculation.

(7) Determine whether the iteration is the last in the epoch, send its value to the parameter server, and then skip to step (2) for repeated training.

The pseudocode for the WLBS policy parameter server is given:

The implementation steps of the above strategy are as follows:

(1) Initialize the initial allocation of variables, the large adjustment and small adjustment semaphores of partial adjustment to TRUE, and the semaphore that stops partial adjustment to FALSE.

(2) If the current epoch number is less than or equal to the predetermined epoch number, the epoch training is entered; otherwise, if the epoch number is greater than the predetermined epoch number, the node training ends.

(3) The GATHER primitive is used to distribute the updated calculation task amount signal to all the nodes. If the update node calculation task amount signal is TRUE, it is judged again whether it is the initial training. In such a case, the initial division is adopted; otherwise, partial adjustment is used, and the initial division is used. The result of the partial adjustment is sent to all the nodes through the GATHER primitive.

(4) The GATHER primitive is used to distribute an iteratively adjusted training data volume signal to all the nodes. If the signal is true, the partial division is performed and the result is distributed to the designated computing node.

(5) Collect the gradient of the nodes in the limited list, the time information of this round of iterations, and whether the current iteration is the last iteration signal of this epoch.

(6) The collected gradients are aggregated into a global gradient and distributed to nodes in a limited list.

(7) If the current iteration is the last iteration of the epoch, enter the next epoch, skip to step (3), and repeat the training; otherwise, continue iterative training, skip to step (4), and repeat the training.

## 3.4 Theoretical Proof

In the distributed machine-learning training process, the training dataset was distributed on each computing node. This section analyzes the feasibility of the dynamic optimization strategy for computing tasks by combining it with the parameter update method of the node-training process. The machine-learning training process involves solving the optimal model parameters through repeated iterations, which is:

$${\omega }^{*}=\text{arg}\underset{\omega \in {R}^{P}}{\text{min}}\sum _{i=1}^{d}f\left(\omega ;{x}_{i},{y}_{i}\right)+ \tau R\left(\omega \right)$$

1

In the training that does not use the dynamic strategy of computing task volume, the training dataset D is evenly distributed among K computing nodes, denoted as \({D}_{1}，{D}_{2}，{D}_{3}，\dots ，{D}_{k}\)., t, s, s, umed that each computing node must perform n iterations in one epoch, and each iteration is random. Select a small batch sample set \({S}_{t}\subset \left\{1,\dots ,n\right\}\), then the gradient calculation expression for each time is

$$\nabla {f}_{{s}_{t}}\left({\omega }_{t}\right)=\frac{1}{\left|{S}_{t}\right|}\sum _{i\in {S}_{t}}\nabla {f}_{i}\left({\omega }_{t}\right)$$

2

The single node parameter update method is:

$${\omega }_{t+1}={\omega }_{t}-{\alpha }_{t}\nabla {f}_{{s}_{t}}\left({\omega }_{t}\right)$$

3

1)The parameter update method of the limited synchronous parallel model is:

$${\tilde{\omega }}_{k,t}={\omega }_{0}+\left[\sum _{{t}^{\text{'}}}^{t-m-1}\sum _{{k}^{\text{'}}}^{l}{u}_{{k}^{\text{'}},{t}^{\text{'}}}\right]+\left[\sum _{({k}^{\text{'}},{t}^{\text{'}})\in {u}_{k,t}}{u}_{{k}^{\text{'}},{t}^{\text{'}}}\right]$$

4

,

where \({\tilde{\omega }}_{k,t}\) represents the current model parameters, \({u}_{k,t}\) represents the updated subset of all computing nodes during the [t-m, t + m-1] iteration, and \({\tilde{\omega }}_{k,t}\) represents the current model parameters. \(\left[\sum _{{t}^{\text{'}}}^{t-m-1}\sum _{{k}^{\text{'}}}^{l}{u}_{{k}^{\text{'}},{t}^{\text{'}}}\right]\)represents the global update set of the previous local iteration, and \(\left[\sum _{({k}^{\text{'}},{t}^{\text{'}})\in {u}_{k,t}}{u}_{{k}^{\text{'}},{t}^{\text{'}}}\right]\) represents the update of this local iteration.

For a convex function \(f\left(\omega \right)=\frac{1}{T}\sum _{t=1}^{T}{f}_{t}(\omega )\), assuming that its component \({f}_{t}\left(\omega \right)\) is also a convex function, we determine the model parameter \({\omega }^{*}\) under its optimal model. Suppose there are K computing nodes in the distributed cluster, limited threshold \(l\in [1，K]\), and global synchroniz, tion threshold m > 1. Let \({u}_{t}≔-{\alpha }_{t}\nabla {f}_{t}\left({\tilde{\omega }}_{t}\right)\), where \({\alpha }_{t}=\frac{\sigma }{\sqrt{t}}\), \(\sigma =\frac{F}{L\sqrt{2(m+1)l}}\), and L and F are constants. Assuming that \({f}_{t}\) satisfies the Lipschitz continuity condition, and the parameters satisfy the \(‖\omega -{\omega }^{*}‖\le C,\forall \omega ,{\omega }^{*}\) condition, we obtain from [24]:

$$R\left(\omega \right)≔ \frac{1}{T}\sum _{t=1}^{T}{f}_{t}\left({\tilde{\omega }}_{t}\right)-{f}_{t}\left({\omega }^{*}\right)\le 4FLK\sqrt{2\left(m+1\right)lT}$$

5

In the case where the training data of the computing nodes are different, for a distributed cluster of K nodes, the number of samples that each node iteratively processes is \({size}_{k}\), and the total number of samples for one round of iterative training is \(\text{S}\text{I}\text{Z}\text{E} = {\sum }_{k=1}^{K}{size}_{k}\). Assuming that all samples are trained on one computing node, the learning rate is \(\alpha\), and the parameter update method is

$${\omega }_{i+1}={\omega }_{i}-\frac{\alpha }{SIZE}\sum _{j=1}^{M}\nabla {f}_{j}$$

6

The samples were distributed to K computing nodes and the parameter update method was

Here, \(\widehat{\nabla }{f}_{k}\) represents the cumulative value of the gradient of the loss function of the kth computing node, and the value of \(\frac{1}{K\times {size}_{k}}\) will change with the number of samples iteratively processed by each node, which can be dynamically updated according to the performance of the computing node. This expression can be obtained by reducing the influence of the gradient of the loss function of the computing nodes with higher performance and reducing the loss caused by the local model update on the overall training.

3) It can be observed from (1) and (2) that the limited synchronous parallel model is convergent, and the dynamic adjustment of the calculation amount will not change the convergence property of the model; therefore, the distributed machine learning training using the WLBS strategy is convergent. Therefore, it was theoretically proven that the WLBS strategy is feasible.