国产无遮挡裸体免费直播视频,久久精品国产蜜臀av,动漫在线视频一区二区,欧亚日韩一区二区三区,久艹在线 免费视频,国产精品美女网站免费,正在播放 97超级视频在线观看,斗破苍穹年番在线观看免费,51最新乱码中文字幕

PostgreSQL高級特性與性能優(yōu)化的實(shí)戰(zhàn)指南

 更新時間:2026年02月09日 08:54:04   作者:閑人編程  
本文將深入探討PostgreSQL的高級特性與性能優(yōu)化技術(shù),結(jié)合Python實(shí)踐,幫助開發(fā)者充分發(fā)揮PostgreSQL的潛力,文中的示例代碼講解詳細(xì),需要的小伙伴可以了解下

1. 引言:PostgreSQL的技術(shù)演進(jìn)與現(xiàn)狀

PostgreSQL作為全球最先進(jìn)的開源關(guān)系型數(shù)據(jù)庫,自1986年誕生以來,歷經(jīng)30多年的持續(xù)發(fā)展,已成為企業(yè)級應(yīng)用的首選數(shù)據(jù)庫之一。根據(jù)2023年DB-Engines排名數(shù)據(jù)顯示,PostgreSQL在"流行度"和"功能完備性"兩方面均位列開源數(shù)據(jù)庫第一。其強(qiáng)大的擴(kuò)展性、嚴(yán)格的數(shù)據(jù)完整性和豐富的功能特性,使其在處理復(fù)雜查詢、海量數(shù)據(jù)和高并發(fā)場景時表現(xiàn)出色。

本文將深入探討PostgreSQL的高級特性與性能優(yōu)化技術(shù),結(jié)合Python實(shí)踐,幫助開發(fā)者充分發(fā)揮PostgreSQL的潛力。根據(jù)國際數(shù)據(jù)公司(IDC)的報告,使用PostgreSQL的企業(yè)在數(shù)據(jù)庫運(yùn)營成本上平均降低65%,同時查詢性能提升300% 以上。

2. PostgreSQL高級特性詳解

2.1 JSONB與半結(jié)構(gòu)化數(shù)據(jù)處理

PostgreSQL的JSONB類型提供了對JSON數(shù)據(jù)的二進(jìn)制存儲,支持索引、查詢和修改操作,實(shí)現(xiàn)了關(guān)系型數(shù)據(jù)庫與文檔數(shù)據(jù)庫的完美結(jié)合。

2.1.1 JSONB性能對比分析

操作類型JSONB性能JSON性能性能提升
數(shù)據(jù)插入O(log n)O(n)5-10倍
路徑查詢O(log n)O(n)20-100倍
索引構(gòu)建O(n log n)不支持無限
數(shù)據(jù)更新O(log n)O(n)10-50倍

JSONB的存儲格式優(yōu)勢體現(xiàn)在:

2.2 全文搜索與文本分析

PostgreSQL內(nèi)置了強(qiáng)大的全文搜索功能,支持多語言、詞干提取、相關(guān)性排序等高級特性。

"""
PostgreSQL全文搜索高級應(yīng)用示例
"""
import psycopg2
from psycopg2.extras import Json, DictCursor
import json
from typing import List, Dict, Any, Optional
import logging
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class PostgreSQLFullTextSearch:
    """PostgreSQL全文搜索高級功能封裝"""
    
    def __init__(self, dsn: str):
        """
        初始化數(shù)據(jù)庫連接
        
        Args:
            dsn: 數(shù)據(jù)庫連接字符串
        """
        self.conn = psycopg2.connect(dsn)
        self.conn.autocommit = False
        self.cursor = self.conn.cursor(cursor_factory=DictCursor)
        
    def create_fulltext_configuration(self, lang: str = 'english'):
        """
        創(chuàng)建自定義全文搜索配置
        
        Args:
            lang: 語言配置
        """
        config_sql = f"""
        -- 創(chuàng)建自定義文本搜索配置
        CREATE TEXT SEARCH CONFIGURATION {lang}_custom (
            COPY = {lang}
        );
        
        -- 添加同義詞字典
        CREATE TEXT SEARCH DICTIONARY synonym_dict (
            TEMPLATE = synonym,
            SYNONYMS = synonym_sample
        );
        
        -- 添加自定義字典到配置
        ALTER TEXT SEARCH CONFIGURATION {lang}_custom
            ALTER MAPPING FOR asciiword, asciihword, hword_asciipart
            WITH synonym_dict, english_stem;
        """
        
        try:
            self.cursor.execute(config_sql)
            self.conn.commit()
            logger.info(f"全文搜索配置創(chuàng)建成功: {lang}_custom")
        except Exception as e:
            self.conn.rollback()
            logger.error(f"創(chuàng)建配置失敗: {e}")
            
    def create_searchable_table(self):
        """
        創(chuàng)建支持全文搜索的表
        """
        create_table_sql = """
        -- 創(chuàng)建文檔表
        CREATE TABLE IF NOT EXISTS documents (
            id SERIAL PRIMARY KEY,
            title VARCHAR(500) NOT NULL,
            content TEXT NOT NULL,
            author VARCHAR(200),
            category VARCHAR(100),
            tags JSONB DEFAULT '[]',
            publication_date DATE DEFAULT CURRENT_DATE,
            
            -- 生成列:用于全文搜索
            content_search_vector TSVECTOR GENERATED ALWAYS AS (
                setweight(to_tsvector('english_custom', coalesce(title, '')), 'A') ||
                setweight(to_tsvector('english_custom', coalesce(content, '')), 'B')
            ) STORED,
            
            -- 生成列:用于元數(shù)據(jù)搜索
            metadata_search_vector TSVECTOR GENERATED ALWAYS AS (
                to_tsvector('english_custom', 
                    coalesce(author, '') || ' ' || 
                    coalesce(category, '') || ' ' ||
                    (tags::text)
                )
            ) STORED,
            
            -- 創(chuàng)建GIN索引優(yōu)化搜索性能
            CONSTRAINT valid_tags CHECK (jsonb_typeof(tags) = 'array')
        );
        
        -- 創(chuàng)建GIN索引
        CREATE INDEX IF NOT EXISTS idx_documents_content_search 
        ON documents USING GIN(content_search_vector);
        
        CREATE INDEX IF NOT EXISTS idx_documents_metadata_search 
        ON documents USING GIN(metadata_search_vector);
        
        -- 創(chuàng)建部分索引優(yōu)化特定查詢
        CREATE INDEX IF NOT EXISTS idx_documents_recent 
        ON documents(publication_date) 
        WHERE publication_date > CURRENT_DATE - INTERVAL '1 year';
        
        -- 創(chuàng)建BRIN索引用于時間范圍查詢
        CREATE INDEX IF NOT EXISTS idx_documents_date_brin 
        ON documents USING BRIN(publication_date);
        """
        
        try:
            self.cursor.execute(create_table_sql)
            self.conn.commit()
            logger.info("全文搜索表創(chuàng)建成功")
        except Exception as e:
            self.conn.rollback()
            logger.error(f"創(chuàng)建表失敗: {e}")
            
    def insert_document(self, document: Dict[str, Any]) -> Optional[int]:
        """
        插入文檔數(shù)據(jù)
        
        Args:
            document: 文檔數(shù)據(jù)
            
        Returns:
            插入的文檔ID
        """
        insert_sql = """
        INSERT INTO documents (title, content, author, category, tags, publication_date)
        VALUES (%s, %s, %s, %s, %s, %s)
        RETURNING id
        """
        
        try:
            self.cursor.execute(
                insert_sql,
                (
                    document.get('title'),
                    document.get('content'),
                    document.get('author'),
                    document.get('category'),
                    Json(document.get('tags', [])),
                    document.get('publication_date')
                )
            )
            doc_id = self.cursor.fetchone()['id']
            self.conn.commit()
            logger.info(f"文檔插入成功,ID: {doc_id}")
            return doc_id
        except Exception as e:
            self.conn.rollback()
            logger.error(f"插入文檔失敗: {e}")
            return None
            
    def search_documents(
        self, 
        query: str,
        categories: List[str] = None,
        start_date: str = None,
        end_date: str = None,
        min_relevance: float = 0.1,
        limit: int = 20,
        offset: int = 0
    ) -> List[Dict[str, Any]]:
        """
        高級全文搜索
        
        Args:
            query: 搜索查詢詞
            categories: 分類篩選
            start_date: 開始日期
            end_date: 結(jié)束日期
            min_relevance: 最小相關(guān)性閾值
            limit: 返回結(jié)果數(shù)量
            offset: 偏移量
            
        Returns:
            搜索結(jié)果列表
        """
        search_sql = """
        SELECT 
            id,
            title,
            author,
            category,
            publication_date,
            tags,
            
            -- 計算相關(guān)性得分
            ts_rank(
                content_search_vector, 
                plainto_tsquery('english_custom', %s)
            ) AS relevance_score,
            
            -- 高亮顯示匹配內(nèi)容
            ts_headline(
                'english_custom',
                content,
                plainto_tsquery('english_custom', %s),
                'StartSel=<mark>, StopSel=</mark>, MaxWords=50, MinWords=10'
            ) AS content_highlight,
            
            -- 提取匹配片段
            ts_headline(
                'english_custom',
                title,
                plainto_tsquery('english_custom', %s),
                'StartSel=<mark>, StopSel=</mark>'
            ) AS title_highlight
            
        FROM documents
        WHERE 
            -- 全文搜索條件
            content_search_vector @@ plainto_tsquery('english_custom', %s)
            
            -- 分類篩選
            {category_filter}
            
            -- 日期范圍篩選
            {date_filter}
            
            -- 相關(guān)性閾值篩選
            AND ts_rank(
                content_search_vector, 
                plainto_tsquery('english_custom', %s)
            ) > %s
            
        ORDER BY 
            -- 按相關(guān)性和時間加權(quán)排序
            (ts_rank(
                content_search_vector, 
                plainto_tsquery('english_custom', %s)
            ) * 0.7 + 
            (CASE WHEN publication_date > CURRENT_DATE - INTERVAL '30 days' 
                  THEN 0.3 ELSE 0 END)) DESC,
            publication_date DESC
            
        LIMIT %s OFFSET %s
        """
        
        # 構(gòu)建動態(tài)WHERE條件
        category_filter = ""
        date_filter = ""
        params = [query, query, query, query]
        
        if categories:
            placeholders = ', '.join(['%s'] * len(categories))
            category_filter = f"AND category IN ({placeholders})"
            params.extend(categories)
            
        if start_date and end_date:
            date_filter = "AND publication_date BETWEEN %s AND %s"
            params.extend([start_date, end_date])
        elif start_date:
            date_filter = "AND publication_date >= %s"
            params.append(start_date)
        elif end_date:
            date_filter = "AND publication_date <= %s"
            params.append(end_date)
            
        # 添加剩余參數(shù)
        params.extend([query, min_relevance, query, limit, offset])
        
        # 格式化SQL
        formatted_sql = search_sql.format(
            category_filter=category_filter,
            date_filter=date_filter
        )
        
        try:
            self.cursor.execute(formatted_sql, params)
            results = self.cursor.fetchall()
            
            # 轉(zhuǎn)換為字典列表
            return [
                {
                    'id': row['id'],
                    'title': row['title'],
                    'author': row['author'],
                    'category': row['category'],
                    'publication_date': row['publication_date'],
                    'tags': row['tags'],
                    'relevance_score': float(row['relevance_score']),
                    'content_highlight': row['content_highlight'],
                    'title_highlight': row['title_highlight']
                }
                for row in results
            ]
            
        except Exception as e:
            logger.error(f"搜索失敗: {e}")
            return []
            
    def search_similar_documents(self, doc_id: int, limit: int = 10) -> List[Dict[str, Any]]:
        """
        查找相似文檔(基于內(nèi)容相似度)
        
        Args:
            doc_id: 參考文檔ID
            limit: 返回結(jié)果數(shù)量
            
        Returns:
            相似文檔列表
        """
        similarity_sql = """
        WITH target_doc AS (
            SELECT content_search_vector 
            FROM documents 
            WHERE id = %s
        )
        SELECT 
            d.id,
            d.title,
            d.author,
            d.category,
            
            -- 計算余弦相似度
            (d.content_search_vector <=> td.content_search_vector) AS similarity,
            
            -- 提取共同標(biāo)簽
            (
                SELECT jsonb_agg(tag)
                FROM jsonb_array_elements_text(d.tags) AS tag
                WHERE tag IN (
                    SELECT jsonb_array_elements_text(td.tags)
                    FROM documents td 
                    WHERE td.id = %s
                )
            ) AS common_tags
            
        FROM documents d, target_doc td
        WHERE d.id != %s
        ORDER BY similarity DESC
        LIMIT %s
        """
        
        try:
            self.cursor.execute(similarity_sql, (doc_id, doc_id, doc_id, limit))
            results = self.cursor.fetchall()
            
            return [
                {
                    'id': row['id'],
                    'title': row['title'],
                    'author': row['author'],
                    'category': row['category'],
                    'similarity': float(row['similarity']),
                    'common_tags': row['common_tags']
                }
                for row in results
            ]
            
        except Exception as e:
            logger.error(f"查找相似文檔失敗: {e}")
            return []
            
    def get_search_statistics(self, time_period: str = '1 month') -> Dict[str, Any]:
        """
        獲取搜索統(tǒng)計信息
        
        Args:
            time_period: 統(tǒng)計時間周期
            
        Returns:
            統(tǒng)計信息字典
        """
        stats_sql = """
        -- 總文檔數(shù)
        SELECT COUNT(*) as total_documents FROM documents;
        
        -- 按分類統(tǒng)計
        SELECT 
            category,
            COUNT(*) as count,
            ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM documents), 2) as percentage
        FROM documents 
        WHERE category IS NOT NULL
        GROUP BY category 
        ORDER BY count DESC;
        
        -- 時間分布統(tǒng)計
        SELECT 
            DATE_TRUNC('month', publication_date) as month,
            COUNT(*) as documents_count
        FROM documents
        WHERE publication_date > CURRENT_DATE - INTERVAL %s
        GROUP BY DATE_TRUNC('month', publication_date)
        ORDER BY month DESC;
        
        -- 標(biāo)簽使用統(tǒng)計
        SELECT 
            tag,
            COUNT(*) as usage_count
        FROM documents, jsonb_array_elements_text(tags) as tag
        GROUP BY tag
        ORDER BY usage_count DESC
        LIMIT 20;
        """
        
        try:
            stats = {}
            
            # 執(zhí)行多個統(tǒng)計查詢
            self.cursor.execute("SELECT COUNT(*) as total_documents FROM documents")
            stats['total_documents'] = self.cursor.fetchone()['total_documents']
            
            self.cursor.execute("""
                SELECT category, COUNT(*) as count
                FROM documents 
                WHERE category IS NOT NULL
                GROUP BY category 
                ORDER BY count DESC
            """)
            stats['category_distribution'] = self.cursor.fetchall()
            
            self.cursor.execute(f"""
                SELECT 
                    DATE_TRUNC('month', publication_date) as month,
                    COUNT(*) as documents_count
                FROM documents
                WHERE publication_date > CURRENT_DATE - INTERVAL '{time_period}'
                GROUP BY DATE_TRUNC('month', publication_date)
                ORDER BY month DESC
            """)
            stats['time_distribution'] = self.cursor.fetchall()
            
            self.cursor.execute("""
                SELECT 
                    tag,
                    COUNT(*) as usage_count
                FROM documents, jsonb_array_elements_text(tags) as tag
                GROUP BY tag
                ORDER BY usage_count DESC
                LIMIT 20
            """)
            stats['popular_tags'] = self.cursor.fetchall()
            
            return stats
            
        except Exception as e:
            logger.error(f"獲取統(tǒng)計信息失敗: {e}")
            return {}
            
    def optimize_search_indexes(self):
        """
        優(yōu)化全文搜索索引
        """
        optimize_sql = """
        -- 重新構(gòu)建GIN索引以提高搜索性能
        REINDEX INDEX CONCURRENTLY idx_documents_content_search;
        REINDEX INDEX CONCURRENTLY idx_documents_metadata_search;
        
        -- 更新表統(tǒng)計信息
        ANALYZE documents;
        
        -- 清理索引膨脹
        VACUUM ANALYZE documents;
        
        -- 更新全文搜索配置字典
        ALTER TEXT SEARCH CONFIGURATION english_custom 
        REFRESH VERSION;
        """
        
        try:
            # 分步執(zhí)行優(yōu)化操作
            self.cursor.execute("REINDEX INDEX CONCURRENTLY idx_documents_content_search")
            logger.info("內(nèi)容搜索索引重建完成")
            
            self.cursor.execute("REINDEX INDEX CONCURRENTLY idx_documents_metadata_search")
            logger.info("元數(shù)據(jù)搜索索引重建完成")
            
            self.cursor.execute("ANALYZE documents")
            logger.info("表統(tǒng)計信息更新完成")
            
            self.cursor.execute("VACUUM ANALYZE documents")
            logger.info("表清理完成")
            
            self.conn.commit()
            logger.info("全文搜索索引優(yōu)化完成")
            
        except Exception as e:
            self.conn.rollback()
            logger.error(f"索引優(yōu)化失敗: {e}")
            
    def close(self):
        """關(guān)閉數(shù)據(jù)庫連接"""
        if self.cursor:
            self.cursor.close()
        if self.conn:
            self.conn.close()
        logger.info("數(shù)據(jù)庫連接已關(guān)閉")


def example_usage():
    """使用示例"""
    # 數(shù)據(jù)庫連接字符串
    dsn = "dbname=testdb user=postgres password=password host=localhost port=5432"
    
    # 創(chuàng)建全文搜索實(shí)例
    search = PostgreSQLFullTextSearch(dsn)
    
    try:
        # 1. 創(chuàng)建全文搜索配置
        search.create_fulltext_configuration('english')
        
        # 2. 創(chuàng)建搜索表
        search.create_searchable_table()
        
        # 3. 插入示例文檔
        sample_documents = [
            {
                'title': 'PostgreSQL Performance Optimization',
                'content': 'PostgreSQL provides advanced optimization techniques including query planning, indexing strategies, and configuration tuning.',
                'author': 'John Doe',
                'category': 'Database',
                'tags': ['postgresql', 'optimization', 'performance'],
                'publication_date': '2024-01-15'
            },
            {
                'title': 'Full Text Search in Modern Applications',
                'content': 'Implementing efficient full-text search using PostgreSQL GIN indexes and relevance scoring algorithms.',
                'author': 'Jane Smith',
                'category': 'Search',
                'tags': ['search', 'full-text', 'postgresql'],
                'publication_date': '2024-02-01'
            }
        ]
        
        for doc in sample_documents:
            search.insert_document(doc)
        
        # 4. 執(zhí)行高級搜索
        print("執(zhí)行全文搜索...")
        results = search.search_documents(
            query='PostgreSQL optimization',
            categories=['Database'],
            min_relevance=0.05,
            limit=10
        )
        
        print(f"找到 {len(results)} 個結(jié)果:")
        for result in results:
            print(f"- {result['title']} (相關(guān)性: {result['relevance_score']:.3f})")
        
        # 5. 查找相似文檔
        if results:
            similar = search.search_similar_documents(results[0]['id'])
            print(f"\n相似文檔: {len(similar)} 個")
        
        # 6. 獲取統(tǒng)計信息
        stats = search.get_search_statistics()
        print(f"\n總文檔數(shù): {stats.get('total_documents', 0)}")
        
    finally:
        search.close()


if __name__ == "__main__":
    example_usage()

2.3 分區(qū)表與數(shù)據(jù)管理

PostgreSQL的分區(qū)表功能通過繼承和約束排除實(shí)現(xiàn),顯著提升大數(shù)據(jù)量查詢性能。

2.3.1 分區(qū)策略對比

分區(qū)類型適用場景優(yōu)勢限制
范圍分區(qū)時間序列數(shù)據(jù)支持自動分區(qū)創(chuàng)建分區(qū)鍵必須有序
列表分區(qū)離散值分類支持非連續(xù)值分區(qū)數(shù)量有限
哈希分區(qū)均勻分布數(shù)據(jù)分布均勻不支持范圍查詢

2.3.2 分區(qū)性能公式

分區(qū)表的查詢性能提升可以通過以下公式估算:

?

其中:

  • Tpartitioned?:分區(qū)表查詢時間
  • Tfull?:未分區(qū)表查詢時間
  • n:相關(guān)分區(qū)數(shù)量
  • Coverhead?:分區(qū)管理開銷

3. PostgreSQL性能優(yōu)化實(shí)戰(zhàn)

3.1 查詢性能分析與優(yōu)化

"""
PostgreSQL查詢性能分析與優(yōu)化工具
"""
import psycopg2
from psycopg2.extras import DictCursor
import time
from typing import Dict, List, Any, Optional, Tuple
import statistics
import json
from datetime import datetime, timedelta
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class QueryPerformanceAnalyzer:
    """查詢性能分析器"""
    
    def __init__(self, dsn: str):
        self.conn = psycopg2.connect(dsn)
        self.cursor = self.conn.cursor(cursor_factory=DictCursor)
        self.query_cache = {}
        
    def analyze_query_plan(self, query: str, params: tuple = None) -> Dict[str, Any]:
        """
        分析查詢執(zhí)行計劃
        
        Args:
            query: SQL查詢語句
            params: 查詢參數(shù)
            
        Returns:
            執(zhí)行計劃分析結(jié)果
        """
        try:
            # 獲取詳細(xì)執(zhí)行計劃
            explain_query = f"EXPLAIN (ANALYZE, BUFFERS, VERBOSE, FORMAT JSON) {query}"
            self.cursor.execute(explain_query, params)
            plan_result = self.cursor.fetchone()[0]
            
            plan = plan_result[0]['Plan']
            return self._parse_execution_plan(plan)
            
        except Exception as e:
            logger.error(f"分析執(zhí)行計劃失敗: {e}")
            return {}
    
    def _parse_execution_plan(self, plan: Dict[str, Any]) -> Dict[str, Any]:
        """
        解析執(zhí)行計劃
        
        Args:
            plan: 執(zhí)行計劃字典
            
        Returns:
            解析后的分析結(jié)果
        """
        analysis = {
            'operation_type': plan.get('Node Type'),
            'relation_name': plan.get('Relation Name'),
            'alias': plan.get('Alias'),
            'startup_cost': plan.get('Startup Cost'),
            'total_cost': plan.get('Total Cost'),
            'plan_rows': plan.get('Plan Rows'),
            'plan_width': plan.get('Plan Width'),
            'actual_rows': plan.get('Actual Rows'),
            'actual_time': plan.get('Actual Total Time'),
            'shared_hit_blocks': 0,
            'shared_read_blocks': 0,
            'shared_dirtied_blocks': 0,
            'shared_written_blocks': 0,
            'local_hit_blocks': 0,
            'local_read_blocks': 0,
            'local_dirtied_blocks': 0,
            'local_written_blocks': 0,
            'temp_read_blocks': 0,
            'temp_written_blocks': 0,
            'buffers': plan.get('Shared Hit Blocks', 0) + plan.get('Shared Read Blocks', 0),
            'children': [],
            'issues': []
        }
        
        # 解析緩沖區(qū)使用情況
        if 'Shared Hit Blocks' in plan:
            analysis['shared_hit_blocks'] = plan['Shared Hit Blocks']
        if 'Shared Read Blocks' in plan:
            analysis['shared_read_blocks'] = plan['Shared Read Blocks']
        
        # 遞歸解析子節(jié)點(diǎn)
        if 'Plans' in plan:
            for child_plan in plan['Plans']:
                child_analysis = self._parse_execution_plan(child_plan)
                analysis['children'].append(child_analysis)
        
        # 識別潛在問題
        self._identify_issues(analysis, plan)
        
        return analysis
    
    def _identify_issues(self, analysis: Dict[str, Any], plan: Dict[str, Any]):
        """識別查詢計劃中的潛在問題"""
        
        # 檢查全表掃描
        if analysis['operation_type'] == 'Seq Scan' and analysis['actual_rows'] > 10000:
            analysis['issues'].append({
                'type': 'FULL_TABLE_SCAN',
                'severity': 'HIGH',
                'message': '檢測到大量行的全表掃描',
                'suggestion': '考慮添加合適的索引'
            })
        
        # 檢查嵌套循環(huán)連接
        if analysis['operation_type'] == 'Nested Loop' and analysis['actual_rows'] > 1000:
            analysis['issues'].append({
                'type': 'INEFFICIENT_JOIN',
                'severity': 'MEDIUM',
                'message': '嵌套循環(huán)連接可能效率較低',
                'suggestion': '考慮使用Hash Join或Merge Join'
            })
        
        # 檢查排序操作
        if analysis['operation_type'] == 'Sort' and analysis['actual_rows'] > 10000:
            analysis['issues'].append({
                'type': 'LARGE_SORT',
                'severity': 'MEDIUM',
                'message': '大規(guī)模排序操作',
                'suggestion': '考慮添加索引以避免排序'
            })
        
        # 檢查緩沖區(qū)命中率
        total_blocks = analysis['shared_hit_blocks'] + analysis['shared_read_blocks']
        if total_blocks > 0:
            hit_ratio = analysis['shared_hit_blocks'] / total_blocks
            if hit_ratio < 0.9:
                analysis['issues'].append({
                    'type': 'LOW_BUFFER_HIT',
                    'severity': 'MEDIUM',
                    'message': f'緩沖區(qū)命中率較低: {hit_ratio:.2%}',
                    'suggestion': '考慮增加shared_buffers或優(yōu)化查詢'
                })
    
    def benchmark_query(self, query: str, params: tuple = None, 
                       iterations: int = 10) -> Dict[str, Any]:
        """
        基準(zhǔn)測試查詢性能
        
        Args:
            query: SQL查詢語句
            params: 查詢參數(shù)
            iterations: 迭代次數(shù)
            
        Returns:
            性能基準(zhǔn)測試結(jié)果
        """
        execution_times = []
        row_counts = []
        
        try:
            # 預(yù)熱緩存
            self.cursor.execute(query, params)
            _ = self.cursor.fetchall()
            
            # 執(zhí)行基準(zhǔn)測試
            for i in range(iterations):
                start_time = time.perf_counter()
                self.cursor.execute(query, params)
                rows = self.cursor.fetchall()
                end_time = time.perf_counter()
                
                execution_times.append(end_time - start_time)
                row_counts.append(len(rows))
            
            # 分析執(zhí)行計劃
            plan_analysis = self.analyze_query_plan(query, params)
            
            # 計算統(tǒng)計信息
            stats = {
                'iterations': iterations,
                'total_time': sum(execution_times),
                'avg_time': statistics.mean(execution_times),
                'min_time': min(execution_times),
                'max_time': max(execution_times),
                'std_dev': statistics.stdev(execution_times) if len(execution_times) > 1 else 0,
                'avg_rows': statistics.mean(row_counts),
                'plan_analysis': plan_analysis,
                'percentiles': {
                    'p50': sorted(execution_times)[int(len(execution_times) * 0.5)],
                    'p90': sorted(execution_times)[int(len(execution_times) * 0.9)],
                    'p95': sorted(execution_times)[int(len(execution_times) * 0.95)],
                    'p99': sorted(execution_times)[int(len(execution_times) * 0.99)],
                }
            }
            
            return stats
            
        except Exception as e:
            logger.error(f"基準(zhǔn)測試失敗: {e}")
            return {}
    
    def generate_optimization_suggestions(self, query: str, 
                                        stats: Dict[str, Any]) -> List[Dict[str, Any]]:
        """
        生成優(yōu)化建議
        
        Args:
            query: SQL查詢語句
            stats: 性能統(tǒng)計信息
            
        Returns:
            優(yōu)化建議列表
        """
        suggestions = []
        plan_analysis = stats.get('plan_analysis', {})
        
        # 基于執(zhí)行時間建議
        avg_time = stats.get('avg_time', 0)
        if avg_time > 1.0:  # 超過1秒
            suggestions.append({
                'priority': 'HIGH',
                'area': 'PERFORMANCE',
                'suggestion': '查詢執(zhí)行時間較長,考慮優(yōu)化查詢或添加索引',
                'estimated_impact': 'HIGH'
            })
        
        # 基于執(zhí)行計劃建議
        for issue in plan_analysis.get('issues', []):
            suggestions.append({
                'priority': issue['severity'],
                'area': 'QUERY_PLAN',
                'suggestion': issue['suggestion'],
                'estimated_impact': 'MEDIUM'
            })
        
        # 基于統(tǒng)計信息建議
        if stats.get('std_dev', 0) / stats.get('avg_time', 1) > 0.5:
            suggestions.append({
                'priority': 'MEDIUM',
                'area': 'CONSISTENCY',
                'suggestion': '查詢執(zhí)行時間波動較大,可能存在并發(fā)或資源競爭問題',
                'estimated_impact': 'MEDIUM'
            })
        
        return suggestions


class IndexOptimizer:
    """索引優(yōu)化器"""
    
    def __init__(self, dsn: str):
        self.conn = psycopg2.connect(dsn)
        self.cursor = self.conn.cursor(cursor_factory=DictCursor)
    
    def analyze_table_indexes(self, table_name: str) -> List[Dict[str, Any]]:
        """
        分析表索引
        
        Args:
            table_name: 表名
            
        Returns:
            索引分析結(jié)果
        """
        query = """
        SELECT 
            i.relname as index_name,
            am.amname as index_type,
            idx.indisunique as is_unique,
            idx.indisprimary as is_primary,
            idx.indisexclusion as is_exclusion,
            idx.indisclustered as is_clustered,
            idx.indisvalid as is_valid,
            idx.indpred as partial_index_predicate,
            pg_relation_size(i.oid) as index_size_bytes,
            pg_size_pretty(pg_relation_size(i.oid)) as index_size,
            pg_stat_get_numscans(i.oid) as scan_count,
            pg_stat_get_tuples_returned(i.oid) as tuples_returned,
            pg_stat_get_tuples_fetched(i.oid) as tuples_fetched,
            
            -- 索引定義
            pg_get_indexdef(idx.indexrelid) as index_definition,
            
            -- 索引列
            array_to_string(array_agg(a.attname), ', ') as index_columns
            
        FROM pg_index idx
        JOIN pg_class i ON i.oid = idx.indexrelid
        JOIN pg_class t ON t.oid = idx.indrelid
        JOIN pg_am am ON i.relam = am.oid
        JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(idx.indkey)
        
        WHERE t.relname = %s
        GROUP BY i.relname, am.amname, idx.indisunique, idx.indisprimary,
                 idx.indisexclusion, idx.indisclustered, idx.indisvalid,
                 idx.indpred, i.oid, idx.indexrelid
        ORDER BY pg_relation_size(i.oid) DESC
        """
        
        try:
            self.cursor.execute(query, (table_name,))
            indexes = self.cursor.fetchall()
            
            analysis = []
            for idx in indexes:
                usage_ratio = 0
                if idx['tuples_returned'] > 0:
                    usage_ratio = idx['tuples_fetched'] / idx['tuples_returned']
                
                analysis.append({
                    'index_name': idx['index_name'],
                    'index_type': idx['index_type'],
                    'is_unique': idx['is_unique'],
                    'is_primary': idx['is_primary'],
                    'index_size_bytes': idx['index_size_bytes'],
                    'index_size': idx['index_size'],
                    'scan_count': idx['scan_count'],
                    'usage_ratio': usage_ratio,
                    'index_definition': idx['index_definition'],
                    'index_columns': idx['index_columns'],
                    'efficiency_score': self._calculate_index_efficiency(idx)
                })
            
            return analysis
            
        except Exception as e:
            logger.error(f"分析索引失敗: {e}")
            return []
    
    def _calculate_index_efficiency(self, index_info: Dict[str, Any]) -> float:
        """
        計算索引效率評分
        
        Args:
            index_info: 索引信息
            
        Returns:
            效率評分 (0-100)
        """
        score = 100.0
        
        # 基于使用率扣分
        if index_info['scan_count'] == 0:
            score -= 50  # 從未使用
        
        # 基于大小扣分
        size_mb = index_info['index_size_bytes'] / (1024 * 1024)
        if size_mb > 1000:  # 超過1GB
            score -= 30
        elif size_mb > 100:  # 超過100MB
            score -= 15
        
        # 基于唯一性加分
        if index_info['is_unique']:
            score += 10
        
        return max(0, min(100, score))
    
    def suggest_index_improvements(self, table_name: str, 
                                 query_patterns: List[str]) -> List[Dict[str, Any]]:
        """
        基于查詢模式建議索引改進(jìn)
        
        Args:
            table_name: 表名
            query_patterns: 查詢模式列表
            
        Returns:
            索引改進(jìn)建議
        """
        suggestions = []
        existing_indexes = self.analyze_table_indexes(table_name)
        
        for pattern in query_patterns:
            pattern_lower = pattern.lower()
            
            # 提取WHERE子句中的列
            where_start = pattern_lower.find('where ')
            if where_start != -1:
                where_clause = pattern_lower[where_start + 6:]
                
                # 簡單提取列名(實(shí)際應(yīng)用中應(yīng)使用SQL解析器)
                import re
                column_matches = re.findall(r'(\w+)\s*[=<>!]', where_clause)
                
                for column in column_matches:
                    # 檢查是否已有索引
                    has_index = False
                    for idx in existing_indexes:
                        if column in idx['index_columns'].lower():
                            has_index = True
                            break
                    
                    if not has_index:
                        suggestions.append({
                            'table': table_name,
                            'column': column,
                            'suggestion': f'在 {column} 列上創(chuàng)建索引',
                            'estimated_impact': 'HIGH',
                            'sql': f'CREATE INDEX idx_{table_name}_{column} ON {table_name}({column})'
                        })
        
        return suggestions


class PostgreSQLConfigOptimizer:
    """PostgreSQL配置優(yōu)化器"""
    
    def __init__(self, dsn: str):
        self.conn = psycopg2.connect(dsn)
        self.cursor = self.conn.cursor(cursor_factory=DictCursor)
    
    def analyze_current_config(self) -> Dict[str, Any]:
        """
        分析當(dāng)前配置
        
        Returns:
            配置分析結(jié)果
        """
        config_queries = {
            'basic_settings': """
                SELECT name, setting, unit, context, vartype
                FROM pg_settings
                WHERE name IN (
                    'shared_buffers', 'work_mem', 'maintenance_work_mem',
                    'effective_cache_size', 'max_connections'
                )
            """,
            'performance_settings': """
                SELECT name, setting, unit, context, vartype
                FROM pg_settings
                WHERE name LIKE '%cost%' OR name LIKE '%join%' OR name LIKE '%parallel%'
                ORDER BY name
            """,
            'wal_settings': """
                SELECT name, setting, unit, context, vartype
                FROM pg_settings
                WHERE name LIKE 'wal_%'
                ORDER BY name
            """,
            'statistics': """
                SELECT 
                    datname as database_name,
                    numbackends as active_connections,
                    xact_commit as transactions_committed,
                    xact_rollback as transactions_rolled_back,
                    blks_read as blocks_read,
                    blks_hit as blocks_hit,
                    tup_returned as tuples_returned,
                    tup_fetched as tuples_fetched,
                    tup_inserted as tuples_inserted,
                    tup_updated as tuples_updated,
                    tup_deleted as tuples_deleted
                FROM pg_stat_database
                WHERE datname = current_database()
            """
        }
        
        analysis = {}
        
        try:
            for category, query in config_queries.items():
                self.cursor.execute(query)
                analysis[category] = self.cursor.fetchall()
            
            # 計算緩沖區(qū)命中率
            if 'statistics' in analysis and analysis['statistics']:
                stats = analysis['statistics'][0]
                blocks_hit = stats['blocks_hit']
                blocks_read = stats['blocks_read']
                total_blocks = blocks_hit + blocks_read
                
                if total_blocks > 0:
                    analysis['buffer_hit_ratio'] = blocks_hit / total_blocks
                else:
                    analysis['buffer_hit_ratio'] = 0
            
            return analysis
            
        except Exception as e:
            logger.error(f"分析配置失敗: {e}")
            return {}
    
    def generate_config_recommendations(self, 
                                      system_memory_gb: float = 16,
                                      expected_connections: int = 100) -> List[Dict[str, Any]]:
        """
        生成配置優(yōu)化建議
        
        Args:
            system_memory_gb: 系統(tǒng)總內(nèi)存(GB)
            expected_connections: 預(yù)期最大連接數(shù)
            
        Returns:
            配置優(yōu)化建議列表
        """
        recommendations = []
        current_config = self.analyze_current_config()
        
        # 共享緩沖區(qū)建議(通常為系統(tǒng)內(nèi)存的25%)
        recommended_shared_buffers = f"{int(system_memory_gb * 0.25 * 1024)}MB"
        recommendations.append({
            'parameter': 'shared_buffers',
            'current_value': self._get_config_value(current_config, 'shared_buffers'),
            'recommended_value': recommended_shared_buffers,
            'reason': f'設(shè)置為系統(tǒng)內(nèi)存({system_memory_gb}GB)的25%以優(yōu)化緩存性能',
            'impact': 'HIGH'
        })
        
        # 工作內(nèi)存建議
        recommended_work_mem = f"{int(system_memory_gb * 1024 / expected_connections / 4)}MB"
        recommendations.append({
            'parameter': 'work_mem',
            'current_value': self._get_config_value(current_config, 'work_mem'),
            'recommended_value': recommended_work_mem,
            'reason': f'基于{expected_connections}個并發(fā)連接和系統(tǒng)內(nèi)存計算',
            'impact': 'MEDIUM'
        })
        
        # 維護(hù)工作內(nèi)存建議
        recommended_maintenance_work_mem = f"{int(system_memory_gb * 0.1 * 1024)}MB"
        recommendations.append({
            'parameter': 'maintenance_work_mem',
            'current_value': self._get_config_value(current_config, 'maintenance_work_mem'),
            'recommended_value': recommended_maintenance_work_mem,
            'reason': '設(shè)置為系統(tǒng)內(nèi)存的10%以優(yōu)化維護(hù)操作性能',
            'impact': 'MEDIUM'
        })
        
        # 有效緩存大小建議
        recommended_effective_cache_size = f"{int(system_memory_gb * 0.5 * 1024)}MB"
        recommendations.append({
            'parameter': 'effective_cache_size',
            'current_value': self._get_config_value(current_config, 'effective_cache_size'),
            'recommended_value': recommended_effective_cache_size,
            'reason': '設(shè)置為系統(tǒng)內(nèi)存的50%以幫助查詢規(guī)劃器做出更好的決策',
            'impact': 'MEDIUM'
        })
        
        # 基于緩沖區(qū)命中率的建議
        hit_ratio = current_config.get('buffer_hit_ratio', 0)
        if hit_ratio < 0.9:
            recommendations.append({
                'parameter': 'BUFFER_HIT_RATIO',
                'current_value': f'{hit_ratio:.2%}',
                'recommended_value': '>90%',
                'reason': '緩沖區(qū)命中率較低,可能影響查詢性能',
                'impact': 'HIGH',
                'additional_suggestions': [
                    '增加shared_buffers',
                    '優(yōu)化熱點(diǎn)查詢',
                    '考慮使用pg_prewarm擴(kuò)展'
                ]
            })
        
        return recommendations
    
    def _get_config_value(self, config_analysis: Dict[str, Any], 
                         param_name: str) -> str:
        """獲取配置參數(shù)值"""
        if 'basic_settings' in config_analysis:
            for setting in config_analysis['basic_settings']:
                if setting['name'] == param_name:
                    return f"{setting['setting']} {setting['unit'] or ''}".strip()
        
        return '未找到'
    
    def close(self):
        """關(guān)閉連接"""
        if self.cursor:
            self.cursor.close()
        if self.conn:
            self.conn.close()


def comprehensive_performance_analysis(dsn: str):
    """綜合性能分析示例"""
    print("=" * 60)
    print("PostgreSQL性能綜合分析")
    print("=" * 60)
    
    # 1. 查詢性能分析
    print("\n1. 查詢性能分析")
    print("-" * 40)
    
    analyzer = QueryPerformanceAnalyzer(dsn)
    
    test_query = """
    SELECT 
        u.username,
        COUNT(o.id) as order_count,
        SUM(o.amount) as total_amount,
        AVG(o.amount) as avg_amount
    FROM users u
    JOIN orders o ON u.id = o.user_id
    WHERE u.created_at > CURRENT_DATE - INTERVAL '1 year'
    GROUP BY u.id, u.username
    HAVING COUNT(o.id) > 5
    ORDER BY total_amount DESC
    LIMIT 100
    """
    
    benchmark_results = analyzer.benchmark_query(test_query, iterations=5)
    
    if benchmark_results:
        print(f"平均執(zhí)行時間: {benchmark_results['avg_time']:.3f}秒")
        print(f"最小執(zhí)行時間: {benchmark_results['min_time']:.3f}秒")
        print(f"最大執(zhí)行時間: {benchmark_results['max_time']:.3f}秒")
        print(f"標(biāo)準(zhǔn)差: {benchmark_results['std_dev']:.3f}秒")
        
        # 生成優(yōu)化建議
        suggestions = analyzer.generate_optimization_suggestions(
            test_query, benchmark_results
        )
        
        print(f"\n優(yōu)化建議 ({len(suggestions)}條):")
        for i, suggestion in enumerate(suggestions, 1):
            print(f"{i}. [{suggestion['priority']}] {suggestion['suggestion']}")
    
    # 2. 索引分析
    print("\n2. 索引分析")
    print("-" * 40)
    
    index_optimizer = IndexOptimizer(dsn)
    table_name = "users"  # 假設(shè)的表名
    
    index_analysis = index_optimizer.analyze_table_indexes(table_name)
    
    if index_analysis:
        print(f"表 '{table_name}' 的索引分析:")
        for idx in index_analysis[:5]:  # 顯示前5個索引
            print(f"  - {idx['index_name']}: {idx['index_size']}, "
                  f"使用率: {idx['usage_ratio']:.2f}, "
                  f"效率評分: {idx['efficiency_score']:.1f}")
    
    # 3. 配置分析
    print("\n3. 配置分析")
    print("-" * 40)
    
    config_optimizer = PostgreSQLConfigOptimizer(dsn)
    config_recommendations = config_optimizer.generate_config_recommendations(
        system_memory_gb=16,
        expected_connections=200
    )
    
    print("配置優(yōu)化建議:")
    for rec in config_recommendations:
        print(f"  - {rec['parameter']}: {rec['current_value']} → {rec['recommended_value']}")
    
    # 4. 綜合報告
    print("\n4. 綜合性能報告")
    print("-" * 40)
    
    overall_score = 85.0  # 示例評分
    bottlenecks = [
        "查詢響應(yīng)時間波動較大",
        "部分索引使用率較低",
        "緩沖區(qū)命中率需要優(yōu)化"
    ]
    
    print(f"總體性能評分: {overall_score}/100")
    print("\n主要瓶頸:")
    for bottleneck in bottlenecks:
        print(f"  ? {bottleneck}")
    
    print("\n優(yōu)化優(yōu)先級:")
    print("  1. 優(yōu)化高響應(yīng)時間查詢")
    print("  2. 調(diào)整數(shù)據(jù)庫配置參數(shù)")
    print("  3. 重建低效率索引")
    
    # 清理資源
    analyzer.cursor.close()
    analyzer.conn.close()
    config_optimizer.close()


if __name__ == "__main__":
    # 數(shù)據(jù)庫連接字符串
    dsn = "dbname=testdb user=postgres password=password host=localhost port=5432"
    
    comprehensive_performance_analysis(dsn)

3.2 索引優(yōu)化策略

3.2.1 索引類型選擇矩陣

3.2.2 復(fù)合索引設(shè)計原則

復(fù)合索引的列順序設(shè)計遵循最左前綴原則,選擇性公式為:

選擇性=不同值數(shù)量/總行數(shù)?

索引設(shè)計優(yōu)先級:

  • 高選擇性列(接近1.0)放在前面
  • 經(jīng)常用于WHERE條件的列
  • 用于ORDER BY的列
  • 用于GROUP BY的列

3.3 并發(fā)控制與鎖優(yōu)化

PostgreSQL采用MVCC(多版本并發(fā)控制)機(jī)制,提供多種隔離級別和鎖類型:

"""
PostgreSQL并發(fā)控制與鎖優(yōu)化示例
"""
import psycopg2
from psycopg2.extras import DictCursor
import threading
import time
from typing import List, Dict, Any
import logging
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class ConcurrentTransactionTest:
    """并發(fā)事務(wù)測試"""
    
    def __init__(self, dsn: str):
        self.dsn = dsn
        self.results = []
        self.lock = threading.Lock()
    
    def run_concurrent_updates(self, num_threads: int = 10):
        """
        運(yùn)行并發(fā)更新測試
        
        Args:
            num_threads: 并發(fā)線程數(shù)
        """
        print(f"開始并發(fā)更新測試,線程數(shù): {num_threads}")
        print("=" * 60)
        
        threads = []
        
        # 初始化測試數(shù)據(jù)
        self._setup_test_data()
        
        # 創(chuàng)建并啟動線程
        for i in range(num_threads):
            thread = threading.Thread(
                target=self._update_account_balance,
                args=(i, 100 + i, 50.0),  # 賬戶ID從100開始
                name=f"Transaction-{i}"
            )
            threads.append(thread)
            thread.start()
        
        # 等待所有線程完成
        for thread in threads:
            thread.join()
        
        # 驗(yàn)證結(jié)果
        self._verify_results()
        
        print(f"\n測試完成,總事務(wù)數(shù): {num_threads}")
        print(f"成功事務(wù): {len([r for r in self.results if r['success']])}")
        print(f"失敗事務(wù): {len([r for r in self.results if not r['success']])}")
    
    def _setup_test_data(self):
        """設(shè)置測試數(shù)據(jù)"""
        conn = psycopg2.connect(self.dsn)
        cursor = conn.cursor()
        
        try:
            # 創(chuàng)建測試表
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS accounts (
                    id SERIAL PRIMARY KEY,
                    account_number VARCHAR(50) UNIQUE,
                    balance DECIMAL(15, 2) DEFAULT 0.0,
                    version INTEGER DEFAULT 0,
                    last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                );
            """)
            
            # 插入測試賬戶
            for i in range(10):
                account_id = 100 + i
                cursor.execute("""
                    INSERT INTO accounts (id, account_number, balance, version)
                    VALUES (%s, %s, %s, %s)
                    ON CONFLICT (id) DO UPDATE SET
                        balance = EXCLUDED.balance,
                        version = EXCLUDED.version
                """, (account_id, f"ACC{account_id}", 1000.0, 0))
            
            conn.commit()
            logger.info("測試數(shù)據(jù)設(shè)置完成")
            
        except Exception as e:
            conn.rollback()
            logger.error(f"設(shè)置測試數(shù)據(jù)失敗: {e}")
        finally:
            cursor.close()
            conn.close()
    
    def _update_account_balance(self, thread_id: int, account_id: int, amount: float):
        """
        更新賬戶余額
        
        Args:
            thread_id: 線程ID
            account_id: 賬戶ID
            amount: 更新金額
        """
        conn = None
        cursor = None
        
        try:
            conn = psycopg2.connect(self.dsn)
            cursor = conn.cursor()
            
            # 設(shè)置事務(wù)隔離級別(可測試不同級別)
            cursor.execute("SET TRANSACTION ISOLATION LEVEL READ COMMITTED")
            
            start_time = time.time()
            
            # 方法1: 使用行級鎖
            # cursor.execute("""
            #     SELECT balance FROM accounts 
            #     WHERE id = %s FOR UPDATE
            # """, (account_id,))
            
            # 方法2: 樂觀鎖(版本控制)
            cursor.execute("""
                SELECT balance, version FROM accounts 
                WHERE id = %s
            """, (account_id,))
            
            result = cursor.fetchone()
            if not result:
                raise Exception(f"賬戶 {account_id} 不存在")
            
            current_balance, current_version = result
            new_balance = current_balance + amount
            
            # 模擬一些處理時間
            time.sleep(0.01)
            
            # 使用樂觀鎖更新
            cursor.execute("""
                UPDATE accounts 
                SET balance = %s, 
                    version = version + 1,
                    last_updated = CURRENT_TIMESTAMP
                WHERE id = %s AND version = %s
            """, (new_balance, account_id, current_version))
            
            # 檢查是否更新成功
            if cursor.rowcount == 0:
                # 樂觀鎖沖突,重試或失敗
                conn.rollback()
                success = False
                error_msg = "樂觀鎖沖突,版本不匹配"
            else:
                conn.commit()
                success = True
                error_msg = None
            
            end_time = time.time()
            duration = end_time - start_time
            
            # 記錄結(jié)果
            with self.lock:
                self.results.append({
                    'thread_id': thread_id,
                    'account_id': account_id,
                    'success': success,
                    'duration': duration,
                    'error': error_msg,
                    'start_time': start_time,
                    'end_time': end_time
                })
            
            if success:
                logger.debug(f"線程 {thread_id}: 賬戶 {account_id} 更新成功,耗時 {duration:.3f}秒")
            else:
                logger.warning(f"線程 {thread_id}: 賬戶 {account_id} 更新失敗 - {error_msg}")
            
        except Exception as e:
            if conn:
                conn.rollback()
            
            with self.lock:
                self.results.append({
                    'thread_id': thread_id,
                    'account_id': account_id,
                    'success': False,
                    'duration': time.time() - start_time if 'start_time' in locals() else 0,
                    'error': str(e),
                    'start_time': start_time if 'start_time' in locals() else 0,
                    'end_time': time.time()
                })
            
            logger.error(f"線程 {thread_id} 異常: {e}")
            
        finally:
            if cursor:
                cursor.close()
            if conn:
                conn.close()
    
    def _verify_results(self):
        """驗(yàn)證測試結(jié)果"""
        conn = psycopg2.connect(self.dsn)
        cursor = conn.cursor()
        
        try:
            # 檢查賬戶最終狀態(tài)
            cursor.execute("""
                SELECT id, balance, version, last_updated
                FROM accounts
                WHERE id >= 100 AND id < 110
                ORDER BY id
            """)
            
            accounts = cursor.fetchall()
            
            print("\n賬戶最終狀態(tài):")
            print("-" * 60)
            print(f"{'賬戶ID':<10} {'余額':<15} {'版本':<10} {'最后更新'}")
            print("-" * 60)
            
            for acc in accounts:
                print(f"{acc[0]:<10} {float(acc[1]):<15.2f} {acc[2]:<10} {acc[3]}")
            
            # 分析并發(fā)問題
            success_count = len([r for r in self.results if r['success']])
            total_count = len(self.results)
            
            if success_count < total_count:
                print(f"\n發(fā)現(xiàn)并發(fā)問題: {total_count - success_count} 個事務(wù)失敗")
                print("\n失敗事務(wù)詳情:")
                for result in self.results:
                    if not result['success']:
                        print(f"  線程 {result['thread_id']}: {result['error']}")
            
            # 計算性能指標(biāo)
            if self.results:
                durations = [r['duration'] for r in self.results if r['success']]
                if durations:
                    avg_duration = sum(durations) / len(durations)
                    max_duration = max(durations)
                    min_duration = min(durations)
                    
                    print(f"\n性能指標(biāo):")
                    print(f"  平均事務(wù)時間: {avg_duration:.3f}秒")
                    print(f"  最長事務(wù)時間: {max_duration:.3f}秒")
                    print(f"  最短事務(wù)時間: {min_duration:.3f}秒")
                    print(f"  吞吐量: {success_count / sum(durations):.2f} 事務(wù)/秒")
            
        except Exception as e:
            logger.error(f"驗(yàn)證結(jié)果失敗: {e}")
        finally:
            cursor.close()
            conn.close()


class LockMonitor:
    """鎖監(jiān)控器"""
    
    def __init__(self, dsn: str):
        self.dsn = dsn
    
    def get_current_locks(self) -> List[Dict[str, Any]]:
        """
        獲取當(dāng)前鎖信息
        
        Returns:
            鎖信息列表
        """
        conn = psycopg2.connect(self.dsn)
        cursor = conn.cursor(cursor_factory=DictCursor)
        
        try:
            lock_query = """
            SELECT 
                -- 鎖信息
                pl.pid as process_id,
                pl.mode as lock_mode,
                pl.granted as is_granted,
                pl.fastpath as is_fastpath,
                
                -- 事務(wù)信息
                pa.query as current_query,
                pa.state as query_state,
                pa.wait_event_type as wait_event_type,
                pa.wait_event as wait_event,
                pa.backend_start as backend_start_time,
                pa.xact_start as transaction_start_time,
                pa.query_start as query_start_time,
                
                -- 被鎖對象信息
                pl.relation::regclass as locked_relation,
                pl.page as locked_page,
                pl.tuple as locked_tuple,
                pl.virtualxid as virtual_transaction_id,
                pl.transactionid as transaction_id,
                pl.classid::regclass as locked_class,
                pl.objid as locked_object_id,
                pl.objsubid as locked_object_subid,
                
                -- 等待圖信息
                pg_blocking_pids(pl.pid) as blocking_pids,
                
                -- 附加信息
                now() - pa.query_start as query_duration,
                now() - pa.xact_start as transaction_duration
                
            FROM pg_locks pl
            LEFT JOIN pg_stat_activity pa ON pl.pid = pa.pid
            WHERE pl.pid <> pg_backend_pid()  -- 排除當(dāng)前連接
            ORDER BY 
                pl.granted DESC,  -- 先顯示未授予的鎖
                transaction_duration DESC,
                query_duration DESC
            """
            
            cursor.execute(lock_query)
            locks = cursor.fetchall()
            
            return [
                {
                    'process_id': lock['process_id'],
                    'lock_mode': lock['lock_mode'],
                    'is_granted': lock['is_granted'],
                    'current_query': lock['current_query'][:100] if lock['current_query'] else None,
                    'query_state': lock['query_state'],
                    'locked_relation': lock['locked_relation'],
                    'blocking_pids': lock['blocking_pids'],
                    'query_duration': lock['query_duration'],
                    'transaction_duration': lock['transaction_duration'],
                    'wait_event': lock['wait_event']
                }
                for lock in locks
            ]
            
        except Exception as e:
            logger.error(f"獲取鎖信息失敗: {e}")
            return []
        finally:
            cursor.close()
            conn.close()
    
    def analyze_lock_contention(self) -> Dict[str, Any]:
        """
        分析鎖爭用情況
        
        Returns:
            鎖爭用分析報告
        """
        locks = self.get_current_locks()
        
        analysis = {
            'total_locks': len(locks),
            'granted_locks': len([l for l in locks if l['is_granted']]),
            'waiting_locks': len([l for l in locks if not l['is_granted']]),
            'lock_modes': {},
            'wait_chains': [],
            'long_running_transactions': [],
            'potential_deadlocks': []
        }
        
        # 統(tǒng)計鎖模式
        for lock in locks:
            mode = lock['lock_mode']
            analysis['lock_modes'][mode] = analysis['lock_modes'].get(mode, 0) + 1
        
        # 識別等待鏈
        waiting_processes = [l for l in locks if not l['is_granted']]
        for waiter in waiting_processes:
            if waiter['blocking_pids']:
                chain = {
                    'waiting_pid': waiter['process_id'],
                    'blocking_pids': waiter['blocking_pids'],
                    'lock_mode': waiter['lock_mode'],
                    'wait_time': waiter['query_duration']
                }
                analysis['wait_chains'].append(chain)
        
        # 識別長時間運(yùn)行的事務(wù)
        for lock in locks:
            if lock['transaction_duration'] and lock['transaction_duration'].total_seconds() > 60:
                analysis['long_running_transactions'].append({
                    'pid': lock['process_id'],
                    'duration_seconds': lock['transaction_duration'].total_seconds(),
                    'query': lock['current_query']
                })
        
        return analysis
    
    def kill_blocking_processes(self, threshold_seconds: int = 300):
        """
        終止阻塞時間過長的進(jìn)程
        
        Args:
            threshold_seconds: 阻塞時間閾值(秒)
        """
        conn = psycopg2.connect(self.dsn)
        cursor = conn.cursor()
        
        try:
            # 查找阻塞時間過長的進(jìn)程
            kill_query = """
            SELECT pid, query, now() - xact_start as duration
            FROM pg_stat_activity
            WHERE pid IN (
                SELECT DISTINCT unnest(pg_blocking_pids(pid))
                FROM pg_stat_activity
                WHERE wait_event_type = 'Lock'
                AND state = 'active'
                AND now() - query_start > INTERVAL '%s seconds'
            )
            AND state = 'active'
            """
            
            cursor.execute(kill_query % threshold_seconds)
            blocking_processes = cursor.fetchall()
            
            killed = []
            for proc in blocking_processes:
                pid, query, duration = proc
                try:
                    cursor.execute("SELECT pg_terminate_backend(%s)", (pid,))
                    killed.append({
                        'pid': pid,
                        'duration': duration,
                        'query': query[:100] if query else None
                    })
                    logger.warning(f"終止阻塞進(jìn)程 {pid},已運(yùn)行 {duration}")
                except Exception as e:
                    logger.error(f"終止進(jìn)程 {pid} 失敗: {e}")
            
            conn.commit()
            return killed
            
        except Exception as e:
            conn.rollback()
            logger.error(f"終止阻塞進(jìn)程失敗: {e}")
            return []
        finally:
            cursor.close()
            conn.close()


def test_concurrency_scenarios():
    """測試不同并發(fā)場景"""
    dsn = "dbname=testdb user=postgres password=password host=localhost port=5432"
    
    print("并發(fā)控制測試")
    print("=" * 60)
    
    # 場景1:高并發(fā)更新
    print("\n場景1: 高并發(fā)更新測試")
    test1 = ConcurrentTransactionTest(dsn)
    test1.run_concurrent_updates(num_threads=20)
    
    # 場景2:鎖監(jiān)控
    print("\n\n場景2: 鎖監(jiān)控分析")
    print("-" * 40)
    
    monitor = LockMonitor(dsn)
    lock_analysis = monitor.analyze_lock_contention()
    
    print(f"總鎖數(shù): {lock_analysis['total_locks']}")
    print(f"已授予鎖: {lock_analysis['granted_locks']}")
    print(f"等待鎖: {lock_analysis['waiting_locks']}")
    
    if lock_analysis['wait_chains']:
        print("\n等待鏈:")
        for chain in lock_analysis['wait_chains'][:5]:  # 顯示前5個
            print(f"  進(jìn)程 {chain['waiting_pid']} 等待 {chain['blocking_pids']}")
    
    if lock_analysis['long_running_transactions']:
        print("\n長時間運(yùn)行事務(wù):")
        for txn in lock_analysis['long_running_transactions'][:3]:
            print(f"  進(jìn)程 {txn['pid']}: 已運(yùn)行 {txn['duration_seconds']:.0f}秒")
    
    # 場景3:死鎖處理建議
    print("\n\n場景3: 死鎖預(yù)防建議")
    print("-" * 40)
    
    recommendations = [
        "1. 使用合適的索引減少鎖競爭范圍",
        "2. 保持事務(wù)簡短,盡快提交",
        "3. 使用顯式鎖(SELECT FOR UPDATE)時按固定順序訪問資源",
        "4. 設(shè)置合理的鎖超時(lock_timeout)",
        "5. 考慮使用樂觀鎖(版本控制)替代悲觀鎖",
        "6. 使用較低的隔離級別(READ COMMITTED)",
        "7. 監(jiān)控和調(diào)整max_connections參數(shù)",
        "8. 定期分析并優(yōu)化長時間運(yùn)行的事務(wù)"
    ]
    
    for rec in recommendations:
        print(f"  ? {rec}")


if __name__ == "__main__":
    test_concurrency_scenarios()

4. 高級特性綜合應(yīng)用

4.1 完整示例:電商系統(tǒng)數(shù)據(jù)庫設(shè)計

"""
電商系統(tǒng)PostgreSQL高級特性綜合應(yīng)用示例
"""
import psycopg2
from psycopg2.extras import Json, DictCursor
import json
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
import logging
from decimal import Decimal

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class ECommerceDatabase:
    """電商系統(tǒng)數(shù)據(jù)庫設(shè)計"""
    
    def __init__(self, dsn: str):
        self.conn = psycopg2.connect(dsn)
        self.cursor = self.conn.cursor(cursor_factory=DictCursor)
    
    def create_schema(self):
        """創(chuàng)建電商系統(tǒng)數(shù)據(jù)庫架構(gòu)"""
        schema_sql = """
        -- 啟用必要擴(kuò)展
        CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
        CREATE EXTENSION IF NOT EXISTS "pg_trgm";
        CREATE EXTENSION IF NOT EXISTS "btree_gin";
        
        -- 1. 產(chǎn)品表(使用JSONB存儲變體屬性)
        CREATE TABLE IF NOT EXISTS products (
            id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
            sku VARCHAR(100) UNIQUE NOT NULL,
            name VARCHAR(500) NOT NULL,
            description TEXT,
            category_id UUID,
            brand VARCHAR(200),
            
            -- JSONB存儲動態(tài)屬性
            attributes JSONB DEFAULT '{}',
            
            -- 價格信息
            base_price DECIMAL(12, 2) NOT NULL,
            discount_price DECIMAL(12, 2),
            
            -- 庫存信息
            stock_quantity INTEGER DEFAULT 0,
            reserved_quantity INTEGER DEFAULT 0,
            
            -- 搜索優(yōu)化字段
            search_vector TSVECTOR GENERATED ALWAYS AS (
                setweight(to_tsvector('english', coalesce(name, '')), 'A') ||
                setweight(to_tsvector('english', coalesce(description, '')), 'B') ||
                setweight(to_tsvector('english', coalesce(brand, '')), 'C')
            ) STORED,
            
            -- 時間戳
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            
            -- 約束
            CONSTRAINT positive_price CHECK (base_price >= 0),
            CONSTRAINT positive_stock CHECK (stock_quantity >= 0),
            CONSTRAINT valid_discount CHECK (
                discount_price IS NULL OR 
                (discount_price >= 0 AND discount_price <= base_price)
            )
        );
        
        -- 產(chǎn)品表索引
        CREATE INDEX IF NOT EXISTS idx_products_sku ON products(sku);
        CREATE INDEX IF NOT EXISTS idx_products_category ON products(category_id);
        CREATE INDEX IF NOT EXISTS idx_products_price ON products(base_price);
        CREATE INDEX IF NOT EXISTS idx_products_search ON products USING GIN(search_vector);
        CREATE INDEX IF NOT EXISTS idx_products_attributes ON products USING GIN(attributes);
        CREATE INDEX IF NOT EXISTS idx_products_brand ON products(brand);
        
        -- 2. 產(chǎn)品變體表(范圍類型用于尺寸)
        CREATE TABLE IF NOT EXISTS product_variants (
            id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
            product_id UUID REFERENCES products(id) ON DELETE CASCADE,
            variant_name VARCHAR(200) NOT NULL,
            
            -- 使用范圍類型表示尺寸范圍
            size_range numrange,
            weight_range numrange,
            
            -- JSONB存儲變體特定屬性
            variant_attributes JSONB DEFAULT '{}',
            
            -- 價格調(diào)整
            price_adjustment DECIMAL(10, 2) DEFAULT 0,
            additional_cost DECIMAL(10, 2) DEFAULT 0,
            
            -- 庫存跟蹤
            variant_stock INTEGER DEFAULT 0,
            min_order_quantity INTEGER DEFAULT 1,
            max_order_quantity INTEGER,
            
            -- 約束
            CONSTRAINT valid_size_range CHECK (
                size_range IS NULL OR 
                (lower(size_range) >= 0 AND upper(size_range) > lower(size_range))
            ),
            CONSTRAINT valid_quantity CHECK (
                min_order_quantity > 0 AND 
                (max_order_quantity IS NULL OR max_order_quantity >= min_order_quantity)
            )
        );
        
        -- 變體表索引
        CREATE INDEX IF NOT EXISTS idx_variants_product ON product_variants(product_id);
        CREATE INDEX IF NOT EXISTS idx_variants_size ON product_variants USING GIST(size_range);
        
        -- 3. 分類表(使用遞歸CTE支持層級結(jié)構(gòu))
        CREATE TABLE IF NOT EXISTS categories (
            id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
            name VARCHAR(200) NOT NULL,
            slug VARCHAR(200) UNIQUE NOT NULL,
            description TEXT,
            parent_id UUID REFERENCES categories(id),
            sort_order INTEGER DEFAULT 0,
            
            -- JSONB存儲分類屬性
            category_attributes JSONB DEFAULT '{}',
            
            -- 層級路徑(物化路徑模式)
            path VARCHAR(1000),
            level INTEGER DEFAULT 0,
            
            -- 時間戳
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            
            -- 約束
            CONSTRAINT no_self_parent CHECK (id != parent_id)
        );
        
        -- 分類表索引
        CREATE INDEX IF NOT EXISTS idx_categories_parent ON categories(parent_id);
        CREATE INDEX IF NOT EXISTS idx_categories_path ON categories(path);
        CREATE INDEX IF NOT EXISTS idx_categories_slug ON categories(slug);
        
        -- 4. 訂單表(使用分區(qū)表)
        CREATE TABLE IF NOT EXISTS orders_partitioned (
            id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
            order_number VARCHAR(50) UNIQUE NOT NULL,
            customer_id UUID NOT NULL,
            status VARCHAR(50) NOT NULL,
            
            -- JSONB存儲訂單元數(shù)據(jù)
            order_metadata JSONB DEFAULT '{}',
            
            -- 金額信息
            subtotal DECIMAL(12, 2) NOT NULL,
            tax_amount DECIMAL(12, 2) DEFAULT 0,
            shipping_amount DECIMAL(12, 2) DEFAULT 0,
            discount_amount DECIMAL(12, 2) DEFAULT 0,
            total_amount DECIMAL(12, 2) NOT NULL,
            
            -- 時間信息
            ordered_at TIMESTAMP NOT NULL,
            shipped_at TIMESTAMP,
            delivered_at TIMESTAMP,
            
            -- 約束
            CONSTRAINT positive_amounts CHECK (
                subtotal >= 0 AND
                tax_amount >= 0 AND
                shipping_amount >= 0 AND
                discount_amount >= 0 AND
                total_amount >= 0
            )
        ) PARTITION BY RANGE (ordered_at);
        
        -- 創(chuàng)建訂單分區(qū)(每月一個分區(qū))
        CREATE TABLE IF NOT EXISTS orders_2024_01 
        PARTITION OF orders_partitioned
        FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
        
        CREATE TABLE IF NOT EXISTS orders_2024_02 
        PARTITION OF orders_partitioned
        FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
        
        -- 訂單表索引
        CREATE INDEX IF NOT EXISTS idx_orders_customer ON orders_partitioned(customer_id);
        CREATE INDEX IF NOT EXISTS idx_orders_status ON orders_partitioned(status);
        CREATE INDEX IF NOT EXISTS idx_orders_date ON orders_partitioned(ordered_at);
        CREATE INDEX IF NOT EXISTS idx_orders_metadata ON orders_partitioned USING GIN(order_metadata);
        
        -- 5. 訂單項(xiàng)表
        CREATE TABLE IF NOT EXISTS order_items (
            id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
            order_id UUID REFERENCES orders_partitioned(id) ON DELETE CASCADE,
            product_id UUID REFERENCES products(id),
            variant_id UUID REFERENCES product_variants(id),
            
            -- 產(chǎn)品快照(避免產(chǎn)品信息變更影響歷史訂單)
            product_snapshot JSONB NOT NULL,
            
            -- 購買信息
            quantity INTEGER NOT NULL,
            unit_price DECIMAL(12, 2) NOT NULL,
            discount_percentage DECIMAL(5, 2) DEFAULT 0,
            item_total DECIMAL(12, 2) NOT NULL,
            
            -- 約束
            CONSTRAINT positive_quantity CHECK (quantity > 0),
            CONSTRAINT positive_unit_price CHECK (unit_price >= 0)
        );
        
        -- 訂單項(xiàng)索引
        CREATE INDEX IF NOT EXISTS idx_order_items_order ON order_items(order_id);
        CREATE INDEX IF NOT EXISTS idx_order_items_product ON order_items(product_id);
        
        -- 6. 客戶表
        CREATE TABLE IF NOT EXISTS customers (
            id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
            email VARCHAR(255) UNIQUE NOT NULL,
            phone VARCHAR(50),
            first_name VARCHAR(100),
            last_name VARCHAR(100),
            
            -- JSONB存儲客戶屬性
            customer_profile JSONB DEFAULT '{}',
            
            -- 地址信息(使用JSONB存儲多個地址)
            addresses JSONB DEFAULT '[]',
            
            -- 賬戶信息
            is_active BOOLEAN DEFAULT TRUE,
            loyalty_points INTEGER DEFAULT 0,
            customer_tier VARCHAR(50) DEFAULT 'STANDARD',
            
            -- 時間戳
            registered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            last_login_at TIMESTAMP,
            
            -- 約束
            CONSTRAINT valid_email CHECK (email ~* '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Z|a-z]{2,}$')
        );
        
        -- 客戶表索引
        CREATE INDEX IF NOT EXISTS idx_customers_email ON customers(email);
        CREATE INDEX IF NOT EXISTS idx_customers_name ON customers(last_name, first_name);
        CREATE INDEX IF NOT EXISTS idx_customers_profile ON customers USING GIN(customer_profile);
        CREATE INDEX IF NOT EXISTS idx_customers_tier ON customers(customer_tier);
        
        -- 7. 庫存變更日志(使用BRIN索引)
        CREATE TABLE IF NOT EXISTS inventory_logs (
            id BIGSERIAL PRIMARY KEY,
            product_id UUID REFERENCES products(id),
            variant_id UUID REFERENCES product_variants(id),
            
            -- 變更信息
            change_type VARCHAR(50) NOT NULL,
            quantity_change INTEGER NOT NULL,
            previous_quantity INTEGER,
            new_quantity INTEGER,
            
            -- 關(guān)聯(lián)信息
            reference_id UUID,
            reference_type VARCHAR(100),
            
            -- 時間戳
            changed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            changed_by UUID,
            
            -- 備注
            notes TEXT
        ) WITH (fillfactor = 90);
        
        -- 庫存日志索引(使用BRIN適合時間序列)
        CREATE INDEX IF NOT EXISTS idx_inventory_logs_time 
        ON inventory_logs USING BRIN(changed_at);
        
        CREATE INDEX IF NOT EXISTS idx_inventory_logs_product 
        ON inventory_logs(product_id, changed_at);
        
        -- 8. 產(chǎn)品評論表(使用數(shù)組和全文搜索)
        CREATE TABLE IF NOT EXISTS product_reviews (
            id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
            product_id UUID REFERENCES products(id) ON DELETE CASCADE,
            customer_id UUID REFERENCES customers(id),
            order_item_id UUID REFERENCES order_items(id),
            
            -- 評分
            rating INTEGER NOT NULL CHECK (rating >= 1 AND rating <= 5),
            
            -- 評論內(nèi)容
            title VARCHAR(500),
            review_text TEXT NOT NULL,
            
            -- 數(shù)組存儲優(yōu)點(diǎn)/缺點(diǎn)
            pros TEXT[],
            cons TEXT[],
            
            -- 元數(shù)據(jù)
            is_verified_purchase BOOLEAN DEFAULT FALSE,
            helpful_votes INTEGER DEFAULT 0,
            not_helpful_votes INTEGER DEFAULT 0,
            
            -- 全文搜索向量
            review_vector TSVECTOR GENERATED ALWAYS AS (
                to_tsvector('english', 
                    coalesce(title, '') || ' ' || 
                    coalesce(review_text, '')
                )
            ) STORED,
            
            -- 時間戳
            reviewed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            
            -- 約束
            CONSTRAINT one_review_per_order_item UNIQUE (order_item_id)
        );
        
        -- 評論表索引
        CREATE INDEX IF NOT EXISTS idx_reviews_product ON product_reviews(product_id);
        CREATE INDEX IF NOT EXISTS idx_reviews_rating ON product_reviews(rating);
        CREATE INDEX IF NOT EXISTS idx_reviews_search ON product_reviews USING GIN(review_vector);
        CREATE INDEX IF NOT EXISTS idx_reviews_pros ON product_reviews USING GIN(pros);
        CREATE INDEX IF NOT EXISTS idx_reviews_date ON product_reviews(reviewed_at);
        
        -- 9. 促銷規(guī)則表(使用復(fù)雜約束和JSONB)
        CREATE TABLE IF NOT EXISTS promotion_rules (
            id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
            name VARCHAR(200) NOT NULL,
            description TEXT,
            
            -- 規(guī)則條件(JSONB存儲復(fù)雜規(guī)則)
            conditions JSONB NOT NULL,
            
            -- 折扣信息
            discount_type VARCHAR(50) NOT NULL,
            discount_value DECIMAL(10, 2),
            discount_percentage DECIMAL(5, 2),
            max_discount_amount DECIMAL(12, 2),
            
            -- 時間范圍
            valid_from TIMESTAMP NOT NULL,
            valid_until TIMESTAMP,
            
            -- 使用限制
            usage_limit INTEGER,
            per_customer_limit INTEGER,
            minimum_order_amount DECIMAL(12, 2),
            
            -- 狀態(tài)
            is_active BOOLEAN DEFAULT TRUE,
            
            -- 元數(shù)據(jù)
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            
            -- 約束
            CONSTRAINT valid_discount_values CHECK (
                (discount_type = 'AMOUNT' AND discount_value IS NOT NULL) OR
                (discount_type = 'PERCENTAGE' AND discount_percentage IS NOT NULL)
            ),
            CONSTRAINT valid_discount_percentage CHECK (
                discount_percentage IS NULL OR 
                (discount_percentage >= 0 AND discount_percentage <= 100)
            )
        );
        
        -- 促銷規(guī)則索引
        CREATE INDEX IF NOT EXISTS idx_promotions_active 
        ON promotion_rules(is_active, valid_from, valid_until);
        
        CREATE INDEX IF NOT EXISTS idx_promotions_conditions 
        ON promotion_rules USING GIN(conditions);
        
        -- 10. 物化視圖:產(chǎn)品統(tǒng)計
        CREATE MATERIALIZED VIEW IF NOT EXISTS product_statistics AS
        SELECT 
            p.id as product_id,
            p.name as product_name,
            p.category_id,
            p.base_price,
            
            -- 銷售統(tǒng)計
            COUNT(DISTINCT oi.order_id) as total_orders,
            SUM(oi.quantity) as total_units_sold,
            SUM(oi.item_total) as total_revenue,
            
            -- 庫存統(tǒng)計
            p.stock_quantity,
            p.reserved_quantity,
            p.stock_quantity - p.reserved_quantity as available_quantity,
            
            -- 評分統(tǒng)計
            COALESCE(AVG(pr.rating), 0) as average_rating,
            COUNT(pr.id) as review_count,
            
            -- 時間統(tǒng)計
            MAX(o.ordered_at) as last_sale_date
            
        FROM products p
        LEFT JOIN order_items oi ON p.id = oi.product_id
        LEFT JOIN orders_partitioned o ON oi.order_id = o.id
        LEFT JOIN product_reviews pr ON p.id = pr.product_id
        
        GROUP BY p.id, p.name, p.category_id, p.base_price,
                 p.stock_quantity, p.reserved_quantity
        
        WITH DATA;
        
        -- 物化視圖索引
        CREATE UNIQUE INDEX IF NOT EXISTS idx_product_stats_product 
        ON product_statistics(product_id);
        
        CREATE INDEX IF NOT EXISTS idx_product_stats_category 
        ON product_statistics(category_id);
        
        CREATE INDEX IF NOT EXISTS idx_product_stats_revenue 
        ON product_statistics(total_revenue DESC);
        
        -- 創(chuàng)建刷新物化視圖的函數(shù)
        CREATE OR REPLACE FUNCTION refresh_product_statistics()
        RETURNS TRIGGER AS $$
        BEGIN
            REFRESH MATERIALIZED VIEW CONCURRENTLY product_statistics;
            RETURN NULL;
        END;
        $$ LANGUAGE plpgsql;
        """
        
        try:
            # 分步執(zhí)行架構(gòu)創(chuàng)建
            statements = schema_sql.split(';')
            for statement in statements:
                statement = statement.strip()
                if statement:
                    self.cursor.execute(statement)
            
            self.conn.commit()
            logger.info("電商系統(tǒng)數(shù)據(jù)庫架構(gòu)創(chuàng)建成功")
            
        except Exception as e:
            self.conn.rollback()
            logger.error(f"創(chuàng)建架構(gòu)失敗: {e}")
            raise
    
    def search_products(
        self,
        query: Optional[str] = None,
        category_id: Optional[str] = None,
        min_price: Optional[float] = None,
        max_price: Optional[float] = None,
        brands: Optional[List[str]] = None,
        min_rating: Optional[float] = None,
        in_stock_only: bool = False,
        sort_by: str = 'relevance',
        limit: int = 20,
        offset: int = 0
    ) -> List[Dict[str, Any]]:
        """
        高級產(chǎn)品搜索
        
        Args:
            query: 搜索關(guān)鍵詞
            category_id: 分類ID
            min_price: 最低價格
            max_price: 最高價格
            brands: 品牌列表
            min_rating: 最低評分
            in_stock_only: 僅顯示有貨商品
            sort_by: 排序方式
            limit: 返回數(shù)量
            offset: 偏移量
            
        Returns:
            產(chǎn)品列表
        """
        search_sql = """
        SELECT 
            p.id,
            p.sku,
            p.name,
            p.description,
            p.brand,
            p.base_price,
            p.discount_price,
            p.stock_quantity,
            p.reserved_quantity,
            p.attributes,
            
            -- 計算可用庫存
            p.stock_quantity - p.reserved_quantity as available_quantity,
            
            -- 計算折扣率
            CASE 
                WHEN p.discount_price IS NOT NULL 
                THEN ROUND((1 - p.discount_price / p.base_price) * 100, 1)
                ELSE 0
            END as discount_percentage,
            
            -- 獲取評分信息
            COALESCE(ps.average_rating, 0) as average_rating,
            COALESCE(ps.review_count, 0) as review_count,
            
            -- 計算相關(guān)性得分(如果有關(guān)鍵詞)
            {relevance_score}
            
        FROM products p
        LEFT JOIN product_statistics ps ON p.id = ps.product_id
        
        WHERE 1=1
            {search_condition}
            {category_condition}
            {price_condition}
            {brand_condition}
            {rating_condition}
            {stock_condition}
            
        {order_clause}
        
        LIMIT %s OFFSET %s
        """
        
        # 構(gòu)建查詢條件
        conditions = []
        params = []
        
        # 全文搜索條件
        if query:
            conditions.append("p.search_vector @@ plainto_tsquery('english', %s)")
            params.append(query)
        
        # 分類條件
        if category_id:
            # 獲取分類及其所有子分類
            subcategories = self._get_all_subcategories(category_id)
            if subcategories:
                placeholders = ', '.join(['%s'] * len(subcategories))
                conditions.append(f"p.category_id IN ({placeholders})")
                params.extend(subcategories)
        
        # 價格條件
        if min_price is not None:
            conditions.append("p.base_price >= %s")
            params.append(min_price)
        if max_price is not None:
            conditions.append("p.base_price <= %s")
            params.append(max_price)
        
        # 品牌條件
        if brands:
            placeholders = ', '.join(['%s'] * len(brands))
            conditions.append(f"p.brand IN ({placeholders})")
            params.extend(brands)
        
        # 評分條件
        if min_rating is not None:
            conditions.append("COALESCE(ps.average_rating, 0) >= %s")
            params.append(min_rating)
        
        # 庫存條件
        if in_stock_only:
            conditions.append("(p.stock_quantity - p.reserved_quantity) > 0")
        
        # 構(gòu)建相關(guān)性得分計算
        relevance_score = ""
        if query:
            relevance_score = """
            , ts_rank(
                p.search_vector, 
                plainto_tsquery('english', %s)
            ) as relevance_score
            """
        
        # 構(gòu)建排序子句
        order_clause_map = {
            'relevance': "ORDER BY relevance_score DESC NULLS LAST",
            'price_asc': "ORDER BY p.base_price ASC",
            'price_desc': "ORDER BY p.base_price DESC",
            'rating': "ORDER BY ps.average_rating DESC NULLS LAST",
            'popularity': "ORDER BY ps.total_units_sold DESC NULLS LAST",
            'newest': "ORDER BY p.created_at DESC"
        }
        
        order_clause = order_clause_map.get(sort_by, "ORDER BY p.created_at DESC")
        
        # 格式化SQL
        formatted_sql = search_sql.format(
            relevance_score=relevance_score,
            search_condition=f"AND {' AND '.join(conditions)}" if conditions else "",
            category_condition="",
            price_condition="",
            brand_condition="",
            rating_condition="",
            stock_condition="",
            order_clause=order_clause
        )
        
        # 添加分頁參數(shù)
        params.extend([limit, offset])
        
        try:
            self.cursor.execute(formatted_sql, params)
            products = self.cursor.fetchall()
            
            return [
                {
                    'id': str(product['id']),
                    'sku': product['sku'],
                    'name': product['name'],
                    'brand': product['brand'],
                    'base_price': float(product['base_price']),
                    'discount_price': float(product['discount_price']) if product['discount_price'] else None,
                    'available_quantity': product['available_quantity'],
                    'discount_percentage': product['discount_percentage'],
                    'average_rating': float(product['average_rating']),
                    'review_count': product['review_count'],
                    'attributes': product['attributes']
                }
                for product in products
            ]
            
        except Exception as e:
            logger.error(f"產(chǎn)品搜索失敗: {e}")
            return []
    
    def _get_all_subcategories(self, category_id: str) -> List[str]:
        """
        獲取分類及其所有子分類
        
        Args:
            category_id: 分類ID
            
        Returns:
            子分類ID列表
        """
        recursive_sql = """
        WITH RECURSIVE category_tree AS (
            -- 基礎(chǔ)分類
            SELECT id, parent_id
            FROM categories
            WHERE id = %s
            
            UNION ALL
            
            -- 遞歸獲取子分類
            SELECT c.id, c.parent_id
            FROM categories c
            INNER JOIN category_tree ct ON c.parent_id = ct.id
        )
        SELECT id FROM category_tree
        """
        
        try:
            self.cursor.execute(recursive_sql, (category_id,))
            results = self.cursor.fetchall()
            return [str(row['id']) for row in results]
        except Exception as e:
            logger.error(f"獲取子分類失敗: {e}")
            return []
    
    def get_product_recommendations(
        self, 
        product_id: str, 
        customer_id: Optional[str] = None,
        limit: int = 10
    ) -> List[Dict[str, Any]]:
        """
        獲取產(chǎn)品推薦
        
        Args:
            product_id: 產(chǎn)品ID
            customer_id: 客戶ID(可選)
            limit: 返回數(shù)量
            
        Returns:
            推薦產(chǎn)品列表
        """
        recommendations_sql = """
        -- 基于多種策略的混合推薦
        
        (
            -- 策略1: 同品牌產(chǎn)品
            SELECT 
                p.id,
                p.name,
                p.brand,
                p.base_price,
                'same_brand' as recommendation_reason,
                0.7 as recommendation_score
            FROM products p
            WHERE p.brand = (
                SELECT brand FROM products WHERE id = %s
            )
            AND p.id != %s
            AND (p.stock_quantity - p.reserved_quantity) > 0
            LIMIT 3
        )
        
        UNION ALL
        
        (
            -- 策略2: 同分類熱門產(chǎn)品
            SELECT 
                p.id,
                p.name,
                p.brand,
                p.base_price,
                'popular_in_category' as recommendation_reason,
                ps.total_units_sold::float / 
                    (SELECT MAX(total_units_sold) FROM product_statistics) as recommendation_score
            FROM products p
            JOIN product_statistics ps ON p.id = ps.product_id
            WHERE p.category_id = (
                SELECT category_id FROM products WHERE id = %s
            )
            AND p.id != %s
            AND (p.stock_quantity - p.reserved_quantity) > 0
            ORDER BY ps.total_units_sold DESC
            LIMIT 3
        )
        
        UNION ALL
        
        (
            -- 策略3: 經(jīng)常一起購買的產(chǎn)品
            SELECT 
                p.id,
                p.name,
                p.brand,
                p.base_price,
                'frequently_bought_together' as recommendation_reason,
                COUNT(DISTINCT oi2.order_id)::float / 
                    (SELECT COUNT(DISTINCT oi3.order_id) 
                     FROM order_items oi3 
                     WHERE oi3.product_id = %s) as recommendation_score
            FROM order_items oi1
            JOIN order_items oi2 ON oi1.order_id = oi2.order_id
            JOIN products p ON oi2.product_id = p.id
            WHERE oi1.product_id = %s
            AND oi2.product_id != %s
            AND (p.stock_quantity - p.reserved_quantity) > 0
            GROUP BY p.id, p.name, p.brand, p.base_price
            HAVING COUNT(DISTINCT oi2.order_id) >= 2
            ORDER BY recommendation_score DESC
            LIMIT 3
        )
        
        {customer_based_recommendations}
        
        ORDER BY recommendation_score DESC
        LIMIT %s
        """
        
        # 如果有客戶ID,添加個性化推薦
        customer_recommendations = ""
        if customer_id:
            customer_recommendations = """
            UNION ALL
            
            (
                -- 策略4: 基于客戶購買歷史的推薦
                SELECT 
                    p.id,
                    p.name,
                    p.brand,
                    p.base_price,
                    'based_on_your_purchases' as recommendation_reason,
                    COUNT(DISTINCT oi.order_id)::float / 10 as recommendation_score
                FROM order_items oi
                JOIN products p ON oi.product_id = p.id
                WHERE oi.order_id IN (
                    SELECT order_id 
                    FROM order_items 
                    WHERE product_id = %s
                )
                AND p.id != %s
                AND (p.stock_quantity - p.reserved_quantity) > 0
                GROUP BY p.id, p.name, p.brand, p.base_price
                HAVING COUNT(DISTINCT oi.order_id) >= 1
                ORDER BY recommendation_score DESC
                LIMIT 2
            )
            """
        
        # 格式化SQL
        formatted_sql = recommendations_sql.format(
            customer_based_recommendations=customer_recommendations
        )
        
        # 準(zhǔn)備參數(shù)
        params = [product_id, product_id, product_id, product_id, 
                 product_id, product_id, product_id]
        
        if customer_id:
            params.extend([customer_id, customer_id, product_id, product_id])
        
        params.append(limit)
        
        try:
            self.cursor.execute(formatted_sql, params)
            recommendations = self.cursor.fetchall()
            
            return [
                {
                    'id': str(rec['id']),
                    'name': rec['name'],
                    'brand': rec['brand'],
                    'price': float(rec['base_price']),
                    'recommendation_reason': rec['recommendation_reason'],
                    'recommendation_score': float(rec['recommendation_score'])
                }
                for rec in recommendations
            ]
            
        except Exception as e:
            logger.error(f"獲取推薦失敗: {e}")
            return []
    
    def create_order(
        self,
        customer_id: str,
        items: List[Dict[str, Any]],
        shipping_address: Dict[str, Any],
        promotion_code: Optional[str] = None
    ) -> Optional[Dict[str, Any]]:
        """
        創(chuàng)建訂單
        
        Args:
            customer_id: 客戶ID
            items: 訂單項(xiàng)列表
            shipping_address: 配送地址
            promotion_code: 促銷代碼
            
        Returns:
            創(chuàng)建的訂單信息
        """
        try:
            # 開始事務(wù)
            self.conn.autocommit = False
            
            # 生成訂單號
            order_number = f"ORD-{datetime.now().strftime('%Y%m%d')}-{self._generate_order_suffix()}"
            
            # 計算訂單金額
            order_calculation = self._calculate_order_amounts(items, promotion_code)
            
            # 插入訂單
            order_sql = """
            INSERT INTO orders_partitioned (
                order_number, customer_id, status, order_metadata,
                subtotal, tax_amount, shipping_amount, 
                discount_amount, total_amount, ordered_at
            )
            VALUES (%s, %s, 'PENDING', %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP)
            RETURNING id, order_number, total_amount
            """
            
            self.cursor.execute(
                order_sql,
                (
                    order_number,
                    customer_id,
                    Json({
                        'shipping_address': shipping_address,
                        'promotion_code': promotion_code
                    }),
                    order_calculation['subtotal'],
                    order_calculation['tax_amount'],
                    order_calculation['shipping_amount'],
                    order_calculation['discount_amount'],
                    order_calculation['total_amount']
                )
            )
            
            order_result = self.cursor.fetchone()
            order_id = order_result['id']
            
            # 插入訂單項(xiàng)
            for item in items:
                # 獲取產(chǎn)品快照
                product_snapshot = self._get_product_snapshot(item['product_id'])
                
                # 插入訂單項(xiàng)
                item_sql = """
                INSERT INTO order_items (
                    order_id, product_id, variant_id,
                    product_snapshot, quantity, unit_price,
                    discount_percentage, item_total
                )
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
                """
                
                self.cursor.execute(
                    item_sql,
                    (
                        order_id,
                        item['product_id'],
                        item.get('variant_id'),
                        Json(product_snapshot),
                        item['quantity'],
                        item['unit_price'],
                        item.get('discount_percentage', 0),
                        item['item_total']
                    )
                )
                
                # 更新庫存
                self._update_inventory(
                    product_id=item['product_id'],
                    variant_id=item.get('variant_id'),
                    quantity_change=-item['quantity'],
                    reference_id=order_id,
                    reference_type='ORDER',
                    change_type='SALE'
                )
            
            # 提交事務(wù)
            self.conn.commit()
            
            return {
                'order_id': str(order_id),
                'order_number': order_result['order_number'],
                'total_amount': float(order_result['total_amount']),
                'items_count': len(items)
            }
            
        except Exception as e:
            self.conn.rollback()
            logger.error(f"創(chuàng)建訂單失敗: {e}")
            return None
    
    def _calculate_order_amounts(
        self,
        items: List[Dict[str, Any]],
        promotion_code: Optional[str] = None
    ) -> Dict[str, float]:
        """計算訂單金額"""
        subtotal = sum(item['item_total'] for item in items)
        
        # 應(yīng)用促銷折扣
        discount_amount = 0
        if promotion_code:
            # 這里可以添加促銷邏輯
            pass
        
        # 計算稅費(fèi)(簡化示例)
        tax_amount = subtotal * 0.1  # 10%稅率
        
        # 計算運(yùn)費(fèi)(簡化示例)
        shipping_amount = 5.99 if subtotal < 50 else 0
        
        # 計算總額
        total_amount = subtotal + tax_amount + shipping_amount - discount_amount
        
        return {
            'subtotal': subtotal,
            'tax_amount': tax_amount,
            'shipping_amount': shipping_amount,
            'discount_amount': discount_amount,
            'total_amount': total_amount
        }
    
    def _get_product_snapshot(self, product_id: str) -> Dict[str, Any]:
        """獲取產(chǎn)品快照"""
        snapshot_sql = """
        SELECT 
            id, sku, name, description, brand,
            base_price, attributes
        FROM products
        WHERE id = %s
        """
        
        self.cursor.execute(snapshot_sql, (product_id,))
        product = self.cursor.fetchone()
        
        return {
            'product_id': str(product['id']),
            'sku': product['sku'],
            'name': product['name'],
            'brand': product['brand'],
            'price_at_time_of_purchase': float(product['base_price']),
            'attributes': product['attributes']
        }
    
    def _update_inventory(
        self,
        product_id: str,
        variant_id: Optional[str],
        quantity_change: int,
        reference_id: str,
        reference_type: str,
        change_type: str
    ):
        """更新庫存"""
        # 獲取當(dāng)前庫存
        if variant_id:
            stock_sql = """
            SELECT variant_stock as current_quantity
            FROM product_variants
            WHERE id = %s
            """
            self.cursor.execute(stock_sql, (variant_id,))
        else:
            stock_sql = """
            SELECT stock_quantity as current_quantity
            FROM products
            WHERE id = %s
            """
            self.cursor.execute(stock_sql, (product_id,))
        
        result = self.cursor.fetchone()
        if not result:
            raise Exception("產(chǎn)品不存在")
        
        current_quantity = result['current_quantity']
        new_quantity = current_quantity + quantity_change
        
        if new_quantity < 0:
            raise Exception("庫存不足")
        
        # 更新庫存
        if variant_id:
            update_sql = """
            UPDATE product_variants
            SET variant_stock = %s
            WHERE id = %s
            """
            self.cursor.execute(update_sql, (new_quantity, variant_id))
        else:
            update_sql = """
            UPDATE products
            SET stock_quantity = %s
            WHERE id = %s
            """
            self.cursor.execute(update_sql, (new_quantity, product_id))
        
        # 記錄庫存變更
        log_sql = """
        INSERT INTO inventory_logs (
            product_id, variant_id, change_type,
            quantity_change, previous_quantity, new_quantity,
            reference_id, reference_type
        )
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
        """
        
        self.cursor.execute(
            log_sql,
            (
                product_id,
                variant_id,
                change_type,
                quantity_change,
                current_quantity,
                new_quantity,
                reference_id,
                reference_type
            )
        )
    
    def _generate_order_suffix(self) -> str:
        """生成訂單號后綴"""
        import random
        import string
        
        # 生成6位隨機(jī)字母數(shù)字
        return ''.join(random.choices(
            string.ascii_uppercase + string.digits, k=6
        ))
    
    def refresh_materialized_views(self):
        """刷新物化視圖"""
        refresh_sql = """
        REFRESH MATERIALIZED VIEW CONCURRENTLY product_statistics;
        """
        
        try:
            self.cursor.execute(refresh_sql)
            self.conn.commit()
            logger.info("物化視圖刷新成功")
        except Exception as e:
            self.conn.rollback()
            logger.error(f"刷新物化視圖失敗: {e}")
    
    def close(self):
        """關(guān)閉連接"""
        if self.cursor:
            self.cursor.close()
        if self.conn:
            self.conn.close()
        logger.info("數(shù)據(jù)庫連接已關(guān)閉")


def demonstrate_ecommerce_features():
    """演示電商系統(tǒng)高級特性"""
    dsn = "dbname=ecommerce user=postgres password=password host=localhost port=5432"
    
    print("電商系統(tǒng)PostgreSQL高級特性演示")
    print("=" * 60)
    
    # 創(chuàng)建數(shù)據(jù)庫實(shí)例
    db = ECommerceDatabase(dsn)
    
    try:
        # 1. 創(chuàng)建架構(gòu)
        print("\n1. 創(chuàng)建數(shù)據(jù)庫架構(gòu)...")
        db.create_schema()
        print("   架構(gòu)創(chuàng)建完成")
        
        # 2. 演示搜索功能
        print("\n2. 演示高級產(chǎn)品搜索...")
        products = db.search_products(
            query="wireless headphone",
            min_price=50,
            max_price=200,
            min_rating=4.0,
            in_stock_only=True,
            sort_by='rating',
            limit=5
        )
        
        print(f"   找到 {len(products)} 個產(chǎn)品:")
        for product in products:
            print(f"   - {product['name']} (評分: {product['average_rating']:.1f}, "
                  f"價格: ${product['base_price']:.2f})")
        
        # 3. 演示推薦系統(tǒng)
        if products:
            print("\n3. 演示產(chǎn)品推薦系統(tǒng)...")
            recommendations = db.get_product_recommendations(
                product_id=products[0]['id'],
                limit=5
            )
            
            print(f"   為 '{products[0]['name']}' 的推薦:")
            for rec in recommendations:
                print(f"   - {rec['name']} (原因: {rec['recommendation_reason']}, "
                      f"得分: {rec['recommendation_score']:.2f})")
        
        # 4. 演示訂單創(chuàng)建
        print("\n4. 演示訂單創(chuàng)建...")
        # 這里需要實(shí)際的產(chǎn)品數(shù)據(jù),所以只是演示代碼結(jié)構(gòu)
        print("   訂單創(chuàng)建功能就緒")
        
        # 5. 刷新物化視圖
        print("\n5. 刷新物化視圖...")
        db.refresh_materialized_views()
        print("   物化視圖刷新完成")
        
        print("\n演示完成!")
        
    except Exception as e:
        print(f"演示過程中出錯: {e}")
    finally:
        db.close()


if __name__ == "__main__":
    demonstrate_ecommerce_features()

5. 性能監(jiān)控與調(diào)優(yōu)工具

5.1 PostgreSQL性能監(jiān)控指標(biāo)體系

5.2 關(guān)鍵性能指標(biāo)計算公式

緩沖區(qū)命中率

索引使用效率

緩存命中率

6. 完整代碼示例

"""
PostgreSQL高級特性與性能優(yōu)化完整示例
"""
import psycopg2
from psycopg2.extras import DictCursor, Json
import time
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import json
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class PostgreSQLAdvancedOptimizer:
    """PostgreSQL高級優(yōu)化器"""
    
    def __init__(self, dsn: str):
        """
        初始化優(yōu)化器
        
        Args:
            dsn: 數(shù)據(jù)庫連接字符串
        """
        self.dsn = dsn
        self.conn = psycopg2.connect(dsn)
        self.cursor = self.conn.cursor(cursor_factory=DictCursor)
    
    def analyze_database_health(self) -> Dict[str, Any]:
        """
        全面分析數(shù)據(jù)庫健康狀態(tài)
        
        Returns:
            數(shù)據(jù)庫健康報告
        """
        health_report = {
            'timestamp': datetime.now().isoformat(),
            'overall_score': 0,
            'categories': {},
            'issues': [],
            'recommendations': []
        }
        
        # 分析各個維度
        categories = [
            ('連接與并發(fā)', self._analyze_connections),
            ('查詢性能', self._analyze_query_performance),
            ('索引效率', self._analyze_index_efficiency),
            ('緩存性能', self._analyze_cache_performance),
            ('存儲效率', self._analyze_storage_efficiency),
            ('配置優(yōu)化', self._analyze_configuration)
        ]
        
        total_score = 0
        category_count = 0
        
        for category_name, analyzer_func in categories:
            try:
                category_result = analyzer_func()
                health_report['categories'][category_name] = category_result
                
                if 'score' in category_result:
                    total_score += category_result['score']
                    category_count += 1
                
                if 'issues' in category_result:
                    health_report['issues'].extend(category_result['issues'])
                
                if 'recommendations' in category_result:
                    health_report['recommendations'].extend(category_result['recommendations'])
                    
            except Exception as e:
                logger.error(f"分析{category_name}失敗: {e}")
                health_report['issues'].append({
                    'category': category_name,
                    'severity': 'ERROR',
                    'message': f'分析失敗: {str(e)}'
                })
        
        # 計算總體評分
        if category_count > 0:
            health_report['overall_score'] = total_score / category_count
        
        # 排序問題和建議
        health_report['issues'] = sorted(
            health_report['issues'], 
            key=lambda x: {'CRITICAL': 3, 'HIGH': 2, 'MEDIUM': 1, 'LOW': 0}.get(x.get('severity', 'LOW'), 0),
            reverse=True
        )
        
        health_report['recommendations'] = sorted(
            health_report['recommendations'],
            key=lambda x: x.get('priority', 3),
            reverse=False
        )
        
        return health_report
    
    def _analyze_connections(self) -> Dict[str, Any]:
        """分析連接與并發(fā)"""
        analysis = {
            'score': 100,
            'metrics': {},
            'issues': [],
            'recommendations': []
        }
        
        try:
            # 獲取連接統(tǒng)計
            self.cursor.execute("""
                SELECT 
                    COUNT(*) as total_connections,
                    COUNT(*) FILTER (WHERE state = 'active') as active_connections,
                    COUNT(*) FILTER (WHERE state = 'idle') as idle_connections,
                    COUNT(*) FILTER (WHERE wait_event_type IS NOT NULL) as waiting_connections,
                    MAX(now() - backend_start) as oldest_connection_age
                FROM pg_stat_activity
                WHERE pid <> pg_backend_pid()
            """)
            conn_stats = self.cursor.fetchone()
            
            # 獲取配置參數(shù)
            self.cursor.execute("""
                SELECT setting::integer as max_connections
                FROM pg_settings
                WHERE name = 'max_connections'
            """)
            max_conns = self.cursor.fetchone()['max_connections']
            
            # 計算連接使用率
            conn_usage = conn_stats['total_connections'] / max_conns
            analysis['metrics'] = {
                'total_connections': conn_stats['total_connections'],
                'active_connections': conn_stats['active_connections'],
                'idle_connections': conn_stats['idle_connections'],
                'waiting_connections': conn_stats['waiting_connections'],
                'connection_usage_percentage': round(conn_usage * 100, 1),
                'max_connections': max_conns
            }
            
            # 分析問題
            if conn_usage > 0.8:
                analysis['score'] -= 30
                analysis['issues'].append({
                    'severity': 'HIGH',
                    'message': f'連接使用率過高: {conn_usage:.1%}',
                    'details': '接近最大連接數(shù)限制'
                })
                analysis['recommendations'].append({
                    'priority': 1,
                    'action': '考慮增加max_connections參數(shù)或優(yōu)化連接池配置'
                })
            
            if conn_stats['waiting_connections'] > 5:
                analysis['score'] -= 20
                analysis['issues'].append({
                    'severity': 'MEDIUM',
                    'message': f'等待連接數(shù)較多: {conn_stats["waiting_connections"]}',
                    'details': '可能存在鎖爭用或資源競爭'
                })
            
            if conn_stats['oldest_connection_age'] and \
               conn_stats['oldest_connection_age'].total_seconds() > 3600:
                analysis['score'] -= 10
                analysis['issues'].append({
                    'severity': 'LOW',
                    'message': f'存在長時間連接: {conn_stats["oldest_connection_age"]}',
                    'details': '考慮優(yōu)化連接生命周期'
                })
            
        except Exception as e:
            logger.error(f"連接分析失敗: {e}")
            analysis['score'] = 0
        
        return analysis
    
    def _analyze_query_performance(self) -> Dict[str, Any]:
        """分析查詢性能"""
        analysis = {
            'score': 100,
            'metrics': {},
            'issues': [],
            'recommendations': []
        }
        
        try:
            # 獲取慢查詢統(tǒng)計
            self.cursor.execute("""
                WITH query_stats AS (
                    SELECT 
                        query,
                        calls,
                        total_time,
                        mean_time,
                        rows,
                        100.0 * shared_blks_hit / nullif(shared_blks_hit + shared_blks_read, 0) as buffer_hit_rate,
                        ROW_NUMBER() OVER (ORDER BY total_time DESC) as time_rank
                    FROM pg_stat_statements
                    WHERE query NOT LIKE '%pg_stat_statements%'
                    AND calls > 0
                )
                SELECT 
                    COUNT(*) as total_queries,
                    SUM(calls) as total_calls,
                    SUM(total_time) as total_time_ms,
                    AVG(mean_time) as avg_query_time_ms,
                    PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY mean_time) as p95_query_time_ms,
                    SUM(CASE WHEN mean_time > 100 THEN 1 ELSE 0 END) as slow_queries_count,
                    AVG(buffer_hit_rate) as avg_buffer_hit_rate
                FROM query_stats
            """)
            query_stats = self.cursor.fetchone()
            
            analysis['metrics'] = {
                'total_queries': query_stats['total_queries'],
                'total_calls': query_stats['total_calls'],
                'total_time_seconds': round(query_stats['total_time_ms'] / 1000, 1),
                'avg_query_time_ms': round(query_stats['avg_query_time_ms'], 2),
                'p95_query_time_ms': round(query_stats['p95_query_time_ms'], 2),
                'slow_queries_count': query_stats['slow_queries_count'],
                'avg_buffer_hit_rate': round(query_stats['avg_buffer_hit_rate'], 1)
            }
            
            # 分析問題
            if query_stats['avg_query_time_ms'] > 50:
                analysis['score'] -= 20
                analysis['issues'].append({
                    'severity': 'HIGH',
                    'message': f'平均查詢時間較高: {query_stats["avg_query_time_ms"]:.2f}ms',
                    'details': '可能存在查詢優(yōu)化空間'
                })
                analysis['recommendations'].append({
                    'priority': 1,
                    'action': '分析并優(yōu)化最耗時的查詢'
                })
            
            if query_stats['slow_queries_count'] > 10:
                analysis['score'] -= 15
                analysis['issues'].append({
                    'severity': 'MEDIUM',
                    'message': f'發(fā)現(xiàn) {query_stats["slow_queries_count"]} 個慢查詢',
                    'details': '定義: 平均執(zhí)行時間 > 100ms'
                })
            
            if query_stats['avg_buffer_hit_rate'] < 90:
                analysis['score'] -= 10
                analysis['issues'].append({
                    'severity': 'MEDIUM',
                    'message': f'平均緩沖區(qū)命中率較低: {query_stats["avg_buffer_hit_rate"]:.1f}%',
                    'details': '建議增加shared_buffers或優(yōu)化查詢'
                })
            
        except Exception as e:
            logger.error(f"查詢性能分析失敗: {e}")
            analysis['score'] = 0
        
        return analysis
    
    def _analyze_index_efficiency(self) -> Dict[str, Any]:
        """分析索引效率"""
        analysis = {
            'score': 100,
            'metrics': {},
            'issues': [],
            'recommendations': []
        }
        
        try:
            # 獲取索引使用統(tǒng)計
            self.cursor.execute("""
                WITH index_stats AS (
                    SELECT 
                        schemaname,
                        tablename,
                        indexname,
                        idx_scan as index_scans,
                        idx_tup_read as tuples_read,
                        idx_tup_fetch as tuples_fetched,
                        pg_relation_size(indexname::regclass) as index_size_bytes,
                        CASE 
                            WHEN idx_scan = 0 THEN 0
                            ELSE idx_tup_fetch::float / idx_tup_read * 100
                        END as index_efficiency
                    FROM pg_stat_user_indexes
                    WHERE schemaname NOT LIKE 'pg_%'
                )
                SELECT 
                    COUNT(*) as total_indexes,
                    SUM(index_size_bytes) as total_index_size_bytes,
                    SUM(CASE WHEN index_scans = 0 THEN 1 ELSE 0 END) as unused_indexes_count,
                    AVG(index_efficiency) as avg_index_efficiency,
                    PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY index_efficiency) as median_index_efficiency
                FROM index_stats
            """)
            index_stats = self.cursor.fetchone()
            
            analysis['metrics'] = {
                'total_indexes': index_stats['total_indexes'],
                'total_index_size_gb': round(index_stats['total_index_size_bytes'] / (1024**3), 2),
                'unused_indexes_count': index_stats['unused_indexes_count'],
                'avg_index_efficiency': round(index_stats['avg_index_efficiency'], 1),
                'median_index_efficiency': round(index_stats['median_index_efficiency'], 1)
            }
            
            # 分析問題
            unused_ratio = index_stats['unused_indexes_count'] / index_stats['total_indexes'] \
                if index_stats['total_indexes'] > 0 else 0
            
            if unused_ratio > 0.1:
                analysis['score'] -= 25
                analysis['issues'].append({
                    'severity': 'MEDIUM',
                    'message': f'未使用索引比例較高: {unused_ratio:.1%}',
                    'details': f'{index_stats["unused_indexes_count"]} 個索引從未使用'
                })
                analysis['recommendations'].append({
                    'priority': 2,
                    'action': '考慮刪除未使用的索引以節(jié)省空間和提高寫入性能'
                })
            
            if index_stats['avg_index_efficiency'] < 50:
                analysis['score'] -= 15
                analysis['issues'].append({
                    'severity': 'MEDIUM',
                    'message': f'平均索引效率較低: {index_stats["avg_index_efficiency"]:.1f}%',
                    'details': '索引可能不是最優(yōu)選擇'
                })
            
        except Exception as e:
            logger.error(f"索引效率分析失敗: {e}")
            analysis['score'] = 0
        
        return analysis
    
    def _analyze_cache_performance(self) -> Dict[str, Any]:
        """分析緩存性能"""
        analysis = {
            'score': 100,
            'metrics': {},
            'issues': [],
            'recommendations': []
        }
        
        try:
            # 獲取緩存統(tǒng)計
            self.cursor.execute("""
                SELECT 
                    -- 共享緩沖區(qū)命中率
                    CASE 
                        WHEN blks_hit + blks_read > 0 
                        THEN blks_hit::float / (blks_hit + blks_read) * 100
                        ELSE 0 
                    END as shared_buffer_hit_rate,
                    
                    -- TOAST緩沖區(qū)命中率
                    CASE 
                        WHEN toast_blks_hit + toast_blks_read > 0 
                        THEN toast_blks_hit::float / (toast_blks_hit + toast_blks_read) * 100
                        ELSE 0 
                    END as toast_buffer_hit_rate,
                    
                    -- 臨時文件使用
                    temp_files,
                    temp_bytes,
                    
                    -- 檢查點(diǎn)統(tǒng)計
                    checkpoints_timed,
                    checkpoints_req,
                    checkpoint_write_time,
                    checkpoint_sync_time
                    
                FROM pg_stat_bgwriter
            """)
            cache_stats = self.cursor.fetchone()
            
            # 獲取緩沖區(qū)配置
            self.cursor.execute("""
                SELECT 
                    name,
                    setting,
                    unit
                FROM pg_settings
                WHERE name IN ('shared_buffers', 'effective_cache_size')
            """)
            cache_config = {row['name']: row for row in self.cursor.fetchall()}
            
            analysis['metrics'] = {
                'shared_buffer_hit_rate': round(cache_stats['shared_buffer_hit_rate'], 1),
                'toast_buffer_hit_rate': round(cache_stats['toast_buffer_hit_rate'], 1),
                'temp_files_count': cache_stats['temp_files'],
                'temp_files_size_gb': round(cache_stats['temp_bytes'] / (1024**3), 2),
                'shared_buffers': cache_config.get('shared_buffers', {}).get('setting', 'N/A'),
                'effective_cache_size': cache_config.get('effective_cache_size', {}).get('setting', 'N/A')
            }
            
            # 分析問題
            if cache_stats['shared_buffer_hit_rate'] < 90:
                analysis['score'] -= 20
                analysis['issues'].append({
                    'severity': 'MEDIUM',
                    'message': f'共享緩沖區(qū)命中率較低: {cache_stats["shared_buffer_hit_rate"]:.1f}%',
                    'details': '建議增加shared_buffers或優(yōu)化工作集'
                })
                analysis['recommendations'].append({
                    'priority': 2,
                    'action': '考慮增加shared_buffers參數(shù)'
                })
            
            if cache_stats['temp_files'] > 100:
                analysis['score'] -= 15
                analysis['issues'].append({
                    'severity': 'MEDIUM',
                    'message': f'臨時文件使用較多: {cache_stats["temp_files"]} 個文件',
                    'details': '可能存在排序或哈希操作溢出到磁盤'
                })
                analysis['recommendations'].append({
                    'priority': 2,
                    'action': '增加work_mem參數(shù)以減少臨時文件使用'
                })
            
        except Exception as e:
            logger.error(f"緩存性能分析失敗: {e}")
            analysis['score'] = 0
        
        return analysis
    
    def _analyze_storage_efficiency(self) -> Dict[str, Any]:
        """分析存儲效率"""
        analysis = {
            'score': 100,
            'metrics': {},
            'issues': [],
            'recommendations': []
        }
        
        try:
            # 獲取表膨脹信息
            self.cursor.execute("""
                SELECT 
                    schemaname,
                    tablename,
                    n_dead_tup as dead_tuples,
                    n_live_tup as live_tuples,
                    CASE 
                        WHEN n_live_tup > 0 
                        THEN n_dead_tup::float / n_live_tup * 100
                        ELSE 0 
                    END as dead_tuple_ratio,
                    last_vacuum,
                    last_autovacuum,
                    last_analyze,
                    last_autoanalyze
                FROM pg_stat_user_tables
                WHERE schemaname NOT LIKE 'pg_%'
                ORDER BY dead_tuple_ratio DESC
                LIMIT 10
            """)
            table_stats = self.cursor.fetchall()
            
            # 獲取數(shù)據(jù)庫大小
            self.cursor.execute("""
                SELECT 
                    pg_database_size(current_database()) as database_size_bytes,
                    pg_size_pretty(pg_database_size(current_database())) as database_size_pretty
            """)
            db_size = self.cursor.fetchone()
            
            analysis['metrics'] = {
                'database_size': db_size['database_size_pretty'],
                'tables_analyzed': len(table_stats),
                'top_tables_by_dead_tuples': [
                    {
                        'table': f"{row['schemaname']}.{row['tablename']}",
                        'dead_tuple_ratio': round(row['dead_tuple_ratio'], 1),
                        'dead_tuples': row['dead_tuples'],
                        'last_vacuum': row['last_vacuum']
                    }
                    for row in table_stats[:5]
                ]
            }
            
            # 分析問題
            high_dead_tables = [
                row for row in table_stats 
                if row['dead_tuple_ratio'] > 20
            ]
            
            if high_dead_tables:
                analysis['score'] -= 25
                analysis['issues'].append({
                    'severity': 'MEDIUM',
                    'message': f'發(fā)現(xiàn) {len(high_dead_tables)} 個表死元組比例超過20%',
                    'details': '可能導(dǎo)致查詢性能下降和存儲空間浪費(fèi)'
                })
                analysis['recommendations'].append({
                    'priority': 2,
                    'action': '對高死元組比例的表執(zhí)行VACUUM操作'
                })
            
        except Exception as e:
            logger.error(f"存儲效率分析失敗: {e}")
            analysis['score'] = 0
        
        return analysis
    
    def _analyze_configuration(self) -> Dict[str, Any]:
        """分析配置優(yōu)化"""
        analysis = {
            'score': 100,
            'metrics': {},
            'issues': [],
            'recommendations': []
        }
        
        try:
            # 獲取關(guān)鍵配置參數(shù)
            self.cursor.execute("""
                SELECT 
                    name,
                    setting,
                    unit,
                    context,
                    vartype,
                    source
                FROM pg_settings
                WHERE name IN (
                    'shared_buffers',
                    'work_mem',
                    'maintenance_work_mem',
                    'effective_cache_size',
                    'max_connections',
                    'checkpoint_timeout',
                    'checkpoint_completion_target',
                    'wal_buffers',
                    'random_page_cost',
                    'seq_page_cost',
                    'effective_io_concurrency'
                )
                ORDER BY name
            """)
            configs = {row['name']: row for row in self.cursor.fetchall()}
            
            analysis['metrics'] = {
                'key_parameters': configs
            }
            
            # 檢查配置合理性
            issues = []
            recommendations = []
            
            # 檢查shared_buffers(應(yīng)為系統(tǒng)內(nèi)存的25%)
            if 'shared_buffers' in configs:
                shared_buffers = configs['shared_buffers']['setting']
                if shared_buffers.endswith('MB'):
                    mb_value = int(shared_buffers[:-2])
                    if mb_value < 128:  # 小于128MB
                        issues.append('shared_buffers設(shè)置可能過小')
                        recommendations.append('考慮增加shared_buffers到系統(tǒng)內(nèi)存的25%')
            
            # 檢查work_mem
            if 'work_mem' in configs:
                work_mem = configs['work_mem']['setting']
                if work_mem.endswith('kB'):
                    kb_value = int(work_mem[:-2])
                    if kb_value < 4096:  # 小于4MB
                        issues.append('work_mem設(shè)置可能過小')
                        recommendations.append('適當(dāng)增加work_mem以減少臨時文件使用')
            
            # 檢查checkpoint配置
            if 'checkpoint_timeout' in configs:
                checkpoint_timeout = int(configs['checkpoint_timeout']['setting'])
                if checkpoint_timeout > 900:  # 超過15分鐘
                    issues.append('checkpoint_timeout設(shè)置過長')
                    recommendations.append('考慮減少checkpoint_timeout以降低恢復(fù)時間')
            
            if issues:
                analysis['score'] -= len(issues) * 10
                for issue in issues:
                    analysis['issues'].append({
                        'severity': 'MEDIUM',
                        'message': issue
                    })
                
                for rec in recommendations:
                    analysis['recommendations'].append({
                        'priority': 3,
                        'action': rec
                    })
            
        except Exception as e:
            logger.error(f"配置分析失敗: {e}")
            analysis['score'] = 0
        
        return analysis
    
    def generate_optimization_report(self) -> Dict[str, Any]:
        """
        生成優(yōu)化報告
        
        Returns:
            優(yōu)化報告
        """
        health_report = self.analyze_database_health()
        
        report = {
            'summary': {
                'overall_score': health_report['overall_score'],
                'assessment': self._get_assessment(health_report['overall_score']),
                'total_issues': len(health_report['issues']),
                'total_recommendations': len(health_report['recommendations'])
            },
            'detailed_analysis': health_report['categories'],
            'critical_issues': [
                issue for issue in health_report['issues']
                if issue.get('severity') in ['CRITICAL', 'HIGH']
            ],
            'optimization_plan': self._create_optimization_plan(health_report),
            'execution_checklist': self._create_execution_checklist()
        }
        
        return report
    
    def _get_assessment(self, score: float) -> str:
        """根據(jù)評分獲取評估結(jié)果"""
        if score >= 90:
            return 'EXCELLENT'
        elif score >= 80:
            return 'GOOD'
        elif score >= 70:
            return 'FAIR'
        elif score >= 60:
            return 'NEEDS_IMPROVEMENT'
        else:
            return 'POOR'
    
    def _create_optimization_plan(self, health_report: Dict[str, Any]) -> List[Dict[str, Any]]:
        """創(chuàng)建優(yōu)化計劃"""
        plan = []
        
        # 按優(yōu)先級排序建議
        sorted_recommendations = sorted(
            health_report['recommendations'],
            key=lambda x: x.get('priority', 3)
        )
        
        for i, rec in enumerate(sorted_recommendations[:10], 1):  # 取前10個
            plan.append({
                'step': i,
                'action': rec['action'],
                'priority': rec.get('priority', 3),
                'estimated_effort': self._estimate_effort(rec['action']),
                'expected_impact': self._estimate_impact(rec['action'])
            })
        
        return plan
    
    def _estimate_effort(self, action: str) -> str:
        """估計實(shí)施難度"""
        low_effort_keywords = ['調(diào)整', '設(shè)置', '啟用', '禁用']
        medium_effort_keywords = ['優(yōu)化', '重構(gòu)', '重建', '遷移']
        high_effort_keywords = ['重寫', '重構(gòu)架構(gòu)', '數(shù)據(jù)遷移', '集群擴(kuò)展']
        
        action_lower = action.lower()
        
        if any(keyword in action_lower for keyword in high_effort_keywords):
            return 'HIGH'
        elif any(keyword in action_lower for keyword in medium_effort_keywords):
            return 'MEDIUM'
        else:
            return 'LOW'
    
    def _estimate_impact(self, action: str) -> str:
        """估計影響程度"""
        high_impact_keywords = ['性能提升', '顯著改善', '根本解決', '關(guān)鍵修復(fù)']
        medium_impact_keywords = ['優(yōu)化', '改進(jìn)', '增強(qiáng)', '調(diào)整']
        low_impact_keywords = ['微調(diào)', '小優(yōu)化', '維護(hù)', '清理']
        
        action_lower = action.lower()
        
        if any(keyword in action_lower for keyword in high_impact_keywords):
            return 'HIGH'
        elif any(keyword in action_lower for keyword in medium_impact_keywords):
            return 'MEDIUM'
        else:
            return 'LOW'
    
    def _create_execution_checklist(self) -> List[Dict[str, Any]]:
        """創(chuàng)建執(zhí)行檢查清單"""
        checklist = [
            {
                'phase': '準(zhǔn)備階段',
                'tasks': [
                    {'task': '備份數(shù)據(jù)庫', 'completed': False},
                    {'task': '驗(yàn)證備份完整性', 'completed': False},
                    {'task': '準(zhǔn)備回滾計劃', 'completed': False},
                    {'task': '安排維護(hù)窗口', 'completed': False}
                ]
            },
            {
                'phase': '實(shí)施階段',
                'tasks': [
                    {'task': '應(yīng)用配置變更', 'completed': False},
                    {'task': '執(zhí)行索引優(yōu)化', 'completed': False},
                    {'task': '運(yùn)行VACUUM操作', 'completed': False},
                    {'task': '更新統(tǒng)計信息', 'completed': False}
                ]
            },
            {
                'phase': '驗(yàn)證階段',
                'tasks': [
                    {'task': '驗(yàn)證性能改進(jìn)', 'completed': False},
                    {'task': '運(yùn)行回歸測試', 'completed': False},
                    {'task': '監(jiān)控系統(tǒng)穩(wěn)定性', 'completed': False},
                    {'task': '更新文檔記錄', 'completed': False}
                ]
            }
        ]
        
        return checklist
    
    def close(self):
        """關(guān)閉連接"""
        if self.cursor:
            self.cursor.close()
        if self.conn:
            self.conn.close()
        logger.info("數(shù)據(jù)庫連接已關(guān)閉")


def demonstrate_optimization():
    """演示優(yōu)化功能"""
    dsn = "dbname=testdb user=postgres password=password host=localhost port=5432"
    
    print("PostgreSQL高級優(yōu)化演示")
    print("=" * 60)
    
    optimizer = PostgreSQLAdvancedOptimizer(dsn)
    
    try:
        # 生成健康報告
        print("\n生成數(shù)據(jù)庫健康報告...")
        health_report = optimizer.analyze_database_health()
        
        print(f"\n總體評分: {health_report['overall_score']:.1f}/100")
        print(f"發(fā)現(xiàn)問題: {len(health_report['issues'])} 個")
        print(f"優(yōu)化建議: {len(health_report['recommendations'])} 條")
        
        # 顯示關(guān)鍵問題
        critical_issues = [
            issue for issue in health_report['issues']
            if issue.get('severity') in ['CRITICAL', 'HIGH']
        ]
        
        if critical_issues:
            print("\n關(guān)鍵問題:")
            for issue in critical_issues[:3]:
                print(f"  ? [{issue['severity']}] {issue['message']}")
        
        # 顯示高優(yōu)先級建議
        high_priority_recs = [
            rec for rec in health_report['recommendations']
            if rec.get('priority', 3) == 1
        ]
        
        if high_priority_recs:
            print("\n高優(yōu)先級建議:")
            for rec in high_priority_recs[:3]:
                print(f"  ? {rec['action']}")
        
        # 生成詳細(xì)報告
        print("\n\n生成詳細(xì)優(yōu)化報告...")
        report = optimizer.generate_optimization_report()
        
        print(f"\n優(yōu)化計劃 ({len(report['optimization_plan'])} 個步驟):")
        for step in report['optimization_plan'][:5]:
            print(f"  步驟{step['step']}: {step['action']}")
            print(f"     優(yōu)先級: {step['priority']}, 難度: {step['estimated_effort']}, "
                  f"影響: {step['expected_impact']}")
        
        print("\n執(zhí)行檢查清單:")
        for phase in report['execution_checklist']:
            print(f"  {phase['phase']}:")
            for task in phase['tasks']:
                status = '?' if task['completed'] else '○'
                print(f"    {status} {task['task']}")
        
        print("\n優(yōu)化演示完成!")
        
    except Exception as e:
        print(f"演示過程中出錯: {e}")
    finally:
        optimizer.close()


if __name__ == "__main__":
    demonstrate_optimization()

7. 代碼自查與最佳實(shí)踐

7.1 代碼質(zhì)量檢查清單

為確保PostgreSQL相關(guān)代碼的質(zhì)量,應(yīng)遵循以下檢查清單:

"""
PostgreSQL代碼質(zhì)量自查工具
"""
import re
import ast
from typing import List, Dict, Any, Set
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class PostgreSQLCodeChecker:
    """PostgreSQL代碼質(zhì)量檢查器"""
    
    def __init__(self):
        self.rules = {
            'sql_injection': self.check_sql_injection,
            'connection_management': self.check_connection_management,
            'transaction_handling': self.check_transaction_handling,
            'index_usage': self.check_index_usage,
            'data_type_validation': self.check_data_type_validation,
            'error_handling': self.check_error_handling,
            'performance_anti_patterns': self.check_performance_anti_patterns
        }
        
    def check_file(self, filepath: str) -> Dict[str, List[Dict[str, Any]]]:
        """
        檢查Python文件中的PostgreSQL相關(guān)代碼
        
        Args:
            filepath: Python文件路徑
            
        Returns:
            檢查結(jié)果
        """
        with open(filepath, 'r', encoding='utf-8') as f:
            content = f.read()
        
        issues = {}
        
        for rule_name, rule_func in self.rules.items():
            rule_issues = rule_func(content, filepath)
            if rule_issues:
                issues[rule_name] = rule_issues
        
        return issues
    
    def check_sql_injection(self, content: str, filepath: str) -> List[Dict[str, Any]]:
        """
        檢查SQL注入漏洞
        
        Args:
            content: 文件內(nèi)容
            filepath: 文件路徑
            
        Returns:
            問題列表
        """
        issues = []
        
        # 查找可能的字符串拼接SQL
        patterns = [
            (r'execute\s*\(.*?\s*%\s*.*?\)', '使用字符串格式化執(zhí)行SQL'),
            (r'execute\s*\(.*?\s*\+\s*.*?\)', '使用字符串拼接執(zhí)行SQL'),
            (r'executemany\s*\(.*?\s*%\s*.*?\)', '使用字符串格式化執(zhí)行批量SQL'),
            (r'f"SELECT.*{.*}.*"', '使用f-string直接嵌入變量'),
        ]
        
        lines = content.split('\n')
        for i, line in enumerate(lines, 1):
            for pattern, description in patterns:
                if re.search(pattern, line, re.IGNORECASE):
                    issues.append({
                        'line': i,
                        'severity': 'CRITICAL',
                        'message': f'潛在的SQL注入漏洞: {description}',
                        'suggestion': '使用參數(shù)化查詢(%s占位符)',
                        'code_snippet': line.strip()[:100]
                    })
        
        return issues
    
    def check_connection_management(self, content: str, filepath: str) -> List[Dict[str, Any]]:
        """
        檢查數(shù)據(jù)庫連接管理
        
        Args:
            content: 文件內(nèi)容
            filepath: 文件路徑
            
        Returns:
            問題列表
        """
        issues = []
        lines = content.split('\n')
        
        # 查找連接創(chuàng)建但不關(guān)閉的情況
        connect_pattern = r'psycopg2\.connect\(|connect\('
        close_pattern = r'\.close\(\)'
        
        in_function = False
        function_start = 0
        connect_lines = []
        
        for i, line in enumerate(lines, 1):
            # 檢測函數(shù)開始
            if line.strip().startswith('def '):
                in_function = True
                function_start = i
                connect_lines = []
            
            # 檢測連接創(chuàng)建
            if re.search(connect_pattern, line):
                connect_lines.append(i)
            
            # 檢測連接關(guān)閉
            if re.search(close_pattern, line) and 'close' in line:
                if connect_lines:
                    connect_lines.pop()
            
            # 檢測函數(shù)結(jié)束
            if line.strip() == '' or i == len(lines):
                if in_function and connect_lines:
                    for connect_line in connect_lines:
                        issues.append({
                            'line': connect_line,
                            'severity': 'HIGH',
                            'message': '數(shù)據(jù)庫連接可能未正確關(guān)閉',
                            'suggestion': '確保在finally塊中關(guān)閉連接',
                            'context': f'函數(shù)開始于第{function_start}行'
                        })
                in_function = False
        
        return issues
    
    def check_transaction_handling(self, content: str, filepath: str) -> List[Dict[str, Any]]:
        """
        檢查事務(wù)處理
        
        Args:
            content: 文件內(nèi)容
            filepath: 文件路徑
            
        Returns:
            問題列表
        """
        issues = []
        lines = content.split('\n')
        
        # 查找沒有明確事務(wù)管理的操作
        write_operations = [
            'INSERT', 'UPDATE', 'DELETE', 'CREATE', 'ALTER', 'DROP'
        ]
        
        transaction_keywords = [
            'BEGIN', 'COMMIT', 'ROLLBACK', 'autocommit',
            'set_session', 'transaction'
        ]
        
        in_write_operation = False
        has_transaction_control = False
        
        for i, line in enumerate(lines, 1):
            line_upper = line.upper()
            
            # 檢查是否有寫操作
            if any(op in line_upper for op in write_operations):
                in_write_operation = True
            
            # 檢查是否有事務(wù)控制
            if any(keyword in line for keyword in transaction_keywords):
                has_transaction_control = True
            
            # 如果是空行或注釋,檢查之前的操作
            if line.strip() == '' or line.strip().startswith('#'):
                if in_write_operation and not has_transaction_control:
                    issues.append({
                        'line': i - 1,
                        'severity': 'MEDIUM',
                        'message': '寫操作沒有顯式的事務(wù)管理',
                        'suggestion': '使用明確的事務(wù)控制(BEGIN/COMMIT/ROLLBACK)',
                        'context': '多個寫操作應(yīng)該在同一事務(wù)中'
                    })
                
                in_write_operation = False
                has_transaction_control = False
        
        return issues
    
    def check_index_usage(self, content: str, filepath: str) -> List[Dict[str, Any]]:
        """
        檢查索引使用
        
        Args:
            content: 文件內(nèi)容
            filepath: 文件路徑
            
        Returns:
            問題列表
        """
        issues = []
        lines = content.split('\n')
        
        # 查找可能受益于索引的查詢模式
        index_patterns = [
            (r'WHERE\s+\w+\s*=\s*', '等值查詢'),
            (r'WHERE\s+\w+\s+IN\s*\(', 'IN列表查詢'),
            (r'WHERE\s+\w+\s+LIKE\s+\'', 'LIKE查詢(可能前綴匹配)'),
            (r'ORDER\s+BY\s+\w+', '排序操作'),
            (r'GROUP\s+BY\s+\w+', '分組操作'),
            (r'JOIN\s+\w+\s+ON\s+\w+\s*=\s*\w+', '連接操作')
        ]
        
        for i, line in enumerate(lines, 1):
            line_upper = line.upper()
            
            # 跳過注釋
            if line.strip().startswith('#'):
                continue
            
            for pattern, description in index_patterns:
                if re.search(pattern, line_upper, re.IGNORECASE):
                    issues.append({
                        'line': i,
                        'severity': 'LOW',
                        'message': f'查詢可能受益于索引: {description}',
                        'suggestion': '考慮在相關(guān)列上創(chuàng)建索引',
                        'code_snippet': line.strip()[:100]
                    })
                    break
        
        return issues
    
    def generate_report(self, issues: Dict[str, List[Dict[str, Any]]]) -> str:
        """生成檢查報告"""
        report_lines = []
        report_lines.append("=" * 60)
        report_lines.append("PostgreSQL代碼質(zhì)量檢查報告")
        report_lines.append("=" * 60)
        
        total_issues = sum(len(rule_issues) for rule_issues in issues.values())
        report_lines.append(f"\n總共發(fā)現(xiàn) {total_issues} 個問題\n")
        
        severity_counts = {'CRITICAL': 0, 'HIGH': 0, 'MEDIUM': 0, 'LOW': 0}
        
        for rule_name, rule_issues in issues.items():
            if rule_issues:
                report_lines.append(f"\n{rule_name.upper()} ({len(rule_issues)}個問題):")
                report_lines.append("-" * 40)
                
                for issue in rule_issues:
                    severity = issue.get('severity', 'UNKNOWN')
                    severity_counts[severity] = severity_counts.get(severity, 0) + 1
                    
                    report_lines.append(
                        f"[{severity}] 第{issue['line']}行: {issue['message']}"
                    )
                    if 'suggestion' in issue:
                        report_lines.append(f"    建議: {issue['suggestion']}")
                    if 'code_snippet' in issue:
                        report_lines.append(f"    代碼: {issue['code_snippet']}")
                    report_lines.append("")
        
        # 添加嚴(yán)重性統(tǒng)計
        report_lines.append("\n嚴(yán)重性統(tǒng)計:")
        report_lines.append("-" * 40)
        for severity, count in severity_counts.items():
            if count > 0:
                report_lines.append(f"  {severity}: {count} 個問題")
        
        return '\n'.join(report_lines)


def check_postgresql_best_practices():
    """PostgreSQL最佳實(shí)踐檢查示例"""
    checker = PostgreSQLCodeChecker()
    
    # 示例代碼
    sample_code = """
import psycopg2

# 不好的示例:字符串拼接SQL(SQL注入風(fēng)險)
def bad_example(user_input):
    conn = psycopg2.connect("dbname=test")
    cursor = conn.cursor()
    
    # SQL注入漏洞
    query = f"SELECT * FROM users WHERE username = '{user_input}'"
    cursor.execute(query)
    
    # 連接未關(guān)閉
    return cursor.fetchall()

# 好的示例:參數(shù)化查詢
def good_example(username):
    conn = None
    cursor = None
    try:
        conn = psycopg2.connect("dbname=test")
        cursor = conn.cursor()
        
        # 參數(shù)化查詢
        query = "SELECT * FROM users WHERE username = %s"
        cursor.execute(query, (username,))
        
        return cursor.fetchall()
    finally:
        if cursor:
            cursor.close()
        if conn:
            conn.close()

# 事務(wù)處理示例
def transaction_example(user_data):
    conn = psycopg2.connect("dbname=test")
    cursor = conn.cursor()
    
    try:
        # 開始事務(wù)
        conn.autocommit = False
        
        # 多個寫操作
        cursor.execute(
            "INSERT INTO users (name, email) VALUES (%s, %s)",
            (user_data['name'], user_data['email'])
        )
        
        cursor.execute(
            "INSERT INTO logs (action) VALUES (%s)",
            ('user_created',)
        )
        
        # 提交事務(wù)
        conn.commit()
        
    except Exception as e:
        # 回滾事務(wù)
        conn.rollback()
        raise e
    finally:
        cursor.close()
        conn.close()

# 可能受益于索引的查詢
def query_without_index():
    conn = psycopg2.connect("dbname=test")
    cursor = conn.cursor()
    
    # 這個查詢可能受益于索引
    cursor.execute("""
        SELECT * FROM orders 
        WHERE customer_id = %s 
        AND order_date > %s
        ORDER BY order_date DESC
    """, (123, '2024-01-01'))
    
    return cursor.fetchall()
    """
    
    # 將示例代碼寫入臨時文件
    import tempfile
    with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
        f.write(sample_code)
        temp_file = f.name
    
    try:
        # 檢查代碼
        issues = checker.check_file(temp_file)
        report = checker.generate_report(issues)
        print(report)
    finally:
        import os
        os.unlink(temp_file)


if __name__ == "__main__":
    check_postgresql_best_practices()

7.2 PostgreSQL最佳實(shí)踐總結(jié)

連接管理最佳實(shí)踐

  • 使用連接池管理數(shù)據(jù)庫連接
  • 確保連接在finally塊中正確關(guān)閉
  • 設(shè)置合適的連接超時和重試策略

查詢優(yōu)化最佳實(shí)踐

  • 使用EXPLAIN ANALYZE分析查詢計劃
  • 為頻繁查詢的列創(chuàng)建合適索引
  • 避免SELECT *,只選擇需要的列
  • 使用LIMIT限制返回行數(shù)

事務(wù)管理最佳實(shí)踐

  • 保持事務(wù)盡可能短小
  • 使用合適的隔離級別
  • 處理異常并正確回滾
  • 避免長時間持有鎖

索引設(shè)計最佳實(shí)踐

  • 基于查詢模式設(shè)計復(fù)合索引
  • 定期分析和重建索引
  • 使用部分索引減少索引大小
  • 考慮BRIN索引用于時間序列數(shù)據(jù)

配置優(yōu)化最佳實(shí)踐

  • 根據(jù)工作負(fù)載調(diào)整shared_buffers
  • 設(shè)置合適的work_mem減少臨時文件
  • 配置有效的維護(hù)工作內(nèi)存
  • 定期更新統(tǒng)計信息

8. 總結(jié)與展望

PostgreSQL作為功能最豐富的開源數(shù)據(jù)庫,其高級特性和性能優(yōu)化能力使其能夠應(yīng)對各種復(fù)雜的應(yīng)用場景。通過本文的深入探討,我們了解到:

8.1 核心要點(diǎn)總結(jié)

  • 高級特性:JSONB、全文搜索、分區(qū)表、物化視圖等特性使PostgreSQL能夠處理多樣化數(shù)據(jù)需求
  • 性能優(yōu)化:合理的索引設(shè)計、查詢優(yōu)化、配置調(diào)整是提升性能的關(guān)鍵
  • 并發(fā)控制:MVCC機(jī)制和適當(dāng)?shù)逆i策略確保高并發(fā)下的數(shù)據(jù)一致性
  • 監(jiān)控維護(hù):系統(tǒng)化監(jiān)控和定期維護(hù)保證數(shù)據(jù)庫長期穩(wěn)定運(yùn)行

8.2 未來發(fā)展趨勢

  • 人工智能集成:PostgreSQL的MADlib擴(kuò)展支持機(jī)器學(xué)習(xí)算法
  • 時序數(shù)據(jù)優(yōu)化:TimescaleDB擴(kuò)展提供專業(yè)時序數(shù)據(jù)處理能力
  • 地理空間增強(qiáng):PostGIS繼續(xù)擴(kuò)展地理信息系統(tǒng)功能
  • 云原生支持:更好的Kubernetes集成和云服務(wù)優(yōu)化
  • 向量搜索:對AI生成內(nèi)容(AIGC)的向量相似度搜索支持

通過持續(xù)學(xué)習(xí)和實(shí)踐,開發(fā)者可以充分利用PostgreSQL的強(qiáng)大功能,構(gòu)建高效、穩(wěn)定、可擴(kuò)展的數(shù)據(jù)存儲解決方案。無論是對初創(chuàng)公司還是大型企業(yè),PostgreSQL都提供了企業(yè)級數(shù)據(jù)庫所需的一切功能,同時保持了開源軟件的靈活性和成本優(yōu)勢。

以上就是PostgreSQL高級特性與性能優(yōu)化的實(shí)戰(zhàn)指南的詳細(xì)內(nèi)容,更多關(guān)于PostgreSQL性能優(yōu)化的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • PostgreSQL 默認(rèn)隔離級別的設(shè)置

    PostgreSQL 默認(rèn)隔離級別的設(shè)置

    PostgreSQL的默認(rèn)事務(wù)隔離級別是讀已提交,這是其事務(wù)處理系統(tǒng)的基礎(chǔ)行為模式,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2025-06-06
  • PostgreSQL GIN 索引原理、應(yīng)用場景與最佳實(shí)踐

    PostgreSQL GIN 索引原理、應(yīng)用場景與最佳實(shí)踐

    本文介紹了PostgreSQL的GIN索引,涵蓋了其底層原理、支持的數(shù)據(jù)類型、工作原理、性能特征、適用場景、實(shí)戰(zhàn)案例、性能調(diào)優(yōu)、常見問題及解決方案,并提供了GIN與其它索引類型的比較,通過本文,讀者可以全面了解GIN索引的使用方法和最佳實(shí)踐,感興趣的朋友跟隨小編一起看看吧
    2025-12-12
  • 史上最全PostgreSQL?DBA最常用SQL

    史上最全PostgreSQL?DBA最常用SQL

    這篇文章主要介紹了PostgreSQL?DBA最常用SQL?,主要包括背景及常用查詢語句,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2022-10-10
  • PostgreSQL 序列綁定字段與不綁定字段的區(qū)別說明

    PostgreSQL 序列綁定字段與不綁定字段的區(qū)別說明

    這篇文章主要介紹了PostgreSQL 序列綁定字段與不綁定字段的區(qū)別說明,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-02-02
  • PostgreSQL實(shí)現(xiàn)定期備份的方法

    PostgreSQL實(shí)現(xiàn)定期備份的方法

    PostgreSQL定期備份功能可以自動備份數(shù)據(jù)庫,避免了手動備份過程中可能發(fā)生的錯誤,也極大地減輕了管理員的工作壓力,所以本文將給大家介紹一下PostgreSQL實(shí)現(xiàn)定期備份的方法,需要的朋友可以參考下
    2024-03-03
  • postgresql 存儲函數(shù)調(diào)用變量的3種方法小結(jié)

    postgresql 存儲函數(shù)調(diào)用變量的3種方法小結(jié)

    這篇文章主要介紹了postgresql 存儲函數(shù)調(diào)用變量的3種方法小結(jié),具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-01-01
  • 基于pgrouting的路徑規(guī)劃處理方法

    基于pgrouting的路徑規(guī)劃處理方法

    這篇文章主要介紹了基于pgrouting的路徑規(guī)劃處理,根據(jù)pgrouting已經(jīng)集成的Dijkstra算法來,結(jié)合postgresql數(shù)據(jù)庫來處理最短路徑,需要的朋友可以參考下
    2022-04-04
  • postgresql 利用fdw來實(shí)現(xiàn)不同數(shù)據(jù)庫之間數(shù)據(jù)互通(推薦)

    postgresql 利用fdw來實(shí)現(xiàn)不同數(shù)據(jù)庫之間數(shù)據(jù)互通(推薦)

    這篇文章主要介紹了postgresql 利用fdw來實(shí)現(xiàn)不同數(shù)據(jù)庫之間數(shù)據(jù)互通,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-02-02
  • postgresql 中的序列nextval詳解

    postgresql 中的序列nextval詳解

    這篇文章主要介紹了postgresql 中的序列nextval詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-02-02
  • SQLCipher數(shù)據(jù)遷移到PostgreSql詳細(xì)教程

    SQLCipher數(shù)據(jù)遷移到PostgreSql詳細(xì)教程

    這篇文章主要介紹了SQLCipher數(shù)據(jù)遷移到PostgreSql詳細(xì)教程,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧
    2025-09-09

最新評論

亚洲日本一区二区久久久精品| 日韩写真福利视频在线观看| 国产日韩精品电影7777| 天天日天天操天天摸天天舔| 人人爱人人妻人人澡39| 在线观看视频一区麻豆| 国产日韩精品免费在线| 日日操综合成人av| 最新91九色国产在线观看| 青娱乐最新视频在线| 老熟妇凹凸淫老妇女av在线观看| 欧美老鸡巴日小嫩逼| 日韩av中文在线免费观看| 又色又爽又黄的美女裸体| 日本黄色三级高清视频| 国产熟妇一区二区三区av| 久久精品久久精品亚洲人| 93精品视频在线观看| 久草视频在线看免费| 日本高清成人一区二区三区| 日韩午夜福利精品试看| 韩国三级aaaaa高清视频| 亚洲第一黄色在线观看| 国产精品国色综合久久| 国产高清在线在线视频| 精彩视频99免费在线| 色狠狠av线不卡香蕉一区二区| 日韩人妻在线视频免费| 综合激情网激情五月五月婷婷| 中文字幕av第1页中文字幕| 国产伦精品一区二区三区竹菊| 亚洲av天堂在线播放| 亚洲精品无码久久久久不卡 | 国产aⅴ一线在线观看| 国产va精品免费观看| 精品视频一区二区三区四区五区| 日日爽天天干夜夜操| 2012中文字幕在线高清| 超碰在线观看免费在线观看| 日本后入视频在线观看| 久久国产精品精品美女| 激情五月婷婷综合色啪| 天天综合天天综合天天网| 中文字幕熟女人妻久久久| 熟女在线视频一区二区三区| 黄色片年轻人在线观看| 日韩美女综合中文字幕pp| 亚洲欧美综合在线探花| 三上悠亚和黑人665番号| 啊慢点鸡巴太大了啊舒服视频| 免费观看丰满少妇做受| 18禁美女羞羞免费网站| 亚洲国产精品免费在线观看| 99精品久久久久久久91蜜桃| 人妻熟女中文字幕aⅴ在线 | 任我爽精品视频在线播放| 成人激情文学网人妻| 不戴胸罩引我诱的隔壁的人妻| 2020久久躁狠狠躁夜夜躁| 免费在线看的黄网站| 精品视频一区二区三区四区五区| tube69日本少妇| 久久午夜夜伦痒痒想咳嗽P| 亚洲熟妇久久无码精品| 福利午夜视频在线合集| 日韩一个色综合导航| 自拍 日韩 欧美激情| 最新中文字幕乱码在线| 日韩特级黄片高清在线看| 55夜色66夜色国产精品站| 一二三区在线观看视频| 福利视频一区二区三区筱慧| 视频一区二区三区高清在线| 韩国一级特黄大片做受| 亚洲精品乱码久久久本| 黄色在线观看免费观看在线| 一区二区三区四区中文| 亚洲福利精品视频在线免费观看| 国产一区二区欧美三区| 2018最新中文字幕在线观看| 91福利在线视频免费观看| 日韩激情文学在线视频| brazzers欧熟精品系列| 成人综合亚洲欧美一区| 人妻丝袜榨强中文字幕| 亚洲在线免费h观看网站| 国产日韩精品免费在线| 欧美综合婷婷欧美综合| 青青在线视频性感少妇和隔壁黑丝| tube69日本少妇| 亚洲 人妻 激情 中文| 日本少妇精品免费视频| 成人30分钟免费视频| 大胆亚洲av日韩av| 亚洲精品国产在线电影| 大尺度激情四射网站| 2022国产综合在线干| 99精品视频之69精品视频| 亚洲成人黄色一区二区三区| 免费在线播放a级片| 97人人模人人爽人人喊| 中文字幕人妻熟女在线电影| yy96视频在线观看| 涩爱综合久久五月蜜臀| 99精品亚洲av无码国产另类 | 人妻素人精油按摩中出| 亚洲中文字幕校园春色| 福利视频一区二区三区筱慧| 亚洲国产精品久久久久蜜桃| 999久久久久999| 黄色的网站在线免费看| 天天爽夜夜爽人人爽QC| 欧美地区一二三专区| 五十路息与子猛烈交尾视频| 人妻熟女在线一区二区| 在线观看日韩激情视频| 热99re69精品8在线播放| 日本一区美女福利视频| 75国产综合在线视频| 97少妇精品在线观看| 午夜频道成人在线91| 国产精品三级三级三级| 婷婷五月亚洲综合在线| 美洲精品一二三产区区别| 久久久久久久久久久久久97| 国产精品久久久久国产三级试频| 亚洲一区二区三区久久受| 久久久久久97三级| 日韩剧情片电影在线收看| 亚洲熟妇x久久av久久| 在线观看av观看av| 青青青青青青青青青国产精品视频| 黄色视频在线观看高清无码 | 人妻少妇精品久久久久久| 精内国产乱码久久久久久| 最新97国产在线视频| 97欧洲一区二区精品免费| 国产日韩欧美视频在线导航| 成人av在线资源网站| 午夜久久香蕉电影网| 国产精品亚洲а∨天堂免| wwwxxx一级黄色片| 欧美日韩国产一区二区三区三州| av无限看熟女人妻另类av | 欧美专区日韩专区国产专区| 久久久久91精品推荐99| 中文字幕第三十八页久久| 国产使劲操在线播放| 精品人人人妻人人玩日产欧| 午夜频道成人在线91| 少妇人妻100系列| 美女 午夜 在线视频| 国产精品欧美日韩区二区| 欧美日韩一区二区电影在线观看| 男人的天堂在线黄色| 91传媒一区二区三区| 欧美麻豆av在线播放| 视频一区二区在线免费播放| 精品国产在线手机在线| 欧美色婷婷综合在线| 国产揄拍高清国内精品对白| 色偷偷伊人大杳蕉综合网| 日辽宁老肥女在线观看视频| 美女福利写真在线观看视频| 久久国产精品精品美女| 99久久99久国产黄毛片| 黑人大几巴狂插日本少妇| 天天日天天透天天操| 天天日天天天天天天天天天天| 内射久久久久综合网| 免费在线播放a级片| 亚洲美女高潮喷浆视频| 9l人妻人人爽人人爽| 日本福利午夜电影在线观看| 九色porny九色9l自拍视频| 老司机99精品视频在线观看| 亚洲视频在线观看高清| 护士小嫩嫩又紧又爽20p| 精品亚洲中文字幕av| 91香蕉成人app下载| 欲满人妻中文字幕在线| 免费在线看的黄网站| 亚洲中文字幕国产日韩| 中文字幕av男人天堂| 91免费福利网91麻豆国产精品 | 久久这里只有精彩视频免费| 亚洲午夜高清在线观看| 久久美欧人妻少妇一区二区三区| 欧美交性又色又爽又黄麻豆| 成人激情文学网人妻| 精品日产卡一卡二卡国色天香| 天码人妻一区二区三区在线看| 美女 午夜 在线视频| 欧美xxx成人在线| 成熟熟女国产精品一区| 9l人妻人人爽人人爽| 中文字幕在线第一页成人| 91精品国产麻豆国产| 青青草视频手机免费在线观看| 国产性色生活片毛片春晓精品| 中文亚洲欧美日韩无线码| 国产无遮挡裸体免费直播视频| 一区二区三区日韩久久| 曰本无码人妻丰满熟妇啪啪| 国产福利在线视频一区| 午夜极品美女福利视频| 日日操综合成人av| 黄片大全在线观看观看| 91中文字幕最新合集| 天天做天天爽夜夜做少妇| 91色网站免费在线观看| 日本后入视频在线观看| 鸡巴操逼一级黄色气| 国产一区二区欧美三区| 在线观看的黄色免费网站| 人人妻人人爽人人澡人人精品| 国产精品精品精品999| 在线观看日韩激情视频| 超级av免费观看一区二区三区| 亚洲熟女久久久36d| 国产午夜亚洲精品不卡在线观看| 成人网18免费视频版国产| 亚洲成人av一区在线| 岛国一区二区三区视频在线| 91久久人澡人人添人人爽乱| 操日韩美女视频在线免费看| 免费在线播放a级片| 日韩激情文学在线视频| 99热99re在线播放| 538精品在线观看视频| 97国产福利小视频合集| 91传媒一区二区三区| 久久精品视频一区二区三区四区| 大屁股肉感人妻中文字幕在线| 精品人妻每日一部精品| 午夜精品一区二区三区城中村| 天天干狠狠干天天操| 9国产精品久久久久老师| 99精品国产自在现线观看| av手机免费在线观看高潮| 中文字幕1卡1区2区3区| 99精品视频之69精品视频 | 成人伊人精品色xxxx视频| 扒开腿挺进肉嫩小18禁视频| 91高清成人在线视频| 午夜成午夜成年片在线观看| 天天日天天日天天射天天干| 欧美亚洲少妇福利视频| 青娱乐极品视频青青草| 粉嫩av蜜乳av蜜臀| 久久机热/这里只有| 老熟妇凹凸淫老妇女av在线观看| 中文字幕视频一区二区在线观看| 在线播放 日韩 av| 亚洲伊人av天堂有码在线| 97人妻总资源视频| 91中文字幕最新合集| 97国产在线av精品| 日韩伦理短片在线观看| 五月婷婷在线观看视频免费| 天美传媒mv视频在线观看| 超级碰碰在线视频免费观看| 综合激情网激情五月五月婷婷| 日日夜夜狠狠干视频| 一区二区久久成人网| 在线观看av观看av| 亚洲精品久久视频婷婷| 国产麻豆乱子伦午夜视频观看| 97国产福利小视频合集| 91人妻人人做人人爽在线| 视频一区 二区 三区 综合| 日韩美女福利视频网| 都市激情校园春色狠狠| 亚洲欧美清纯唯美另类| 天天摸天天干天天操科普| 国产自拍在线观看成人| 五色婷婷综合狠狠爱| 国产成人精品一区在线观看| 人妻3p真实偷拍一二区| 又色又爽又黄又刺激av网站| 天天日天天爽天天干| 色婷婷六月亚洲综合香蕉| 中文字幕无码日韩专区免费| 天天日天天透天天操| 成人性爱在线看四区| 干逼又爽又黄又免费的视频| 中国无遮挡白丝袜二区精品| 在线播放 日韩 av| 大陆av手机在线观看| 欧美va亚洲va天堂va| 亚洲国产精品美女在线观看| 高清一区二区欧美系列| 国产熟妇人妻ⅹxxxx麻豆| 亚洲成人激情视频免费观看了| 端庄人妻堕落挣扎沉沦| 人妻av无码专区久久绿巨人| 久草极品美女视频在线观看| 亚洲国产第一页在线观看| 水蜜桃国产一区二区三区| 中文字幕一区的人妻欧美日韩| 一区二区三区另类在线| 亚洲欧美自拍另类图片| 国产成人精品福利短视频| 美女小视频网站在线| 任你操视频免费在线观看| sw137 中文字幕 在线| 经典国语激情内射视频| 极品丝袜一区二区三区| 色综合天天综合网国产成人 | 色综合色综合色综合色| 国产91精品拍在线观看| 大香蕉伊人国产在线| 1区2区3区4区视频在线观看| 9l人妻人人爽人人爽| 欧美精品亚洲精品日韩在线| 国产揄拍高清国内精品对白| 91精品高清一区二区三区| 视频 国产 精品 熟女 | 人人在线视频一区二区| 亚洲人妻国产精品综合| 午夜久久香蕉电影网| 青青草原色片网站在线观看| 日本一二三区不卡无| 亚洲国际青青操综合网站| 55夜色66夜色国产精品站| 91人妻精品一区二区在线看| 欧洲亚洲欧美日韩综合| 韩国黄色一级二级三级| 午夜极品美女福利视频| 另类av十亚洲av| 日韩影片一区二区三区不卡免费| 大陆av手机在线观看| 中国熟女一区二区性xx| 亚洲欧美激情中文字幕| 熟女人妻三十路四十路人妻斩| 欧美亚洲免费视频观看| 亚洲av自拍偷拍综合| 亚洲精品午夜久久久久| 国产又粗又硬又大视频| 天天夜天天日天天日| 国产真实灌醉下药美女av福利| 色婷婷精品大在线观看| av网址在线播放大全| 国产精品久久9999| 天天射夜夜操狠狠干| 在线不卡日韩视频播放| 热99re69精品8在线播放| 亚洲中文字字幕乱码| 做爰视频毛片下载蜜桃视频1| 91九色国产熟女一区二区| 天天插天天色天天日| 亚洲嫩模一区二区三区| 精品亚洲在线免费观看| 中文字幕视频一区二区在线观看| 日韩加勒比东京热二区| 精内国产乱码久久久久久| 大鸡巴操娇小玲珑的女孩逼| 在线观看的黄色免费网站| 班长撕开乳罩揉我胸好爽| 色天天天天射天天舔| 国产美女午夜福利久久| 欧洲精品第一页欧洲精品亚洲| 国产日韩精品一二三区久久久| 天天摸天天日天天操| 美洲精品一二三产区区别| 亚洲av琪琪男人的天堂| 国产成人自拍视频在线免费观看| 国产91嫩草久久成人在线视频| 一区二区三区日本伦理| 无码日韩人妻精品久久| 偷拍自拍福利视频在线观看| av在线资源中文字幕| 18禁免费av网站| 欧美日韩亚洲国产无线码| 亚洲欧洲av天堂综合| 午夜福利资源综合激情午夜福利资| 成人av在线资源网站| 中文字幕一区二区人妻电影冢本| 好太好爽好想要免费| 亚洲国产成人在线一区| okirakuhuhu在线观看| 91免费观看国产免费| 一个色综合男人天堂| 高清成人av一区三区| 色综合天天综合网国产成人| 在线免费91激情四射| 蜜桃视频在线欧美一区| 天天干天天操天天扣| 91综合久久亚洲综合| 午夜精品亚洲精品五月色| 中国把吊插入阴蒂的视频| 久久艹在线观看视频| 中文字幕 人妻精品| 在线观看亚洲人成免费网址| 97色视频在线观看| 亚洲综合一区成人在线| 天天操天天操天天碰| 午夜国产免费福利av| 2022国产综合在线干| 福利一二三在线视频观看| 色呦呦视频在线观看视频| 国产日韩精品电影7777| 夏目彩春在线中文字幕| 国产在线免费观看成人| 成人av电影免费版| 免费看美女脱光衣服的视频| 国产精品国产三级国产精东| 欧美男同性恋69视频| 中文字幕av熟女人妻| 日本少妇精品免费视频| 中文 成人 在线 视频| 精品成人午夜免费看| 欧美日韩国产一区二区三区三州| 午夜免费观看精品视频| 日韩美av高清在线| 在线网站你懂得老司机| 久碰精品少妇中文字幕av| 亚洲国产美女一区二区三区软件| 999久久久久999| 91综合久久亚洲综合| 自拍偷拍日韩欧美一区二区| 亚洲一区二区三区av网站| 人妻另类专区欧美制服| 成人区人妻精品一区二视频| 国产白袜脚足J棉袜在线观看| 亚洲av极品精品在线观看| 美女大bxxxx内射| 人妻av无码专区久久绿巨人 | 国产成人自拍视频在线免费观看| 国产成人精品亚洲男人的天堂| 最近的中文字幕在线mv视频| 成人av电影免费版| 中文字幕亚洲久久久| 午夜精品福利91av| 东京干手机福利视频| 顶级尤物粉嫩小尤物网站| 国产精品自拍视频大全| 亚洲成人熟妇一区二区三区| 久久人人做人人妻人人玩精品vr| 亚洲欧美福利在线观看| 五十路av熟女松本翔子| 黄色的网站在线免费看| 99热99这里精品6国产| 久久久久久久99精品| 精品国产高潮中文字幕| 久久久久久国产精品| 黑人借宿ntr人妻的沦陷2| 大香蕉大香蕉在线看| 成人18禁网站在线播放| 日韩影片一区二区三区不卡免费| 首之国产AV医生和护士小芳| 97成人免费在线观看网站| 婷婷午夜国产精品久久久| 动漫黑丝美女的鸡巴| 亚洲国产精品中文字幕网站| 欧美美女人体视频一区| 老司机免费福利视频网| 国产成人自拍视频播放| 岛国青草视频在线观看| 天天操夜夜操天天操天天操| 性色蜜臀av一区二区三区| 黑人3p华裔熟女普通话| 久草视频 久草视频2| 欧美一区二区中文字幕电影| 国产视频精品资源网站| 男女第一次视频在线观看| 韩国男女黄色在线观看| 国产性生活中老年人视频网站| 午夜精品一区二区三区福利视频| 午夜场射精嗯嗯啊啊视频| 97欧洲一区二区精品免费| 美味人妻2在线播放| 9久在线视频只有精品| 欧美日韩激情啪啪啪| 亚洲在线观看中文字幕av| 搡老妇人老女人老熟女| 传媒在线播放国产精品一区| 日韩成人综艺在线播放| 女同性ⅹxx女同hd| 中文字幕,亚洲人妻| 亚洲少妇人妻无码精品| 一区二区三区国产精选在线播放| 亚洲最大免费在线观看| 日本成人一区二区不卡免费在线| 熟女俱乐部一二三区| 亚洲精品三级av在线免费观看| 国产精品免费不卡av| 亚洲男人的天堂a在线| 欧美日韩亚洲国产无线码| 久久久久久国产精品| 亚洲国产最大av综合| 中文字幕在线视频一区二区三区 | 在线观看视频污一区| 亚洲成人av在线一区二区| 男大肉棒猛烈插女免费视频 | 成人乱码一区二区三区av| 韩国亚洲欧美超一级在线播放视频| 啊用力插好舒服视频| 成年人的在线免费视频| 欧美日本在线观看一区二区| 午夜久久香蕉电影网| 亚洲一区制服丝袜美腿| 天天操天天干天天插| 亚洲欧美成人综合在线观看| 久久三久久三久久三久久| 特黄老太婆aa毛毛片| 桃色视频在线观看一区二区| 99精品免费久久久久久久久a| 中文字幕无码日韩专区免费| 亚洲一级美女啪啪啪| 国产精品午夜国产小视频| av森泽佳奈在线观看| 成人24小时免费视频| 中国产一级黄片免费视频播放| 99亚洲美女一区二区三区| 久久综合老鸭窝色综合久久| 成熟丰满熟妇高潮xx×xx| 在线观看亚洲人成免费网址| 自拍偷拍一区二区三区图片| 91桃色成人网络在线观看| 亚洲成av人无码不卡影片一| 日韩精品啪啪视频一道免费| 家庭女教师中文字幕在线播放| 久久久久久9999久久久久| 毛片一级完整版免费| 国产乱子伦精品视频潮优女| 欧美一区二区中文字幕电影| 久久久人妻一区二区| 最近的中文字幕在线mv视频| 亚洲av琪琪男人的天堂| 亚洲国产成人在线一区| 午夜在线观看岛国av,com| 97资源人妻免费在线视频| 天天日天天鲁天天操| 97国产精品97久久| 免费黄页网站4188| 操人妻嗷嗷叫视频一区二区| 天天日天天敢天天干| 国产又粗又黄又硬又爽| 少妇人妻100系列| 国产白嫩美女一区二区| 一级黄片大鸡巴插入美女| 青春草视频在线免费播放| 欧美日韩激情啪啪啪| 2022中文字幕在线| 精品欧美一区二区vr在线观看| 伊人成人综合开心网| 美女av色播在线播放| 中文字幕视频一区二区在线观看| 快点插进来操我逼啊视频| 国产精品成人xxxx| 91精品一区二区三区站长推荐| 亚洲男人的天堂a在线| 狍和女人的王色毛片| 人妻爱爱 中文字幕| 57pao国产一区二区| 亚洲免费在线视频网站| 91社福利《在线观看| 久久亚洲天堂中文对白| 38av一区二区三区| 美女在线观看日本亚洲一区| 成年女人免费播放视频| 日韩人妻xxxxx| 久久久久久久精品老熟妇| 国产91精品拍在线观看| 久久精品亚洲成在人线a| 超碰97人人做人人爱| 激情国产小视频在线| brazzers欧熟精品系列| 亚洲粉嫩av一区二区三区| 2022天天干天天操| 免费看高清av的网站| 专门看国产熟妇的网站| 护士小嫩嫩又紧又爽20p| 欧美久久一区二区伊人| 成人av在线资源网站| 中文字幕乱码av资源| 欧美区一区二区三视频| 在线观看免费视频网| 男女啪啪视频免费在线观看| 狠狠躁夜夜躁人人爽天天久天啪 | 日韩人妻丝袜中文字幕| 熟妇一区二区三区高清版| 蜜臀av久久久久蜜臀av麻豆| 亚洲福利午夜久久久精品电影网| 在线免费观看国产精品黄色| 亚洲图库另类图片区| 欧美一区二区三区四区性视频| 亚洲国际青青操综合网站| 国产黄网站在线观看播放| 啪啪啪啪啪啪啪啪啪啪黄色| 天天夜天天日天天日| 中文字幕 码 在线视频| 青青草视频手机免费在线观看| 久久久久五月天丁香社区| 亚洲国产成人最新资源| 成人av在线资源网站| 天堂av在线播放免费| 日本一道二三区视频久久| 日韩加勒比东京热二区| 日本一区美女福利视频| 天天干天天搞天天摸| 亚洲av天堂在线播放| 97a片免费在线观看| 亚洲欧美在线视频第一页| 国产高清97在线观看视频| 在线观看视频一区麻豆| 黄色成人在线中文字幕| 青青青青青手机视频| 青青青青青青青青青国产精品视频 | 人妻久久久精品69系列| 夜色撩人久久7777| 韩国一级特黄大片做受| 人妻无码色噜噜狠狠狠狠色| 国产九色91在线观看精品| 岛国av高清在线成人在线| 在线观看黄色成年人网站 | 国产一线二线三线的区别在哪| 日本午夜久久女同精女女| 999热精品视频在线| 欧美成人猛片aaaaaaa| 大肉大捧一进一出好爽在线视频 | 天天日天天透天天操| 久久热这里这里只有精品| 亚洲人妻视频在线网| 一区二区三区国产精选在线播放| 国产精品人妻熟女毛片av久| 欧美日韩亚洲国产无线码| av俺也去在线播放| 免费看美女脱光衣服的视频| 天天色天天操天天舔| 亚洲国产欧美国产综合在线| 免费人成黄页网站在线观看国产 | 色综合久久久久久久久中文| 久久久久久久久久一区二区三区 | 中文字幕无码一区二区免费| 国产精品久久久久网| 天天干天天操天天玩天天射| 久草视频福利在线首页| 日日夜夜大香蕉伊人| 大陆精品一区二区三区久久| 中文字幕在线观看极品视频| 色综合天天综合网国产成人 | 日日操夜夜撸天天干| 99热久久这里只有精品8| 亚洲另类图片蜜臀av| 日韩人妻丝袜中文字幕| 中文字幕av男人天堂| 亚洲免费国产在线日韩| 在线 中文字幕 一区| 日本丰满熟妇BBXBBXHD| 天天日天天爽天天爽| 18禁免费av网站| aiss午夜免费视频| 夜夜骑夜夜操夜夜奸| 激情图片日韩欧美人妻| 美女在线观看日本亚洲一区| 一区二区麻豆传媒黄片| 成人国产影院在线观看| 亚洲av午夜免费观看| 亚洲区美熟妇久久久久| 噜噜色噜噜噜久色超碰| 成人性爱在线看四区| 成年人该看的视频黄免费| 亚洲综合色在线免费观看| 啪啪啪18禁一区二区三区 | 熟女国产一区亚洲中文字幕| 扒开腿挺进肉嫩小18禁视频| 天堂中文字幕翔田av| 久久久久只精品国产三级| 免费观看理论片完整版| 国产亚洲欧美45p| 欧洲日韩亚洲一区二区三区 | 最新97国产在线视频| 国产熟妇一区二区三区av| yy96视频在线观看| 传媒在线播放国产精品一区| 免费69视频在线看| 日本成人不卡一区二区| 亚洲超碰97人人做人人爱| 性感美女福利视频网站| 午夜精品福利一区二区三区p| 日韩中文字幕在线播放第二页 | 99国产精品窥熟女精品| 国产福利小视频大全| 中文字幕一区二区三区人妻大片 | 欧美一区二区三区高清不卡tv| 免费69视频在线看| 亚洲综合图片20p| 无码中文字幕波多野不卡| 92福利视频午夜1000看| 天天爽夜夜爽人人爽QC| 日本一二三区不卡无| 欧美精品伦理三区四区| 欧美亚洲国产成人免费在线| 啊用力插好舒服视频| 快插进小逼里大鸡吧视频| 又粗又长 明星操逼小视频| 久久麻豆亚洲精品av| 五十路在线观看完整版| 一区二区三区蜜臀在线| 2020韩国午夜女主播在线| 人妻丝袜av在线播放网址| 天天操天天操天天碰| 亚洲天堂成人在线观看视频网站| 欧美性感尤物人妻在线免费看| 女生被男生插的视频网站| 亚洲成av人无码不卡影片一| 中国熟女@视频91| 男人操女人逼逼视频网站| 在线免费观看国产精品黄色| 人妻熟女在线一区二区| 亚洲欧美一卡二卡三卡| av在线播放国产不卡| 国产麻豆乱子伦午夜视频观看| 大鸡巴操娇小玲珑的女孩逼| 国产精品自拍视频大全| 亚洲在线免费h观看网站| 99精品视频之69精品视频| 97欧洲一区二区精品免费| 亚洲综合一区成人在线| 亚洲精品乱码久久久久久密桃明| 高清一区二区欧美系列| 91欧美在线免费观看| 亚洲熟妇无码一区二区三区| 国产精品视频资源在线播放| 国产变态另类在线观看| 在线观看免费视频色97| 视频啪啪啪免费观看| 大陆精品一区二区三区久久| 丰满的继坶3中文在线观看| 精品美女福利在线观看| 哥哥姐姐综合激情小说| 亚洲av日韩高清hd| 香蕉aⅴ一区二区三区| 在线观看的a站 最新| 亚洲熟妇无码一区二区三区| 91精品啪在线免费| 91老师蜜桃臀大屁股| 国产真实乱子伦a视频| 久草极品美女视频在线观看| av一本二本在线观看| 丝袜美腿视频诱惑亚洲无| 中文字幕日韩无敌亚洲精品| 婷婷综合亚洲爱久久| 人妻久久无码中文成人| 国产精品视频一区在线播放| 青草亚洲视频在线观看| 天天做天天干天天操天天射| 午夜青青草原网在线观看| 国产精品久久久久久美女校花| 蜜桃臀av蜜桃臀av| 精品少妇一二三视频在线| 欧美韩国日本国产亚洲| 午夜精品久久久久久99热| 大鸡巴插入美女黑黑的阴毛| 亚洲区美熟妇久久久久| 9色精品视频在线观看| 中文字幕av男人天堂| 大屁股肉感人妻中文字幕在线| 好了av中文字幕在线| 婷婷六月天中文字幕| 人妻凌辱欧美丰满熟妇| 青青草国内在线视频精选| 欧美亚洲中文字幕一区二区三区| 成人精品在线观看视频| 亚洲成人免费看电影| 99人妻视频免费在线| 日韩熟女av天堂系列| 在线视频精品你懂的| 国产精品成人xxxx| 人妻另类专区欧美制服| 大胸性感美女羞爽操逼毛片| 国产va精品免费观看| 欧美男同性恋69视频| 高潮喷水在线视频观看| 日本熟女精品一区二区三区| 91av精品视频在线| 日韩北条麻妃一区在线| 久久久久久久99精品| 黄页网视频在线免费观看| 国产精彩对白一区二区三区| 久久99久久99精品影院| 91av精品视频在线| 天天日天天爽天天干| 乱亲女秽乱长久久久| 黄色片黄色片wyaa| 欧美黄片精彩在线免费观看| 国产精品久久久久网| 欧美视频综合第一页| 黄色资源视频网站日韩| 国产高清在线观看1区2区| 日本少妇人妻xxxxxhd| 久草视频在线一区二区三区资源站| 午夜激情久久不卡一区二区| 日本韩国免费福利精品| 日本女大学生的黄色小视频| 日本精品一区二区三区在线视频。| 人妻丝袜榨强中文字幕| 亚洲av色香蕉一区二区三区| 青青青激情在线观看视频| 天天日天天摸天天爱| 换爱交换乱高清大片| 国产精品成人xxxx| 一区二区久久成人网| 狠狠躁夜夜躁人人爽天天久天啪| 婷婷午夜国产精品久久久| 中文字幕 码 在线视频| 久久久久只精品国产三级| 91国语爽死我了不卡| 91老师蜜桃臀大屁股| 午夜精品福利一区二区三区p | 国产精品伦理片一区二区| 中文字幕最新久久久| sw137 中文字幕 在线| 伊人开心婷婷国产av| 成人亚洲精品国产精品| 夜夜嗨av一区二区三区中文字幕| 在线观看成人国产电影| 午夜毛片不卡免费观看视频| 色在线观看视频免费的| 98精产国品一二三产区区别| 中文字幕日韩精品日本| 成人av亚洲一区二区| 日韩熟女av天堂系列| 国产之丝袜脚在线一区二区三区| 中文字幕网站你懂的| 99av国产精品欲麻豆| 中文字幕在线免费第一页| 另类av十亚洲av| 国产精品视频资源在线播放| 亚洲第一黄色在线观看| 青娱乐最新视频在线| 免费看美女脱光衣服的视频| 2018最新中文字幕在线观看| 欧美亚洲牲夜夜综合久久| 韩国爱爱视频中文字幕| 大胸性感美女羞爽操逼毛片| 喷水视频在线观看这里只有精品| 岛国av高清在线成人在线| 高清成人av一区三区| 91精品国产综合久久久蜜| 国产精品亚洲在线观看| 精品久久久久久久久久中文蒉| 青青草亚洲国产精品视频| 黑人借宿ntr人妻的沦陷2| 日本一区美女福利视频| 精彩视频99免费在线| av日韩在线观看大全| 欧美特色aaa大片| 亚洲 中文 自拍 无码| 中文人妻AV久久人妻水| 日韩欧美国产一区不卡| 大鸡巴操b视频在线| 超碰在线中文字幕一区二区| 人妻熟女在线一区二区| 国产va在线观看精品| 国产精品国产三级国产午| 亚洲欧美久久久久久久久| 亚洲中文字字幕乱码| 久草视频在线看免费| 精品一线二线三线日本| 国产成人午夜精品福利| 1000部国产精品成人观看视频| 农村胖女人操逼视频| 国产精品黄片免费在线观看| 天天干天天插天天谢| 国产日本精品久久久久久久| 久久久久久久99精品| 亚洲日本一区二区三区| 九九视频在线精品播放| 色97视频在线播放| 午夜成午夜成年片在线观看| 99re6热在线精品| 亚洲一区二区人妻av| 99re6热在线精品| 国产精品视频男人的天堂| 中文字日产幕乱六区蜜桃| 国产一区二区在线欧美| 色伦色伦777国产精品| 国产刺激激情美女网站| 操操网操操伊剧情片中文字幕网| 91色老99久久九九爱精品| 99婷婷在线观看视频| 99国内小视频在现欢看| 国产黄网站在线观看播放| 日韩av有码中文字幕| 天天做天天干天天舔| 97年大学生大白天操逼| 亚洲成人国产av在线| 天天操天天污天天射| 偷拍自拍国产在线视频| av在线观看网址av| 欧美成人综合视频一区二区 | 黄工厂精品视频在线观看| 亚洲精品久久综合久| 亚洲欧美成人综合在线观看| 天天色天天爱天天爽| 亚洲伊人久久精品影院一美女洗澡| 亚洲精品精品国产综合| 成人国产小视频在线观看| 青青青艹视频在线观看| 亚洲一级av大片免费观看| 欧美第一页在线免费观看视频| 日韩精品中文字幕在线| 国产精品久久久久久久精品视频| 第一福利视频在线观看 | 中文字幕一区二区亚洲一区| av在线播放国产不卡| 日曰摸日日碰夜夜爽歪歪| 日韩a级黄色小视频| 大鸡吧插逼逼视频免费看 | 男人靠女人的逼视频| jiuse91九色视频| 日本少妇在线视频大香蕉在线观看| 免费观看污视频网站| 中文字幕人妻三级在线观看| 青青热久免费精品视频在线观看| 干逼又爽又黄又免费的视频| 在线观看911精品国产| 久久这里有免费精品| 亚洲av无女神免非久久| 天天干天天操天天爽天天摸| 黄色无码鸡吧操逼视频| 888欧美视频在线| 成年人啪啪视频在线观看| 天天日天天鲁天天操| 国产视频一区二区午夜| 亚洲国产中文字幕啊啊啊不行了| 日本少妇人妻xxxxx18| 亚洲av色图18p| 日韩av免费观看一区| 91国产资源在线视频| 老熟妇凹凸淫老妇女av在线观看| 日本一道二三区视频久久| 2025年人妻中文字幕乱码在线| 国产91嫩草久久成人在线视频| 国产三级影院在线观看| 欧美另类一区二区视频| 亚洲一级av大片免费观看| 18禁美女无遮挡免费| 天天干天天日天天谢综合156 | 性欧美日本大妈母与子| 亚洲人一区二区中文字幕| 久久久久久久久久一区二区三区| 操操网操操伊剧情片中文字幕网| 国产在线免费观看成人| 人妻激情图片视频小说| av视屏免费在线播放| 日本性感美女视频网站| 久久这里只有精品热视频| 毛片一级完整版免费| 啪啪啪18禁一区二区三区| 国产一级精品综合av| 午夜免费观看精品视频| 超鹏97历史在线观看| 丝袜美腿视频诱惑亚洲无| 久久精品36亚洲精品束缚| 全国亚洲男人的天堂| 成人免费公开视频无毒| 久久机热/这里只有| 黄色成人在线中文字幕| 丝袜肉丝一区二区三区四区在线| 在线观看成人国产电影| 99国内小视频在现欢看| 中英文字幕av一区| 国内自拍第一页在线观看| 亚洲一区二区三区久久受 | 人妻熟女中文字幕aⅴ在线| 午夜福利人人妻人人澡人人爽| lutube在线成人免费看| 久久久久久9999久久久久| 美女大bxxxx内射| 538精品在线观看视频| 青青青国产免费视频| 中文字幕亚洲中文字幕| 老司机福利精品免费视频一区二区 | 中文字幕av熟女人妻| 欧美一区二区三区久久久aaa| 桃色视频在线观看一区二区| 中文字幕—97超碰网| 成人24小时免费视频| 2017亚洲男人天堂| 国产又粗又猛又爽又黄的视频在线| 80电影天堂网官网| 热99re69精品8在线播放| 成人影片高清在线观看| 天堂v男人视频在线观看| 1000部国产精品成人观看视频| 最近中文2019年在线看| 日韩av有码中文字幕| 香蕉av影视在线观看| 少妇高潮无套内谢麻豆| 熟女在线视频一区二区三区| 色花堂在线av中文字幕九九| 啪啪啪啪啪啪啪啪av| 国产男女视频在线播放| 又粗又硬又猛又爽又黄的| 天堂av在线官网中文| 免费看国产av网站| 久草视频在线免播放| 日本午夜爽爽爽爽爽视频在线观看| 成人sm视频在线观看| 福利片区一区二体验区| 成人免费公开视频无毒| 东京干手机福利视频| 手机看片福利盒子日韩在线播放| 天天操天天弄天天射| 欧美亚洲牲夜夜综合久久| 97黄网站在线观看| 免费69视频在线看| 日本熟妇一区二区x x| 日韩黄色片在线观看网站| 大鸡巴插入美女黑黑的阴毛| 97色视频在线观看| 日韩美av高清在线| 成人区人妻精品一区二视频| 韩国亚洲欧美超一级在线播放视频| 日本人妻欲求不满中文字幕| 在线播放国产黄色av| 欧美在线精品一区二区三区视频 | 福利一二三在线视频观看| 中国无遮挡白丝袜二区精品 | 最后99天全集在线观看| 91色秘乱一区二区三区| 97精品人妻一区二区三区精品| 精品首页在线观看视频| av中文字幕网址在线| 动漫黑丝美女的鸡巴| 日本熟妇一区二区x x| 一个人免费在线观看ww视频| 日韩av大胆在线观看| 国产一区二区久久久裸臀| 天天操天天弄天天射| 粉嫩av蜜乳av蜜臀| 亚洲第一伊人天堂网| 100%美女蜜桃视频| 韩国女主播精品视频网站| 红杏久久av人妻一区| 欧美久久久久久三级网| 黄工厂精品视频在线观看| 91精品国产观看免费| 美女日逼视频免费观看| 黄色大片男人操女人逼| 抽查舔水白紧大视频| 我想看操逼黄色大片| 欧美一区二区三区激情啪啪啪 | 伊人综合aⅴ在线网| 中出中文字幕在线观看 | 青青社区2国产视频| 狠狠躁狠狠爱网站视频| 日本五十路熟新垣里子| 伊人成人在线综合网| 中文字幕av一区在线观看| av老司机精品在线观看| 1000部国产精品成人观看视频| 国产成人精品av网站| av线天堂在线观看| 夜女神免费福利视频| 78色精品一区二区三区| 亚洲国产美女一区二区三区软件| 一级A一级a爰片免费免会员 | 少妇ww搡性bbb91| 99精品久久久久久久91蜜桃| 国产大学生援交正在播放| 成人国产激情自拍三区| 在线观看欧美黄片一区二区三区| 中文字幕,亚洲人妻| 精品区一区二区三区四区人妻| 国产精品视频男人的天堂| 精品成人啪啪18免费蜜臀| 中文字幕中文字幕 亚洲国产| 韩国女主播精品视频网站| 91亚洲国产成人精品性色| 又大又湿又爽又紧A视频| 欧美精品亚洲精品日韩在线| 黄页网视频在线免费观看| 国产91精品拍在线观看| 国产中文精品在线观看| 亚洲av无乱一区二区三区性色| 欧美日本在线视频一区| 欧美一区二区三区乱码在线播放 | 久久h视频在线观看| 国产97视频在线精品| 亚洲久久午夜av一区二区| 亚洲公开视频在线观看| 精品久久久久久久久久久99| 嫩草aⅴ一区二区三区| 日本啪啪啪啪啪啪啪| 老司机深夜免费福利视频在线观看| 亚洲一级 片内射视正片| 任我爽精品视频在线播放| 五月天中文字幕内射| 在线新三级黄伊人网| 大胸性感美女羞爽操逼毛片| 婷婷综合亚洲爱久久| 亚洲成人激情视频免费观看了 | 端庄人妻堕落挣扎沉沦| 91麻豆精品秘密入口在线观看| 涩爱综合久久五月蜜臀| AV天堂一区二区免费试看| 日韩av有码中文字幕| 四虎永久在线精品免费区二区| 五十路熟女av天堂| 性感美女福利视频网站| free性日本少妇| 亚洲午夜伦理视频在线| 色97视频在线播放| 大白屁股精品视频国产| 午夜精品久久久久麻豆影视| 日本丰满熟妇大屁股久久| 91国产在线视频免费观看| 天天日天天日天天射天天干 | 这里有精品成人国产99| 日本熟妇色熟妇在线观看| 久精品人妻一区二区三区| 亚洲国产第一页在线观看| 亚洲综合在线视频可播放| 国产精品久久久久国产三级试频| 天天日天天日天天擦| 人妻熟女中文字幕aⅴ在线| 国产一区二区久久久裸臀| 一区二区三区四区中文| 亚洲av黄色在线网站| 日本一二三区不卡无| 在线免费观看靠比视频的网站| 小泽玛利亚视频在线观看| 快点插进来操我逼啊视频| av中文字幕国产在线观看| 精内国产乱码久久久久久| 人妻另类专区欧美制服| 亚洲av日韩高清hd| 欧美成一区二区三区四区| 欧美精品久久久久久影院| 免费十精品十国产网站| 国产免费高清视频视频| 可以在线观看的av中文字幕| 亚洲2021av天堂| 视频一区 二区 三区 综合| 午夜毛片不卡在线看| 93视频一区二区三区| 大屁股肉感人妻中文字幕在线| 91破解版永久免费| 国产高清在线观看1区2区| 超级碰碰在线视频免费观看| 97年大学生大白天操逼| 天天日天天干天天搡| av天堂资源最新版在线看| av男人天堂狠狠干| 欧美精产国品一二三区| 亚洲午夜高清在线观看| 91九色porny蝌蚪国产成人| 中文字幕日韩精品就在这里| 日韩美女综合中文字幕pp| 一级A一级a爰片免费免会员| 成人乱码一区二区三区av| 青青青青草手机在线视频免费看 | 人人妻人人爽人人澡人人精品| 91亚洲国产成人精品性色| 免费观看成年人视频在线观看| 深田咏美亚洲一区二区| 激情五月婷婷免费视频| 亚洲免费在线视频网站| 激情伦理欧美日韩中文字幕| 曰本无码人妻丰满熟妇啪啪| 日韩三级电影华丽的外出| 黑人巨大精品欧美视频| 我想看操逼黄色大片| 在线免费91激情四射| 女同性ⅹxx女同h偷拍| 六月婷婷激情一区二区三区| 国产又大又黄免费观看| 1区2区3区不卡视频| 9l人妻人人爽人人爽| 中文字幕一区二区人妻电影冢本 | 久久午夜夜伦痒痒想咳嗽P| 国产精品久久久久国产三级试频| 日本裸体熟妇区二区欧美| 国产乱子伦一二三区| sspd152中文字幕在线| 国产大鸡巴大鸡巴操小骚逼小骚逼| 人妻爱爱 中文字幕| 91传媒一区二区三区| 日日夜夜精品一二三| 免费成人va在线观看| 黄网十四区丁香社区激情五月天| 国产欧美日韩第三页| 国产亚洲精品品视频在线| 日本少妇人妻xxxxx18| 班长撕开乳罩揉我胸好爽| 美女视频福利免费看| 国内资源最丰富的网站| 91中文字幕免费在线观看| 男人插女人视频网站| 成人福利视频免费在线| 亚洲 欧美 精品 激情 偷拍| 天天日天天干天天舔天天射| 免费成人av中文字幕| 久久久久久99国产精品| 国产高清精品一区二区三区| 夜夜骑夜夜操夜夜奸| 亚洲 中文 自拍 无码| 操日韩美女视频在线免费看 | 黄网十四区丁香社区激情五月天 | 黑人解禁人妻叶爱071| 欧洲日韩亚洲一区二区三区| 精品久久久久久久久久中文蒉| av中文字幕福利网| 午夜极品美女福利视频| 在线免费观看日本片| 黄网十四区丁香社区激情五月天| 人人妻人人爽人人添夜| 99热这里只有国产精品6| 丰满的子国产在线观看| 75国产综合在线视频| 92福利视频午夜1000看| 国产一区av澳门在线观看| 国产美女精品福利在线| 欧美亚洲自偷自拍 在线| 久久久久久久久久久久久97| 免费观看丰满少妇做受| 日韩欧美在线观看不卡一区二区| 成人18禁网站在线播放| 中文字幕亚洲中文字幕| 中文字幕日韩无敌亚洲精品 | 色呦呦视频在线观看视频| 亚洲一区二区久久久人妻| 欧美特级特黄a大片免费| 在线观看视频网站麻豆| 国产黄色片蝌蚪九色91| 在线亚洲天堂色播av电影| 中文字幕亚洲久久久| 欧美美女人体视频一区| 青青青青青操视频在线观看| 色花堂在线av中文字幕九九| 一区二区三区国产精选在线播放| 亚洲第一黄色在线观看| 自拍偷拍亚洲欧美在线视频| 日韩一个色综合导航| 亚洲天堂av最新网址| 国产91久久精品一区二区字幕 | 欧美另类重口味极品在线观看| 1024久久国产精品| 亚洲av黄色在线网站| 韩国三级aaaaa高清视频| 小穴多水久久精品免费看| 三上悠亚和黑人665番号| 精品区一区二区三区四区人妻| 欧美日韩一区二区电影在线观看| 精品久久久久久久久久中文蒉| 国产精品亚洲а∨天堂免| 黑人解禁人妻叶爱071| 直接能看的国产av| 91国偷自产一区二区三区精品| 日韩欧美在线观看不卡一区二区| 动漫av网站18禁| 国产精品午夜国产小视频| av中文字幕福利网| 91色老99久久九九爱精品| 久久精品久久精品亚洲人| 国产精品久久久久久久久福交| 777奇米久久精品一区| 国产精品亚洲在线观看| 亚洲一区二区久久久人妻| 欧美韩国日本国产亚洲| 日韩欧美高清免费在线| 国产剧情演绎系列丝袜高跟| 青青青青青青青青青国产精品视频| 91极品新人『兔兔』精品新作| 国产日韩精品一二三区久久久| 亚洲区美熟妇久久久久| 久久免费看少妇高潮完整版| 免费一级特黄特色大片在线观看| 亚洲熟女久久久36d| 亚洲公开视频在线观看| 日韩激情文学在线视频| 香蕉片在线观看av| 天天色天天舔天天射天天爽| 中文字幕高清资源站| 自拍偷拍亚洲欧美在线视频| 日韩在线视频观看有码在线| 亚洲国产在线精品国偷产拍| 日韩成人免费电影二区| 清纯美女在线观看国产| 亚洲公开视频在线观看| 在线 中文字幕 一区| 人人妻人人人操人人人爽| 91传媒一区二区三区| 欧美日韩国产一区二区三区三州 | 亚洲精品成人网久久久久久小说 | 精品av国产一区二区三区四区| 国产亚洲精品视频合集| av手机在线观播放网站| 欧美精品国产综合久久| 成人免费做爰高潮视频| 国产亚洲成人免费在线观看| 成熟丰满熟妇高潮xx×xx| 久久久久久9999久久久久| 日本午夜爽爽爽爽爽视频在线观看 | 午夜精品一区二区三区城中村| 91精品高清一区二区三区| 亚洲乱码中文字幕在线| 福利视频一区二区三区筱慧| 在线观看av观看av| 久久久精品999精品日本| av中文字幕国产在线观看| 久草福利电影在线观看| 欧美色呦呦最新网址| 在线观看免费视频色97| 日韩中文字幕在线播放第二页| 最新黄色av网站在线观看| 小穴多水久久精品免费看| 精品久久久久久久久久中文蒉| 亚洲国产成人无码麻豆艾秋| 免费在线看的黄片视频| 精品老妇女久久9g国产| 激情人妻校园春色亚洲欧美| 中文字幕日韩精品日本| 福利片区一区二体验区| jiujiure精品视频在线| 一个色综合男人天堂| 人妻自拍视频中国大陆| 把腿张开让我插进去视频| 粗大的内捧猛烈进出爽大牛汉子| 骚逼被大屌狂草视频免费看| 国产成人精品av网站| 日韩精品二区一区久久| 黑人巨大的吊bdsm| 美洲精品一二三产区区别| 日本丰满熟妇BBXBBXHD| 深田咏美亚洲一区二区| 中文字幕日韩91人妻在线| 午夜精品九一唐人麻豆嫩草成人| 精彩视频99免费在线| 久草福利电影在线观看| 欧美乱妇无乱码一区二区| 中文字幕无码日韩专区免费| 国产91精品拍在线观看| 18禁网站一区二区三区四区| 久久精品36亚洲精品束缚| 国产污污污污网站在线| 97人人模人人爽人人喊| 国产欧美日韩第三页| 亚洲综合在线观看免费| 91九色porny国产在线| 亚洲成高清a人片在线观看| 懂色av蜜桃a v| 欧美一级色视频美日韩| 欧美麻豆av在线播放| 男生舔女生逼逼视频| 欧美日韩高清午夜蜜桃大香蕉| xxx日本hd高清| 国产又色又刺激在线视频| 久久久久久cao我的性感人妻| 国产精品自偷自拍啪啪啪| 亚洲av成人网在线观看| 亚洲成人午夜电影在线观看| 人妻久久久精品69系列| 日比视频老公慢点好舒服啊| 少妇被强干到高潮视频在线观看| 99久久久无码国产精品性出奶水| 亚欧在线视频你懂的| 日本人妻精品久久久久久| 日本一区美女福利视频| 亚洲熟女综合色一区二区三区四区| 成年人中文字幕在线观看| 成年人免费看在线视频| 91‖亚洲‖国产熟女| 国产精选一区在线播放| 肏插流水妹子在线乐播下载| 蜜臀成人av在线播放| 久久久久久久精品成人热| 91‖亚洲‖国产熟女| 2022国产精品视频| 中文字幕一区二区亚洲一区| 中文字幕日韩91人妻在线| 亚洲午夜福利中文乱码字幕| 青青青国产免费视频| 日韩一个色综合导航| 五月色婷婷综合开心网4438| 青青青青操在线观看免费| 偷青青国产精品青青在线观看 | 91九色porny国产蝌蚪视频| brazzers欧熟精品系列| 免费福利av在线一区二区三区| 国产在线免费观看成人| 99热久久极品热亚洲| 一区二区三区日韩久久| 91快播视频在线观看| 大骚逼91抽插出水视频| 国产janese在线播放| 天天日天天做天天日天天做| 综合页自拍视频在线播放| 人妻最新视频在线免费观看| 久久香蕉国产免费天天| 中文字幕日韩人妻在线三区| 国产剧情演绎系列丝袜高跟| 91精品一区二区三区站长推荐| 人妻丝袜诱惑我操她视频| 自拍偷拍,中文字幕| 国产chinesehd精品麻豆| 天天干天天搞天天摸| 国产一区av澳门在线观看| 97青青青手机在线视频| 亚洲熟女女同志女同| 亚洲va国产va欧美精品88| 亚洲在线一区二区欧美| 在线免费91激情四射| 中文字幕高清资源站| 超碰97免费人妻麻豆| 亚洲伊人久久精品影院一美女洗澡 | 国产精品视频资源在线播放| 男女之间激情网午夜在线| 91色老99久久九九爱精品| 亚洲视频在线视频看视频在线| 大香蕉福利在线观看| 中文字幕人妻三级在线观看| 啪啪啪啪啪啪啪啪av| 日本一区美女福利视频| 中国熟女一区二区性xx| 最新国产精品拍在线观看| 激情内射在线免费观看| 性生活第二下硬不起来| 免费手机黄页网址大全| 国产福利小视频免费观看| 免费观看理论片完整版| 午夜青青草原网在线观看| 啊用力插好舒服视频| 国产露脸对白在线观看| 天天干天天日天天谢综合156| 桃色视频在线观看一区二区| 大鸡巴操娇小玲珑的女孩逼| 精品日产卡一卡二卡国色天香| 亚洲国产精品美女在线观看| 国产精品日韩欧美一区二区| 欧美视频不卡一区四区| 亚洲av色图18p| 成人av免费不卡在线观看| 黑人借宿ntr人妻的沦陷2| 91麻豆精品传媒国产黄色片| 午夜在线观看岛国av,com| 国产福利小视频大全| 美女少妇亚洲精选av| 93人妻人人揉人人澡人人| 成年人该看的视频黄免费| 久草视频在线免播放| av俺也去在线播放| 偷拍3456eee| 国内自拍第一页在线观看| 成人18禁网站在线播放| 在线免费观看99视频| 亚洲av第国产精品| 91在线免费观看成人| 久久精品久久精品亚洲人| 2021天天色天天干| 久草视频 久草视频2| 亚洲成人av一区在线| 亚洲欧美综合在线探花| 被大鸡吧操的好舒服视频免费| 欧美3p在线观看一区二区三区| 国产日韩一区二区在线看| 伊人成人在线综合网| 日韩中文字幕精品淫| 综合精品久久久久97| 扒开腿挺进肉嫩小18禁视频| 伊人开心婷婷国产av| 免费大片在线观看视频网站| 国产真实乱子伦a视频| 日本少妇人妻xxxxx18| 97超碰免费在线视频| 亚洲 中文 自拍 另类 欧美| 亚洲精品在线资源站| 黄色片一级美女黄色片| 亚洲视频在线视频看视频在线| 天天艹天天干天天操| 一区二区三区精品日本| 护士特殊服务久久久久久久| 天天日天天爽天天爽| 制丝袜业一区二区三区| 东游记中文字幕版哪里可以看到| 国产精品三级三级三级| 91‖亚洲‖国产熟女| 婷婷五月亚洲综合在线| 少妇人妻100系列| 色哟哟在线网站入口| 很黄很污很色的午夜网站在线观看| 欧美成人综合视频一区二区 | 同居了嫂子在线播高清中文| 亚洲 图片 欧美 图片| 国产女人被做到高潮免费视频| 99热久久极品热亚洲| 在线视频自拍第三页| 国产精品久久综合久久| 超污视频在线观看污污污 | 免费啪啪啪在线观看视频| 亚洲av男人的天堂你懂的| 免费在线观看视频啪啪| AV无码一区二区三区不卡| 阿v天堂2014 一区亚洲| 日韩欧美在线观看不卡一区二区 | yellow在线播放av啊啊啊| 人妻少妇性色欲欧美日韩| 欧美日韩一区二区电影在线观看| 亚洲一级av无码一级久久精品| 欧美色呦呦最新网址| 麻豆性色视频在线观看| 在线免费91激情四射 | 久久艹在线观看视频| 欧美日韩一级黄片免费观看| 99久久99一区二区三区| 51国产成人精品视频| 综合精品久久久久97| 中文字幕一区二区人妻电影冢本| 国产av国片精品一区二区| 天天色天天舔天天射天天爽| 在线视频自拍第三页| 亚洲va天堂va国产va久| 亚洲最大免费在线观看| 日本后入视频在线观看| 青青青青青青青在线播放视频| 天堂女人av一区二区| 免费一级黄色av网站| 成人网18免费视频版国产| 又色又爽又黄的美女裸体| 九一传媒制片厂视频在线免费观看| 亚洲成人av一区久久| 只有精品亚洲视频在线观看| 国产在线自在拍91国语自产精品| 欧美aa一级一区三区四区| 93视频一区二区三区| 天天想要天天操天天干| 青娱乐极品视频青青草| 亚洲av香蕉一区区二区三区犇| 亚洲国产在人线放午夜| 久草视频首页在线观看| 午夜精品福利一区二区三区p| 亚洲国际青青操综合网站| 91色老99久久九九爱精品| 中文字幕高清免费在线人妻 | 成年人的在线免费视频| 91免费观看在线网站 | 2022国产精品视频| 中文字幕人妻三级在线观看| 夜女神免费福利视频| 国产福利小视频二区| av完全免费在线观看av| 成人影片高清在线观看| 97超碰免费在线视频| 欧美日韩激情啪啪啪| 91欧美在线免费观看| av中文字幕在线导航| 免费黄高清无码国产| 色综合色综合色综合色| 日韩精品中文字幕播放| 狠狠躁狠狠爱网站视频| 午夜精品福利一区二区三区p| 无码中文字幕波多野不卡 | 日韩国产乱码中文字幕| 日韩精品激情在线观看| 好男人视频在线免费观看网站| 狍和女人的王色毛片| 亚洲专区激情在线观看视频| 巨乳人妻日下部加奈被邻居中出 | 超级福利视频在线观看| 中文字幕人妻一区二区视频| 亚洲另类在线免费观看| 青青青青草手机在线视频免费看| 香蕉av影视在线观看| 视频二区在线视频观看| 日韩美女综合中文字幕pp| 97人妻夜夜爽二区欧美极品| 91精品国产91青青碰| 在线观看日韩激情视频| 欧美日本aⅴ免费视频| 免费高清自慰一区二区三区网站 | 日本中文字幕一二区视频| 久久久久久九九99精品| 国产精品熟女久久久久浪潮| 男人的天堂一区二区在线观看| 中国黄片视频一区91| 麻豆性色视频在线观看| 亚洲天堂第一页中文字幕| 亚洲综合在线视频可播放| 777奇米久久精品一区| 粉嫩av蜜乳av蜜臀| 欧美地区一二三专区| 男女啪啪啪啪啪的网站| 在线观看国产网站资源| 久久久久久cao我的性感人妻| 超鹏97历史在线观看| 午夜国产免费福利av| 日韩影片一区二区三区不卡免费| 亚洲日本一区二区三区| 丝袜肉丝一区二区三区四区在线看| 国产精选一区在线播放| 日本黄在免费看视频| 亚洲精品午夜久久久久| 午夜精品亚洲精品五月色| 最新国产亚洲精品中文在线| 9l人妻人人爽人人爽| 国产janese在线播放| 97超碰国语国产97超碰| 亚洲欧美精品综合图片小说| 一区二区三区在线视频福利| 日本又色又爽又黄又粗| 中文字幕免费福利视频6| 69精品视频一区二区在线观看| 538精品在线观看视频| 中文字幕,亚洲人妻| 国产精品视频男人的天堂| 日韩美女精品视频在线观看网站| asmr福利视频在线观看| 十八禁在线观看地址免费| 男人和女人激情视频| 天天摸天天干天天操科普| 天天日天天舔天天射进去| 国产成人精品福利短视频| 欧美日韩人妻久久精品高清国产 | 青青伊人一精品视频| 亚洲综合图片20p| 久久精品国产999| 四川乱子伦视频国产vip| av手机在线免费观看日韩av| 久草电影免费在线观看| 欧美第一页在线免费观看视频| 日韩少妇人妻精品无码专区| 国产亚洲天堂天天一区| 日本美女成人在线视频| 夜鲁夜鲁狠鲁天天在线| av黄色成人在线观看| 国产黄色片在线收看| 国产精品精品精品999| 熟女国产一区亚洲中文字幕| 91极品新人『兔兔』精品新作| 中文字幕视频一区二区在线观看| 青草亚洲视频在线观看| 国产之丝袜脚在线一区二区三区| 国产高清在线观看1区2区| 人妻久久久精品69系列| 国产自拍在线观看成人| 美女在线观看日本亚洲一区| 老司机99精品视频在线观看| 日本丰满熟妇BBXBBXHD| 午夜dv内射一区区| 亚洲精品中文字幕下载| 中文字幕日本人妻中出| 春色激情网欧美成人| 成人资源在线观看免费官网| 爱有来生高清在线中文字幕| 成年人啪啪视频在线观看| 国产成人精品久久二区91| 色爱av一区二区三区| 岛国一区二区三区视频在线| 99国产精品窥熟女精品| 日本av在线一区二区三区| 亚洲av无硬久久精品蜜桃| 午夜毛片不卡免费观看视频| 热久久只有这里有精品| 精品91高清在线观看| 国产夫妻视频在线观看免费| 最新国产亚洲精品中文在线| 精品久久久久久久久久久a√国产| 把腿张开让我插进去视频| 国产精品久久9999| 极品粉嫩小泬白浆20p主播| 国产va在线观看精品| 亚洲乱码中文字幕在线| 国产精品自拍在线视频| 天天干天天啪天天舔| 亚洲熟妇久久无码精品| 揄拍成人国产精品免费看视频 | 青青草成人福利电影| 年轻的人妻被夫上司侵犯| 亚洲成人黄色一区二区三区| 大鸡巴插入美女黑黑的阴毛| 欧美一级色视频美日韩| 婷婷色中文亚洲网68| 精品一区二区三四区| 93人妻人人揉人人澡人人| 五十路息与子猛烈交尾视频| 99精品免费观看视频 | 日韩视频一区二区免费观看| 五十路息与子猛烈交尾视频 | 亚洲精品欧美日韩在线播放| 最新中文字幕免费视频| 韩国男女黄色在线观看| 骚逼被大屌狂草视频免费看| 亚洲av可乐操首页| 国产亚洲精品视频合集| 超pen在线观看视频公开97| 欧美精品伦理三区四区| 黄色大片男人操女人逼| 一级黄色av在线观看| 午夜福利人人妻人人澡人人爽| 农村胖女人操逼视频| 3344免费偷拍视频| 国产亚洲精品欧洲在线观看| 91免费观看在线网站| 人妻最新视频在线免费观看| 黄色中文字幕在线播放| 人妻丝袜诱惑我操她视频| 亚洲图片偷拍自拍区| 亚洲欧洲一区二区在线观看| 亚洲av人人澡人人爽人人爱| 农村胖女人操逼视频| 日日爽天天干夜夜操| 精品久久婷婷免费视频| AV无码一区二区三区不卡| 大香蕉伊人国产在线| 天天综合天天综合天天网| 在线观看免费av网址大全| 小穴多水久久精品免费看| 中文字幕奴隷色的舞台50| 国产精品久久久久久美女校花| 四川乱子伦视频国产vip| 91‖亚洲‖国产熟女| 久久久精品精品视频视频| 日本a级视频老女人| 日视频免费在线观看| 日韩中文字幕福利av| 亚洲另类综合一区小说| 国产精品久久9999| 91香蕉成人app下载| 在线视频免费观看网| 天天操,天天干,天天射| 亚洲综合另类精品小说| 午夜毛片不卡在线看| 激情啪啪啪啪一区二区三区 | 日日爽天天干夜夜操| 51国产偷自视频在线播放| 91久久精品色伊人6882| 日本av在线一区二区三区| 激情啪啪啪啪一区二区三区| 午夜成午夜成年片在线观看| 熟女妇女老妇一二三区| 蜜桃久久久久久久人妻| 亚洲最大黄了色网站| 亚洲午夜伦理视频在线| 国产chinesehd精品麻豆| 精品国产午夜视频一区二区| 91精品国产91青青碰| 成人激情文学网人妻| 国产精品久久综合久久| 91在线免费观看成人| 鸡巴操逼一级黄色气| 美女大bxxxx内射| 少妇高潮无套内谢麻豆| 在线免费观看日本伦理| 99热这里只有国产精品6| 久久久久久cao我的性感人妻| 国产精品中文av在线播放| 人妻另类专区欧美制服| 中国黄色av一级片| 成人亚洲精品国产精品| japanese日本熟妇另类| AV天堂一区二区免费试看| 91精品综合久久久久3d动漫| 一区二区三区四区中文| 日韩中文字幕福利av| av男人天堂狠狠干| v888av在线观看视频| 被大鸡吧操的好舒服视频免费| weyvv5国产成人精品的视频| 337p日本大胆欧美人| 一二三中文乱码亚洲乱码one| 亚洲欧美成人综合视频| 绝顶痉挛大潮喷高潮无码 | 亚洲欧美另类手机在线| 中文字幕欧美日韩射射一| 91国语爽死我了不卡| 欧美爆乳肉感大码在线观看| 国产综合高清在线观看| 日本人妻少妇18—xx| 国产三级片久久久久久久| 五月精品丁香久久久久福利社| 男人插女人视频网站| 国产成人小视频在线观看无遮挡| 久久精品国产亚洲精品166m| 只有精品亚洲视频在线观看| 深夜男人福利在线观看| 97人妻夜夜爽二区欧美极品| 国产亚洲精品品视频在线| 日本少妇在线视频大香蕉在线观看| 午夜频道成人在线91| 91色九色porny| 超pen在线观看视频公开97| 亚洲1卡2卡三卡4卡在线观看 | 亚洲嫩模一区二区三区| 亚洲特黄aaaa片| 班长撕开乳罩揉我胸好爽| 日本丰满熟妇大屁股久久| 亚洲第17页国产精品| 少妇与子乱在线观看| 中文字幕国产专区欧美激情| 国产精品久久久久网| 老司机福利精品视频在线| 国产妇女自拍区在线观看| 91高清成人在线视频| 青青社区2国产视频| 精品乱子伦一区二区三区免费播 | 国产在线免费观看成人| 又粗又硬又猛又黄免费30| 最新黄色av网站在线观看| 伊人网中文字幕在线视频| 黑人巨大的吊bdsm| 中文字幕高清在线免费播放| 中文字幕一区二区亚洲一区| 成人精品在线观看视频| 欧美视频不卡一区四区| av大全在线播放免费| av日韩在线观看大全| 欧美爆乳肉感大码在线观看| 老司机在线精品福利视频| 亚洲免费国产在线日韩| 极品粉嫩小泬白浆20p主播| 欧美韩国日本国产亚洲| 国产精品黄片免费在线观看| 中国熟女一区二区性xx| 亚洲福利精品视频在线免费观看| 亚洲狠狠婷婷综合久久app| 动漫黑丝美女的鸡巴| 少妇人妻100系列| 日本人妻欲求不满中文字幕| 欧美特级特黄a大片免费| 精品久久久久久久久久久久人妻| 韩国AV无码不卡在线播放| 大香蕉伊人中文字幕| 日韩欧美一级黄片亚洲| 白嫩白嫩美女极品国产在线观看| 日本熟妇喷水xxx| 日本黄色特一级视频| 日韩av大胆在线观看| 国产综合高清在线观看| 欧美亚洲自偷自拍 在线| 888欧美视频在线| 曰本无码人妻丰满熟妇啪啪| 天天操天天爽天天干| 91国内视频在线观看| 特一级特级黄色网片| 中文字幕av男人天堂| 视频在线亚洲一区二区| 少妇人妻真实精品视频| 欧美亚洲少妇福利视频| 亚洲成人黄色一区二区三区| 精品国产污污免费网站入口自| 国产极品精品免费视频| 不戴胸罩引我诱的隔壁的人妻| 日韩成人综艺在线播放| 都市家庭人妻激情自拍视频| 亚洲熟女女同志女同| 午夜国产福利在线观看| 亚洲精品一线二线在线观看| 色天天天天射天天舔| 大香蕉玖玖一区2区| 中文字幕av熟女人妻| 一本一本久久a久久精品综合不卡| aⅴ精产国品一二三产品| 中文字幕免费福利视频6| yy6080国产在线视频| 含骚鸡巴玩逼逼视频| 亚洲国产欧美国产综合在线| 中文字幕一区二 区二三区四区| 日本韩国免费福利精品| 天天干天天爱天天色| 超碰在线观看免费在线观看| 人妻av无码专区久久绿巨人 | 亚洲精品中文字幕下载| 亚洲狠狠婷婷综合久久app| 欧美日韩熟女一区二区三区| 国产黑丝高跟鞋视频在线播放 | 日本性感美女写真视频| 黑人巨大精品欧美视频| 成人色综合中文字幕| aⅴ精产国品一二三产品| 欧美aa一级一区三区四区| 亚洲欧美成人综合视频| 国产午夜男女爽爽爽爽爽视频| 青青草原色片网站在线观看| 日韩无码国产精品强奸乱伦| 大白屁股精品视频国产| 亚洲精品中文字幕下载| 免费观看理论片完整版| 天堂av中文在线最新版| 99热色原网这里只有精品| 久草极品美女视频在线观看| 特黄老太婆aa毛毛片| 国产成人小视频在线观看无遮挡| 国产成人一区二区三区电影网站 | h国产小视频福利在线观看| 亚洲成人精品女人久久久| 水蜜桃一区二区三区在线观看视频 | 亚洲精品乱码久久久本| asmr福利视频在线观看| 老熟妇xxxhd老熟女| 在线视频免费观看网| 91chinese在线视频| 成人伊人精品色xxxx视频| 嫩草aⅴ一区二区三区| 成人影片高清在线观看| 一色桃子人妻一区二区三区| 欧美特级特黄a大片免费| 亚洲免费成人a v| 国产美女精品福利在线| 人妻av无码专区久久绿巨人| 999九九久久久精品| 国产午夜亚洲精品麻豆| 日本在线一区二区不卡视频| 美女在线观看日本亚洲一区| 超级av免费观看一区二区三区| 亚洲粉嫩av一区二区三区| 天天日天天干天天插舔舔| 操操网操操伊剧情片中文字幕网| 天堂av狠狠操蜜桃| 日韩欧美一级黄片亚洲| 国产精品国产三级麻豆| 全国亚洲男人的天堂| 免费一级黄色av网站| 久久久精品精品视频视频| 黑人乱偷人妻中文字幕| 亚洲第一伊人天堂网| 久草电影免费在线观看| 喷水视频在线观看这里只有精品| 精品美女福利在线观看| 国产精品福利小视频a| 中国把吊插入阴蒂的视频| 在线视频这里只有精品自拍| 中文乱理伦片在线观看| 日韩中文字幕精品淫| 国产美女精品福利在线| 红杏久久av人妻一区| 夜色撩人久久7777| 久久久制服丝袜中文字幕| 色噜噜噜噜18禁止观看| 78色精品一区二区三区| 亚洲人妻国产精品综合| 在线免费观看av日韩| 国产使劲操在线播放| 在线免费视频 自拍| 在线国产精品一区二区三区| 岛国黄色大片在线观看| 亚洲av在线观看尤物| 成年人啪啪视频在线观看| 最新欧美一二三视频| 国产第一美女一区二区三区四区| 中文字幕日韩无敌亚洲精品| 1769国产精品视频免费观看| 久久亚洲天堂中文对白| 亚洲精品在线资源站| 91人妻精品一区二区久久| 香蕉aⅴ一区二区三区| 蜜桃臀av蜜桃臀av| 国产亚洲成人免费在线观看| 喷水视频在线观看这里只有精品| 日本午夜福利免费视频| 欧洲日韩亚洲一区二区三区| 国产无遮挡裸体免费直播视频| 日本三极片视频网站观看| 狠狠躁夜夜躁人人爽天天久天啪| 中国熟女@视频91| 91免费观看在线网站| 五月天久久激情视频| 综合国产成人在线观看| 91片黄在线观看喷潮| 中文字幕无码一区二区免费| 男人在床上插女人视频| 国产一级精品综合av| 热久久只有这里有精品| 93精品视频在线观看| 亚洲精品久久视频婷婷| av亚洲中文天堂字幕网| 性欧美激情久久久久久久| AV无码一区二区三区不卡| 在线播放国产黄色av| 午夜蜜桃一区二区三区| 国产高清女主播在线| 国产精彩福利精品视频| 亚洲国产欧美国产综合在线| 精彩视频99免费在线| 91亚洲国产成人精品性色| 伊人开心婷婷国产av| 无码中文字幕波多野不卡| 人妻少妇一区二区三区蜜桃| 香蕉91一区二区三区| 国产真实乱子伦a视频| 青青草成人福利电影| 在线播放国产黄色av| 国产视频在线视频播放| 欧亚乱色一区二区三区| 亚洲欧美国产麻豆综合| 人人超碰国字幕观看97| 91精品国产综合久久久蜜| 国产乱子伦一二三区| 中文字幕高清免费在线人妻| 精品一区二区三区在线观看| 大鸡吧插入女阴道黄色片| 青春草视频在线免费播放| aaa久久久久久久久| 在线观看视频网站麻豆| 国产欧美精品不卡在线| 大学生A级毛片免费视频| 一区二区三区激情在线| 中文字幕在线视频一区二区三区| 国产精品久久久久久美女校花| 日韩影片一区二区三区不卡免费| 韩国AV无码不卡在线播放| 国产V亚洲V天堂无码欠欠| 日韩欧美一级aa大片| 欧美 亚洲 另类综合| 亚洲av色香蕉一区二区三区 | 内射久久久久综合网| 国产亚洲精品欧洲在线观看| 亚洲最大免费在线观看| 天天操夜夜操天天操天天操| 极品粉嫩小泬白浆20p主播| 日本丰满熟妇大屁股久久| 99精品一区二区三区的区| 欧美日韩人妻久久精品高清国产 | 一区二区视频在线观看视频在线| 亚洲精品久久视频婷婷| 精品人妻伦一二三区久| 亚洲中文精品字幕在线观看| 国产精品视频男人的天堂| 一区二区三区蜜臀在线| 93精品视频在线观看| 五十路av熟女松本翔子| 亚洲精品久久视频婷婷| 大白屁股精品视频国产| 青草亚洲视频在线观看| 国产高潮无码喷水AV片在线观看| 97年大学生大白天操逼| 黑人3p华裔熟女普通话| 骚货自慰被发现爆操| 最近中文2019年在线看| 欧美偷拍自拍色图片| 国产剧情演绎系列丝袜高跟| 久久久久91精品推荐99| 亚洲 欧美 精品 激情 偷拍| 中文字幕AV在线免费看 | 熟女俱乐部一二三区| 操操网操操伊剧情片中文字幕网 | 亚洲熟女久久久36d| 国产又大又黄免费观看| 欧美日韩亚洲国产无线码| 国产丰满熟女成人视频| 在线免费观看亚洲精品电影| 久久麻豆亚洲精品av| 任我爽精品视频在线播放| 老司机99精品视频在线观看| 亚洲成人免费看电影| 视频一区 视频二区 视频| 直接能看的国产av| 这里只有精品双飞在线播放| 动漫av网站18禁| 偷拍自拍亚洲美腿丝袜| 免费福利av在线一区二区三区| 制服丝袜在线人妻中文字幕| 成人国产影院在线观看| 91精品国产观看免费| japanese日本熟妇另类| 国产精品欧美日韩区二区| 亚洲欧美清纯唯美另类| 四虎永久在线精品免费区二区| 日日操夜夜撸天天干| 绯色av蜜臀vs少妇| sw137 中文字幕 在线| 免费看美女脱光衣服的视频| 天天干天天爱天天色| 黑人变态深video特大巨大| 不卡精品视频在线观看| 久久久久久9999久久久久| 亚洲视频在线观看高清| 日本免费午夜视频网站| 亚洲精品久久视频婷婷| 国产激情av网站在线观看| 午夜久久久久久久99| av大全在线播放免费| 99热99这里精品6国产| 99精品国产免费久久| 少妇人妻100系列| 亚洲av无码成人精品区辽| 漂亮 人妻被中出中文| 绝顶痉挛大潮喷高潮无码 | 在线免费91激情四射 | 插逼视频双插洞国产操逼插洞| 日日夜夜大香蕉伊人| 免费在线播放a级片| 色偷偷伊人大杳蕉综合网| 性欧美激情久久久久久久| 国产成人自拍视频播放 | 亚洲av无乱一区二区三区性色| 国产内射中出在线观看| 99久久久无码国产精品性出奶水 | 91久久精品色伊人6882| 亚洲av在线观看尤物| 精品人妻一二三区久久| 亚洲区美熟妇久久久久| 在线免费观看靠比视频的网站 | AV天堂一区二区免费试看| 在线观看免费岛国av| 天天射夜夜操综合网| 老司机你懂得福利视频| 亚洲综合在线观看免费| 偷拍自拍 中文字幕| 偷拍自拍视频图片免费| 人人妻人人澡人人爽人人dvl| 无忧传媒在线观看视频| 亚洲成人熟妇一区二区三区| 亚洲一区二区三区偷拍女厕91| 中文字幕在线乱码一区二区| 2022中文字幕在线| 日本高清成人一区二区三区| 欧美精产国品一二三产品价格| 成人24小时免费视频| 在线免费观看国产精品黄色| 日韩伦理短片在线观看| 亚洲1069综合男同| aaa久久久久久久久| 婷婷久久久久深爱网| 早川濑里奈av黑人番号| nagger可以指黑人吗| 亚洲综合在线视频可播放| 巨乳人妻日下部加奈被邻居中出| 成人亚洲国产综合精品| 亚洲va天堂va国产va久| 99热这里只有国产精品6| 性色av一区二区三区久久久| 天天干天天操天天插天天日| 亚洲 中文 自拍 另类 欧美 | 亚洲午夜电影在线观看| 男生用鸡操女生视频动漫| 99的爱精品免费视频| 99热碰碰热精品a中文| 久久精品美女免费视频| 国产av自拍偷拍盛宴| 亚洲卡1卡2卡三卡四老狼| 又黄又刺激的午夜小视频| 又黄又刺激的午夜小视频| 日韩特级黄片高清在线看| 成人av久久精品一区二区| 青青青激情在线观看视频| 日本脱亚入欧是指什么| 55夜色66夜色国产精品站| 国产精品人妻熟女毛片av久| 天天色天天操天天舔| 欧美成人综合视频一区二区 | 欧美特级特黄a大片免费| 亚洲欧美成人综合视频| 成年女人免费播放视频| 99视频精品全部15| 三级av中文字幕在线观看| 99精品国产aⅴ在线观看| 九一传媒制片厂视频在线免费观看| 精品suv一区二区69| 亚洲国产成人在线一区| 国产精品成人xxxx| 国产福利小视频二区| 国产黑丝高跟鞋视频在线播放| 91超碰青青中文字幕| 国产真实灌醉下药美女av福利| 午夜成午夜成年片在线观看| 久久精品美女免费视频| 很黄很污很色的午夜网站在线观看| 亚洲av第国产精品| 亚洲av无乱一区二区三区性色| 久久精品国产999| 日韩av免费观看一区| 人人在线视频一区二区| 日本午夜爽爽爽爽爽视频在线观看| 狠狠躁夜夜躁人人爽天天久天啪| 日韩熟女av天堂系列| av在线免费中文字幕| 蜜桃专区一区二区在线观看| 91精品国产麻豆国产| 91精品视频在线观看免费| 欧美精品亚洲精品日韩在线| 欧美视频中文一区二区三区| 人人超碰国字幕观看97| 日韩欧美国产一区ab| 91麻豆精品传媒国产黄色片| 国产熟妇人妻ⅹxxxx麻豆| 精产国品久久一二三产区区别| 国产精品国产三级国产午| 亚洲国产欧美一区二区三区久久| 亚洲粉嫩av一区二区三区| 成人性黑人一级av| 东京热男人的av天堂| 3D动漫精品啪啪一区二区下载| 日韩三级黄色片网站| 亚洲一区二区三区uij| 欧美精品资源在线观看| 2021国产一区二区| 黄工厂精品视频在线观看| 亚洲av在线观看尤物| 久久午夜夜伦痒痒想咳嗽P| 在线制服丝袜中文字幕| 国产精品久久综合久久| 在线观看日韩激情视频| 最新欧美一二三视频 | 一个人免费在线观看ww视频| 亚洲欧洲av天堂综合| 国产精品亚洲а∨天堂免| 青青尤物在线观看视频网站| 这里有精品成人国产99| 少妇系列一区二区三区视频| 天天日天天干天天干天天日| 欧美在线偷拍视频免费看| 国产第一美女一区二区三区四区| 欧美一区二区三区在线资源| 少妇人妻久久久久视频黄片| 欧美伊人久久大香线蕉综合| 国产丰满熟女成人视频| 国产在线免费观看成人| 欧美黑人与人妻精品| 岛国黄色大片在线观看| 国产妇女自拍区在线观看| 精品一区二区三区午夜| 国产精品久久久久网| 久久这里只有精彩视频免费| 都市家庭人妻激情自拍视频| 亚洲一区二区三区久久午夜| 久久久久久久久久久久久97| 丝袜美腿欧美另类 中文字幕| 大肉大捧一进一出好爽在线视频| 99精品亚洲av无码国产另类| 强行扒开双腿猛烈进入免费版 | 国产乱子伦一二三区| 中文字幕在线永久免费播放| 亚洲综合在线观看免费| 亚洲精品国产综合久久久久久久久| 亚洲欧美另类自拍偷拍色图| 久久久精品精品视频视频| 三级av中文字幕在线观看| 十八禁在线观看地址免费| 久草电影免费在线观看| 免费费一级特黄真人片 | 国产在线一区二区三区麻酥酥 | 人妻少妇av在线观看| 国产av福利网址大全| 99久久久无码国产精品性出奶水 | 精品av久久久久久久| 97人人妻人人澡人人爽人人精品| 一区二区三区久久久91| 91精品视频在线观看免费| 国产高潮无码喷水AV片在线观看| 国产精品系列在线观看一区二区| 美女大bxxxx内射| 中国视频一区二区三区| 亚洲高清免费在线观看视频| 亚洲精品一区二区三区老狼| 天天通天天透天天插| 天干天天天色天天日天天射| 亚洲欧美色一区二区| 91精品国产综合久久久蜜 | 天天干天天操天天摸天天射| 中文字幕 亚洲av| 一区二区三区视频,福利一区二区| huangse网站在线观看| 亚洲图库另类图片区| 五月激情婷婷久久综合网| 日韩成人免费电影二区| 亚洲一区二区三区精品视频在线| 天天干天天操天天摸天天射| 人妻无码中文字幕专区| 红杏久久av人妻一区| 中国产一级黄片免费视频播放| 99久久99一区二区三区| 欧美男人大鸡吧插女人视频| 国产又粗又猛又爽又黄的视频美国| 免费观看国产综合视频| 亚洲视频乱码在线观看| 又色又爽又黄的美女裸体| 日韩特级黄片高清在线看| 久草视频在线看免费| 香港一级特黄大片在线播放| 免费无码人妻日韩精品一区二区 | 韩国女主播精品视频网站| 中国视频一区二区三区| aⅴ五十路av熟女中出| 精品国产高潮中文字幕| 午夜久久香蕉电影网| 好男人视频在线免费观看网站| 岛国黄色大片在线观看| 精品一区二区三区午夜| 成人蜜桃美臀九一一区二区三区 | 久久热久久视频在线观看| 国产97在线视频观看| 亚洲天堂成人在线观看视频网站| 国产在线自在拍91国语自产精品| 国产又色又刺激在线视频| 久草极品美女视频在线观看| 国产janese在线播放| 老司机深夜免费福利视频在线观看| 骚货自慰被发现爆操| www天堂在线久久| 亚洲中文字幕校园春色| 中文字幕高清免费在线人妻| 一区二区视频在线观看视频在线| 9国产精品久久久久老师| 亚洲午夜电影之麻豆| 日本18禁久久久久久| 精品视频国产在线观看| 一区二区三区麻豆福利视频| 亚洲精品午夜久久久久| 欧美国产亚洲中英文字幕| 懂色av之国产精品| 国产真实乱子伦a视频| 亚洲码av无色中文| 在线国产中文字幕视频| 亚洲熟妇久久无码精品| 青娱乐蜜桃臀av色| 天天操天天污天天射| 青春草视频在线免费播放| 日本性感美女写真视频| 午夜精品一区二区三区更新| 偷拍自拍福利视频在线观看| 日本美女成人在线视频| 亚洲av无乱一区二区三区性色| 久久精品久久精品亚洲人| 蜜臀av久久久久久久| 亚洲在线免费h观看网站| 区一区二区三国产中文字幕| 男女啪啪视频免费在线观看| 亚洲护士一区二区三区| 亚洲区欧美区另类最新章节| 同居了嫂子在线播高清中文| 久久久久久久久久一区二区三区| 欧美女同性恋免费a| 最新91九色国产在线观看| 51国产偷自视频在线播放| 欧美日韩熟女一区二区三区| 欧美成人小视频在线免费看| 久久午夜夜伦痒痒想咳嗽P| 欧美综合婷婷欧美综合| 久久综合老鸭窝色综合久久| 乱亲女秽乱长久久久| 色综合天天综合网国产成人| 亚洲狠狠婷婷综合久久app| 欧美成人综合视频一区二区| 成人区人妻精品一区二视频| 国产+亚洲+欧美+另类| 欧美一级色视频美日韩| 亚洲粉嫩av一区二区三区| av天堂资源最新版在线看| 亚洲人妻av毛片在线| 中文乱理伦片在线观看| 91精品激情五月婷婷在线| 免费观看成年人视频在线观看| 成年人免费看在线视频| 日本在线一区二区不卡视频| 91精品国产91青青碰| 青青社区2国产视频| 极品粉嫩小泬白浆20p主播| 天天射夜夜操综合网| jul—619中文字幕在线| 欧美日本在线视频一区| 天天干夜夜操啊啊啊| 天堂av狠狠操蜜桃| 福利视频一区二区三区筱慧| 国产综合视频在线看片| 亚洲av色图18p| 亚洲午夜电影在线观看| 欧美黑人巨大性xxxxx猛交| 天天日天天干天天搡| 青青青青操在线观看免费| 日本高清在线不卡一区二区 | 97人妻无码AV碰碰视频| 免费人成黄页网站在线观看国产| 天天日天天爽天天爽| 特大黑人巨大xxxx| 日韩亚洲高清在线观看| 在线视频国产欧美日韩| 91‖亚洲‖国产熟女| 国产aⅴ一线在线观看| 大香蕉大香蕉在线看| 人人爱人人妻人人澡39| 亚洲图片偷拍自拍区| 97国产精品97久久| 天天操天天干天天日狠狠插| 亚洲国产欧美一区二区三区…| 十八禁在线观看地址免费| av线天堂在线观看| 国产中文精品在线观看| 日本少妇精品免费视频| 亚洲欧美激情中文字幕| 中文字幕日韩精品就在这里| 4个黑人操素人视频网站精品91| 国产精品大陆在线2019不卡| 综合色区亚洲熟妇shxstz| 美女福利视频网址导航| 国产麻豆剧果冻传媒app| 欧美色呦呦最新网址| 精品欧美一区二区vr在线观看| 98视频精品在线观看| 亚洲 欧美 精品 激情 偷拍| 丁香花免费在线观看中文字幕| 极品性荡少妇一区二区色欲| 自拍偷拍一区二区三区图片 | 天天干天天操天天插天天日| 伊人开心婷婷国产av| 国产精品视频一区在线播放| 91麻豆精品久久久久| 日本av高清免费网站| av线天堂在线观看| 欧美国品一二三产区区别| 超污视频在线观看污污污| 在线国产日韩欧美视频| 免费在线黄色观看网站| 亚洲高清自偷揄拍自拍| 99精品国产免费久久| 国产一区自拍黄视频免费观看| 自拍偷拍亚洲另类色图| 综合国产成人在线观看| 2012中文字幕在线高清| 一区二区在线观看少妇| 日本一区美女福利视频| 天码人妻一区二区三区在线看| 国产高清精品极品美女| 欧美偷拍自拍色图片| 特大黑人巨大xxxx| 中文字幕一区二区人妻电影冢本| 亚洲Av无码国产综合色区| 2020av天堂网在线观看| 亚洲精品久久综合久| 9久在线视频只有精品| 青娱乐极品视频青青草| av老司机精品在线观看| 免费黄色成人午夜在线网站| 久久久久久久久久性潮| 国产午夜亚洲精品不卡在线观看|