-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain_sqlite.go
More file actions
263 lines (222 loc) · 8.26 KB
/
main_sqlite.go
File metadata and controls
263 lines (222 loc) · 8.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
// IPFS Pinning Service - SQLite Backend
// This is the main entry point using SQLite for data storage
package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"
"time"
openapi "github.com/functionland/pinning-service"
"github.com/gorilla/mux"
ipfsCluster "github.com/ipfs-cluster/ipfs-cluster/api/rest/client"
"github.com/ipfs/kubo/client/rpc"
"github.com/joho/godotenv"
ma "github.com/multiformats/go-multiaddr"
)
// Default test credentials for compliance testing
const (
TestModeUsername = "test@pinning.local"
TestModePassword = "test-password-for-compliance"
TestModeToken = "test-token-for-ipfs-pinning-compliance"
)
func main() {
log.Printf("IPFS Pinning Service starting (SQLite backend)...")
// Check for test mode
testMode := strings.ToLower(os.Getenv("TEST_MODE")) == "true" || os.Getenv("TEST_MODE") == "1"
if testMode {
log.Printf("⚠️ TEST MODE ENABLED - Using test credentials")
log.Printf(" Test Token: %s", TestModeToken)
}
// Load environment variables from .env file
if err := godotenv.Load(); err != nil {
log.Printf("Warning: .env file not found, using environment variables")
}
// Get configuration from environment variables
dbPath := os.Getenv("DATABASE_PATH")
// Default database path
if dbPath == "" {
dbPath = filepath.Join(".", "data", "pinning.db")
}
// Ensure data directory exists
dataDir := filepath.Dir(dbPath)
if err := os.MkdirAll(dataDir, 0755); err != nil {
log.Fatalf("Failed to create data directory: %v", err)
}
// Initialize SQLite Service
sqliteService, err := openapi.NewSQLiteService(dbPath)
if err != nil {
log.Fatalf("Error initializing SQLite service: %v", err)
}
defer func() {
if err := sqliteService.Close(); err != nil {
log.Printf("Error closing SQLite service: %v", err)
}
}()
log.Printf("SQLite database initialized at: %s", dbPath)
// Initialize User Service with SQLite backend
userService, err := openapi.NewUserServiceSQLite(sqliteService)
if err != nil {
log.Fatalf("Error initializing User service: %v", err)
}
userAPIController := openapi.NewUserAPIControllerSQLite(userService)
// In test mode, create test user and session
if testMode {
ctx := context.Background()
if err := setupTestUser(ctx, sqliteService, userService); err != nil {
log.Printf("Warning: Failed to setup test user: %v", err)
} else {
log.Printf("✓ Test user and token ready")
}
}
// Initialize IPFS node connection
ipfsAddr := os.Getenv("IPFS_API_ADDR")
if ipfsAddr == "" {
ipfsAddr = "/ip4/127.0.0.1/tcp/5001"
}
nodeMultiAddr, err := ma.NewMultiaddr(ipfsAddr)
if err != nil {
log.Fatalf("Invalid IPFS multiaddress %s: %v", ipfsAddr, err)
}
ipfsAPI, err := rpc.NewApi(nodeMultiAddr)
if err != nil {
log.Fatalf("Error connecting to IPFS API: %v", err)
}
// Initialize IPFS Cluster connection
ipfsClusterAddr := os.Getenv("IPFS_CLUSTER_API_ADDR")
if ipfsClusterAddr == "" {
ipfsClusterAddr = "/ip4/127.0.0.1/tcp/9094"
}
clusterMultiAddr, err := ma.NewMultiaddr(ipfsClusterAddr)
if err != nil {
log.Fatalf("Invalid IPFS Cluster multiaddress %s: %v", ipfsClusterAddr, err)
}
ipfsClusterConfig := ipfsCluster.Config{
APIAddr: clusterMultiAddr,
}
ipfsClusterApi, err := ipfsCluster.NewDefaultClient(&ipfsClusterConfig)
if err != nil {
log.Fatalf("Error initializing IPFS cluster API: %v", err)
}
log.Printf("IPFS Cluster API connected at: %s", ipfsClusterAddr)
// Check if direct IPFS pinning is enabled (in addition to IPFS Cluster)
enableIPFSPinning := strings.ToLower(os.Getenv("ENABLE_IPFS_PINNING")) == "true" || os.Getenv("ENABLE_IPFS_PINNING") == "1"
if enableIPFSPinning {
log.Printf("Direct IPFS pinning ENABLED - pins will be sent to both IPFS and IPFS Cluster")
} else {
log.Printf("Direct IPFS pinning DISABLED - pins will only be sent to IPFS Cluster")
}
// Get IPFS HTTP URL for dag/stat API (defaults to http://127.0.0.1:5001)
ipfsHTTPURL := os.Getenv("IPFS_HTTP_URL")
// Initialize PinsAPIService with SQLite backend
pinsAPIService := openapi.NewPinsAPIServiceSQLite(sqliteService, userService, ipfsAPI, ipfsClusterApi, enableIPFSPinning, ipfsHTTPURL)
// Create PinsAPIController
pinsAPIController := openapi.NewPinsAPIController(pinsAPIService)
// Create Admin API Controller (system-only endpoints)
systemKey := os.Getenv("SYSTEM_KEY")
if systemKey == "" {
log.Printf("Warning: SYSTEM_KEY not set, admin endpoints will be inaccessible")
systemKey = "disabled" // Effectively disables admin access
}
adminAPIController := openapi.NewAdminAPIController(sqliteService, systemKey)
adminRouter := openapi.NewAdminRouter(adminAPIController)
// Initialize router
mainRouter := openapi.NewRouter(pinsAPIController)
additionalRouter := openapi.NewAdditionalRouterSQLite(pinsAPIController, userAPIController)
router := mux.NewRouter()
router.PathPrefix("/admin/").Handler(adminRouter) // Admin routes (no auth middleware - uses system key)
router.PathPrefix("/auth/").Handler(additionalRouter)
router.PathPrefix("/").Handler(mainRouter)
// Apply auth middleware
authRouter := openapi.AuthMiddlewareSQLite(sqliteService)(router)
// Apply CORS middleware (required for compliance tests)
corsRouter := corsMiddleware(authRouter)
// Apply request logging middleware (for debugging)
loggingRouter := requestLoggingMiddleware(corsRouter, testMode)
// Get server port from environment or default to 6000
port := os.Getenv("PORT")
if port == "" {
port = "6000"
}
// Create server with timeouts for production
server := &http.Server{
Addr: ":" + port,
Handler: openapi.InjectRequestIntoContext(loggingRouter),
ReadTimeout: 30 * time.Second,
WriteTimeout: 90 * time.Second,
IdleTimeout: 120 * time.Second,
}
// Start server in a goroutine
go func() {
log.Printf("Server listening on port %s", port)
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("Server error: %v", err)
}
}()
// Wait for interrupt signal for graceful shutdown
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Println("Shutting down server...")
// Create shutdown context with timeout
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := server.Shutdown(shutdownCtx); err != nil {
log.Fatalf("Server forced to shutdown: %v", err)
}
log.Println("Server exited gracefully")
}
// setupTestUser creates or updates the test user and session for compliance testing
func setupTestUser(ctx context.Context, db *openapi.SQLiteService, userService *openapi.UserServiceSQLite) error {
// Try to create test user (ignore error if already exists)
err := userService.CreateUser(ctx, TestModeUsername, TestModePassword)
if err != nil && err.Error() != "user already exists" {
return err
}
// Create or update the test session with fixed token
err = db.CreateTestSession(ctx, TestModeUsername, TestModeToken)
if err != nil {
return err
}
log.Printf(" Test Username: %s", TestModeUsername)
log.Printf(" Test Token: %s", TestModeToken)
return nil
}
// corsMiddleware adds CORS headers required for compliance tests
func corsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Set CORS headers
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Accept, Authorization, Content-Type, X-Requested-With")
w.Header().Set("Access-Control-Max-Age", "3600")
// Handle preflight requests
if r.Method == "OPTIONS" {
w.WriteHeader(http.StatusOK)
return
}
next.ServeHTTP(w, r)
})
}
// requestLoggingMiddleware logs all incoming requests (verbose in test mode)
func requestLoggingMiddleware(next http.Handler, verbose bool) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if verbose {
log.Printf("➡️ %s %s (from %s)", r.Method, r.URL.Path, r.RemoteAddr)
if auth := r.Header.Get("Authorization"); auth != "" {
if len(auth) > 20 {
log.Printf(" Auth: %s...%s", auth[:15], auth[len(auth)-5:])
} else {
log.Printf(" Auth: %s", auth)
}
} else {
log.Printf(" Auth: (none)")
}
}
next.ServeHTTP(w, r)
})
}