百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术资源 > 正文

30天学会Python编程:18. Python数据库编程入门

off999 2025-07-08 22:07 39 浏览 0 评论

18.1 数据库基础

18.1.1 数据库类型对比

18.1.2 DB-API规范

核心接口

  1. connect() - 建立连接
  2. cursor() - 创建游标
  3. execute() - 执行SQL
  4. fetchone()/fetchall() - 获取结果
  5. commit()/rollback() - 事务控制

18.2 SQLite操作

18.2.1 基本CRUD

import sqlite3

# 创建连接
conn = sqlite3.connect('example.db', check_same_thread=False)
cursor = conn.cursor()

# 建表
cursor.execute('''CREATE TABLE IF NOT EXISTS users
               (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)''')

# 插入数据
cursor.execute("INSERT INTO users (name, age) VALUES (?, ?)", ('Alice', 25))

# 查询数据
cursor.execute("SELECT * FROM users WHERE age > ?", (20,))
print(cursor.fetchall())

# 提交事务
conn.commit()
conn.close()

18.2.2 高级特性

# 使用上下文管理器
with sqlite3.connect('example.db') as conn:
    conn.row_factory = sqlite3.Row  # 字典式访问
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM users")
    for row in cursor:
        print(row['name'], row['age'])

# 内存数据库
mem_db = sqlite3.connect(':memory:')

18.3 MySQL/PostgreSQL

18.3.1 PyMySQL示例

import pymysql

# 连接MySQL
conn = pymysql.connect(
    host='localhost',
    user='root',
    password='password',
    database='test',
    cursorclass=pymysql.cursors.DictCursor
)

try:
    with conn.cursor() as cursor:
        # 执行查询
        sql = "SELECT * FROM users WHERE email=%s"
        cursor.execute(sql, ('test@example.com',))
        result = cursor.fetchone()
        print(result)
finally:
    conn.close()

18.3.2 psycopg2示例

import psycopg2

# 连接PostgreSQL
conn = psycopg2.connect(
    host="localhost",
    database="test",
    user="postgres",
    password="password"
)

# 使用with自动提交/回滚
with conn:
    with conn.cursor() as cursor:
        cursor.execute("""
            INSERT INTO products (name, price)
            VALUES (%s, %s)
            RETURNING id
        """, ("Laptop", 999.99))
        product_id = cursor.fetchone()[0]
        print(f"插入记录ID: {product_id}")

18.4 ORM框架

18.4.1 SQLAlchemy核心

from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# 定义模型
Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String(50))
    age = Column(Integer)

# 创建引擎和会话
engine = create_engine('sqlite:///example.db')
Session = sessionmaker(bind=engine)
session = Session()

# 查询操作
users = session.query(User).filter(User.age > 20).all()
for user in users:
    print(user.name, user.age)

# 插入数据
new_user = User(name='Bob', age=30)
session.add(new_user)
session.commit()

18.4.2 Django ORM

# models.py
from django.db import models

class Product(models.Model):
    name = models.CharField(max_length=100)
    price = models.DecimalField(max_digits=10, decimal_places=2)
    created_at = models.DateTimeField(auto_now_add=True)

# 查询示例
from app.models import Product

# 创建记录
Product.objects.create(name="Mouse", price=29.99)

# 复杂查询
from django.db.models import Q, F
products = Product.objects.filter(
    Q(price__lt=100) | Q(name__startswith="M"),
    created_at__year=2023
).annotate(
    discounted_price=F('price')*0.9
)

18.5 NoSQL数据库

18.5.1 MongoDB

from pymongo import MongoClient

# 连接MongoDB
client = MongoClient('mongodb://localhost:27017/')
db = client['mydatabase']
collection = db['users']

# 插入文档
user = {"name": "Alice", "age": 25, "hobbies": ["coding", "reading"]}
inserted_id = collection.insert_one(user).inserted_id

# 聚合查询
pipeline = [
    {"$match": {"age": {"$gt": 20}}},
    {"$group": {"_id": "$name", "count": {"$sum": 1}}}
]
results = collection.aggregate(pipeline)

18.5.2 Redis

import redis

# 连接Redis
r = redis.Redis(host='localhost', port=6379, db=0)

# 字符串操作
r.set('foo', 'bar')
print(r.get('foo'))  # b'bar'

# 哈希操作
r.hset('user:1000', mapping={
    'name': 'Alice',
    'age': '25'
})
print(r.hgetall('user:1000'))  # {b'name': b'Alice', b'age': b'25'}

18.6 数据库连接池

18.6.1 连接池实现

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

# 创建连接池
engine = create_engine(
    'mysql+pymysql://user:password@localhost/db',
    poolclass=QueuePool,
    pool_size=5,
    max_overflow=10,
    pool_timeout=30
)

# 使用连接
with engine.connect() as conn:
    result = conn.execute("SELECT * FROM users")
    print(result.fetchall())

18.6.2 连接池管理

import psycopg2
from psycopg2 import pool

# 创建连接池
connection_pool = psycopg2.pool.SimpleConnectionPool(
    minconn=1,
    maxconn=10,
    host="localhost",
    database="test",
    user="postgres",
    password="password"
)

# 获取连接
conn = connection_pool.getconn()
try:
    with conn.cursor() as cursor:
        cursor.execute("SELECT * FROM products")
        print(cursor.fetchall())
finally:
    connection_pool.putconn(conn)

18.7 应用举例

案例1:电商订单

from sqlalchemy import create_engine, ForeignKey
from sqlalchemy.orm import relationship, sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Float, DateTime

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String(50))
    orders = relationship("Order", back_populates="user")

class Product(Base):
    __tablename__ = 'products'
    id = Column(Integer, primary_key=True)
    name = Column(String(100))
    price = Column(Float)
    order_items = relationship("OrderItem", back_populates="product")

class Order(Base):
    __tablename__ = 'orders'
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey('users.id'))
    created_at = Column(DateTime)
    user = relationship("User", back_populates="orders")
    items = relationship("OrderItem", back_populates="order")

class OrderItem(Base):
    __tablename__ = 'order_items'
    id = Column(Integer, primary_key=True)
    order_id = Column(Integer, ForeignKey('orders.id'))
    product_id = Column(Integer, ForeignKey('products.id'))
    quantity = Column(Integer)
    order = relationship("Order", back_populates="items")
    product = relationship("Product", back_populates="order_items")

# 使用示例
engine = create_engine('sqlite:///ecommerce.db')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()

# 创建测试数据
new_user = User(name="Alice")
session.add(new_user)

product1 = Product(name="Laptop", price=999.99)
product2 = Product(name="Mouse", price=29.99)
session.add_all([product1, product2])

order = Order(user=new_user)
order.items = [
    OrderItem(product=product1, quantity=1),
    OrderItem(product=product2, quantity=2)
]
session.add(order)
session.commit()

# 查询用户订单
user = session.query(User).filter_by(name="Alice").first()
for order in user.orders:
    print(f"订单 {order.id}:")
    for item in order.items:
        print(f"  - {item.product.name} x{item.quantity}")

案例2:缓存策略实现

import sqlite3
import redis
import json
from datetime import datetime

class CachedDatabase:
    """带Redis缓存的数据库访问层"""
    
    def __init__(self, db_file=':memory:'):
        self.redis = redis.Redis(host='localhost', port=6379, db=0)
        self.db_conn = sqlite3.connect(db_file)
        self._init_db()
    
    def _init_db(self):
        cursor = self.db_conn.cursor()
        cursor.execute('''CREATE TABLE IF NOT EXISTS articles
                       (id INTEGER PRIMARY KEY, title TEXT, content TEXT, 
                       created_at TIMESTAMP)''')
        self.db_conn.commit()
    
    def get_article(self, article_id):
        """获取文章(优先从缓存读取)"""
        cache_key = f"article:{article_id}"
        cached_data = self.redis.get(cache_key)
        
        if cached_data:
            print("从缓存读取")
            return json.loads(cached_data)
        
        print("从数据库读取")
        cursor = self.db_conn.cursor()
        cursor.execute("SELECT * FROM articles WHERE id=?", (article_id,))
        row = cursor.fetchone()
        
        if row:
            article = {
                'id': row[0],
                'title': row[1],
                'content': row[2],
                'created_at': row[3]
            }
            # 写入缓存(60秒过期)
            self.redis.setex(cache_key, 60, json.dumps(article))
            return article
        return None
    
    def create_article(self, title, content):
        """创建新文章(自动清除相关缓存)"""
        cursor = self.db_conn.cursor()
        created_at = datetime.now().isoformat()
        cursor.execute("INSERT INTO articles (title, content, created_at) VALUES (?, ?, ?)",
                      (title, content, created_at))
        self.db_conn.commit()
        
        # 获取新插入的ID
        article_id = cursor.lastrowid
        
        # 清除可能存在的缓存
        self.redis.delete(f"article:{article_id}")
        
        return article_id

# 使用示例
cache_db = CachedDatabase()

# 创建测试文章
article_id = cache_db.create_article(
    "Python数据库编程",
    "本文介绍Python中的各种数据库操作方法..."
)

# 第一次读取(从数据库)
article = cache_db.get_article(article_id)

# 第二次读取(从缓存)
article = cache_db.get_article(article_id)

18.8 知识图谱


持续更新Python编程学习日志与技巧,敬请关注!


#编程# #学习# #在头条记录我的2025# #python#


相关推荐

大文件传不动?WinRAR/7-Zip 入门到高手,这 5 个技巧让你效率翻倍

“这200张照片怎么传给女儿?微信发不了,邮箱附件又超限……”62岁的张阿姨对着电脑犯愁时,儿子只用了3分钟就把照片压缩成一个文件,还教她:“以后用压缩软件,比打包行李还方便!”职场人更懂这...

电脑解压缩软件推荐——7-Zip:免费、高效、简洁的文件管理神器

在日常工作中,我们经常需要处理压缩文件。无论是下载软件包、接收文件,还是存储大量数据,压缩和解压缩文件都成为了我们日常操作的一部分。而说到压缩解压软件,7-Zip绝对是一个不可忽视的名字。今天,我就来...

设置了加密密码zip文件要如何打开?这几个方法可以试试~

Zip是一种常见的压缩格式文件,文件还可以设置密码保护。那设置了密码的Zip文件要如何打开呢?不清楚的小伙伴一起来看看吧。当我们知道密码想要打开带密码的Zip文件,我们需要用到适用于Zip格式的解压缩...

大文件想要传输成功,怎么把ZIP文件分卷压缩

不知道各位小伙伴有没有这样的烦恼,发送很大很大的压缩包会受到限制,为此,想要在压缩过程中将文件拆分为几个压缩包并且同时为所有压缩包设置加密应该如何设置?方法一:使用7-Zip免费且强大的文件管理工具7...

高效处理 RAR 分卷压缩包:合并解压操作全攻略

在文件传输和存储过程中,当遇到大文件时,我们常常会使用分卷压缩的方式将其拆分成多个较小的压缩包,方便存储和传输。RAR作为一种常见的压缩格式,分卷压缩包的使用频率也很高。但很多人在拿到RAR分卷...

2个方法教你如何删除ZIP压缩包密码

zip压缩包设置了加密密码,每次解压文件都需要输入密码才能够顺利解压出文件,当压缩包文件不再需要加密的时候,大家肯定想删除压缩包密码,或是忘记了压缩包密码,想要通过删除操作将压缩包密码删除,就能够顺利...

速转!漏洞预警丨压缩软件Winrar目录穿越漏洞

WinRAR是一款功能强大的压缩包管理器,它是档案工具RAR在Windows环境下的图形界面。该软件可用于备份数据,缩减电子邮件附件的大小,解压缩从Internet上下载的RAR、ZIP及其它类...

文件解压方法和工具分享_文件解压工具下载

压缩文件减少文件大小,降低文件失效的概率,总得来说好处很多。所以很多文件我们下载下来都是压缩软件,很多小伙伴不知道怎么解压,或者不知道什么工具更好,所以今天做了文件解压方法和工具的分享给大家。一、解压...

[python]《Python编程快速上手:让繁琐工作自动化》学习笔记3

1.组织文件笔记(第9章)(代码下载)1.1文件与文件路径通过importshutil调用shutil模块操作目录,shutil模块能够在Python程序中实现文件复制、移动、改名和删除;同时...

Python内置tarfile模块:读写 tar 归档文件详解

一、学习目标1.1学习目标掌握Python内置模块tarfile的核心功能,包括:理解tar归档文件的原理与常见压缩格式(gzip/bz2/lzma)掌握tar文件的读写操作(创建、解压、查看、过滤...

使用python展开tar包_python拓展

类Unix的系统,打包文件经常使用的就是tar包,结合zip工具,可以方便的打包并解压。在python的标准库里面有tarfile库,可以方便实现生成了展开tar包。使用这个库最大的好处,可能就在于不...

银狐钓鱼再升级:白文件脚本化实现GO语言后门持久驻留

近期,火绒威胁情报中心监测到一批相对更为活跃的“银狐”系列变种木马。火绒安全工程师第一时间获取样本并进行分析。分析发现,该样本通过阿里云存储桶下发恶意文件,采用AppDomainManager进行白利...

ZIP文件怎么打开?2个简单方法教你轻松搞定!

在日常工作和生活中,我们经常会遇到各种压缩文件,其中最常见的格式之一就是ZIP。ZIP文件通过压缩数据来减少文件大小,方便我们进行存储和传输。然而,对于初学者来说,如何打开ZIP文件可能会成为一个小小...

Ubuntu—解压多个zip压缩文件.zip .z01 .z02

方法将所有zip文件放在同一目录中:zip_file.z01,zip_file.z02,zip_file.z03,...,zip_file.zip。在Zip3.0版本及以上,使用下列命令:将所有zi...

如何使用7-Zip对文件进行加密压缩

7-Zip是一款开源的文件归档工具,支持多种压缩格式,并提供了对压缩文件进行加密的功能。使用7-Zip可以轻松创建和解压.7z、.zip等格式的压缩文件,并且可以通过设置密码来保护压缩包中的...

取消回复欢迎 发表评论: