On this page

Python MySQL

Python MySQL 编程全面指南

MySQL是最流行的开源关系型数据库之一,Python通过多种驱动可以与MySQL交互。以下是Python操作MySQL数据库的详细说明。

1. 环境准备

安装MySQL驱动

# 最常用的MySQL连接器
pip install mysql-connector-python

# 另一个流行选择
pip install pymysql

# ORM工具SQLAlchemy
pip install sqlalchemy

数据库准备

  1. 安装MySQL服务器
  2. 创建数据库和用户
  3. 准备测试数据

2. 使用mysql-connector

基本连接

import mysql.connector

config = {
    'user': 'username',
    'password': 'password',
    'host': 'localhost',
    'database': 'testdb',
    'raise_on_warnings': True
}

try:
    # 建立连接
    conn = mysql.connector.connect(**config)
    
    # 创建游标
    cursor = conn.cursor()
    
    # 执行简单查询
    cursor.execute("SELECT VERSION()")
    version = cursor.fetchone()
    print(f"MySQL版本: {version[0]}")
    
except mysql.connector.Error as err:
    print(f"数据库错误: {err}")
    
finally:
    # 关闭连接
    if 'conn' in locals() and conn.is_connected():
        cursor.close()
        conn.close()

CRUD操作示例

创建表

create_table_sql = """
CREATE TABLE IF NOT EXISTS employees (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(50) NOT NULL,
    department VARCHAR(50),
    salary DECIMAL(10,2),
    hire_date DATE
)
"""
cursor.execute(create_table_sql)

插入数据

# 单条插入
insert_sql = "INSERT INTO employees (name, department, salary, hire_date) VALUES (%s, %s, %s, %s)"
employee = ("张三", "技术部", 8500.00, "2022-01-15")
cursor.execute(insert_sql, employee)
conn.commit()  # 重要:提交事务

# 批量插入
employees = [
    ("李四", "市场部", 7500.00, "2022-03-10"),
    ("王五", "人事部", 6500.00, "2021-11-22"),
    ("赵六", "技术部", 9000.00, "2020-09-05")
]
cursor.executemany(insert_sql, employees)
conn.commit()

查询数据

# 查询所有记录
cursor.execute("SELECT * FROM employees")
rows = cursor.fetchall()

print("所有员工:")
for row in rows:
    print(row)

# 带条件查询
query = "SELECT name, salary FROM employees WHERE department = %s AND salary > %s"
params = ("技术部", 8000)
cursor.execute(query, params)

print("\n高薪技术部员工:")
for (name, salary) in cursor:
    print(f"{name}: {salary}")

更新数据

update_sql = "UPDATE employees SET salary = salary * 1.1 WHERE department = %s"
cursor.execute(update_sql, ("技术部",))
conn.commit()
print(f"更新了{cursor.rowcount}条记录")

删除数据

delete_sql = "DELETE FROM employees WHERE hire_date < %s"
cursor.execute(delete_sql, ("2022-01-01",))
conn.commit()
print(f"删除了{cursor.rowcount}条记录")

事务处理

try:
    # 开始事务
    conn.start_transaction()
    
    # 执行多个操作
    cursor.execute("INSERT INTO employees VALUES (NULL, '钱七', '财务部', 7000, '2023-01-10')")
    cursor.execute("UPDATE accounts SET balance = balance - 1000 WHERE id = 1")
    cursor.execute("UPDATE accounts SET balance = balance + 1000 WHERE id = 2")
    
    # 提交事务
    conn.commit()
    print("事务执行成功")
    
except mysql.connector.Error as err:
    # 回滚事务
    conn.rollback()
    print(f"事务执行失败: {err}")

3. 使用PyMySQL

PyMySQL是纯Python实现的MySQL客户端。

基本用法

import pymysql

# 建立连接
connection = pymysql.connect(
    host='localhost',
    user='username',
    password='password',
    database='testdb',
    charset='utf8mb4',
    cursorclass=pymysql.cursors.DictCursor  # 返回字典形式结果
)

try:
    with connection.cursor() as cursor:
        # 执行SQL
        sql = "SELECT * FROM employees WHERE salary > %s"
        cursor.execute(sql, (8000,))
        
        # 获取结果
        results = cursor.fetchall()
        for row in results:
            print(row)  # 每行是一个字典
            
    # 自动提交事务
    connection.commit()
    
finally:
    connection.close()

4. 使用SQLAlchemy ORM

SQLAlchemy提供了高级的ORM功能。

定义模型

from sqlalchemy import create_engine, Column, Integer, String, Date, DECIMAL
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

Base = declarative_base()

class Employee(Base):
    __tablename__ = 'employees'
    
    id = Column(Integer, primary_key=True)
    name = Column(String(50), nullable=False)
    department = Column(String(50))
    salary = Column(DECIMAL(10,2))
    hire_date = Column(Date)
    
    def __repr__(self):
        return f"<Employee(name='{self.name}', department='{self.department}')>"

# 创建引擎
engine = create_engine('mysql+pymysql://username:password@localhost/testdb')
Base.metadata.create_all(engine)  # 创建表

# 创建会话
Session = sessionmaker(bind=engine)
session = Session()

ORM操作示例

# 添加新员工
new_emp = Employee(
    name='孙八',
    department='销售部',
    salary=6800.00,
    hire_date='2023-02-20'
)
session.add(new_emp)
session.commit()

# 查询
# 获取所有技术部员工
tech_employees = session.query(Employee).filter_by(department='技术部').all()
for emp in tech_employees:
    print(emp.name, emp.salary)

# 更新
emp = session.query(Employee).filter_by(name='张三').first()
if emp:
    emp.salary = 9500.00
    session.commit()

# 删除
emp = session.query(Employee).filter_by(name='李四').first()
if emp:
    session.delete(emp)
    session.commit()

5. 高级主题

连接池管理

from mysql.connector import pooling

# 创建连接池
dbconfig = {
    "user": "username",
    "password": "password",
    "host": "localhost",
    "database": "testdb"
}

pool = pooling.MySQLConnectionPool(
    pool_name="mypool",
    pool_size=5,
    **dbconfig
)

# 从连接池获取连接
conn = pool.get_connection()

try:
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM employees")
    for row in cursor:
        print(row)
finally:
    conn.close()  # 连接返回连接池

批量操作优化

# 使用executemany进行批量插入
data = [
    ('周九', '市场部', 7200.00, '2023-03-15'),
    ('吴十', '人事部', 6200.00, '2023-04-01')
]

insert_sql = """
INSERT INTO employees (name, department, salary, hire_date)
VALUES (%s, %s, %s, %s)
"""

cursor.executemany(insert_sql, data)
conn.commit()

使用存储过程

# 创建存储过程
create_proc_sql = """
CREATE PROCEDURE increase_salaries(IN dept VARCHAR(50), IN rate DECIMAL(5,2))
BEGIN
    UPDATE employees 
    SET salary = salary * (1 + rate / 100)
    WHERE department = dept;
END
"""
cursor.execute(create_proc_sql)

# 调用存储过程
args = ('技术部', 10.0)  # 技术部加薪10%
result_args = cursor.callproc('increase_salaries', args)
conn.commit()

6. 安全最佳实践

使用参数化查询

# 不安全的方式 - SQL注入风险
# cursor.execute(f"SELECT * FROM users WHERE username = '{user_input}'")

# 安全的方式
cursor.execute("SELECT * FROM users WHERE username = %s", (user_input,))

最小权限原则

  • 为应用创建专用数据库用户
  • 只授予必要权限
    CREATE USER 'appuser'@'localhost' IDENTIFIED BY 'password';
    GRANT SELECT, INSERT, UPDATE ON testdb.* TO 'appuser'@'localhost';
    

连接安全

# 使用SSL连接
config = {
    'user': 'username',
    'password': 'password',
    'host': 'localhost',
    'database': 'testdb',
    'ssl_ca': '/path/to/ca.pem',
    'ssl_cert': '/path/to/client-cert.pem',
    'ssl_key': '/path/to/client-key.pem'
}
conn = mysql.connector.connect(**config)

7. 性能优化

索引优化

# 为常用查询条件创建索引
cursor.execute("CREATE INDEX idx_department ON employees(department)")
cursor.execute("CREATE INDEX idx_salary ON employees(salary)")

查询优化

# 只选择需要的列
cursor.execute("SELECT name, department FROM employees")

# 使用LIMIT分页
page = 1
page_size = 10
offset = (page - 1) * page_size
cursor.execute("SELECT * FROM employees LIMIT %s OFFSET %s", (page_size, offset))

批量操作

# 使用事务批量插入
try:
    conn.start_transaction()
    for i in range(1000):
        cursor.execute("INSERT INTO log (message) VALUES (%s)", (f"Log entry {i}",))
    conn.commit()
except:
    conn.rollback()

8. 实际应用示例

员工管理系统

class EmployeeManager:
    def __init__(self, config):
        self.config = config
        self.connection = None
    
    def __enter__(self):
        self.connection = mysql.connector.connect(**self.config)
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.connection and self.connection.is_connected():
            self.connection.close()
    
    def add_employee(self, name, department, salary, hire_date):
        sql = """
        INSERT INTO employees (name, department, salary, hire_date)
        VALUES (%s, %s, %s, %s)
        """
        with self.connection.cursor() as cursor:
            cursor.execute(sql, (name, department, salary, hire_date))
            self.connection.commit()
            return cursor.lastrowid
    
    def get_employees_by_dept(self, department):
        sql = "SELECT * FROM employees WHERE department = %s ORDER BY salary DESC"
        with self.connection.cursor(dictionary=True) as cursor:
            cursor.execute(sql, (department,))
            return cursor.fetchall()
    
    def update_salary(self, emp_id, new_salary):
        sql = "UPDATE employees SET salary = %s WHERE id = %s"
        with self.connection.cursor() as cursor:
            cursor.execute(sql, (new_salary, emp_id))
            self.connection.commit()
            return cursor.rowcount

# 使用示例
db_config = {
    'user': 'username',
    'password': 'password',
    'host': 'localhost',
    'database': 'testdb'
}

with EmployeeManager(db_config) as manager:
    # 添加新员工
    emp_id = manager.add_employee("郑十一", "研发部", 9200.00, "2023-05-10")
    
    # 查询部门员工
    tech_employees = manager.get_employees_by_dept("技术部")
    for emp in tech_employees:
        print(f"{emp['name']}: {emp['salary']}")
    
    # 加薪
    manager.update_salary(emp_id, 9500.00)

9. 常见问题解决

连接问题

try:
    conn = mysql.connector.connect(**config)
except mysql.connector.Error as err:
    if err.errno == mysql.connector.errorcode.ER_ACCESS_DENIED_ERROR:
        print("用户名或密码错误")
    elif err.errno == mysql.connector.errorcode.ER_BAD_DB_ERROR:
        print("数据库不存在")
    else:
        print(err)

编码问题

# 确保连接使用正确的字符集
config = {
    'user': 'username',
    'password': 'password',
    'host': 'localhost',
    'database': 'testdb',
    'charset': 'utf8mb4'  # 支持完整Unicode
}

超时设置

config = {
    'user': 'username',
    'password': 'password',
    'host': 'localhost',
    'database': 'testdb',
    'connection_timeout': 10,  # 连接超时(秒)
    'pool_timeout': 5         # 连接池获取连接超时
}

10. 总结

Python操作MySQL有多种方式:

方法特点适用场景
mysql-connector官方驱动,功能全面需要官方支持的项目
PyMySQL纯Python实现,兼容性好需要纯Python解决方案
SQLAlchemyORM,支持多种数据库大型项目,需要数据库抽象
ORM框架Django ORM等使用对应框架的项目

关键要点:

  1. 始终使用参数化查询防止SQL注入
  2. 管理好数据库连接和事务
  3. 根据需求选择合适的驱动或ORM
  4. 对性能敏感的操作考虑连接池和批量操作
  5. 生产环境使用SSL加密连接

掌握这些技术可以高效安全地在Python项目中操作MySQL数据库。