Partition Management using go_partman
go_partman is a Go native implementation of PostgreSQL table partitioning management, inspired by pg_partman. It automatically manages and maintains partitioned tables in PostgreSQL databases by providing the following features:
- Pre-creation of future partitions
- Support for time-based range partitioning
- Configurable tenant-specific retention policies
- Automatic cleanup of old partitions
Installation and Usage
To get started, we first need to install it
1 | go get github.com/jirevwe/go_partman |
Table Requirements
Your Postgres tables must be created as a partitioned table before using go_partman. Examples:
-- Single-tenant table
CREATE TABLE events (
id VARCHAR NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
data JSONB,
PRIMARY KEY (id, created_at)
) PARTITION BY RANGE (created_at);
-- Multi-tenant table
CREATE TABLE events (
id VARCHAR NOT NULL,
project_id VARCHAR NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
data JSONB,
PRIMARY KEY (id, created_at, project_id)
) PARTITION BY RANGE (project_id, created_at);
Sample code
package main
import (
"context"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/pgx/v5/stdlib"
"github.com/jirevwe/go_partman"
"github.com/jmoiron/sqlx"
"time"
)
func main() {
logger := partman.NewSlogLogger()
pgxCfg, err := pgxpool.ParseConfig("postgres://postgres:postgres@localhost:5432/test?sslmode=disable")
if err != nil {
logger.Fatal(err)
}
pool, err := pgxpool.NewWithConfig(context.Background(), pgxCfg)
if err != nil {
logger.Fatal(err)
}
sqlDB := stdlib.OpenDBFromPool(pool)
db := sqlx.NewDb(sqlDB, "pgx")
r, err := NewRetentionPolicy(db, logger, time.Minute)
if err != nil {
logger.Fatal(err)
}
r.Start(context.Background(), time.Minute)
// start your server
time.Sleep(30 * time.Second)
}
type Retentioner interface {
Perform(context.Context) error
Start(context.Context, time.Duration)
}
type RetentionPolicy struct {
retentionPeriod time.Duration
partitioner partman.Partitioner
logger *partman.SlogLogger
db *sqlx.DB
}
func NewRetentionPolicy(db *sqlx.DB, logger *partman.SlogLogger, period time.Duration) (*RetentionPolicy, error) {
pm, err := partman.NewManager(
partman.WithDB(db),
partman.WithLogger(logger),
partman.WithConfig(&partman.Config{SampleRate: time.Second}),
partman.WithClock(partman.NewRealClock()),
)
if err != nil {
return nil, err
}
return &RetentionPolicy{
retentionPeriod: period,
partitioner: pm,
logger: logger,
db: db,
}, nil
}
func (r *RetentionPolicy) Start(ctx context.Context, sampleRate time.Duration) {
go func(r *RetentionPolicy) {
ticker := time.NewTicker(sampleRate)
defer ticker.Stop()
// fetch existing partitions on startup,
// this is useful for one time setups,
// but I'll leave it in since it'll no-op after the first time
err := r.partitioner.ImportExistingPartitions(ctx, partman.Table{
Schema: "convoy",
TenantIdColumn: "project_id",
PartitionBy: "created_at",
PartitionType: partman.TypeRange,
RetentionPeriod: r.retentionPeriod,
PartitionInterval: time.Hour * 24,
PartitionCount: 10,
})
if err != nil {
r.logger.Errorf("failed to import existing partitions: %v", err)
}
projectRepo := postgres.NewProjectRepo(r.db)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// watches for newly added tables and automatically adds them
projects, pErr := projectRepo.LoadProjects(context.Background())
if pErr != nil {
r.logger.WithError(pErr).Error("failed to load projects")
}
for _, project := range projects {
err = r.partitioner.AddManagedTable(partman.Table{
Name: "events",
Schema: "convoy",
TenantId: project.UID,
TenantIdColumn: "project_id",
PartitionBy: "created_at",
PartitionType: partman.TypeRange,
RetentionPeriod: r.retentionPeriod,
PartitionInterval: time.Hour * 24,
PartitionCount: 10,
})
if err != nil {
r.logger.WithError(err).Error("failed to add convoy.events to managed tables")
}
err = r.partitioner.AddManagedTable(partman.Table{
Name: "event_deliveries",
Schema: "convoy",
TenantId: project.UID,
TenantIdColumn: "project_id",
PartitionBy: "created_at",
PartitionType: partman.TypeRange,
RetentionPeriod: r.retentionPeriod,
PartitionInterval: time.Hour * 24,
PartitionCount: 10,
})
if err != nil {
r.logger.WithError(err).Error("failed to add convoy.event_deliveries to managed tables")
}
err = r.partitioner.AddManagedTable(partman.Table{
Name: "delivery_attempts",
Schema: "convoy",
TenantId: project.UID,
TenantIdColumn: "project_id",
PartitionBy: "created_at",
PartitionType: partman.TypeRange,
RetentionPeriod: r.retentionPeriod,
PartitionInterval: time.Hour * 24,
PartitionCount: 10,
})
if err != nil {
r.logger.WithError(err).Error("failed to add convoy.delivery_attempts to managed tables")
}
}
}
}
}(r)
}
func (r *RetentionPolicy) Perform(ctx context.Context) error {
return r.partitioner.Maintain(ctx)
}