• Locust如何实现负载测试?
  • 发布于 1个月前
  • 55 热度
    0 评论

一、场景要求

我们在使用locust时,有时候默认的场景无法满足我们的要求时,这时后我们需要自定义场景。比如我们要设置每一段时间启动10个用户运行,执行60s后再一次启动10个用户,总共运行10分钟,默认的场景是无法满足这样的要求的,我们可以使用LoadTestshape类,LoadTestshape类提供了几种负载测试策略。


二、用法
在脚本文件中定义一个类继承LoadTestshape类,locust在启动时发现文件中有使用这个类会自动启动。在该类中需要定义tick()方法,该方法返回用户数以及产生率的元组(如果没有返回这两个测试将停止),locust启动后每秒调用一次该函数。在LoadTestshape类中可以使用get_run_time()方法来获取测试运行的时间,使用此方法可以用来控制压测的总时间。
 
三、基于时间峰值策略

需求:比如我们要设置每一段时间启动10个用户运行,过一段时间后再一次启动10个用户,持续加压 60s 。


四、代码实现
import os
from locust import *

'''实现目标:每隔一段时间增加十个用户,实现持续加压'''

class CustomTaskSet(LoadTestShape):
    # 设置压测时间60s
    time_limit=60
    #设置启动/停止的用户数
    spawn_rate=10
    def tick(self):
        """
        返回一个元组,包含两值:
            user_count -- 总用户数
            spawn_rate -- 每秒启动/停止用户数
        返回None时,停止负载测试
                """
        #获取压测时间
        run_time=self.get_run_time()
        if run_time<self.time_limit:
            #每隔一段时间启动10个用户;为-1时,表示将个位变为0,逢5进一
            user_count=round(run_time,-1)
            print(f'当前用户数{user_count},当前时间{run_time}')
            return user_count,self.spawn_rate

        return None

class IncrementalPressureMeasurement(HttpUser):
    wait_time =between(1,2)
    host="http://localhost:8080"
    def on_start(self):

        print("负载加压开始")

    def on_stop(self):
        print("负载加压结束")

    @task
    def increment_pressure(self):
        self.client.post('/measurement',data={'measurement':''})


if __name__ == '__main__':
    file_path = os.path.abspath(__file__)
    os.system(f'locust -f {file_path} --web-host=127.0.0.1')
五、实现效果
最后我们欣赏下劳动成果吧,haha!

最后,还请大家可以点个免费的赞,你们的点赞才是我更新的动力!
用户评论