Distributed Stress Testing with htp
Distributed load testing across globally deployed servers can provide invaluable insights into how a service performs under various network conditions. Tools like htp
, a Go-based CLI for HTTP requests and measurements, become even more powerful when orchestrated across multiple servers. This article explores how to set up a queue-based publish-subscribe system using NATS to broadcast commands to all servers running htp
, and how to persist the results for generating comprehensive performance reports.
The Queue-Based Publish-Subscribe Architecture
Using a publish-subscribe messaging system provides a scalable and efficient way to orchestrate commands across a distributed network of servers. A central orchestrator publishes commands to a message queue, and each server running a wrapper service subscribes to relevant message topics to receive and execute those commands. This decouples the orchestrator from the servers, simplifying scaling and management.
Key Components:
- Message Queue (NATS): Facilitates publish-subscribe messaging.
- Wrapper Service: Runs on each server to subscribe to the message queue, execute
htp
commands, and publish results back. - Orchestrator: Publishes
htp
commands to the queue. - Persistent Storage: Stores execution results for analysis and reporting.
Setting Up a Wrapper Service for htp
Each server runs a wrapper service that listens for published htp
commands. Upon receiving a command, it executes htp
locally and then sends the results to the message queue.
Pseudo-code for the Wrapper Service:
package main
import (
"encoding/json"
"log"
"os/exec"
"github.com/nats-io/nats.go"
)
type HTPCommand struct {
Action string `json:"action"`
Args []string `json:"args"`
}
type HTPResult struct {
ServerID string `json:"server_id"`
Action string `json:"action"`
Output string `json:"output"`
Timestamp int64 `json:"timestamp"`
}
func main() {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatalf("Error connecting to NATS: %v", err)
}
defer nc.Close()
serverID := "server-region-identifier" // e.g., "server-us-east"
_, err = nc.Subscribe("htp.commands", func(msg *nats.Msg) {
var cmd HTPCommand
err := json.Unmarshal(msg.Data, &cmd)
if err != nil {
log.Printf("Invalid command payload: %v", err)
return
}
out, err := exec.Command("htp", append([]string{cmd.Action}, cmd.Args...)...).CombinedOutput()
if err != nil {
log.Printf("Error executing htp on %s: %s\nOutput: %s", serverID, err.Error(), string(out))
}
result := HTPResult{
ServerID: serverID,
Action: cmd.Action,
Output: string(out),
Timestamp: time.Now().Unix(),
}
resultData, err := json.Marshal(result)
if err != nil {
log.Printf("Error marshaling result: %v", err)
return
}
nc.Publish("htp.results", resultData)
})
if err != nil {
log.Fatalf("Error subscribing to subject: %v", err)
}
log.Printf("Wrapper service on %s subscribed to 'htp.commands'...", serverID)
select {}
}
Containerization and Deployment with Fly.io
The wrapper service, along with htp
, can be containerized and deployed to multiple regions using Fly.io for global distribution.
Example Dockerfile
****:
FROM golang:1.19-alpine AS builder
RUN apk add --no-cache git
WORKDIR /app
COPY . .
RUN go build -o wrapper-service main.go
FROM alpine
RUN apk add --no-cache ca-certificates
COPY --from=builder /app/wrapper-service /usr/local/bin/wrapper-service
COPY --from=builder /app/htp /usr/local/bin/htp
RUN chmod +x /usr/local/bin/wrapper-service /usr/local/bin/htp
CMD ["wrapper-service"]
Deployment Steps:
- Initialize Fly Application: In the project directory, run
fly launch
and configure the application. - Configure Regions: Use
fly regions add
to add deployment regions (e.g.,iad
,syd
,fra
). - Deploy: Use
fly deploy
to build and deploy the container image globally.
Orchestrator for Publishing Commands
An orchestrator publishes htp
commands to the htp.commands
subject. All subscribed servers then execute the commands.
Orchestrator Pseudo-code:
package main
import (
"encoding/json"
"log"
"github.com/nats-io/nats.go"
)
type HTPCommand struct {
Action string `json:"action"`
Args []string `json:"args"`
}
func main() {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatalf("Error connecting to NATS: %v", err)
}
defer nc.Close()
command := HTPCommand{
Action: "stress",
Args: []string{"-d", "30", "-l", "1000", "https://example.com"},
}
cmdData, err := json.Marshal(command)
if err != nil {
log.Fatalf("Error serializing command: %v", err)
}
err = nc.Publish("htp.commands", cmdData)
if err != nil {
log.Fatalf("Error publishing command: %v", err)
}
log.Println("Published 'stress' command to 'htp.commands'")
}
Persisting Results for Reporting
To generate performance reports, results from each server must be persisted. This involves:
- Consuming
htp.results
: An application or service subscribes to thehtp.results
subject. - Storing Results: Results are stored in persistent storage like a database (e.g., PostgreSQL, MongoDB) or a time-series database (e.g., InfluxDB).
- Generating Reports: Use stored data to generate comprehensive performance reports.
Pseudo-code for a results consumer:
package main
import (
"encoding/json"
"log"
"github.com/nats-io/nats.go"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
type HTPResult struct {
ID uint `gorm:"primaryKey"`
ServerID string
Action string
Output string
Timestamp int64
}
func main() {
// Connect to the database
dsn := "host=localhost user=htpuser password=htppass dbname=htpdb port=5432 sslmode=disable"
db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{})
if err != nil {
log.Fatalf("Failed to connect to database: %v", err)
}
// Auto-migrate the schema
db.AutoMigrate(&HTPResult{})
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatalf("Error connecting to NATS: %v", err)
}
defer nc.Close()
_, err = nc.Subscribe("htp.results", func(msg *nats.Msg) {
var result HTPResult
if err := json.Unmarshal(msg.Data, &result); err != nil {
log.Printf("Invalid result payload: %v", err)
return
}
// Store result in the database
if err := db.Create(&result).Error; err != nil {
log.Printf("Error storing result: %v", err)
} else {
log.Printf("Stored result from server %s", result.ServerID)
}
})
if err != nil {
log.Fatalf("Error subscribing to results subject: %v", err)
}
log.Println("Results consumer subscribed to 'htp.results'")
select {}
}
Explanation:
- This consumer subscribes to
"htp.results"
. - Received results are unmarshaled and stored in a PostgreSQL database using GORM (an ORM for Go).
- Persisted data can then be used to generate historical performance reports.
Generating Reports
With results persisted, a reporting system can query the database to generate insights such as:
- Performance Over Time: Track how response times change over test durations.
- Regional Variations: Compare performance results from servers in different regions to identify network bottlenecks.
- Error Rates and Response Distribution: Analyze failures and response distribution across different endpoints.
For instance, a simple reporting script could aggregate average response times per server per test, and output them in CSV or JSON format. More sophisticated solutions might involve integrating with dashboard tools like Grafana for real-time visualizations.
Key Takeaways and Further Improvements
Scalability: Utilizing a publish-subscribe queue system (like NATS) decouples the orchestrator from servers and simplifies scaling. You can easily add new servers that subscribe to htp.commands
without modifying the orchestrator.
Persistence: Storing execution results in a database allows for comprehensive performance analysis and reporting. Adjusting the database schema and the result structure can further enrich the data collected (like including request latencies, error codes, etc.).
Reliability: Message queues like NATS provide a reliable messaging backbone. In production, consider deploying a NATS cluster for fault tolerance and ensuring the results consumer is robust against message or connection failures.
Security and Best Practices: Secure connections and authentication should be considered when deploying in production. Use TLS for NATS connections and secure database credentials. Additionally, implement error handling and retries in the wrapper service and results consumer for robustness.
Conclusion
Implementing a queue-based publish-subscribe system for orchestrating distributed htp
commands, along with persistent storage of results, offers a scalable and reliable way to perform advanced performance testing and analysis. By leveraging NATS for messaging, deploying wrapper services on platforms like Fly.io for global distribution, and storing results in a persistent database, testers and developers can run large-scale, geographically diverse load tests and then generate informative reports to guide optimization and scaling decisions. This architecture provides a robust foundation for continuous performance monitoring and can be adapted to suit various testing needs and environments.