In this section, we present the design of the proposed prefetching mechanism by addressing the key issues discussed in the introduction. Firstly, we determine when to launch the prefetching mechanism based on the progress rate of tasks. Secondly, based on the processing capacity of the worker node and the available cache space, we calculate the number of data blocks that can be prefetched in each round, K, using the KNN algorithm. Finally, we determine the location for prefetched data based on the data locality rate to reduce execution time by minimizing data transmission time
4.1 Prefetch time
In theory, the optimal timing for launching prefetching in Hadoop is determined by when the prefetched data can be fully utilized by Map or Reduce tasks, and when it can be made available in the cache before it is needed. This approach can help minimize the impact of data access latencies on Hadoop job performance. One method of determining the best timing is to use the progress rate of Map and Reduce tasks to predict when they will require data. In this case, a threshold value should be set for the progress rate of Map and Reduce tasks. Once the progress rate exceeds the threshold, the prefetching mechanism can be launched to begin fetching data blocks that are likely to be needed soon. By launching prefetching at this time, the data can be made available in the cache before it is needed, reducing data access latencies. To avoid interfering with other tasks or causing excessive network traffic, it is advisable to trigger prefetching when the system is relatively idle, such as during periods of low job activity or off-peak hours. Ultimately, the optimal timing for prefetching in Hadoop depends on various factors, including the job's characteristics, worker node processing capacity, and network resource availability. Monitoring system performance and adjusting the prefetching strategy as needed is critical to achieving favorable results.
To determine the best time to launch prefetching, we calculate the progress rate of tasks and determine a suitable threshold value. To calculate this threshold, we first introduce some notation: a job consists of T tasks (either Map tasks or Reduce tasks), with each task processing N < key, value > pairs, the number of processed pairs is M, and the task has completed L stages (for Reduce tasks, there are three stages: copy data phase, sort phase, and reduce phase). The progress score of the ith task, PSi, is estimated based on the percentage of the task's < key, value > pairs that have been processed, as shown in Eq. 1 [11]. The average progress score of a job, PSavg, is then calculated using Eq. 2. Furthermore, the progress rate of a task T can be computed based on how many < key, value > pairs are processed per second, given the task has run for Tr seconds, as shown in Eq. 3 [12]. By setting a suitable threshold value for the progress rate of Map and Reduce tasks, the prefetch mechanism can be triggered when the progress rate exceeds the threshold value, indicating that the task will soon require data blocks that can be prefetched.
PSi = M/N for Map tasks (1)
PSi =1/3(L + M/N) for Reduce tasks
PSavg= (1/T) * \(\sum _{i=1}^{T}\text{P}\text{S}\)i (2)
PRi=PSi/Tr (3)
To determine the threshold value for launching the prefetch mechanism, we need to consider the workload and system characteristics. The choice of threshold should balance the risk of triggering prefetching too early (leading to wasted network bandwidth and cache space) and the risk of triggering prefetching too late (resulting in longer processing times due to data access latency).
Our suggested formula for determining the threshold value is expressed as Eq. 4, where α and β are scaling factors that can be adjusted based on the characteristics of the workload and cluster. To determine α and β, one could consider workload features such as cache affinity and job type (I/O-bound and CPU-bound), while the cluster specifications could include available cache space and worker node processing capacity. To calculate the standard deviation of the progress rate of Map and Reduce tasks across the entire job, we use Eq. 5, where T is the total number of progress rate values. By taking into account these factors, we can determine an appropriate threshold value for launching the prefetch mechanism that minimizes the impact of data access latencies on the performance of Hadoop jobs.
Threshold = α * (PSavg + β * PSstd) (4)
PSstd = sqrt [ \(\sum _{i=1}^{T}(\text{P}\text{S}\)i- PSavg)² / (T − 1) ] (5)
4.2 Prefetched data size
The size of prefetched data is a crucial factor affecting performance. Insufficient prefetched data can result in the system having to retrieve demand data from the disk or network, leading to increased data access time or an increased frequency of prefetch operations. On the other hand, excessive prefetched data can lead to an excessive load on a worker node and increased resource contention, negatively affecting performance. The size of data blocks that can be prefetched at each round depends on various factors, such as the processing capacity of the node, job running time, and the degree of parallelism. In a Hadoop heterogeneous environment, where the cluster consists of a set of worker nodes with varying processing capacities and cache spaces, the prefetched data size is calculated using Eq. 6. Here, W is the number of worker nodes, Pj denotes the processing capacity of the jth worker node, and Cj represents the available cache space on the jth worker node. The equation takes into account the parallelism degree, denoted by Nc, which refers to the number of tasks that can run concurrently, the estimated average processing time of a data block in the jth worker node, denoted by ETj, and the data block size, denoted by BS.
K = \(\sum _{j=1}^{W}(\text{P}\)j * ETj * Cj )/(BS*Nc) (6)
4.3 Prefetched data location
The selection of the prefetched data location through the consideration of data locality measures can have a positive impact on decreasing job execution time. Assume that for a set of data nodes where all replicas of data blocks are located, Dd is the distance between data node d and the processing node. Additionally, Cd represents the processing capability of node d (a combination of CPU and memory capacity), while Ld represents the workload of data node d. It is important to choose an appropriate data node with a low load and distance to the processing node, to reduce data transmission time, and ensure enough processing capacity to balance the load in the cluster, thereby increasing resource utilization. A high-level conceptual formula that incorporates these parameters is given in Eq. 7, where f is a function that takes into account the specific algorithms and policies implemented in Hadoop to calculate a suitable location for prefetching data. It is obvious that data locality and node processing capacity have direct relationships, as Hadoop prioritizes nodes with local copies of the required data and that are closer to the nodes where the task is scheduled. Also, nodes with higher processing capacity are preferred for executing tasks to ensure that the computational load can be handled effectively. When a node is under heavy load or already processing a significant amount of data, it is logical to avoid placing additional prefetched data on that node and to distribute the tasks among less loaded nodes, ensuring a balanced workload distribution. In summary, the selected node for the prefetched data, S, is given by:
S = arg min{f ((Dd, Cd)/Ld)} (7)
A similar formula is used in the dynamic replica selection algorithm proposed in [9], where identical parameters are taken into account to determine which data blocks should be replicated, thereby mitigating the occurrence of hot DataNode phenomena. We do emphasize that the details of this choice (in particular the choice of the function f) may require some application-dependent customization.
4–4 Proposed prefetch algorithm
This section outlines our proposed algorithm for smart prefetching, which we present in detail. The input to our KNN prefetch algorithm includes a list of worker nodes in the cluster along with their processing capabilities, a list of tasks to be performed, and the size of the input data. First, we calculate the number of data blocks based on the Hadoop Distributed File System's (HDFS) data block size. Next, we estimate the time remaining for each task based on its progress, which depends on the task type, using Eq. 2. We then check whether the estimated time remaining for a task reaches the predefined threshold value or not. If it meets the threshold value, the prefetching process is initiated. The number of data blocks that can be prefetched, denoted by K, is determined using Eq. 6. We use K-nearest neighbor (KNN) clustering to group the data blocks into clusters, with each cluster containing the K-nearest neighbor data blocks that can be prefetched simultaneously during each prefetch phase. We determine the K data blocks that are located in the nearest neighborhood of the demand data blocks based on their Euclidean distance. Finally, we locate the prefetched data blocks based on the data locality factor, as described by Eq. 7, which reduces the data transmission time through the network and positively impacts the job execution time.
KNN prefetch algorithm |
Input: List of heterogenous worker nodes, Input data size, List of tasks 1- TDB = IDS/DBS //Calculate total number of data blocks 2- If task Ti is Map task // Calculate the progress rate of the task 3- PSi = M/N 4-else PSi =1/3*(L + M/N) 5- End if 6-ProgressRatei = PS i/ExecutionTimei 7-TimetoEndi = (1 − PS i)/ProgressRate 8- PSavg= (1/T) * \(\sum _{i=1}^{T}\text{P}\text{S}\)i 9−If TimetoEndi > = Threshold* PSi then launch preftech 10- K = \(\sum _{j=1}^{w}(\text{P}\)j * ETj * Cj )/(BS*Nc) //Calculate the number of the data blocks that can be prefetch 11- For j = 1, j < IDS, j++ 12- distance = distance + data[j]-data[j + 1]^2 // Calculate Euclidean distance for KNN 13- List= (SQRT(distance)) 14- Sorted list = sort and group distance in ascending order 15- Choose the top K entries from the sorted list 16- Look up block metadata to find the location of these K data blocks 17- If they are not located in the DataNode then 18- DataNodes = arg min{f ((Dd, Cd)/Ld)}//Data locality to locate prefetched data 19- Send a request to the DataNodes to cache them |