Skip to main content

Overview

Flexprice is a comprehensive usage-based billing and subscription management platform built in Go. The system handles the complete billing lifecycle from event ingestion through invoice generation and payment processing, designed for B2B SaaS companies requiring flexible pricing models.

High-Level Architecture

Flexprice implements a three-tier layered architecture with clear separation of concerns, bootstrapped using Uber FX dependency injection framework:

Architecture Layers

Technology Stack

Core Technologies

Flexprice uses a modern Go stack with specialized components: Key Dependencies:
  • Gin - HTTP routing and middleware
  • Ent - Type-safe ORM for PostgreSQL
  • Uber FX - Dependency injection framework
  • ClickHouse - Columnar database for analytics
  • Kafka (Sarama) - Event streaming platform
  • Temporal - Workflow orchestration engine
  • Stripe - Payment processing integration

Dual-Database Strategy

Flexprice employs a specialized database strategy for different workloads:

PostgreSQL (Transactional Data)

  • Purpose: ACID transactions, relational data
  • Data: Subscriptions, invoices, customers, plans, prices, wallets
  • Access Pattern: Read-write, complex joins, referential integrity
  • ORM: Ent for type-safe queries

ClickHouse (Analytical Data)

  • Purpose: High-volume inserts, aggregation queries
  • Data: Events (raw), events_lazy, processed_events, system_events
  • Access Pattern: Append-only writes, time-series queries
  • Tables: Optimized for usage metering and analytics

Event Processing Pipeline

The core of Flexprice’s usage-based billing is an asynchronous event processing pipeline powered by Kafka:

Pipeline Stages

Deployment Modes

Flexprice supports multiple deployment configurations to separate concerns and scale independently:

Available Modes

ModeComponentsPurpose
localAPI Server + Router (with event consumption handlers) + Temporal WorkerFull development environment with all services enabled
apiAPI Server + Router (event consumption disabled)Handle HTTP requests with routing but no event processing
consumerRouter only (with event consumption handlers)Dedicated event processing without HTTP API
temporal_workerTemporal workers onlyBackground workflows and scheduled tasks execution

Mode Implementation Details

  • local: Starts API server, registers all router handlers with event consumption enabled (true), starts router, and starts Temporal worker
  • api: Starts API server, registers router handlers with event consumption disabled (false), starts router only
  • consumer: Requires Kafka consumer, registers all router handlers with event consumption enabled (true), starts router without API server
  • temporal_worker: Starts only Temporal worker service, no API server or router

Service Layer Architecture

The service layer implements business logic through 30+ domain services organized by domain:

Core Services

  • Billing Service: Handles invoice generation, usage calculations, and billing cycles
  • Event Service: Processes incoming usage events and manages event lifecycle
  • Subscription Service: Manages customer subscriptions, upgrades, and downgrades
  • Customer Service: Customer management and tenant operations
  • Payment Service: Payment processing and gateway integrations
  • Plan Service: Pricing plan management and feature configuration
  • Wallet Service: Prepaid credits and wallet balance management

Service Factory Pattern

All services are instantiated through factory functions that receive a centralized ServiceParams struct containing shared dependencies:
type ServiceParams struct {
    // Individual Repository Interfaces
    AuthRepo                     auth.Repository
	UserRepo                     user.Repository
	EventRepo                    events.Repository
	ProcessedEventRepo           events.ProcessedEventRepository
    // ... 40+ more domain specific repositories

    // Publishers
	EventPublisher   publisher.EventPublisher
	WebhookPublisher webhookPublisher.WebhookPublisher

    // Services
    Logger       *logger.Logger
	Config       *config.Configuration
	DB           postgres.IClient
	PDFGenerator pdf.Generator
	S3           s3.Service

    // http client
	Client httpclient.Client

	// Proration
	ProrationCalculator proration.Calculator

	// Integration Factory
	IntegrationFactory *integration.Factory
}

API Layer & Routing

The API layer uses Gin framework with a comprehensive middleware stack for security, logging, and request processing:

Middleware Stack

  1. Recovery Middleware: Panic recovery and graceful error handling
  2. CORS Middleware: Cross-origin resource sharing configuration
  3. Rate Limiting: Request throttling per API key/tenant
  4. Authentication: JWT and API key validation
  5. Tenant Isolation: Multi-tenant context enforcement
  6. Request Logging: Structured logging for all requests
  7. Metrics Collection: Prometheus metrics for observability

Authentication & Authorization

Flexprice supports multiple authentication methods:
  • API Keys: Tenant-scoped keys for programmatic access
  • JWT Tokens: User session tokens with role-based permissions
  • Webhook Signatures: HMAC verification for incoming webhooks

API Endpoints Structure

/v1
├── /auth                    # Authentication endpoints
├── /customers              # Customer management
├── /plans                  # Pricing plans and features
├── /subscriptions          # Subscription lifecycle
├── /events                 # Event ingestion
├── /invoices              # Invoice generation and retrieval
├── /usage                 # Usage queries and aggregations
├── /payments              # Payment processing
└── /webhooks              # Webhook management

Multi-Tenancy Architecture

Flexprice implements row-level multi-tenancy with environment isolation enforced at multiple architectural layers:

Tenant Isolation Strategy

  • Database Level: All tables include tenant_id and environment columns
  • Middleware Level: Request context includes tenant/environment validation
  • Service Level: All business logic operations are tenant-scoped
  • Repository Level: Automatic filtering on all database queries

Middleware Enforcement

Tenant and environment isolation is enforced through the AuthenticateMiddleware, which handles both authentication and tenant context extraction:
func AuthenticateMiddleware(cfg *config.Configuration, secretService service.SecretService, logger *logger.Logger) gin.HandlerFunc {
    return func(c *gin.Context) {
        // Check for API key in X-API-Key header
        
        // Check for JWT token in Authorization header
        
        c.Next()
    }
}

Authentication Methods

API keys are configured with tenant and user association:
  • Tenant API Keys: Scoped to specific tenant and environment
  • User Tokens: Associated with user permissions within tenant
  • System Keys: Cross-tenant access for administrative operations

Repository Layer

The repository layer provides data access abstraction with automatic tenant/environment filtering:

Repository Interface

type Repository interface {
    // All methods automatically filter by tenant/environment from context
    GetCustomer(ctx context.Context, customerID string) (*Customer, error)
    CreateInvoice(ctx context.Context, invoice *Invoice) error
    GetUsageEvents(ctx context.Context, filters EventFilters) ([]Event, error)
}

Automatic Tenant Filtering

All repositories automatically append tenant and environment filters to queries using typed context constants:
func (r *PostgresRepository) GetCustomer(ctx context.Context, customerID string) (*Customer, error) {
    tenantID := ctx.Value(types.CtxTenantID).(string)
    environment := ctx.Value(types.CtxEnvironmentID).(string)
    
    return r.db.Customer.
        Query().
        Where(customer.ID(customerID)).
        Where(customer.TenantID(tenantID)).
        Where(customer.Environment(environment)).
        Only(ctx)
}
All repositories are created through factory functions that receive shared dependencies like database connections, caching layers, and configuration.

Message Processing with PubSub Router

Flexprice uses Watermill for message routing with built-in retry mechanisms and rate limiting:

Kafka Integration

router := message.NewRouter(message.RouterConfig{}, logger)

// Event processing pipeline
router.AddHandler(
    "events_processor",
    "events",
    subscriber,
    "events_post_processing",
    publisher,
    eventProcessingHandler,
)

Message Processing Features

  • Dead Letter Queues: Failed messages are routed to error topics
  • Retry Logic: Exponential backoff for transient failures
  • Rate Limiting: Configurable processing rates per topic
  • Metrics: Built-in monitoring for message processing

Temporal Workflow Orchestration

Background jobs and scheduled tasks use Temporal for reliable execution and workflow management:

Workflow Types

  • Billing Workflows: Monthly/annual invoice generation
  • Usage Aggregation: Periodic usage calculations
  • Payment Processing: Async payment collection workflows
  • Data Exports: Large dataset exports to S3
  • Webhook Delivery: Reliable event notifications

Workflow Registration

func RegisterWorkflows(worker worker.Worker) {
    // Billing workflows
    worker.RegisterWorkflow(billing.GenerateInvoiceWorkflow)
    worker.RegisterWorkflow(billing.ProcessPaymentWorkflow)
    
    // Usage workflows  
    worker.RegisterWorkflow(usage.AggregateUsageWorkflow)
    
    // Export workflows
    worker.RegisterWorkflow(export.ExportDataWorkflow)
}

Worker Lifecycle

Workers are deployed separately and can be scaled independently:
func StartTemporalWorker(config *config.Config) {
    client := temporal.NewClient()
    worker := worker.New(client, "billing-queue", worker.Options{})
    
    RegisterWorkflows(worker)
    RegisterActivities(worker)
    
    worker.Run(worker.InterruptCh())
}

External Integrations

FlexPrice uses a factory pattern for external service integrations, supporting multiple providers through specific integration implementations:
// Integration Factory creates specific integration instances
type Factory struct {
    config *config.Configuration
    logger *logger.Logger
}

// Specific integration getters with proper error handling
func (f *Factory) GetStripeIntegration(ctx context.Context) (*StripeIntegration, error) {
    // Initialize and return Stripe integration with configuration
    return NewStripeIntegration(f.config, f.logger), nil
}

func (f *Factory) GetHubSpotIntegration(ctx context.Context) (*HubSpotIntegration, error) {
    // Initialize and return HubSpot integration with configuration
    return NewHubSpotIntegration(f.config, f.logger), nil
}

func (f *Factory) GetRazorpayIntegration(ctx context.Context) (*RazorpayIntegration, error) {
    // Initialize and return Razorpay integration with configuration
    return NewRazorpayIntegration(f.config, f.logger), nil
}

Webhook Delivery

FlexPrice has a flexible webhook delivery system with optional Svix integration. By default, it uses an internal webhook system with in-memory or Kafka pub/sub:
type WebhookService struct {
    config    *config.Configuration
    publisher publisher.WebhookPublisher
    handler   handler.Handler
    factory   payload.PayloadBuilderFactory
    client    httpclient.Client
    logger    *logger.Logger
}

func (w *WebhookService) DeliverEvent(event *Event, endpoints []WebhookEndpoint) error {
    // Route based on configuration
    if w.config.Webhook.SvixConfig.Enabled {
        w.logger.Info("Using Svix for webhook delivery")
        return w.deliverViaSvix(event)
    }
    
    w.logger.Info("Using internal webhook system")
    return w.deliverViaInternalSystem(event, endpoints)
}

// Configuration controls webhook delivery method
// svix_config:
//   enabled: false  # Default - uses internal system
//   enabled: true   # Optional - uses Svix for delivery

Infrastructure Services

Monitoring & Observability

Sentry for error tracking and performance monitoring:
sentry.Init(sentry.ClientOptions{
    Dsn: config.SentryDSN,
    Environment: config.Environment,
    TracesSampleRate: 0.1,
})

// Error tracking in handlers
if err != nil {
    sentry.CaptureException(err)
    return gin.Error{Err: err, Type: gin.ErrorTypePublic}
}
Pyroscope for continuous profiling and performance analysis:
pyroscope.Start(pyroscope.Config{
    ApplicationName: "flexprice-api",
    ServerAddress:   config.PyroscopeURL,
    ProfileTypes: []pyroscope.ProfileType{
        pyroscope.ProfileCPU,
        pyroscope.ProfileAllocObjects,
        pyroscope.ProfileAllocSpace,
    },
})

Document Storage

Invoice PDFs are stored in S3 with presigned URL generation for secure access:
func (s *S3Service) StoreInvoice(invoiceID string, pdfData []byte) (string, error) {
    key := fmt.Sprintf("invoices/%s/%s.pdf", s.tenantID, invoiceID)
    
    _, err := s.client.PutObject(&s3.PutObjectInput{
        Bucket: aws.String(s.bucket),
        Key:    aws.String(key), 
        Body:   bytes.NewReader(pdfData),
    })
    
    // Return presigned URL for download
    return s.GeneratePresignedURL(key, 24*time.Hour)
}

Caching Layer

FlexPrice uses in-memory caching only for performance optimization. Caching can be enabled/disabled via configuration:
// Cache interface in internal/cache/
type Cache interface {
    // Get retrieves a value from the cache
    // Returns the value and a boolean indicating whether the key was found
    Get(ctx context.Context, key string) (interface{}, bool)

    // Set adds a value to the cache with the specified expiration
    // If expiration is 0, the item never expires (but may be evicted)
    Set(ctx context.Context, key string, value interface{}, expiration time.Duration)

    // Delete removes a key from the cache
    Delete(ctx context.Context, key string)

    // DeleteByPrefix removes all keys with the given prefix
    DeleteByPrefix(ctx context.Context, prefix string)

    // Flush removes all items from the cache
    Flush(ctx context.Context)
}

// Usage example
func (s *PlanService) GetPlan(ctx context.Context, planID string) (*Plan, error) {
    // Check if caching is enabled
    if !s.config.Cache.Enabled {
        return s.repo.GetPlan(ctx, planID)
    }
    
    // Try cache first
    if cached, found := s.cache.Get(ctx, planID); found {
        return cached.(*Plan), nil
    }
    
    // Cache miss - fetch from database
    plan, err := s.repo.GetPlan(ctx, planID)
    if err == nil {
        s.cache.Set(ctx, planID, plan, time.Hour)
    }
    return plan, err
}

// Configuration
// cache:
//   enabled: true/false  # Simple boolean to enable/disable caching

Configuration Management

Configuration is loaded from YAML files with environment variable overrides for deployment flexibility:
# config.yaml
server:
  port: 8080
  host: "0.0.0.0"
  
database:
  postgres:
    host: ${DB_HOST:localhost}
    port: ${DB_PORT:5432}
    database: ${DB_NAME:flexprice}
  clickhouse:
    host: ${CLICKHOUSE_HOST:localhost}
    port: ${CLICKHOUSE_PORT:9000}

kafka:
  brokers: ${KAFKA_BROKERS:localhost:29092}
  topics:
    events: "events"
    events_lazy: "events_lazy"
    processed_events: "events_post_processing"
    system_events: "system_events"

temporal:
  host: ${TEMPORAL_HOST:localhost:7233}
  namespace: ${TEMPORAL_NAMESPACE:default}

Configuration Structure

type Configuration struct {
    Deployment               DeploymentConfig
    Server                   ServerConfig
    Auth                     AuthConfig
    Kafka                    KafkaConfig
    ClickHouse               ClickHouseConfig
    Logging                  LoggingConfig
    Postgres                 PostgresConfig
    Sentry                   SentryConfig
    Pyroscope                PyroscopeConfig
    Event                    EventConfig
    DynamoDB                 DynamoDBConfig
    Temporal                 TemporalConfig
    Webhook                  Webhook
    Secrets                  SecretsConfig
    Billing                  BillingConfig
    S3                       S3Config
    Cache                    CacheConfig
    EventProcessing          EventProcessingConfig 
    EventProcessingLazy      EventProcessingLazyConfig
    EventPostProcessing      EventPostProcessingConfig
    FeatureUsageTracking     FeatureUsageTrackingConfig
    FeatureUsageTrackingLazy FeatureUsageTrackingLazyConfig
    EnvAccess                EnvAccessConfig
    FeatureFlag              FeatureFlagConfig
    Email                    EmailConfig
}

Application Lifecycle

The application lifecycle is managed by Uber FX with hooks for startup and shutdown:
func main() {
    fx.New(
        // Configuration
        fx.Provide(config.Load),
        fx.Provide(logger.New),
        
        // Infrastructure
        fx.Provide(database.NewPostgres),
        fx.Provide(database.NewClickHouse),
        fx.Provide(kafka.NewProducer),
        fx.Provide(temporal.NewClient),
        
        // Services  
        fx.Provide(service.NewBillingService),
        fx.Provide(service.NewEventService),
        
        // HTTP Server
        fx.Provide(server.New),
        fx.Invoke(server.RegisterRoutes),
        
        // Lifecycle hooks
        fx.Invoke(func(lc fx.Lifecycle, server *server.Server) {
            lc.Append(fx.Hook{
                OnStart: server.Start,
                OnStop:  server.Stop,
            })
        }),
    ).Run()
}

Notes

Key Architectural Decisions:
  1. Dual-Database Strategy: PostgreSQL for transactional consistency, ClickHouse for high-volume analytics
  2. Event-Driven Architecture: Kafka enables asynchronous processing, scalability, and fault tolerance
  3. Microservices-Ready: Deployment modes allow splitting components into separate services
  4. Strong Multi-Tenancy: Enforced at middleware, service, and repository layers
  5. Workflow Orchestration: Temporal provides reliable background job execution
  6. Open Integration: Factory pattern allows multiple payment gateway implementations
The architecture is designed for high scalability, reliability, and flexibility to support diverse pricing models while maintaining clean separation of concerns across all layers.