.举例:在并行矩阵乘法中,不同的核心可以同时计算矩阵的不同部分,从而显著缩短总计算时间;科学模拟或图像处理,其效果依赖于多核处理器、GPU或分布式计算系统的硬件支持。
2.2 区别
Petri网:一种图形化工具,用于表示任务依赖、资源共享和并发行为,常用于系统设计和分析。
function process_requests(requests): thread_pool = create_thread_pool(num_cores) # 创建线程池 for request in requests: thread_pool.submit(handle_request, request) # 提交任务 thread_pool.wait_for_completion() # 等待所有任务完成 function handle_request(request): response = process(request) # 处理请求 send_response(response) # 发送响应3.2 任务调度
function scheduler(tasks, time_slice): while tasks not empty: for task in tasks: run_task(task, time_slice) # 执行任务一段时间 if task not finished: requeue(task) # 未完成则重新排队 else: remove(task) # 完成后移除3.3 多线程
mutex = create_mutex() # 创建互斥锁 shared_data = initialize_data() # 初始化共享数据 function thread_function(): lock(mutex) # 加锁 modify(shared_data) # 修改共享数据 unlock(mutex) # 解锁3.4 异步编程
function async_read(file, callback): event_loop.add_task(read_file, file, callback) # 添加异步任务 function read_file(file, callback): data = read_from_disk(file) # 读取文件 callback(data) # 执行回调 # 事件循环 while True: task = event_loop.get_next_task() task.execute()3.5 协程
function coroutine_example(): while True: data = yield # 暂停并接收数据 process(data) # 处理数据 coroutine = coroutine_example() coroutine.send(data) # 发送数据并恢复执行3.6 事件驱动
function event_loop(): while True: event = wait_for_event() # 等待事件 handler = get_handler(event) # 获取处理函数 handler(event) # 处理事件3.7 多进程
function main(): processes = [] for i in range(num_processes): p = create_process(target=worker, args=(i,)) # 创建进程 processes.append(p) p.start() # 启动进程 for p in processes: p.join() # 等待进程结束 function worker(id): result = compute(id) # 执行计算任务 send_result(result) # 发送结果四. 实现并行的技术
// 创建并启动多个线程 threads = [] for i in 1 to N: thread = create_thread(task_function, args) threads.append(thread) start_thread(thread) // 堆代码 duidaima.com // 等待所有线程完成 for thread in threads: join_thread(thread) // 线程执行的函数 function task_function(args): result = perform_task(args) return result4.2 多进程(Multiprocessing)
// 创建并启动多个进程 processes = [] for i in 1 to N: process = create_process(target=worker_function, args=(i,)) processes.append(process) start_process(process) // 等待所有进程完成 for process in processes: join_process(process) // 进程执行的函数 function worker_function(id): result = compute(id) send_result(result)4.3 分布式计算(Distributed Computing)
// MPI 伪代码示例 if rank == 0: // 主节点 data = load_data() for worker in 1 to num_workers: send_data(data_chunk, worker) results = [] for worker in 1 to num_workers: result = receive_result(worker) results.append(result) final_result = aggregate(results) else: // 工作节点 data_chunk = receive_data(0) result = process(data_chunk) send_result(result, 0)4.4 GPU并行计算
// CUDA 伪代码示例 function gpu_kernel(input, output): tid = get_thread_id() if tid < input.size: output[tid] = compute(input[tid]) // 主函数 input = load_input() output = allocate_output() launch_kernel(gpu_kernel, input, output) synchronize() // 等待GPU完成4.5 任务并行(Task Parallelism)
// 使用 OpenMP 实现任务并行 #pragma omp parallel { #pragma omp single { #pragma omp task task1() #pragma omp task task2() #pragma omp task task3() } }4.6 数据并行(Data Parallelism)
// 使用 OpenMP 实现数据并行 #pragma omp parallel for for i in 0 to N-1: output[i] = compute(input[i])4.7 流水线并行(Pipeline Parallelism)
// 流水线并行伪代码 function stage1(input): intermediate1 = process_stage1(input) return intermediate1 function stage2(intermediate1): intermediate2 = process_stage2(intermediate1) return intermediate2 function stage3(intermediate2): output = process_stage3(intermediate2) return output // 在不同线程或处理器上执行各阶段 thread1: stage1(input) thread2: stage2(stage1_output) thread3: stage3(stage2_output)4.8 Actor模型
// 创建两个Actor actor1 = create_actor(1) actor2 = create_actor(2) // Actor1 发送 Ping 消息给 Actor2 send_message(2, Ping, 1) // Actor2 收到 Ping 后会回复 Pong 给 Actor1 // Actor1 收到 Pong 后打印消息 // 停止两个Actor send_message(1, Stop, 0) // 0可以是系统或主线程的ID send_message(2, Stop, 0)五. 实践应用
Actor模型:Erlang和Akka框架通过独立的Actor单元和消息传递实现并发,避免共享内存问题。
测试框架:JUnit或pytest可扩展用于并发测试,模拟多线程场景。
Map-Reduce:将任务映射到数据分片并归约结果,适用于大数据处理。
Java:提供java.util.concurrent包,包括线程池、并发集合等高级工具。
异步编程:避免线程开销,但可能导致回调地狱或复杂逻辑。