RESEARCH ARTICLE

DRPS: efficient disk-resident parameter servers for distributed machine learning

  • Zhen SONG 1 ,
  • Yu GU , 1 ,
  • Zhigang WANG 2 ,
  • Ge YU 1
Expand
  • 1. School of Computer Science and Engineering, Northeastern University, Shenyang 110819, China
  • 2. College of Information Science and Engineering, Ocean University of China, Qingdao 266100, China

Received date: 06 Sep 2020

Accepted date: 09 Apr 2021

Published date: 15 Aug 2022

Copyright

2022 Higher Education Press

Abstract

Parameter server (PS) as the state-of-the-art distributed framework for large-scale iterative machine learning tasks has been extensively studied. However, existing PS-based systems often depend on memory implementations. With memory constraints, machine learning (ML) developers cannot train large-scale ML models in their rather small local clusters. Moreover, renting large-scale cloud servers is always economically infeasible for research teams and small companies. In this paper, we propose a disk-resident parameter server system named DRPS, which reduces the hardware requirement of large-scale machine learning tasks by storing high dimensional models on disk. To further improve the performance of DRPS, we build an efficient index structure for parameters to reduce the disk I/O cost. Based on this index structure, we propose a novel multi-objective partitioning algorithm for the parameters. Finally, a flexible workerselection parallel model of computation (WSP) is proposed to strike a right balance between the problem of inconsistent parameter versions (staleness) and that of inconsistent execution progresses (straggler). Extensive experiments on many typical machine learning applications with real and synthetic datasets validate the effectiveness of DRPS.

Cite this article

Zhen SONG , Yu GU , Zhigang WANG , Ge YU . DRPS: efficient disk-resident parameter servers for distributed machine learning[J]. Frontiers of Computer Science, 2022 , 16(4) : 164321 . DOI: 10.1007/s11704-021-0445-2

1 Introduction

Nowadays, machine learning (ML) often needs to iteratively process large-scale datasets and high-dimension parameters. In real applications, the dataset scale can range from 1 TB to 1 PB, while the parameter dimension can reach even one trillion [1]. Considering the case where 10 million users are rating 10 thousand movies, if we assume r = 1000 (where r is the rank of the matrix), the parameter dimension is more than 10 billion with the matrix factorization (MF) model. As an essential strategy to deal with datasets and parameters at such a large scale, distributed machine learning systems have induced research interests broadly.
Parameter Server (PS) [1-4] as the state-of-the-art distributed framework for large-scale iterative machine learning, has been widely applied to many real scenarios. However, existing PS-based systems are often implemented as memory-resident schemes, which leads to the following challenges. First, renting/purchasing lots of machines for scalability is not always economically flexible. Second, a large number of machines may increase the difficulty of parallel management. Last but not least, if researchers intend to train out-of-memory ML models in their local clusters, they must choose a disk-resident pattern.
Therefore, we consider extending the existing parameter servers to the disk-resident framework (DRPS) to reduce economic costs and enhance system scalability. However, a straightforward disk implementation cannot work well as expected. On the one hand, it inevitably yields a large number of random I/O behaviors, which dramatically slows down the training process. On the other hand, previous PS-based partitioning techniques work poorly as they do not consider the non-negligible disk I/O cost. Besides, the disk-resident framework aggravates the skewed workload, which produces heavy straggler problems. In summary, our motivation is to process large-scale machine learning tasks as quickly as possible with restricted resources.
To this end, we tailor a series of optimizations for DRPS. Our DRPS focuses on a general framework, which is consistent with the traditional parameter server on the scope of applications. Our contributions are summarized as follows:
● We design a disk-resident distributed machine learning system named DRPS by storing datasets and parameters on disk to enhance system scalability, and then reduce the economic cost.
● We present a distributed global index structure, and optimize the index-building algorithm to improve its efficiency and effectiveness. Based on this, we propose a novel multi-objective parameter partitioning algorithm to balance the skewed workload and relieve network traffic pressure.
● We explore a novel worker-selection parallel model of computation named WSP to achieve adaptive parallel control by analyzing the real-time state and benefit to wait.
The rest is organized as follows: Section 2 introduces the related work; then, the preliminaries and the overview of the proposed system are contained in Section 3. Section 4 describes the detailed strategies of the distributed global index and Section 5 introduces the multi-objective parameter partitioning technique. Section 6 mainly illustrates the specific design of the parallel model WSP. We provide experimental results and analysis in Section 7. Section 8 discuss the applicability and limitation of this paper. Finally, Section 9 concludes this paper.

2 Related work

2.1 Machine learning systems

The researches on machine learning systems are attracting a lot of efforts. We separate the existing prevailing ML systems into three categories: Stand-alone systems, PS-based systems and All-reduce-based systems. Hogwild! [5] is a stand-alone machine learning system with the lock-free parallel model. HELIX [6] optimizes the execution across iterations, including intelligently caching and reusing, or recomputing intermediates as appropriate. The stand-alone ML systems generally show poor scalability and parallelism. The existing representative PS-based systems, such as Petuum [3], Parameter Server [1], MXNet [2], TensorFlow [4], are all based on a memory implementation. FlexPS [7] introduces a novel multi-stage abstraction to support flexible parallelism control, which can map machine learning tasks to a series of stages, and the parallelism for a stage can be set according to its workload. PS2 [8] builds the Parameter Server on top of Spark [9]. The pieces of literature mentioned above are orthogonal to our paper, which mainly focus on the optimizations of a memory-based implementation. All-Reduce is another famous distributed framework for machine learning. BlueConnect [10] designs a parallelizable All-Reduce mode to make a trade-off between latency and bandwidth. However, systems like this are always multi-GPUs friendly architecture. Moreover, there are also some works to explore the adaptive adjustment among these architectures. Husky [11] attempts to strike a better balance between high performance and low development cost. BytePS [12] studies the load allocation problem of distributed machine learning under different heterogeneous GPU/CPU resources, which does not consider the impact of disk I/Os on load allocation. HybridGraph [13] is also a disk-resident distributed system, however, it mainly focuses on distributed graph iterative tasks.

2.2 Parameter index and partition

Qin et al. [14] design an efficient stand-alone parameter index, which adopts pruning and sampling strategies to make the index-building algorithm feasible. Motivated by it, we build the global index structure in parameter servers. Also, we utilize a modified distributed scheme to speed it up. Parsa [15] proposes a distributed partition algorithm to reduce the communication overhead. As it aims at memory-resident PS, Parsa does not take disk I/O cost into account, which cannot be neglected in our DRPS. Nowadays, some dynamic parameter assignment methods are proposed. LAPSE [16] supports to allocate parameters dynamically, and explores the possibility of dynamic parameter allocation employed in PS. PSLD [17] proposes a prediction-guided exploitation-exploration approach for dynamic PS load distribution, and supports the dynamic parameter reassignment. The idea of dynamic migration can be applied to our work for better performance. However, it needs to be designed carefully for the parameter index blocks introduced, which is not what we are concerned about in this paper. HEGJOIN [18] proposes a novel heterogeneous CPU-GPU distance similarity join algorithm, and develops corresponding work partitioning strategies. However, it is not aimed at parameter partitioning in ML tasks.

2.3 Parallel models of computation

There are three main parallel computational models supported by the parameter server: BSP [19], ASP [5] and SSP [20,21]. BSP is the synchronization model, which blocks the process of parallel execution until all workers complete the current iteration. It can ensure the correctness of parallel algorithms, but the expensive synchronization cost is often unacceptable, which is also referred to as the straggler problem. ASP is a lock-free asynchronous model; that is, the worker can directly start the next iteration without waiting for other machines after completing the current iteration worker updates, which is common in DRPS. Besides, SSP needs to tune δ to get the best effect. In addition, AAP [22] is an adaptive asynchronous parallelization, which focuses on graph algorithms instead of machine learning in PS. Jiang et al. [23] propose two learning rate schedules to enhance robust convergence, which can be adopted in our proposed WSP to further speed up the process of convergence. FSP [24] studies the flexible synchronous parallel framework for Expectation-Maximization approach, however, it is not suitable for machine learning tasks.

3 Preliminaries and overview

Machine learning and mini-batch training We focus on a general category of machine learning applications that are trained by the gradient descent algorithm family. The input data consists of D={xi,yi}i=1N, where xi and yi represent the feature vector and the label respectively, and N describes the number of training samples. ML algorithms use the input data D and the model θ to calculate the predicted value yi^ and construct the loss function: J(θ)=i=1Nf(θ,xi;yi).
The purpose of the optimization algorithm is to find an optimal θRD to minimize the loss function J(θ). For mini-batch stochastic gradient descent (mini-SGD), it just chooses a batch of data for calculating the gradient at every iteration: gk=1mibatchkf(xi,yi,θ). After calculating the gradient of a batch, mini-SGD iteratively updates parameter θ through the following formula: θ=θηgk, where η is the learning rate and m is the size of a single batch.
Parameter index and partition An appropriate index structure can significantly decrease the number of disk I/Os as shown in Fig. 1. Here, u represents a batch of data, and v represents the parameters. The edge between u and v indicates that batch u needs to use parameter v for training. By the left of Fig. 1, we can see that many batches are accessing v1 and v7 at the same time. We then build index for v1 and v7. As shown in the right of Fig. 1, we reduce the number of disk I/Os from 8 to 4. Suppose we set the disk seek time to ts and the disk access time per unit parameter to ta, where ts>>ta. Before indexing, the total disk I/O time for v1 and v7 is 8ts+8ta. After indexing, the time is reduced to 4ts+8ta. The time reduction is 4ts. We call parameters that are indexed together as a parameter index block (we represent it as PIB for short).
Fig.1 Model index example

Full size|PPT slide

Parameter partition aims to reduce network communications by assigning parameters to different servers. It follows the principle of proximity, i.e., allocating parameters to the machine that use them most frequently, which transforms network communications into local ones. For example, we denote the time of accessing a single parameter through the network as tnet. If we partition parameters randomly as the left of Fig. 2, the total network communication cost is 6tnet. However, we reduce the cost of the network communication to tnet by partitioning parameters as the right of Fig. 2.
Fig.2 Parameter partition example

Full size|PPT slide

Overview of DRPS The disk-resident parameter server (DRPS) follows the basic idea of PS. Servers and workers logically constitute DRPS. Each machine runs a worker and a server physically in our DRPS, which are implemented by threads. Servers and workers of the same machine communicate by the local thread. Otherwise, they pass the message by the network. Servers aggregate gradients and update parameters, while workers complete the major computation. The reason why we adopt this parameter server implementation is that we can make full use of the resources of each machine in the cluster.
DRPS uses a lightweight key-value database LevelDB as the KV-Store. The KV-Stores running on servers mainly store parameters, while those running on workers are responsible for managing datasets. At the training process, when the workers request servers for parameters, DRPS reads parameters from the KV-Store; when the servers need to update parameters, DRPS reads the parameters before updating, and writes the parameters back to KV-store after updating.

4 Index construction and optimization

In this section, we introduce the details of our distributed index first. In order to further reduce the runtime cost, we design a new optimization approach to improve the distributed indexing algorithm.

4.1 Distributed indexing process

We use mini-SGD as the training method. Each worker only reads a batch of m pieces of data for training. We denote the batch as bij, which means the j-th batch on the i-th machine. The batch number of each worker is Ni=nim, where ni represents the total number of records in training data assigned to worker i. Here, we divide the dataset evenly into workers. Each worker contains the same number of batches.
During the training process, worker i uses bij for training. The worker needs to access parameters based on bij. We represent the required parameters as Pij, which is a parameter set consisting of key-value parameters. The size of Pij is often related to the sparsity of training data and the size of the batch. Machine learning algorithms usually need to perform multiple epochs to converge, where each epoch represents performing a traversal of the entire dataset.
The total number of batches is i=1Nnim. Each batch has its own parameter sets needed for training. The set of parameters between batches can overlap, and the more times an overlapping combination (parameter pairs or PIB pairs) occurs, the more times the combination is accessed together. By indexing such combinations, we can greatly reduce the disk I/O cost of parameter reads and writes.
In order to make full use of resources in the distributed cluster to accelerate the process of index establishment, we propose a distributed global index structure, and further optimize the index building algorithm. For all workers, they calculate the time cost reduction of all pairwise combinations of parameters based on the local dataset and then push them to the servers, which aggregate the statistics and merge pair with the maximal time cost reduction. The above process is repeated until the time cost reduction of any pair is less than 0 (or set a minimum gain of γ). The formulas of time cost reduction after merging Bi and Bj are as follows:
cost(Bk)=AccNum(Bk)(ts+Bkta),
cost(Bij)=AccNum(Bij)(ts+Bi+Bjta),
Δij=cost(Bi)+cost(Bj)cost(Bij),
where Bk is the kth parameter index block; AccNum(Bk) is the accessing number of Bk by dataset; ts is a single seeking time of disk; ta is the sum of writing and reading time of a unit parameter; AccNum(Bij) is the accessing number for Bij; Eq.(1) is the access time for Bk; Bij is the parameter index block after merging Bi and Bj; Eq.(2) is the accessing time after merging Bi and Bj; Eq.(3) is the time reduction after merging Bi and Bj.

4.2 Index constructing optimization

However, we find that the time complexity of the algorithm is O(NDf2D3), where N is the number of data pieces; Df denotes the average dimension number of each parameter; D indicates the total dimension number of the ML model. In practice, such an extravagant time cost is not acceptable. To increase the availability of the algorithm in practical scenarios, dimension pruning and data sampling are used in existing methods. The former removes the parameters with low accessing frequency at a certain proportion, which are no longer involved in index constructing. Here, we give a specific example to show how it affects the effect of the index. Suppose that a machine learning model has four dimensions (i.e., four parameters): a, b, c and d, and the accessing frequency of these are 20, 25, 3 and 4, respectively. If the pruning ratio is set as 25%, c is pruned. After constructing index, the parameters are now divided into two parameter index blocks {a, b} and {d}. The total benefit comes from the time reduction of constructing the index for a and b. However, if we do not prune c, it may benefit further from the index for c and d. The latter only needs to make statistics on data after sampling, instead of all data. Although pruning and sampling can significantly decrease D and N, they bring a negative impact on the effectiveness of our index.
In acutual operation, we notice that the algorithm runs extremely slowly when the size of remaining dimensions is more than a thousand, even logger than the total time of online machine learning training. Excessive pruning tremendously weakens the effect of the index, which means that indexing the remaining dimensions still has great potential for reducing disk I/O time. Every pair of parameters merged needs to traverse the dataset once, and the vast disk I/O cost is appalling. Therefore, reducing indexing iteration rounds is an intuitive way to improve the speed of the algorithm. It motivates us to optimize the efficiency of the distributed variant further.
In existing work, a large number of accessing frequency statistics in each iteration are wasted. Thus, our idea is to merge as many parameters as possible instead of just merging one parameter pair in each iteration. We greedily choose PIB (Parameter Index Block) pairs with top- k maximum cost reductions. Firstly, we merge the pair with the largest cost reduction. Next, if the PIBs of the second-largest pair are not contained in the merged pairs of the current iteration, we merge them. Otherwise, we should judge if it is worthy of merging the pair into other merged pairs. For example, a ML model has six parameters (each parameter corresponding to a PIB initially) af and satisfies the inequality Δa,b>Δa,c>Δc,d> Δe,f (where Δi,j is the value of time reduction after merging PIBs i and j). As we can see, the second-largest pair is contained in the merged pair in the current iteration. Thus, we cannot merge the {a,c} into {a,b}, because we can just make clear Δa,c>Δc,d, but cannot ensure whether Δ{a,b},c is larger than Δc,d. We will merge {a,b}, {c,d} and {e,f} at this iteration instead of {a,b,c,d} and {e,f}. In most cases, there is a little difference between the values of Δi,j in the same iteration. As a result, the performance will not drop very significantly, even though Δ{a,b},c>Δc,d. When Δ{a,b},c>>Δc,d, the PIB pair {c,d} can also be merged into {a,b} at the next iteration. Exceptionally, the effect of our optimization approach can drop rapidly when the data distribution emerges such characteristics, in which Δ{a,b},c is quite large, while Δ{a,b},{c,d} is rather small at the same time. From the experimental statistics on lots of datasets, we find that the PIBs appearing in the top- m largest cost reduction pairs in the current iteration will hardly appear in the next nearly n merging iterations. Therefore, we choose the top- k cost reduction pairs without merged PIBs of the current iteration. If the cost reduction is larger than the minimal gain of γ, we merge them. We fully demonstrate its effectiveness and practicability in the experimental part.
The overall process is shown in Algorithm 1. Our approach first receives and aggregates accessing matrices. And it calculates time cost reductions Δij (Lines 1−3). Furthermore, it obtains top- k time reductions (Lines 4−7) and determines whether the largest Δ is larger than γ. If not, it terminates (Lines 8−10). Otherwise, it merges the PIBs according to the top- k time reductions (Lines 10−14). The time complexity of the algorithm is O(1kNDf2D3). In theory, we can provide the k-fold acceleration ratio of the algorithm. The space complexity of the algorithm is O(Bopt2), where Bopt is the number of PIBs.

5 Multi-objective parameter partition

In this section, we describe the partitioning method based on disk I/O and network communication costs in detail. For better understanding, we first introduce the network communication cost produced in the training process, and then a multi-objective parameter partition algorithm based on the cost of disk I/O and that of communication is proposed.
The network communication cost mainly includes two aspects: one is workers accessing parameters from parameter servers, the other is parameter servers accessed by workers. When we train a machine learning model by mini-SGD in the parameter server framework, a worker-thread and a server-thread run on a machine. When the worker-thread uses the local training dataset for calculation, it needs to pull parameters from server-threads (including the local machine and remote ones). Because the worker and server on the same machine communicate by threads, which is far less than the cost of network communication, we ignore the time cost of pulling from the local server. The remaining is the local worker-thread pulls parameters from parameter servers on other machines. The server-thread contributes the other time cost to the machine. The parameters assigned to servers can be accessed by workers (the time incurred by the local worker is also ignored). The communication cost is shown as follows:
costaccOtheri=j=1Ni(PijVi)tnet,
costaccessedi=l=1limj=1Nl((ViPlj)tnet),
costtotali=costaccOtheri+costaccessedi,
where Eq.(4) is the time of accessing parameters from parameter servers on other machines; Eq.(5) stands for the time of parameters accessed by workers on the other machines; Eq.(6) describes the total network communication time of machine i. Vi is the parameters stored in machine i; Pij is the parameters needed by Uij for training, where Uij is the jth batch in machine i; tnet is the network communication time of a unit parameter.
After indexing in Section 4, we can get the optimal PIBs for the global training dataset, where the disk I/O time of each PIB is preserved. Then, the multi-objective parameter partition algorithm starts to assign the PIBs to machines over the distributed cluster.
We can get the access time of each PIB assigned in every machine to build a matrix of W×Bopt, where W is the number of machines and Bopt is the number of PIBs. Then we add up the network communication cost with the disk I/O cost of each PIB. Thus, this section transforms the above problem into an optimal allocation problem. As shown in Algorithm 2, our proposed approach receives the pull requests from workers, sends message Bopt to workers, and waits to get the network communication cost of each Bi (Lines 1 and 2). It calculates the total cost (including network communication cost and disk I/O cost) of each PIB (Lines 3−5).
In our proposed method, the parameter servers are equivalent to n buckets, and each PIB has its fixed time cost in different buckets. Therefore, the problem is transformed into how to put all PIBs in buckets to minimize the total time of bucket with the largest time cost. To solve the above optimization problem, we adopt a greedy strategy. First, we choose the one with the minimal time cost as the inserted bucket at each iteration. Next, we select the PIB with the minimal time cost in the inserted bucket. Finally, we insert the PIB into the bucket.
For the whole algorithm mentioned above, we give a detailed example to explain its execution process. Figure 3 shows the entire process of building distributed indexes. First of all, we initialize each parameter as a PIB (parameter index block). Each machine uses the local dataset and current optimal PIBs (intermediate results of index establishment) to count the accessing number of PIBs and their pairwise combinations. Then, each worker in the cluster pushes the local accessing matrix to the master. The master aggregates the accessing matrices into a global accessing matrix. Next, the master computes the disk I/O cost reduction value of every pairwise combination of PIBs and selects the pair with the maximum reduction value to merge. Then, we choose the pair with the second-largest cost reduction. We need to determine whether the pair is included in the PIBs that have been merged in this round of iteration. If not, we merge them directly. Otherwise, we choose the third-largest cost reduction to execute. We select the pair with first to kth maximum cost reduction to perform the merging operator, and then start the next iteration.
Fig.5 Distributed index establishment

Full size|PPT slide

After establishing the distributed index, we can calculate the total access time for each PIB shown as the last step of Fig. 3. Next, we will introduce the multi-objective partitioning method based on the disk I/O cost computed from the distributed index establishment. We can calculate the network communication cost of each PIB placed on each machine. As shown in Fig. 4, we sum the network communication time of each PIB on each machine with its disk I/O time. And then, we can obtain a total cost matrix with the information of network communication cost and disk I/O cost. In particular, Aij in the accessing matrix is the time cost of putting the ith index block on the jth machine.
Fig.6 Parameter partitioning method based on communication cost and disk I/O cost

Full size|PPT slide

Till now, we have transformed the multi-objective parameter partition problem into maximizing benefit, which is solved by a greedy strategy. First, we select the bucket with the minimal current cost as the insertion container. As shown in Fig. 4, bucket 1 is the bucket with the minimal current cost. Second, we choose the node with the minimal cost in the selected bucket as the inserted node. We find B2 is the minimum cost value in bucket 1, we hence insert B2 into bucket 1. Then, we update the cost of the inserted bucket, i.e., update the value of cost1 to 68. The process is performed iteratively and does not stop until all PIBs are inserted into the buckets.
Algorithm 3 shows the process of the multi-objective parameter partitioning approach. First, we traverse to find the bucket bucketI with the minimal value of total cost (Line 2). And then, we find the node nodeI with the minimal value of total cost (Line 3). We assign the nodeI to bucketI (Line 4). The algorithm does not stop until all PIBs have been assigned. The time and space complexity of the algorithm are both O(BoptW).

6 Worker-selection parallel model

High contention for I/O makes the system stability and balance go worse, which inspires us to present the real-time state aware parallel computation model.

6.1 Parallel models in DRPS

When workers request servers for parameters to start the next iteration, the parallel model adopted by the master determines whether or not to respond to the requests of workers. There are three parallel computing models used in the traditional parameter server, namely BSP, ASP and SSP, all of which are supported by our DRPS. BSP and ASP are two extremes, both of which hold apparent virtues and limitations. SSP makes a trade-off between them. It uses a delay-bounded δ to restrict the staleness of parameters while it allows a slight degree of asynchronism. When δ=0, SSP degrades to BSP, and when δ=, SSP is equalized to ASP.
SSP controls parameter staleness through a delay bound δ. There are two disadvantages to SSP. First, the parameter δ is very critical, but users must manually tune it based on experience, which is not a trivial task. Second, SSP is controlled by a simple bound. It ignores the common situation where the degree of staleness has not arrived the limit bound, but the request machine only needs to wait a short time for other machines completing the updating. For example, as shown in Fig. 5, we set the bound of SSP to 2. At t1, workeri has finished its computation, and requests parameters to start the next iteration. If we adopt SSP, because it has not reached the bound limit, the master accepts parameter requests. However, we find that workeri just needs to wait a short time of twait, and then, it can get the updating of workerj. We can eliminate this part of the staleness of workeri.
Fig.8 The drawback of SSP

Full size|PPT slide

6.2 Core of WSP

WSP does not always adopt the same parallel computing model in the execution process, but automatically selects the optimal parallel strategy according to the current worker’s computing state. We call it Worker-Selection Parallel Model (WSP). SSP can not be aware of what is going to happen in the future, while WSP has the “predictability”.
When the parameter request of a worker reaches the servers, the worker automatically makes a trade-off between the staleness and the waiting time to decide whether or not to wait for other machine updates. The execution process is shown in Fig. 5. We suppose wij is the ith machine at the jth iteration. At the time of the dotted line, w12 updates its local gradients and requests parameters for the next iteration. The process w12 needs to determine whether or not to wait for other processes’ arriving. At this time, WSP calculates the gain value of a strategy according to the waiting time and staleness degree to choose a strategy with the maximal gain for execution. The gain function is calculated as follows:
gaini=1TEiti1W1l,lilSln(Ii+1Il),
where TEi is the execution time of machine i; W is the number of workers; ti is the time of current worker waiting for workeri; Sl is the set selected to wait if the worker chooses to wait for workerl; Ii and Il are the current iteration rounds of request worker and workerl.
The gain function consists of two parts: waiting time and staleness degree. Since these two parts are inversely proportional to the gain function, we put a negative sign for each part. In order to make the two parts comparable, we multiply them by coefficients, respectively. The former part is easy to understand, and we hence focus on the latter part. If the request worker wants to wait for workeri, the staleness of workers finishing the current iterations before workeri will be eliminated. The left staleness consists of the other workers, which the request worker does not wait for.
However, there is a problem that we cannot get the iteration time of other workers before they arrive. We can only estimate this information. When we use the distributed cluster to train the machine learning model, the cluster may perform other tasks. The resources used for training are variable. We consider that there is little change in computing resources under continuous state (except critical state). We use the last iteration time to replace the current iteration time.

6.3 Detailed design and implementation

There are two main structures of WSP: ITTable and SCTable. The former records the time of starting, ending, iteration, and the iteration round of each worker, while the latter records the waiting time, staleness and gain of each strategy.
When the server receives a parameter request from worker wi, it first needs to determine whether or not the worker wi is in the waiting sets of other workers. If in, i.e., wiSk, the thread of wi is blocked, and the counter of the waiting set Ck increases by 1. If the counter is equal to the size of the waiting set, then the servers respond to all the workers in the waiting set Sk, and do nothing otherwise. If not, WSP updates SCTable according to ITTable to select the strategy with the maximum total gain for performing. Finally, WSP updates ITTable according to the selected strategy.
At the beginning of execution, WSP needs to synchronize a round of iteration to initialize ITTable. Before the beginning of the iteration (equivalent to after returning the parameter), the start time is recorded. When the parameter server receives the parameter request from the worker, the end time of the corresponding worker is recorded. The iteration time of the worker in ITTable is calculated as the end time minus the start time, i.e., Titer=TeTs, and the current iteration round of wi is set to iteration clock plus one.
Here is a specific example to illustrate the implementation of WSP. After initialization, we get the first table shown in Fig. 6, where W is the index of the worker, Ts is the start time, Te is the end time, Titer is the execution time, and I is the iteration round. Based on Eq.(7) and IITable, WSP calculates the second table in Fig. 6, where S is the index of strategy, Tw is the wait time, G is the gain to execute the strategy, and hooking in Op represents adopting this strategy. The values of the gain for s0, s1, s2 are 23, 2313, 53 respectively. Moreover, the servers distribute parameters to worker0, when WSP updates Ts to the current time and updates I to the current iteration clock (iteration round of the fastest worker) increasing by 1. After worker0 finishes the next iteration and requests parameters, WSP updates Te to the current time and Titer to TeTs respectively.
Fig.9 The execution process of WSP

Full size|PPT slide

6.4 Algorithm and complexity analysis

Algorithm 4 shows the detailed worker selection process of WSP. In order to guarantee the convergence, we set the maximum staleness bound δ as SSP (Lines 1 and 2). WSP does not always execute the asynchronous strategy when it does not exceed the limit, but chooses the strategy according to the gain of waiting. It calculates the waiting time and staleness to get the gain value of each waiting strategy (Lines 4−9) and chooses the strategy si with the maximal gain (Line 10). Finally, waiting workers are added to optPlan (Line 11).
The time complexity of the algorithm is O(tW2) where t is the count of iterations, and W is the number of workers. The spatial complexity of the algorithm is O(W). In fact, the values of t and W are small, especially under our motivation. And thus, the running time of WSP can be negligible.

7 Experimental evaluation

We have implemented all of our proposals in the open-source prototype system DRPS. In particular, the configurable optimizations include: the improved index constructing algorithm, the multi-objective parameter partitioning model, and the novel worker-selection parallel model of computation, while the data storage mode and parameter addressing strategy are always enabled in DRPS by default, both of which are essential in DRPS. This section will first give the general experiment setting description and then report the gain brought by each optimization technique and overall performance improvement.

7.1 Experimental setup

Implementation, cluster and baseline We implement a PS-based system by borrowing insights on Parameter Server [1] in Java. Then we extend it to the disk-resident one, using LevelDB as the key-value storage database. Emphatically, our ideas and optimizations of disk-resident PS can be utilized in all existing PS-based systems. We experiment on a local server cluster consisting of 13 machine nodes that are connected by a 1 Gbps Ethernet. Each machine is equipped with a Intel(R) Xeon(R) E3-1226 v3 CPU, 16GB RAM and 931.5GB HDD. We compare our proposal with Qin et al. [14] from the efficiency and effectiveness perspectives. Then, we validate each of our proposed optimizations by comparing those with the baseline which extends existing PS-based systems to disk-resident without any optimization. The particular settings include: (1) the disk-resident Parameter Server without any optimization, represented by DRPS; (2) the improved distributed index for DRPS, represented by DRPS+Index; (3) the multi-objective parameter partition for DRPS, represented by DRPS+Partition (4) integrating all the optimizations for DRPS, represented by DRPS+All. Moreover, we compare WSP with the existing parallel models of computation: ASP, BSP and SSP. Finally, we show a comparison of overall improvements. We use five machines to test the time and the effect, and extend to 7, 9, 11, 13 machines for the scalability. For each algorithm, we set k=50 for the top- k reduction and batch size as 0.01%. We select 1% samples randomly for index constructing. We set the rank of the matrix as r=2000.
Datasets and algorithms We run experiments on four datasets (shown as Table 1). Matrix and Classification are synthetic; while MovieLens and AvazuCTR are real datasets and publicly available. The detailed description is given in Table 1, where MSize is the model’s size on the key-value store. The reason why we use synthetic datasets is that such large scale industrial datasets are generally not publicly available. Matrix is a uniform dataset, while Classification is a skewed dataset. Concerning real datasets, MovieLens is a small-scale dataset for matrix factorization; AvazuCTR is used for advertising click prediction. The employed datasets cover various characteristics (e.g., skewness and uniformity, matrix and vector). We have already implemented four machine learning algorithms including linear regression (LiR), logistic regression (LoR), support vector machine (SVM), and low-rank matrix factorization (LMF), all of which are widely used in the industry. Due to the limitation of space, we only provide the experimental results of two algorithms (LMF and LoR) on four datasets (each is run on two datasets). The omitted evaluation of other algorithms displays similar results.
Tab.1 Detailed information of datasets
Dataset #Dims MSize #Examples ESize
Matrix 10M ×10K 298GB 1.3B 36.2GB
MovieLens 0.26M ×0.16M 7.8GB 24M 663MB
Classification 10B 186GB 10M 35GB
Avazu-CTR 9M 85MB 40M 6.3GB

7.2 Evaluation for distributed index and parameter partition

Firstly, we conduct experiments to verify the efficiency of our proposed indexing techniques. We run five epochs and record the average time of training and indexing. As shown in Fig. 7, the horizontal axis represents the pruning frequency, which means the remaining dimension frequency is more than f. The train is an average training time of each epoch, while the index stands for an average indexing time of each epoch.
Fig.11 Indexing and training time. The first bar of each graph represents our indexing approach; the second one stands for the basic method; the last one is the algorithm with no-index. (a) LoR on Avazu-CTR; (b) LoR on classification; (c) LMF on MovieLens; (d) LMF on matrix

Full size|PPT slide

By running LMF and LoR on real and synthetic datasets, we find that our indexing method is on average 30 × faster than the state-of-the-art method.
As mentioned earlier, the improved indexing approach can provide a maximum of k-fold acceleration ratio of the algorithm. In practice, although the theoretical acceleration ratio can not be achieved, it still significantly reduces the index construction time. As we can see, the total time of the second pillar in several diagrams is longer than the no-index setting, which is due to the smaller pruning frequency. Next, we run experiments on the multi-objective parameter partitioning model. We also test the following measurements on four datasets using two algorithms:
Convergence The curve of loss function varying with time reflects the dynamic change of the convergence process, which helps to clearly show the advantages and disadvantages of the four strategies. Five machines are used as default. (Fig. 8 Column 1)
Fig.12 Multiple indicators for comparison strategies. Note that the DRPS is an extension based on the existing PS, and the state-of-the-art optimazations on PS can be applied in the DRPS for further performance improvement. Therefore, we choose the DRPS as a benchmark. (a) LMF on matrix; (b) LMF on MovieLens; (c) LoR on classification; (d) LoR on Avazu-CTR

Full size|PPT slide

Time per epoch This measurement intuitively reflects the influence of each method on training speed. The main time cost of an iteration is spent on gradient computing, network communication, and disk reading and writing. (Fig. 8 Column 2)
Disk I/O Although the number of disk I/Os can not accurately represent the time of disk reading and writing, it also reflects the efficiency of our proposed index constructing technology. (Fig. 8 Column 3)
Network communication We record the number of network communications, that is, the total number of parameters requested by workers. This number is directly proportional to the network cost. (Fig. 8 Column 4)
The Iteration Time, Disk I/O and Network Communication are the average statistics of each epoch. From Fig. 8, we can find the effect of each optimization method on the speed of convergence. For DRPS without any optimization, its loss value decreases very slowly. With the distributed index, the convergence speed increases significantly because of the decrease of disk I/O operations. In particular, as shown in Fig. 8(3), the number of disk I/Os decreases by an average of 13%−23%.
The multi-objective parameter partitioning creates another performance gap for the convergence speed, which reduces the number of network parameter requests. As shown in Fig. 8(4), the reduction is 10%−18% on average. The time per iteration shows the effect of each optimization method on the time decline of each iteration.

7.3 Evaluation for parallel model of computation

Evaluation for the parallel model of computation mainly considers two metrics: iteration time and convergence rate. The former is determined by the extent of the synchronization, and the latter can be decided by the consistency degree. Excessive asynchrony leads to a high inconsistency of parameters used by each worker. However, low asynchrony increases the waiting time among workers. Here, our goal is to reduce the iteration time and improve the convergence rate.
We also use two algorithms to test the effectiveness of WSP on four datasets. In addition to WSP proposed in this paper, we also implement three existing parallel computing models in DPRS. For simplicity, we denote DRPS+ASP, DRPS+BSP, DRPS+SSP and DRPS+WSP as ASP, BSP, SSP and WSP respectively.
In this subsection, we mainly display the loss function decreasing curve over time under four parallel computing models. We manually adjust the delay bound δ of SSP until it reaches the fastest convergence rate. For different datasets and machine states, the effect of this delay bound will be different. Compared with SSP, WSP can eliminate the complicated adjustment. More importantly, it can automatically choose the best parallel strategy based on the real-time state and selection gain.
Figure 9 indicates the effect of four models on each dataset. As the figure shows, BSP has poor performance in the entire convergence process because of the long waiting time. Also, ASP does not work as well as expected. It converges fastly at the beginning in some scenarios. However, it is very unstable, especially near the optimal point, which finally leads to performance degeneration. Differently, we find WSP has a prominent performance than other models. Compared with BSP, WSP relaxes the synchronization constraints for the workers without high staleness. In general, WSP has a positive impact on convergence in every iteration. Compared with ASP, WSP enjoys higher stability. Although ASP iterates very fast, the convergence rate of each iteration is not optimistic. From Fig. 9, we can discover that WSP outperforms SSP, especially in the following two scenarios. One is eliminating much staleness by waiting a short time; while the other is eliminating a little staleness by waiting a long time. Furthermore, WSP does not need to adjust δ as SSP.
Fig.13 Convergence comparison of parallel models. (a) LMF on matrix; (b) LMF on MovieLens; (c) LoR on classification; (d) LoR on Avazu-CTR

Full size|PPT slide

7.4 Evaluation for overall system performance

This subsection synthesizes the optimization of the above two parts in order to test the overall improvement for the disk-resident Parameter Server DRPS. We make an experimental table (Table 2) by adding the optimization part one by one. We record the convergence time for each experimental strategy. We carry out experiments on five machines because we have investigated the scalability in the previous subsections.
Tab.2 Convergence time of optimization methods for DRPS
DRPS Indexing Partitioning BSP ASP SSP WSP TMat/min TMov/min TClas/min TAza/min
1468 93 260 273
1268 82 232 238
1125 75 214 227
1135 78 232 236
979 62 212 219
864 50 182 212
The experimental objects consist of the following methods. The first two are DRPS and DRPS+Index, which have the same meaning as used before. The third is DRPS+Index+ Partition, which builds indices and partitions ML models for DRPS. For simplicity, we denote it as DRPSA. All the above methods adapt the parallel model of BSP. Then the next three strategies use different models. They are DRPSA+ASP, DRPSA+SSP, and DRPSA+WSP. The first two runtimes represent the testing time of LMF on MovieLens and matrix datasets, and the second two times represent the testing time of SVM on AvazuCTR and classification datasets, respectively. The checkmark means to add this optimization to the strategy.
From Table 2, we can see the optimization effect of LMF and LoR on corresponding datasets. For the performance optimization of distributed machine learning systems, the ultimate goal is to improve the overall latency until the algorithms are converged. In Table 2, we only record the convergence time of each optimization plan. We find that the last line, i.e., DRPSA+WSP, performs best in convergence time among all plans. The speedup ratio of Matrix, MovieLens, Classification and AzuzaCTR are 41%, 46%, 30% and 22% respectively.

8 Applicability and limitation

This paper is suitable for sparse machine learning tasks, where each piece of data only needs a small part of parameters for training, which is common in real scenarios. On the contrary, it shows poor performance in the scenarios of dense machine learning and deep learning tasks, where almost all parameters need to be accessed in every training iteration.
In the context of statistical heterogeneity, we mainly divide the scenarios into two specific cases. First is the heterogeneous configuration of machines. Due to the diverse computing resources of different machines, some heterogeneity-aware data partitioning algorithms allocate more workload to the powerful machines. Generally, the computing time of each machine is balanced in this scenario. Second is this kind of scenario like Federated Learning, where data is not suitable for migration or re-partitioning. In this case, the number of instances is naturally heterogeneous, and it is difficult to balance the computing time of each machine. We’d like to analyze the advantages and limitations of this paper in both statistical heterogeneous scenarios from the following indicators.
Time of constructing the index In the first circumstance, some machines need to compute on more data for constructing parameter index than other machines. However, the data is allocated according to the computing resources of each machine. As a result, the index construction time of each machine is balanced due to the strong computing power of the machine with more data. In the second one, it is also difficult to balance the index construction time of each machine because of the unpredictable and unchangeable load distribution.
Effect of the index Since we build a global index, where we aggregate the statistical results of each machine, it will not affect the effect of the index in both scenarios. Each parameter index block on every machine can maintain the same performance with the homogeneous configurations.
Effect of the partition Our parameter partition approach is mainly for homogeneous machines, so our heuristic strategy is also aimed at uniform load distribution. We need to further explore the parameter partition scheme if we want to apply it to a heterogeneous environment. In the exploration of heterogeneity-aware parameter partition strategy, it is still a very important step to calculate the total accessing cost (disk and network I/O cost) of each parameter index block on each machine, which has been done in this paper.
Effect of WSP WSP considers the real-time states of each machine and chooses the synchronization strategy dynamically. In the context of statistical heterogeneity, the running time of each machine will be more unbalanced. WSP can be well adapted to the scenarios.

9 Conclusion

As far as we know, it is the first effort to optimize the disk-resident parameter server. Our starting point is to balance the processing efficiency and the financial budget. Firstly, we propose and optimize a distributed disk index construction algorithm. Then, considering the cost of disk I/O and network communication, we partition the model to enhance the workload balance of distributed machine learning training. Finally, a worker-selection parallel model (WSP) is presented to enhance robust convergence.This paper is suitable for sparse machine learning tasks, where each piece of data only needs a small part of parameters for training, which is common in real scenarios. On the contrary, it shows bad performance in the scenarios of dense machine learning and deep learning tasks. Expensive experiments have demonstrated that the performance of DRPS is much improved by applying these optimizations.

Acknowledgements

This work was supported by the National Key R&D Program of China (2018YFB1003404), the National Natural Science Foundation of China (Grant Nos. 62072083, U1811261, 61902366), Basal Research Fund (N180716010), Liao Ning Revitalization Talents Program (XLYC1807158) and the China Postdoctoral Science Foundation (2020T130623).
1
Li M, Andersen D G, Park J W, Smola A J, Ahmed A, Josifovski V, Long J, Shekita E J, Su B Y. Scaling distributed machine learning with the parameter server. In: Proceedings of USENIX Symposium on Operating Systems Design and Implementation. 2014, 583– 598

2
Chen T Q, Li M, Li Y T, Lin M, Wang N Y, Wang M J, Xiao T J, Xu B, Zhang C Y, Zhang Z. MXNet: a flexible and efficient machine learning library for heterogeneous distributed system. 2015, arXiv preprint arXiv: 1512.01274

3
Xing E P, Ho Q R, Dai W, Kim J K, Wei J L, Lee S H, Zheng X, Xie P T, Kumar A, Yu Y L. Petuum: a new platform for distributed machine learning on big data. In: Proceedings of ACM Conference on Knowledge Discovery and Data Mining. 2015, 1335−1344

4
Abadi M, Barham P, Chen J M, Chen Z F, Davis A, Dean J, Devin M, Ghemawat S, Irving G, Isard M, Kudlur M, Levenberg J, Monga R, Moore S, Murray D G, Steiner B, Tucker P A, Vasudevan V, Warden P, Wicke M, Yu Y, Zheng X Q. TensorFlow: a system for large-scale machine learning. In: Proceedings of USENIX Symposium on Operating Systems Design and Implementation. 2016, 265– 283

5
Recht B, Re C, Wright S J, Niu F. Hogwild: a lock-free approach to parallelizing stochastic gradient descent. In: Proceeding of the 24th International Conference on Neural Information Processing Systems. 2011, 693– 701

6
Xin D, Macke S, Ma L T, Liu J L, Song S C, Parameswaran A G. Helix: holistic optimization for accelerating iterative machine learning. Proceedings of the VLDB Endowment, 2018, 12(4): 446– 460

7
Huang Y Z, Jin T, Wu Y D, Cai Z K, Yan X, Yang F, Li J F, Guo Y Y, Cheng J. FlexPS: flexible parallelism control in parameter server architecture. Proceedings of the VLDB Endowment, 2018, 11(5): 566– 579

8
Zhang Z P, Cui B, Shao Y X, Yu L L, Jiang J W, Miao X P. PS2: parameter server on spark. In: Proceedings of ACM Conference on Management of Data. 2019, 376– 388

9
Zaharia M, Chowdhury M, Franklin M J, Shenker S, Stoica I. Spark: cluster computing with working sets. In: Proceedings of USENIX Workshop on Hot Topics in Cloud Computing. 2010, 1– 7

10
Cho M, Finkler U, Kung D S, Hunter H C. BlueConnect: decomposing all-reduce for deep learning on heterogeneous network hierarchy. In: Proceedings of Conference on Machine Learning and Systems. 2019, 1– 11

11
Yang F, Li J F, Cheng J. Husky: towards a more efficient and expressive distributed computing framework. Proceedings of the VLDB Endowment, 2016, 9(5): 420– 431

12
Jiang Y M, Zhu Y B, Lan C, Yi B, Cui Y, Guo C X. A unified architecture for accelerating distributed dnn training in heterogeneous gpu/cpu clusters. In: Proceedings of USENIX Symposium on Operating Systems Design and Implementation. 2020, 463– 479

13
Wang Z G, Gu Y, Bao Y B, Yu G, Yu J X. Hybrid pulling/pushing for i/o-efficient distributed and iterative graph computing. In: Proceedings of ACM Conference on Management of Data. 2016, 479– 494

14
Qin C J, Torres M, Rusu F. Scalable asynchronous gradient descent optimization for out-of-core models. Proceedings of the VLDB Endowment, 2017, 10(10): 986– 997

15
Li M, Andersen D G, Smola A J. Graph partitioning via parallel submodular approximation to accelerate distributed machine learning. 2015, arXiv preprint arXiv: 1505.04636

16
Renz-Wieland A, Gemulla R, Zeuch S, Markl V. Dynamic parameter allocation in parameter servers. Proceedings of the VLDB Endowment, 2020, 13(12): 1877−1890

17
Chen Y R, Peng Y H, Bao Y X, Wu C, Zhu Y B, Guo C X. Elastic parameter server load distribution in deep learning clusters. In: Proceedings of ACM Symposium on Cloud Computing. 2020, 507– 521

18
Gallet B , Gowanlock M . Heterogeneous cpu-gpu epsilon grid joins: static and dynamic work partitioning strategies. Data Science and Engineering, 2021, 6( 1): 39– 62

19
Valiant L G . A bridging model for parallel computation. Communications of the ACM, 1990, 33( 8): 103– 111

20
Ho Q R, Cipar J, Cui H G, Lee S H, Kim J K, Gibbons P B, Gibson G A, Ganger G R, Xing E P. More effective distributed ML via a stale synchronous parallel parameter server. In: Proceedings of the 26th International Conference on Neural Information Processing Systems. 2013, 1223−1231

21
Li M, Andersen D G, Smola A J, Yu K. Communication efficient distributed machine learning with the parameter server. In: Proceedings of the 27th International Conference on Neural Information Processing Systems. 2014, 19– 27

22
Fan W F, Lu P, Luo X J, Xu J B, Yin Q, Yu W Y, Xu R Q. Adaptive asynchronous parallelization of graph algorithms. In: Proceedings of the International Conference on Management of Data. 2018, 1141−1156

23
Jiang J W, Cui B, Zhang C, Yu L L. Heterogeneity-aware distributed parameter servers. In: Proceedings of the ACM International Conference on Management of Data. 2017, 463– 478

24
Wang Z G, Gao L X, Gu Y, Bao Y B, Yu G. FSP: towards flexible synchronous parallel framework for expectation-maximization based algorithms on cloud. In: Proceedings of the Symposium on Cloud Computing. 2017, 1– 14

Outlines

/