更新时间:2025-11-27 gmt 08:00
函数访问rds for mysql示例代码-j9九游会登录
函数访问rds for mysql示例代码
这段示例代码实现了从rds for mysql实例数据库的user表中查询前10条记录的功能。通过使用数据库连接池和重试机制,代码能够高效且可靠地执行数据库操作。
以下为完整的函数示例代码。其中关于连接池和重试部分的代码解读请参考示例代码解读。
import pymysql
import time
from functools import wraps
from dbutils.pooleddb import pooleddb
db_client = none
pool_config = { # 连接池配置
'max_connections': 5, # 最大连接数
'keepalive_interval': 60, # 连接保活间隔(秒)
'max_retries': 3, # 最大重试次数
'retry_delay': 1 # 重试间隔(秒)
}
def initializer(context):
global db_client
user = context.getuserdata('username')
password = context.getuserdata('password')
host = context.getuserdata('host')
port = int(context.getuserdata('port'))
database = context.getuserdata('database')
dbconfig = { # mysql数据库配置
'host': host,
'port': port,
'user': user,
'password': password,
'database': database,
'charset': 'utf8',
}
db_client = database(context, pool_config, dbconfig)
def handler(event, context): # 执行入口
logger = context.getlogger()
try:
result= db_client.query("select * from user limit 10")
except exception as e:
logger.info("query database error:%s" % e)
return {"code": 400, "errormsg": "internal error %s" % e}
return result
class mysqlconnectionpool:
def __init__(self, context, pool_config, db_config):
"""
初始化数据库连接池
:param db_config: 数据库配置
:param pool_config: 连接池配置
"""
self.context = context
self.logger = context.getlogger();
self.db_config = db_config
self.pool_config = pool_config
self.pool = self._create_pool()
self.last_keepalive_time = 0
def _create_pool(self):
"""
创建数据库连接池
:return: 连接池对象
"""
try:
pool = pooleddb(
creator=pymysql,
maxconnections=self.pool_config['max_connections'],
mincached=1,
**self.db_config
)
return pool
except exception as e:
self.logger.error(f"failed to create connection pool: {e}")
raise
def _get_connection(self):
"""
从连接池获取连接,并确保连接有效
:return: 数据库连接对象
"""
conn = self.pool.connection()
if not self._is_connection_alive(conn):
conn = self.pool.connection()
return conn
def _is_connection_alive(self, conn):
"""
检查连接是否存活
:param conn: 数据库连接对象
:return: bool
"""
try:
with conn.cursor() as cursor:
cursor.execute("select 1")
return true
except exception as e:
self.logger.warning(f"connection is not alive: {e}")
return false
def _close_connection(self, conn):
"""
关闭连接
:param conn: 数据库连接对象
"""
try:
conn.close()
self.logger.info("connection closed")
except exception as e:
self.logger.error(f"failed to close connection: {e}")
def _execute_query(self, conn, sql, params=none):
"""
执行数据库查询
:param conn: 数据库连接对象
:param sql: sql 语句
:param params: sql 参数
:return: 查询结果
"""
try:
with conn.cursor() as cursor:
cursor.execute(sql, params)
if sql.strip().lower().startswith('select'):
return cursor.fetchall()
return none
except exception as e:
self.logger.error(f"query failed: {e}")
raise
def _execute_write(self, conn, sql, params=none):
"""
执行写操作(插入、更新、删除)
:param conn: 数据库连接对象
:param sql: sql 语句
:param params: sql 参数
:return: 受影响的行数
"""
try:
with conn.cursor() as cursor:
cursor.execute(sql, params)
conn.commit()
return cursor.rowcount
except exception as e:
self.logger.error(f"write operation failed: {e}")
conn.rollback()
raise
def retry(max_retries=3, retry_delay=1):
"""
重试装饰器
:param max_retries: 最大重试次数
:param retry_delay: 重试间隔(秒)
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
retries = 0
while retries < max_retries:
try:
return func(*args, **kwargs)
except exception as e:
print(f"attempt {retries 1} failed: {e}")
if retries < max_retries - 1:
time.sleep(retry_delay)
retries = 1
print(f"failed after {max_retries} attempts")
raise
return wrapper
return decorator
class database:
def __init__(self, context, pool_config, db_config):
self.pool_config = pool_config
self.db_config = db_config
self.pool = mysqlconnectionpool(context, pool_config, db_config)
@retry(max_retries=pool_config['max_retries'], retry_delay=pool_config['retry_delay'])
def query(self, sql, params=none):
"""
执行查询操作
:param sql: sql 语句
:param params: sql 参数
:return: 查询结果
"""
conn = self.pool._get_connection()
result = self.pool._execute_query(conn, sql, params)
return result
@retry(max_retries=pool_config['max_retries'], retry_delay=pool_config['retry_delay'])
def execute(self, sql, params=none):
"""
执行写操作(插入、更新、删除)
:param sql: sql 语句
:param params: sql 参数
:return: 受影响的行数
"""
conn = self.pool._get_connection()
result = self.pool._execute_write(conn, sql, params)
return result
相关文档
意见反馈
文档内容是否对您有帮助?
提交成功!非常感谢您的反馈,我们会继续努力做到更好!
您可在查看反馈及问题处理状态。
系统繁忙,请稍后重试
如您有其它疑问,您也可以通过华为云社区问答频道来与我们联系探讨