数据库分库分表与数据迁移实战引言随着业务规模增长单一数据库实例往往难以承受高并发、大数据量的压力。分库分表成为解决这类问题的常用方案但同时也带来了复杂性。本文将深入探讨分库分表的各种策略以及数据迁移的最佳实践。一、分库分表策略1.1 垂直拆分垂直拆分按照业务将表或库拆分到不同的实例┌─────────────────┐ │ 原始数据库 │ ├─────────────────┤ │ users表 │ │ orders表 │ │ products表 │ │ logs表 │ └─────────────────┘ ↓ 垂直拆分 ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ 用户库 │ │ 订单库 │ │ 产品库 │ ├─────────────────┤ ├─────────────────┤ ├─────────────────┤ │ users表 │ │ orders表 │ │ products表 │ └─────────────────┘ └─────────────────┘ └─────────────────┘package sharding import ( context database/sql fmt ) type VerticalShardingManager struct { shards map[string]*sql.DB } func NewVerticalShardingManager() *VerticalShardingManager { return VerticalShardingManager{ shards: make(map[string]*sql.DB), } } func (vsm *VerticalShardingManager) RegisterShard(name string, db *sql.DB) { vsm.shards[name] db } func (vsm *VerticalShardingManager) GetShard(name string) (*sql.DB, error) { db, ok : vsm.shards[name] if !ok { return nil, fmt.Errorf(shard not found: %s, name) } return db, nil } type UserRepository struct { db *sql.DB } func NewUserRepository(db *sql.DB) *UserRepository { return UserRepository{db: db} } type OrderRepository struct { db *sql.DB } func NewOrderRepository(db *sql.DB) *OrderRepository { return OrderRepository{db: db} }1.2 水平拆分水平拆分按照数据分布策略将同一表的数据拆分到多个库或表中package sharding import ( context fmt hash/fnv strings ) type ShardingStrategy interface { GetShardKey(record interface{}) string GetShardIndex(shardKey string, shardCount int) int } type HashSharding struct { shardCount int } func NewHashSharding(shardCount int) *HashSharding { return HashSharding{shardCount: shardCount} } func (h *HashSharding) GetShardKey(record interface{}) string { return fmt.Sprintf(%v, record) } func (h *HashSharding) GetShardIndex(shardKey string, shardCount int) int { hsh : fnv.New32a() hsh.Write([]byte(shardKey)) return int(hsh.Sum32()) % shardCount } type RangeSharding struct { ranges []int64 } func NewRangeSharding(ranges []int64) *RangeSharding { return RangeSharding{ranges: ranges} } func (r *RangeSharding) GetShardKey(record interface{}) string { return fmt.Sprintf(%v, record) } func (r *RangeSharding) GetShardIndex(shardKey string, shardCount int) int { var key int64 fmt.Sscanf(shardKey, %d, key) for i, bound : range r.ranges { if key bound { return i } } return len(r.ranges) } type ShardRouter struct { shards []*ShardConfig strategy ShardingStrategy } type ShardConfig struct { Name string DSN string } func NewShardRouter(shards []ShardConfig, strategy ShardingStrategy) (*ShardRouter, error) { router : ShardRouter{ shards: make([]*ShardConfig, len(shards)), strategy: strategy, } for i, shard : range shards { router.shards[i] shard } return router, nil } func (sr *ShardRouter) GetShard(shardKey string) *ShardConfig { index : sr.strategy.GetShardIndex(shardKey, len(sr.shards)) return sr.shards[index] } func (sr *ShardRouter) GetAllShards() []*ShardConfig { return sr.shards } func (sr *ShardRouter) GetShardByName(name string) *ShardConfig { for _, shard : range sr.shards { if shard.Name name { return shard } } return nil }二、分布式ID生成2.1 Snowflake算法实现package idgen import ( errors sync time ) var ( ErrTimeBackwards errors.New(time has gone backwards) ErrIDOverflow errors.New(ID overflow) ) type Snowflake struct { mu sync.Mutex lastTimestamp int64 sequence int64 epoch int64 nodeID int64 nodeIDBits int64 sequenceBits int64 nodeIDShift int64 timestampShift int64 sequenceMask int64 maxNodeID int64 } func NewSnowflake(nodeID int64, epoch int64) (*Snowflake, error) { nodeIDBits : int64(10) sequenceBits : int64(12) maxNodeID : int64(-1) ^ (int64(-1) nodeIDBits) if nodeID 0 || nodeID maxNodeID { return nil, errors.New(nodeID must be between 0 and string(rune(maxNodeID))) } sequenceMask : int64(-1) ^ (int64(-1) sequenceBits) return Snowflake{ epoch: epoch, nodeID: nodeID, nodeIDBits: nodeIDBits, sequenceBits: sequenceBits, nodeIDShift: sequenceBits, timestampShift: sequenceBits nodeIDBits, sequenceMask: sequenceMask, maxNodeID: maxNodeID, lastTimestamp: -1, sequence: 0, }, nil } func (s *Snowflake) Generate() (int64, error) { s.mu.Lock() defer s.mu.Unlock() timestamp : time.Now().UnixMilli() - s.epoch if timestamp s.lastTimestamp { return 0, ErrTimeBackwards } if timestamp s.lastTimestamp { s.sequence (s.sequence 1) s.sequenceMask if s.sequence 0 { timestamp s.waitNextMillis(timestamp) } } else { s.sequence 0 } s.lastTimestamp timestamp id : (timestamp s.timestampShift) | (s.nodeID s.nodeIDShift) | s.sequence if id 0 { return 0, ErrIDOverflow } return id, nil } func (s *Snowflake) waitNextMillis(currentTimestamp int64) int64 { for currentTimestamp s.lastTimestamp { currentTimestamp time.Now().UnixMilli() - s.epoch } return currentTimestamp } func (s *Snowflake) Parse(id int64) (timestamp int64, nodeID int64, sequence int64) { timestamp (id s.timestampShift) s.epoch nodeID (id s.nodeIDShift) s.maxNodeID sequence id s.sequenceMask return }2.2 批量ID生成package idgen import ( context sync ) type IDBatchGenerator struct { snowflake *Snowflake buffer chan int64 batchSize int ctx context.Context cancel context.CancelFunc wg sync.WaitGroup } func NewIDBatchGenerator(nodeID int64, epoch int64, batchSize int) (*IDBatchGenerator, error) { sf, err : NewSnowflake(nodeID, epoch) if err ! nil { return nil, err } ctx, cancel : context.WithCancel(context.Background()) return IDBatchGenerator{ snowflake: sf, buffer: make(chan int64, batchSize*2), batchSize: batchSize, ctx: ctx, cancel: cancel, }, nil } func (g *IDBatchGenerator) Start() { g.wg.Add(1) go g.generateLoop() } func (g *IDBatchGenerator) generateLoop() { defer g.wg.Done() ticker : time.NewTicker(100 * time.Millisecond) defer ticker.Stop() for { select { case -g.ctx.Done(): return case -ticker.C: if len(g.buffer) g.batchSize { for i : 0; i g.batchSize; i { id, err : g.snowflake.Generate() if err ! nil { continue } select { case g.buffer - id: default: } } } } } } func (g *IDBatchGenerator) GenerateBatch(ctx context.Context, count int) ([]int64, error) { ids : make([]int64, 0, count) for len(ids) count { select { case -ctx.Done(): return ids, ctx.Err() case id : -g.buffer: ids append(ids, id) default: id, err : g.snowflake.Generate() if err ! nil { continue } ids append(ids, id) } } return ids, nil } func (g *IDBatchGenerator) Stop() { g.cancel() g.wg.Wait() close(g.buffer) }三、数据迁移策略3.1 双写迁移package migration import ( context fmt sync time ) type DualWriter struct { sourceDB *DB targetDB *DB mode WriteMode mu sync.RWMutex } type WriteMode int const ( WriteModeSource WriteMode iota WriteModeTarget WriteModeBoth ) func NewDualWriter(source, target *DB, mode WriteMode) *DualWriter { return DualWriter{ sourceDB: source, targetDB: target, mode: mode, } } func (dw *DualWriter) SetMode(mode WriteMode) { dw.mu.Lock() defer dw.mu.Unlock() dw.mode mode } func (dw *DualWriter) GetMode() WriteMode { dw.mu.RLock() defer dw.mu.RUnlock() return dw.mode } func (dw *DualWriter) Insert(ctx context.Context, table string, data map[string]interface{}) error { dw.mu.RLock() mode : dw.mode dw.mu.RUnlock() switch mode { case WriteModeSource: return dw.sourceDB.Insert(ctx, table, data) case WriteModeTarget: return dw.targetDB.Insert(ctx, table, data) case WriteModeBoth: if err : dw.sourceDB.Insert(ctx, table, data); err ! nil { return err } return dw.targetDB.Insert(ctx, table, data) } return nil } func (dw *DualWriter) Update(ctx context.Context, table string, id interface{}, data map[string]interface{}) error { dw.mu.RLock() mode : dw.mode dw.mu.RUnlock() switch mode { case WriteModeSource: return dw.sourceDB.Update(ctx, table, id, data) case WriteModeTarget: return dw.targetDB.Update(ctx, table, id, data) case WriteModeBoth: if err : dw.sourceDB.Update(ctx, table, id, data); err ! nil { return err } return dw.targetDB.Update(ctx, table, id, data) } return nil } func (dw *DualWriter) Delete(ctx context.Context, table string, id interface{}) error { dw.mu.RLock() mode : dw.mode dw.mu.RUnlock() switch mode { case WriteModeSource: return dw.sourceDB.Delete(ctx, table, id) case WriteModeTarget: return dw.targetDB.Delete(ctx, table, id) case WriteModeBoth: if err : dw.sourceDB.Delete(ctx, table, id); err ! nil { return err } return dw.targetDB.Delete(ctx, table, id) } return nil } type DB struct { Name string } func (db *DB) Insert(ctx context.Context, table string, data map[string]interface{}) error { return fmt.Errorf(not implemented) } func (db *DB) Update(ctx context.Context, table string, id interface{}, data map[string]interface{}) error { return fmt.Errorf(not implemented) } func (db *DB) Delete(ctx context.Context, table string, id interface{}) error { return fmt.Errorf(not implemented) }3.2 数据同步工具package migration import ( context fmt sync time ) type DataSync struct { sourceDB *DB targetDB *DB batchSize int parallelism int progress *SyncProgress } type SyncProgress struct { mu sync.RWMutex TotalRecords int64 SyncedRecords int64 FailedRecords int64 StartTime time.Time Errors []SyncError } type SyncError struct { RecordID interface{} Error error Time time.Time } func NewDataSync(source, target *DB, batchSize, parallelism int) *DataSync { return DataSync{ sourceDB: source, targetDB: target, batchSize: batchSize, parallelism: parallelism, progress: SyncProgress{ StartTime: time.Now(), Errors: make([]SyncError, 0), }, } } func (ds *DataSync) GetProgress() *SyncProgress { ds.progress.mu.RLock() defer ds.progress.mu.RUnlock() return ds.progress } func (ds *DataSync) SyncTable(ctx context.Context, table string, whereClause string) error { var lastID interface{} batchNum : 0 for { select { case -ctx.Done(): return ctx.Err() default: } records, err : ds.sourceDB.FetchBatch(ctx, table, whereClause, lastID, ds.batchSize) if err ! nil { return fmt.Errorf(failed to fetch batch %d: %w, batchNum, err) } if len(records) 0 { break } if err : ds.targetDB.InsertBatch(ctx, table, records); err ! nil { for _, record : range records { if err : ds.targetDB.Insert(ctx, table, record); err ! nil { ds.recordError(record[id], err) } } } lastID records[len(records)-1][id] ds.progress.mu.Lock() ds.progress.SyncedRecords int64(len(records)) ds.progress.mu.Unlock() batchNum if len(records) ds.batchSize { break } } return nil } func (ds *DataSync) recordError(recordID interface{}, err error) { ds.progress.mu.Lock() defer ds.progress.mu.Unlock() ds.progress.FailedRecords ds.progress.Errors append(ds.progress.Errors, SyncError{ RecordID: recordID, Error: err, Time: time.Now(), }) if len(ds.progress.Errors) 100 { ds.progress.Errors ds.progress.Errors[1:] } } func (p *SyncProgress) GetProgressPercent() float64 { p.mu.RLock() defer p.mu.RUnlock() if p.TotalRecords 0 { return 0 } return float64(p.SyncedRecords) / float64(p.TotalRecords) * 100 } func (p *SyncProgress) GetETA() time.Duration { p.mu.RLock() defer p.mu.RUnlock() if p.SyncedRecords 0 { return 0 } elapsed : time.Since(p.StartTime) rate : float64(p.SyncedRecords) / elapsed.Seconds() remaining : float64(p.TotalRecords - p.SyncedRecords) return time.Duration(remaining/rate) * time.Second }3.3 一致性校验package migration import ( context fmt ) type DataVerifier struct { sourceDB *DB targetDB *DB } func NewDataVerifier(source, target *DB) *DataVerifier { return DataVerifier{ sourceDB: source, targetDB: target, } } type VerificationResult struct { Table string Status VerificationStatus SourceCount int64 TargetCount int64 MatchedRows int64 MissingRows int64 MismatchRows int64 Errors []error } type VerificationStatus string const ( StatusPassed VerificationStatus passed StatusFailed VerificationStatus failed StatusPartial VerificationStatus partial ) func (v *DataVerifier) VerifyTable(ctx context.Context, table string, primaryKey string) (*VerificationResult, error) { result : VerificationResult{ Table: table, Status: StatusPassed, Errors: make([]error, 0), } sourceCount, err : v.sourceDB.Count(ctx, table) if err ! nil { return nil, fmt.Errorf(failed to count source: %w, err) } result.SourceCount sourceCount targetCount, err : v.targetDB.Count(ctx, table) if err ! nil { return nil, fmt.Errorf(failed to count target: %w, err) } result.TargetCount targetCount if sourceCount ! targetCount { result.Status StatusFailed result.MissingRows sourceCount - targetCount } matched, err : v.verifyDataMatch(ctx, table, primaryKey) if err ! nil { result.Errors append(result.Errors, err) } result.MatchedRows matched if result.MatchedRows ! sourceCount { result.Status StatusFailed result.MismatchRows sourceCount - result.MatchedRows } return result, nil } func (v *DataVerifier) verifyDataMatch(ctx context.Context, table, primaryKey string) (int64, error) { sourceRows, err : v.sourceDB.FetchAll(ctx, table) if err ! nil { return 0, err } var matched int64 for _, row : range sourceRows { targetRow, err : v.targetDB.FetchByID(ctx, table, row[primaryKey]) if err ! nil { continue } if v.rowsMatch(row, targetRow) { matched } } return matched, nil } func (v *DataVerifier) rowsMatch(source, target map[string]interface{}) bool { if len(source) ! len(target) { return false } for key, sourceVal : range source { if targetVal, ok : target[key]; ok { if sourceVal ! targetVal { return false } } else { return false } } return true } func (v *DataVerifier) QuickCheck(ctx context.Context, table string, sampleSize int) (bool, error) { sourceSample, err : v.sourceDB.FetchSample(ctx, table, sampleSize) if err ! nil { return false, err } for _, row : range sourceSample { exists, err : v.targetDB.Exists(ctx, table, row) if err ! nil { return false, err } if !exists { return false, nil } } return true, nil }四、路由中间件实现4.1 分片路由中间件package middleware import ( context fmt strings ) type ShardRouterMiddleware struct { router *ShardRouter } func NewShardRouterMiddleware(router *ShardRouter) *ShardRouterMiddleware { return ShardRouterMiddleware{router: router} } type ShardContextKey string const ShardContextKey ShardContextKey shard func (m *ShardRouterMiddleware) SelectShard(ctx context.Context, table string, shardKey interface{}) (context.Context, error) { key : fmt.Sprintf(%s:%v, table, shardKey) shard : m.router.GetShard(fmt.Sprintf(%v, key)) if shard nil { return nil, fmt.Errorf(no shard found for key: %s, key) } return context.WithValue(ctx, ShardContextKey, shard), nil } func (m *ShardRouterMiddleware) ExtractShard(ctx context.Context) (*ShardConfig, error) { shard, ok : ctx.Value(ShardContextKey).(*ShardConfig) if !ok { return nil, fmt.Errorf(shard not found in context) } return shard, nil } type QueryTemplate struct { SQL string ShardKey string TargetTable string } func (m *ShardRouterMiddleware) BuildShardQuery(template *QueryTemplate, shardKey interface{}) (string, error) { shard : m.router.GetShard(fmt.Sprintf(%v, shardKey)) if shard nil { return , fmt.Errorf(no shard found for key: %v, shardKey) } query : strings.Replace(template.SQL, {{table}}, template.TargetTable, 1) query strings.Replace(query, {{shard_table}}, fmt.Sprintf(%s_%v, template.TargetTable, shard.Name), 1) return query, nil }五、总结分库分表是应对大数据量、高并发的有效手段分片策略选择哈希分片数据分布均匀适合随机访问范围分片适合按时间顺序访问但可能产生热点目录分片灵活但需要额外的查找开销ID生成方案Snowflake高性能、有序UUID完全独立但无序数据库自增简单但分布式下有瓶颈数据迁移流程双写方案业务切换平滑但需要处理数据一致性增量同步使用CDC或日志同步一致性校验确保数据完整注意事项跨分片查询代价高尽量避免事务边界需要重新设计索引维护更加复杂合理的分库分表设计能够显著提升系统容量和性能。