更新时间:2025-08-12 gmt 08:00

示例代码解读-j9九游会登录

连接池

在示例代码中基于dbutils.pooleddb构建mysql连接池(mysqlconnectionpool),并提供内置的连接探活机制,配置了最大连接数(max_connections)和连接探活间隔(keepalive_interval),代码片段如下:

pool_config = { # 连接池配置
        'max_connections': 5,           # 最大连接数
        'keepalive_interval': 60,       # 连接保活间隔(秒)
        'max_retries': 3,               # 最大重试次数
        'retry_delay': 1                # 重试间隔(秒)
}
class mysqlconnectionpool:
    def __init__(self, context, pool_config, db_config):
        """
        初始化数据库连接池
        :param db_config: 数据库配置
        :param pool_config: 连接池配置
        """
        self.context = context
        self.logger = context.getlogger();
        self.db_config = db_config
        self.pool_config = pool_config
        self.pool = self._create_pool()
        self.last_keepalive_time = 0
    def _create_pool(self):
        """
        创建数据库连接池
        :return: 连接池对象
        """
        try:
            pool = pooleddb(
                creator=pymysql,
                maxconnections=self.pool_config['max_connections'],
                mincached=1,
                **self.db_config
            )
            return pool
        except exception as e:
            self.logger.error(f"failed to create connection pool: {e}")
            raise
    def _get_connection(self):
        """
        从连接池获取连接,并确保连接有效
        :return: 数据库连接对象
        """
        conn = self.pool.connection()
        if not self._is_connection_alive(conn):
            conn = self.pool.connection()
        return conn
    def _is_connection_alive(self, conn):
        """
        检查连接是否存活
        :param conn: 数据库连接对象
        :return: bool
        """
        try:
            with conn.cursor() as cursor:
                cursor.execute("select 1")
                return true
        except exception as e:
            self.logger.warning(f"connection is not alive: {e}")
            return false
    def _close_connection(self, conn):
        """
        关闭连接
        :param conn: 数据库连接对象
        """
        try:
            conn.close()
            self.logger.info("connection closed")
        except exception as e:
            self.logger.error(f"failed to close connection: {e}")
    def _execute_query(self, conn, sql, params=none):
        """
        执行数据库查询
        :param conn: 数据库连接对象
        :param sql: sql 语句
        :param params: sql 参数
        :return: 查询结果
        """
        try:
            with conn.cursor() as cursor:
                cursor.execute(sql, params)
                if sql.strip().lower().startswith('select'):
                    return cursor.fetchall()
                return none
        except exception as e:
            self.logger.error(f"query failed: {e}")
            raise
    def _execute_write(self, conn, sql, params=none):
        """
        执行写操作(插入、更新、删除)
        :param conn: 数据库连接对象
        :param sql: sql 语句
        :param params: sql 参数
        :return: 受影响的行数
        """
        try:
            with conn.cursor() as cursor:
                cursor.execute(sql, params)
                conn.commit()
                return cursor.rowcount
        except exception as e:
            self.logger.error(f"write operation failed: {e}")
            conn.rollback()
            raise

使用mysql连接池复用已创建的连接,能有效提升程序性能。最大连接数配置确保连接资源的使用保持在可控范围内,同时确保线程安全。

相关概念说明:

表1 连接相关概念说明

概念

说明

最大连接数配置区间

在functiongraph函数配置mysql最大连接数建议在如下区间选取一个值:

  • 最大连接数下限 =(函数单实例并发度)*(函数单次执行访问mysql并发度)
  • 最大连接数上限 =(mysql实例连接数上限)/(函数最大实例数)

例如:某个访问mysql函数单实例并发度配置为5,每次执行函数访问mysql并发度为2,函数最大实例数默认400,访问的mysql实例连接数上限为30000,则计算如下:

  • 最大连接数下限 = 5*2 = 10
  • 最大连接数上限 = 30000/400 = 75

按上述结果,建议将最大连接数配置为50。

连接探活间隔

不要超过函数执行超时时间,避免因连接断开导致的问题。

重试

通过装饰器构建自动重试机制,确保在执行mysql操作失败后重试特定次数,这样可以显著降低暂时性故障的影响。例如,在瞬时网络抖动、磁盘问题导致服务暂时不可用或调用超时的情况下,提高mysql操作的成功率。

代码片段如下:

pool_config = { # 连接池配置
        'max_connections': 5,           # 最大连接数
        'keepalive_interval': 60,       # 连接保活间隔(秒)
        'max_retries': 3,               # 最大重试次数
        'retry_delay': 1                # 重试间隔(秒)
}
class database:
    def __init__(self, context, pool_config, db_config):
        self.pool_config = pool_config
        self.db_config = db_config
        self.pool = mysqlconnectionpool(context, pool_config, db_config)
    @retry(max_retries=pool_config['max_retries'], retry_delay=pool_config['retry_delay'])
    def query(self, sql, params=none):
        """
        执行查询操作
        :param sql: sql 语句
        :param params: sql 参数
        :return: 查询结果
        """
        conn = self.pool._get_connection()
        result = self.pool._execute_query(conn, sql, params)
        return result
    @retry(max_retries=pool_config['max_retries'], retry_delay=pool_config['retry_delay'])
    def execute(self, sql, params=none):
        """
        执行写操作(插入、更新、删除)
        :param sql: sql 语句
        :param params: sql 参数
        :return: 受影响的行数
        """
        conn = self.pool._get_connection()
        result = self.pool._execute_write(conn, sql, params)
        return result
def retry(max_retries=3, retry_delay=1):
    """
    重试装饰器
    :param max_retries: 最大重试次数
    :param retry_delay: 重试间隔(秒)
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            retries = 0
            while retries < max_retries:
                try:
                    return func(*args, **kwargs)
                except exception as e:
                    print(f"attempt {retries   1} failed: {e}")
                    if retries < max_retries - 1:
                        time.sleep(retry_delay)
                    retries  = 1
            print(f"failed after {max_retries} attempts")
            raise
        return wrapper
    return decorator

相关文档

网站地图