线程:如何以性能为目标来定义和限制执行?

线程:如何以性能为目标来定义和限制执行?

线程,一种有助于开发现代高性能解决方案并成为不可或缺的工具。无论使用哪种语言,并行执行任务的能力都具有很大的吸引力。但显然有本叔叔的名言:“能力越大,责任越大。”如何以最佳方式使用该解决方案,以实现性能、更好地利用资源和应用程序健康?首先,有必要了解本主题的基本概念。

什么是“线程”?

线程是c++olor:#f60; text-decoration:underline;' href="https://www.php.cn/zt/16016.html" target="_blank">操作系统中进程执行的基本单位。它们允许程序在同一进程中同时执行多个操作。每个线程与主进程共享相同的内存空间,但可以独立执行,这对于可以并行执行的任务非常有用,例如输入/输出(i/o)操作、复杂计算或数据用户界面。 .

在许多系统上,线程由操作系统管理,操作系统为每个线程分配 cpu 时间并管理它们之间的上下文切换。在java、python、c++等编程语言中,都有方便创建和管理线程的库和框架。

为什么要使用线程?

线程主要用于提高程序的效率和响应能力。使用线程,尤其是后端的原因是:

  • 并行:线程允许您同时执行多个操作,从而更好地利用可用的 cpu 资源,特别是在具有多个内核的系统上。

  • 性能:在 i/o 操作中,例如读写文件或网络通信,线程可以通过允许程序在等待这些任务完成的同时继续执行其他任务来帮助提高性能操作。

  • 模块化:线程可用于将程序划分为更小、更易于管理的部分,每个部分执行特定的任务。

但是,仔细管理线程非常重要,因为不正确的使用可能会导致竞争条件、死锁和调试困难等问题。为了更好地管理它们,使用了线程池解决方案。

什么是线程池以及为什么需要它?

线程池是一种软件设计模式,涉及创建和管理可重复使用来执行任务的线程池。线程池不会为每个任务重复创建和销毁线程,而是维护固定数量的线程,准备根据需要执行任务。这可以显着提高需要处理许多并发任务的应用程序的性能。使用线程池的优点是:

  • 提高性能:创建和销毁线程在资源方面是一项昂贵的操作。线程池通过重用现有线程来最小化此成本。

  • 资源管理:控制运行的线程数量,避免过多的线程创建导致系统过载。

  • 易于使用:简化线程管理,使开发人员能够专注于应用程序逻辑而不是线程管理。

  • 可扩展性:帮助扩展应用程序以有效地处理大量并发任务。

如何设置池中的线程限制

好吧,我当然必须创建一个线程池才能更好地利用此功能,但很快出现的一个问题是:“池应该包含多少个线程?”。按照基本逻辑,越多越好,对吗?如果所有事情都可以并行完成,那么很快就会完成,因为速度会更快。因此,最好不要限制线程数量,或者设置一个较高的数字,这样就不会出现问题。正确吗?
这是一个公平的说法,所以让我们测试一下。此测试的代码是用 kotlin 编写的,只是为了熟悉并易于编写示例。这一点与语言无关。
通过 4 个示例来探索不同的系统性质。示例 1 和示例 2 是为了使用 cpu 进行大量数学运算,即进行大量处理。示例3重点关注i/o,示例是读取文件,最后,示例4是并行api调用的情况,同样关注i/o。它们都使用不同大小的池,分别具有 1、2、4、8、16、32、50、100 和 500 个线程。所有过程发生超过 500 次。

示例 1 - 计算 1 到 100000 之间有多少个质数的代码

import kotlinx.coroutines.*
import kotlin.math.sqrt
import kotlin.system.measuretimemillis

fun isprime(number: int): boolean {
    if (number <= 1) return false
    for (i in 2..sqrt(number.todouble()).toint()) {
        if (number % i == 0) return false
    }
    return true
}

fun countprimesinrange(start: int, end: int): int {
    var count = 0
    for (i in start..end) {
        if (isprime(i)) {
            count++
        }
    }
    return count
}

@optin(delicatecoroutinesapi::class)
fun main() = runblocking {
    val rangestart = 1
    val rangeend = 100_000
    val numberofthreadslist = listof(1, 2, 4, 8, 16, 32, 50, 100, 500)

    for (numberofthreads in numberofthreadslist) {
        val customdispatcher = newfixedthreadpoolcontext(numberofthreads, "custompool")
        val chunksize = (rangeend - rangestart + 1) / numberofthreads
        val timetaken = measuretimemillis {
            val jobs = mutablelistof<deferred<int>>()
            for (i in 0 until numberofthreads) {
                val start = rangestart + i * chunksize
                val end = if (i == numberofthreads - 1) rangeend else start + chunksize - 1
                jobs.add(async(customdispatcher) { countprimesinrange(start, end) })
            }
            val totalprimes = jobs.awaitall().sum()
            println("total de números primos encontrados com $numberofthreads threads: $totalprimes")
        }
        println("tempo levado com $numberofthreads threads: $timetaken ms")
        customdispatcher.close()
    }
}
示例1 控制台输出
total de números primos encontrados com 1 threads: 9592
tempo levado com 1 threads: 42 ms
total de números primos encontrados com 2 threads: 9592
tempo levado com 2 threads: 17 ms
total de números primos encontrados com 4 threads: 9592
tempo levado com 4 threads: 8 ms
total de números primos encontrados com 8 threads: 9592
tempo levado com 8 threads: 8 ms
total de números primos encontrados com 16 threads: 9592
tempo levado com 16 threads: 16 ms
total de números primos encontrados com 32 threads: 9592
tempo levado com 32 threads: 12 ms
total de números primos encontrados com 50 threads: 9592
tempo levado com 50 threads: 19 ms
total de números primos encontrados com 100 threads: 9592
tempo levado com 100 threads: 36 ms
total de números primos encontrados com 500 threads: 9592
tempo levado com 500 threads: 148 ms

示例 2 - 计算斐波那契数列上第 30 个数字的代码

import kotlinx.coroutines.delicatecoroutinesapi
import kotlinx.coroutines.launch
import kotlinx.coroutines.newfixedthreadpoolcontext
import kotlinx.coroutines.runblocking
import kotlin.system.measuretimemillis

fun fibonacci(n: int): long {
    return if (n <= 1) n.tolong() else fibonacci(n - 1) + fibonacci(n - 2)
}

@optin(delicatecoroutinesapi::class)
fun main() = runblocking {
    val numberofthreadslist = listof(1, 2, 4, 8, 16, 32, 50, 100, 500)

    for (numberofthreads in numberofthreadslist) {
        val customdispatcher = newfixedthreadpoolcontext(numberofthreads, "custompool")
        val numberstocalculate = mutablelistof<int>()
        for (i in 1..1000) {
            numberstocalculate.add(30)
        }
        val timetaken = measuretimemillis {
            val jobs = numberstocalculate.map { number ->
                launch(customdispatcher) {
                    fibonacci(number)
                }
            }
            jobs.foreach { it.join() }
        }
        println("tempo levado com $numberofthreads threads: $timetaken ms")
        customdispatcher.close()
    }
}
示例2 控制台输出
tempo levado com 1 threads: 4884 ms
tempo levado com 2 threads: 2910 ms
tempo levado com 4 threads: 1660 ms
tempo levado com 8 threads: 1204 ms
tempo levado com 16 threads: 1279 ms
tempo levado com 32 threads: 1260 ms
tempo levado com 50 threads: 1364 ms
tempo levado com 100 threads: 1400 ms
tempo levado com 500 threads: 1475 ms

示例 3 - 读取随机数文件并在读取结束时添加它们的代码

import kotlinx.coroutines.*
import java.io.file
import kotlin.system.measuretimemillis

@optin(delicatecoroutinesapi::class)
fun main() = runblocking {
    val file = file("numeros_aleatorios.txt")

    if (!file.exists()) {
        println("arquivo não encontrado!")
        return@runblocking
    }
    val numberofthreadslist = listof(1, 2, 4, 8, 16, 32, 50, 100, 500)
    for (numberofthreads in numberofthreadslist) {
        val customdispatcher = newfixedthreadpoolcontext(numberofthreads, "custompool")
        val timetaken = measuretimemillis {
            val jobs = mutablelistof<deferred<int>>()
            file.uselines { lines ->
                lines.foreach { line ->
                    jobs.add(async(customdispatcher) {
                        processline(line)
                    })
                }
            }
            val totalsum = jobs.awaitall().sum()
            println("total da soma com $numberofthreads threads: $totalsum")
        }
        println("tempo levado com $numberofthreads threads: $timetaken ms")
        customdispatcher.close()
    }

}

fun processline(line: string): int {
    return line.toint() + 10
}
示例3 控制台输出
total da soma de 1201 linhas com 1 threads: 60192
tempo levado com 1 threads: 97 ms
total da soma de 1201 linhas com 2 threads: 60192
tempo levado com 2 threads: 28 ms
total da soma de 1201 linhas com 4 threads: 60192
tempo levado com 4 threads: 30 ms
total da soma de 1201 linhas com 8 threads: 60192
tempo levado com 8 threads: 26 ms
total da soma de 1201 linhas com 16 threads: 60192
tempo levado com 16 threads: 33 ms
total da soma de 1201 linhas com 32 threads: 60192
tempo levado com 32 threads: 35 ms
total da soma de 1201 linhas com 50 threads: 60192
tempo levado com 50 threads: 44 ms
total da soma de 1201 linhas com 100 threads: 60192
tempo levado com 100 threads: 66 ms
total da soma de 1201 linhas com 500 threads: 60192
tempo levado com 500 threads: 297 ms

示例 4 - 调用 api 500 次的代码

import io.ktor.client.*
import io.ktor.client.engine.cio.*
import io.ktor.client.request.*
import kotlinx.coroutines.delicatecoroutinesapi
import kotlinx.coroutines.launch
import kotlinx.coroutines.newfixedthreadpoolcontext
import kotlinx.coroutines.runblocking
import kotlin.system.measuretimemillis

@optin(delicatecoroutinesapi::class)
fun main() = runblocking {
    val client = httpclient(cio)

    try {
        val numberofthreadslist = listof(1, 2, 4, 8, 16, 32, 50, 100, 500)
        for (numberofthreads in numberofthreadslist) {
            val customdispatcher = newfixedthreadpoolcontext(numberofthreads, "custompool")
            val timetaken = measuretimemillis {
                repeat(500) {
                    val jobs = launch(customdispatcher) { client.get("http://127.0.0.1:5000/example") }
                    jobs.join()
                }
            }
            println("tempo levado com $numberofthreads threads: $timetaken ms")
            customdispatcher.close()
        }
    } catch (e: exception) {
        println("erro ao conectar à api: ${e.message}")
    } finally {
        client.close()
    }
}
示例4 控制台输出
Tempo levado com 1 threads: 7104 ms
Tempo levado com 2 threads: 4793 ms
Tempo levado com 4 threads: 4170 ms
Tempo levado com 8 threads: 4310 ms
Tempo levado com 16 threads: 4028 ms
Tempo levado com 32 threads: 4089 ms
Tempo levado com 50 threads: 4066 ms
Tempo levado com 100 threads: 3978 ms
Tempo levado com 500 threads: 3777 ms

示例 1 到示例 3 有一个共同的行为,它们在达到 8 个线程时性能都变得更高,然后处理时间再次增加,但示例 4 却没有,这说明了什么?总是使用尽可能多的线程不是很有趣吗?

简单而快速的答案是

我的机器的处理器有 8 个核心,也就是说,它可以同时执行 8 个任务,超过这个时间会随着管理每个线程状态的时间而增加,最终导致性能下降。

好的,这回答了示例 1 到 3,但是示例 4 呢?为什么启动的线程越多性能就会提高?

简单,因为它是一个集成,机器没有处理,它基本上等待响应,它保持“睡眠”直到响应到达,所以是的,这里线程的数量可以更大。但要小心,并不意味着可以有尽可能多的线程,线程会导致资源耗尽,不加区别地使用它们会产生相反的效果,会影响服务的整体健康状况。
因此,要定义池将拥有的线程数,最简单、最安全的方法是分离将要执行的任务的性质。它们分为两部分:

  • 不需要处理的任务:
    当任务类型不需要处理时,可以创建比机器上的处理器核心更多的线程。发生这种情况是因为不需要处理信息来完成线程,基本上这种性质的线程在大多数情况下都期望来自集成的响应,例如写入数据库或来自 api 的响应。

  • 需要处理的任务:
    当解决方案有处理时,即机器实际在做工作时,最大线程数必须是机器处理器的核心数。这是因为处理器核心无法同时执行多于一件事。例如,如果运行该解决方案的处理器有 4 个核心,那么它的线程池必须是您的处理器核心的大小,即 4 线程池。

结论

在考虑线程池时,首先要定义的一点不一定是限制其大小的数量,而是所执行任务的性质。线程对服务的性能有很大帮助,但必须以最佳方式使用它们,以免产生相反的效果并降低性能,甚至更糟糕的是,导致整个服务的健康状况受到影响。很明显,较小的池最终更倾向于处理大量使用的任务,换句话说,即 cpu 受限的任务。如果您不确定使用线程的解决方案是否具有大量使用处理的行为,请谨慎起见,将您的池限制为机器上的处理器数量,相信我,这会节省让你很头疼。

以上就是线程:如何以性能为目标来定义和限制执行?的详细内容,更多请关注硕下网其它相关文章!