Python MySQL
Python MySQL 编程全面指南
MySQL是最流行的开源关系型数据库之一,Python通过多种驱动可以与MySQL交互。以下是Python操作MySQL数据库的详细说明。
1. 环境准备
安装MySQL驱动
# 最常用的MySQL连接器
pip install mysql-connector-python
# 另一个流行选择
pip install pymysql
# ORM工具SQLAlchemy
pip install sqlalchemy
数据库准备
- 安装MySQL服务器
- 创建数据库和用户
- 准备测试数据
2. 使用mysql-connector
基本连接
import mysql.connector
config = {
'user': 'username',
'password': 'password',
'host': 'localhost',
'database': 'testdb',
'raise_on_warnings': True
}
try:
# 建立连接
conn = mysql.connector.connect(**config)
# 创建游标
cursor = conn.cursor()
# 执行简单查询
cursor.execute("SELECT VERSION()")
version = cursor.fetchone()
print(f"MySQL版本: {version[0]}")
except mysql.connector.Error as err:
print(f"数据库错误: {err}")
finally:
# 关闭连接
if 'conn' in locals() and conn.is_connected():
cursor.close()
conn.close()
CRUD操作示例
创建表
create_table_sql = """
CREATE TABLE IF NOT EXISTS employees (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(50) NOT NULL,
department VARCHAR(50),
salary DECIMAL(10,2),
hire_date DATE
)
"""
cursor.execute(create_table_sql)
插入数据
# 单条插入
insert_sql = "INSERT INTO employees (name, department, salary, hire_date) VALUES (%s, %s, %s, %s)"
employee = ("张三", "技术部", 8500.00, "2022-01-15")
cursor.execute(insert_sql, employee)
conn.commit() # 重要:提交事务
# 批量插入
employees = [
("李四", "市场部", 7500.00, "2022-03-10"),
("王五", "人事部", 6500.00, "2021-11-22"),
("赵六", "技术部", 9000.00, "2020-09-05")
]
cursor.executemany(insert_sql, employees)
conn.commit()
查询数据
# 查询所有记录
cursor.execute("SELECT * FROM employees")
rows = cursor.fetchall()
print("所有员工:")
for row in rows:
print(row)
# 带条件查询
query = "SELECT name, salary FROM employees WHERE department = %s AND salary > %s"
params = ("技术部", 8000)
cursor.execute(query, params)
print("\n高薪技术部员工:")
for (name, salary) in cursor:
print(f"{name}: {salary}")
更新数据
update_sql = "UPDATE employees SET salary = salary * 1.1 WHERE department = %s"
cursor.execute(update_sql, ("技术部",))
conn.commit()
print(f"更新了{cursor.rowcount}条记录")
删除数据
delete_sql = "DELETE FROM employees WHERE hire_date < %s"
cursor.execute(delete_sql, ("2022-01-01",))
conn.commit()
print(f"删除了{cursor.rowcount}条记录")
事务处理
try:
# 开始事务
conn.start_transaction()
# 执行多个操作
cursor.execute("INSERT INTO employees VALUES (NULL, '钱七', '财务部', 7000, '2023-01-10')")
cursor.execute("UPDATE accounts SET balance = balance - 1000 WHERE id = 1")
cursor.execute("UPDATE accounts SET balance = balance + 1000 WHERE id = 2")
# 提交事务
conn.commit()
print("事务执行成功")
except mysql.connector.Error as err:
# 回滚事务
conn.rollback()
print(f"事务执行失败: {err}")
3. 使用PyMySQL
PyMySQL是纯Python实现的MySQL客户端。
基本用法
import pymysql
# 建立连接
connection = pymysql.connect(
host='localhost',
user='username',
password='password',
database='testdb',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor # 返回字典形式结果
)
try:
with connection.cursor() as cursor:
# 执行SQL
sql = "SELECT * FROM employees WHERE salary > %s"
cursor.execute(sql, (8000,))
# 获取结果
results = cursor.fetchall()
for row in results:
print(row) # 每行是一个字典
# 自动提交事务
connection.commit()
finally:
connection.close()
4. 使用SQLAlchemy ORM
SQLAlchemy提供了高级的ORM功能。
定义模型
from sqlalchemy import create_engine, Column, Integer, String, Date, DECIMAL
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class Employee(Base):
__tablename__ = 'employees'
id = Column(Integer, primary_key=True)
name = Column(String(50), nullable=False)
department = Column(String(50))
salary = Column(DECIMAL(10,2))
hire_date = Column(Date)
def __repr__(self):
return f"<Employee(name='{self.name}', department='{self.department}')>"
# 创建引擎
engine = create_engine('mysql+pymysql://username:password@localhost/testdb')
Base.metadata.create_all(engine) # 创建表
# 创建会话
Session = sessionmaker(bind=engine)
session = Session()
ORM操作示例
# 添加新员工
new_emp = Employee(
name='孙八',
department='销售部',
salary=6800.00,
hire_date='2023-02-20'
)
session.add(new_emp)
session.commit()
# 查询
# 获取所有技术部员工
tech_employees = session.query(Employee).filter_by(department='技术部').all()
for emp in tech_employees:
print(emp.name, emp.salary)
# 更新
emp = session.query(Employee).filter_by(name='张三').first()
if emp:
emp.salary = 9500.00
session.commit()
# 删除
emp = session.query(Employee).filter_by(name='李四').first()
if emp:
session.delete(emp)
session.commit()
5. 高级主题
连接池管理
from mysql.connector import pooling
# 创建连接池
dbconfig = {
"user": "username",
"password": "password",
"host": "localhost",
"database": "testdb"
}
pool = pooling.MySQLConnectionPool(
pool_name="mypool",
pool_size=5,
**dbconfig
)
# 从连接池获取连接
conn = pool.get_connection()
try:
cursor = conn.cursor()
cursor.execute("SELECT * FROM employees")
for row in cursor:
print(row)
finally:
conn.close() # 连接返回连接池
批量操作优化
# 使用executemany进行批量插入
data = [
('周九', '市场部', 7200.00, '2023-03-15'),
('吴十', '人事部', 6200.00, '2023-04-01')
]
insert_sql = """
INSERT INTO employees (name, department, salary, hire_date)
VALUES (%s, %s, %s, %s)
"""
cursor.executemany(insert_sql, data)
conn.commit()
使用存储过程
# 创建存储过程
create_proc_sql = """
CREATE PROCEDURE increase_salaries(IN dept VARCHAR(50), IN rate DECIMAL(5,2))
BEGIN
UPDATE employees
SET salary = salary * (1 + rate / 100)
WHERE department = dept;
END
"""
cursor.execute(create_proc_sql)
# 调用存储过程
args = ('技术部', 10.0) # 技术部加薪10%
result_args = cursor.callproc('increase_salaries', args)
conn.commit()
6. 安全最佳实践
使用参数化查询
# 不安全的方式 - SQL注入风险
# cursor.execute(f"SELECT * FROM users WHERE username = '{user_input}'")
# 安全的方式
cursor.execute("SELECT * FROM users WHERE username = %s", (user_input,))
最小权限原则
- 为应用创建专用数据库用户
- 只授予必要权限
CREATE USER 'appuser'@'localhost' IDENTIFIED BY 'password'; GRANT SELECT, INSERT, UPDATE ON testdb.* TO 'appuser'@'localhost';
连接安全
# 使用SSL连接
config = {
'user': 'username',
'password': 'password',
'host': 'localhost',
'database': 'testdb',
'ssl_ca': '/path/to/ca.pem',
'ssl_cert': '/path/to/client-cert.pem',
'ssl_key': '/path/to/client-key.pem'
}
conn = mysql.connector.connect(**config)
7. 性能优化
索引优化
# 为常用查询条件创建索引
cursor.execute("CREATE INDEX idx_department ON employees(department)")
cursor.execute("CREATE INDEX idx_salary ON employees(salary)")
查询优化
# 只选择需要的列
cursor.execute("SELECT name, department FROM employees")
# 使用LIMIT分页
page = 1
page_size = 10
offset = (page - 1) * page_size
cursor.execute("SELECT * FROM employees LIMIT %s OFFSET %s", (page_size, offset))
批量操作
# 使用事务批量插入
try:
conn.start_transaction()
for i in range(1000):
cursor.execute("INSERT INTO log (message) VALUES (%s)", (f"Log entry {i}",))
conn.commit()
except:
conn.rollback()
8. 实际应用示例
员工管理系统
class EmployeeManager:
def __init__(self, config):
self.config = config
self.connection = None
def __enter__(self):
self.connection = mysql.connector.connect(**self.config)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.connection and self.connection.is_connected():
self.connection.close()
def add_employee(self, name, department, salary, hire_date):
sql = """
INSERT INTO employees (name, department, salary, hire_date)
VALUES (%s, %s, %s, %s)
"""
with self.connection.cursor() as cursor:
cursor.execute(sql, (name, department, salary, hire_date))
self.connection.commit()
return cursor.lastrowid
def get_employees_by_dept(self, department):
sql = "SELECT * FROM employees WHERE department = %s ORDER BY salary DESC"
with self.connection.cursor(dictionary=True) as cursor:
cursor.execute(sql, (department,))
return cursor.fetchall()
def update_salary(self, emp_id, new_salary):
sql = "UPDATE employees SET salary = %s WHERE id = %s"
with self.connection.cursor() as cursor:
cursor.execute(sql, (new_salary, emp_id))
self.connection.commit()
return cursor.rowcount
# 使用示例
db_config = {
'user': 'username',
'password': 'password',
'host': 'localhost',
'database': 'testdb'
}
with EmployeeManager(db_config) as manager:
# 添加新员工
emp_id = manager.add_employee("郑十一", "研发部", 9200.00, "2023-05-10")
# 查询部门员工
tech_employees = manager.get_employees_by_dept("技术部")
for emp in tech_employees:
print(f"{emp['name']}: {emp['salary']}")
# 加薪
manager.update_salary(emp_id, 9500.00)
9. 常见问题解决
连接问题
try:
conn = mysql.connector.connect(**config)
except mysql.connector.Error as err:
if err.errno == mysql.connector.errorcode.ER_ACCESS_DENIED_ERROR:
print("用户名或密码错误")
elif err.errno == mysql.connector.errorcode.ER_BAD_DB_ERROR:
print("数据库不存在")
else:
print(err)
编码问题
# 确保连接使用正确的字符集
config = {
'user': 'username',
'password': 'password',
'host': 'localhost',
'database': 'testdb',
'charset': 'utf8mb4' # 支持完整Unicode
}
超时设置
config = {
'user': 'username',
'password': 'password',
'host': 'localhost',
'database': 'testdb',
'connection_timeout': 10, # 连接超时(秒)
'pool_timeout': 5 # 连接池获取连接超时
}
10. 总结
Python操作MySQL有多种方式:
方法 | 特点 | 适用场景 |
---|---|---|
mysql-connector | 官方驱动,功能全面 | 需要官方支持的项目 |
PyMySQL | 纯Python实现,兼容性好 | 需要纯Python解决方案 |
SQLAlchemy | ORM,支持多种数据库 | 大型项目,需要数据库抽象 |
ORM框架 | Django ORM等 | 使用对应框架的项目 |
关键要点:
- 始终使用参数化查询防止SQL注入
- 管理好数据库连接和事务
- 根据需求选择合适的驱动或ORM
- 对性能敏感的操作考虑连接池和批量操作
- 生产环境使用SSL加密连接
掌握这些技术可以高效安全地在Python项目中操作MySQL数据库。