一、场景要求
我们在使用locust时,有时候默认的场景无法满足我们的要求时,这时后我们需要自定义场景。比如我们要设置每一段时间启动10个用户运行,执行60s后再一次启动10个用户,总共运行10分钟,默认的场景是无法满足这样的要求的,我们可以使用LoadTestshape类,LoadTestshape类提供了几种负载测试策略。
需求:比如我们要设置每一段时间启动10个用户运行,过一段时间后再一次启动10个用户,持续加压 60s 。
import os from locust import * '''实现目标:每隔一段时间增加十个用户,实现持续加压''' class CustomTaskSet(LoadTestShape): # 设置压测时间60s time_limit=60 #设置启动/停止的用户数 spawn_rate=10 def tick(self): """ 返回一个元组,包含两值: user_count -- 总用户数 spawn_rate -- 每秒启动/停止用户数 返回None时,停止负载测试 """ #获取压测时间 run_time=self.get_run_time() if run_time<self.time_limit: #每隔一段时间启动10个用户;为-1时,表示将个位变为0,逢5进一 user_count=round(run_time,-1) print(f'当前用户数{user_count},当前时间{run_time}') return user_count,self.spawn_rate return None class IncrementalPressureMeasurement(HttpUser): wait_time =between(1,2) host="http://localhost:8080" def on_start(self): print("负载加压开始") def on_stop(self): print("负载加压结束") @task def increment_pressure(self): self.client.post('/measurement',data={'measurement':''}) if __name__ == '__main__': file_path = os.path.abspath(__file__) os.system(f'locust -f {file_path} --web-host=127.0.0.1')五、实现效果
最后,还请大家可以点个免费的赞,你们的点赞才是我更新的动力!