diff --git a/cmd/devtap/lazystore.go b/cmd/devtap/lazystore.go new file mode 100644 index 0000000..4ecc788 --- /dev/null +++ b/cmd/devtap/lazystore.go @@ -0,0 +1,115 @@ +package main + +import ( + "errors" + "fmt" + "sync" + "time" + + "github.com/killme2008/devtap/internal/store" +) + +const defaultLazyCooldown = 30 * time.Second + +// lazyStore wraps a store.Store that is connected on first use and retried +// with a cooldown on failure. This allows the MCP server to start even when +// the configured store (e.g. GreptimeDB) is temporarily unavailable, and +// reconnect once it becomes reachable. +type lazyStore struct { + mu sync.Mutex + inner store.Store + connectFn func() (store.Store, error) + lastAttempt time.Time + lastErr error + cooldown time.Duration + closed bool +} + +func newLazyStore(connectFn func() (store.Store, error)) *lazyStore { + return &lazyStore{ + connectFn: connectFn, + cooldown: defaultLazyCooldown, + } +} + +// ensureConnected attempts to establish the inner store if not already connected. +// Returns nil if connected, or the connection error if unavailable. +func (ls *lazyStore) ensureConnected() error { + if ls.closed { + return errors.New("store closed") + } + if ls.inner != nil { + return nil + } + if !ls.lastAttempt.IsZero() && time.Since(ls.lastAttempt) < ls.cooldown { + return fmt.Errorf("store unavailable (cooldown): %w", ls.lastErr) + } + ls.lastAttempt = time.Now() + s, err := ls.connectFn() + if err != nil { + ls.lastErr = err + return err + } + ls.lastErr = nil + ls.inner = s + return nil +} + +// reset closes the inner store and nils it out so the next call retries. +// Does NOT clear lastAttempt — the cooldown still applies, which prevents +// hammering if the store accepts connections but fails on queries. For +// long-running processes (MCP server), the elapsed time since the original +// connect is typically >cooldown, so the retry is effectively immediate. +func (ls *lazyStore) reset() { + if ls.inner != nil { + _ = ls.inner.Close() + ls.inner = nil + } +} + +func (ls *lazyStore) Drain(sessionID string, maxLines int) ([]store.LogMessage, error) { + ls.mu.Lock() + defer ls.mu.Unlock() + + if err := ls.ensureConnected(); err != nil { + return nil, err + } + msgs, err := ls.inner.Drain(sessionID, maxLines) + if err != nil { + ls.reset() + return nil, err + } + return msgs, nil +} + +func (ls *lazyStore) Status() (map[string]int, error) { + ls.mu.Lock() + defer ls.mu.Unlock() + + if err := ls.ensureConnected(); err != nil { + return nil, err + } + counts, err := ls.inner.Status() + if err != nil { + ls.reset() + return nil, err + } + return counts, nil +} + +func (ls *lazyStore) Write(_ string, _ store.LogMessage) error { + return errors.New("lazyStore: write not supported (read-only)") +} + +func (ls *lazyStore) Close() error { + ls.mu.Lock() + defer ls.mu.Unlock() + + ls.closed = true + if ls.inner != nil { + err := ls.inner.Close() + ls.inner = nil + return err + } + return nil +} diff --git a/cmd/devtap/lazystore_test.go b/cmd/devtap/lazystore_test.go new file mode 100644 index 0000000..eaaf3c9 --- /dev/null +++ b/cmd/devtap/lazystore_test.go @@ -0,0 +1,345 @@ +package main + +import ( + "errors" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/killme2008/devtap/internal/store" +) + +// mockStore is a minimal store.Store for testing lazyStore behavior. +type mockStore struct { + drainFn func(string, int) ([]store.LogMessage, error) + statusFn func() (map[string]int, error) + closed bool +} + +func (m *mockStore) Write(string, store.LogMessage) error { return nil } +func (m *mockStore) Close() error { m.closed = true; return nil } + +func (m *mockStore) Drain(s string, n int) ([]store.LogMessage, error) { + return m.drainFn(s, n) +} + +func (m *mockStore) Status() (map[string]int, error) { + return m.statusFn() +} + +func TestLazyStore_FirstCallConnects(t *testing.T) { + connectCalls := 0 + ms := &mockStore{ + drainFn: func(string, int) ([]store.LogMessage, error) { return nil, nil }, + statusFn: func() (map[string]int, error) { return map[string]int{"s": 0}, nil }, + } + ls := newLazyStore(func() (store.Store, error) { + connectCalls++ + return ms, nil + }) + + _, err := ls.Drain("s", 10) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if connectCalls != 1 { + t.Fatalf("expected 1 connect call, got %d", connectCalls) + } + + // Second call should reuse cached connection. + _, err = ls.Status() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if connectCalls != 1 { + t.Fatalf("expected still 1 connect call, got %d", connectCalls) + } +} + +func TestLazyStore_CooldownPreventsRapidRetry(t *testing.T) { + connectCalls := 0 + ls := newLazyStore(func() (store.Store, error) { + connectCalls++ + return nil, errors.New("connection refused") + }) + ls.cooldown = 1 * time.Hour // very long cooldown + + // First call: attempts connection, fails. + _, err := ls.Drain("s", 10) + if err == nil { + t.Fatal("expected error") + } + if connectCalls != 1 { + t.Fatalf("expected 1 connect call, got %d", connectCalls) + } + + // Second call within cooldown: should NOT retry, and error should + // include the original connection error for diagnostics. + _, err = ls.Drain("s", 10) + if err == nil { + t.Fatal("expected error") + } + if connectCalls != 1 { + t.Fatalf("expected still 1 connect call (cooldown), got %d", connectCalls) + } + if !strings.Contains(err.Error(), "cooldown") { + t.Fatalf("expected cooldown in error, got: %v", err) + } + if !strings.Contains(err.Error(), "connection refused") { + t.Fatalf("expected original error in cooldown message, got: %v", err) + } +} + +func TestLazyStore_RetriesAfterCooldown(t *testing.T) { + connectCalls := 0 + ls := newLazyStore(func() (store.Store, error) { + connectCalls++ + return nil, errors.New("connection refused") + }) + ls.cooldown = 1 * time.Millisecond + + // First call fails. + _, _ = ls.Drain("s", 10) + if connectCalls != 1 { + t.Fatalf("expected 1 connect call, got %d", connectCalls) + } + + time.Sleep(5 * time.Millisecond) + + // After cooldown: should retry. + _, _ = ls.Drain("s", 10) + if connectCalls != 2 { + t.Fatalf("expected 2 connect calls after cooldown, got %d", connectCalls) + } +} + +func TestLazyStore_DrainErrorTriggersReset(t *testing.T) { + connectCalls := 0 + drainErr := true + ms := &mockStore{ + drainFn: func(string, int) ([]store.LogMessage, error) { + if drainErr { + return nil, errors.New("query failed") + } + return nil, nil + }, + statusFn: func() (map[string]int, error) { return nil, nil }, + } + ls := newLazyStore(func() (store.Store, error) { + connectCalls++ + return ms, nil + }) + ls.cooldown = 1 * time.Millisecond + + // First call: connects successfully, Drain fails → reset. + _, err := ls.Drain("s", 10) + if err == nil { + t.Fatal("expected drain error") + } + if connectCalls != 1 { + t.Fatalf("expected 1 connect call, got %d", connectCalls) + } + if !ms.closed { + t.Fatal("expected inner store to be closed after reset") + } + + time.Sleep(5 * time.Millisecond) + + // Provide a fresh mock for the reconnect. + ms2 := &mockStore{ + drainFn: func(string, int) ([]store.LogMessage, error) { return nil, nil }, + statusFn: func() (map[string]int, error) { return nil, nil }, + } + drainErr = false + ls.mu.Lock() + ls.connectFn = func() (store.Store, error) { + connectCalls++ + return ms2, nil + } + ls.mu.Unlock() + + // After cooldown: reconnects. + _, err = ls.Drain("s", 10) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if connectCalls != 2 { + t.Fatalf("expected 2 connect calls, got %d", connectCalls) + } +} + +func TestLazyStore_StatusErrorTriggersReset(t *testing.T) { + ms := &mockStore{ + drainFn: func(string, int) ([]store.LogMessage, error) { return nil, nil }, + statusFn: func() (map[string]int, error) { + return nil, errors.New("status failed") + }, + } + ls := newLazyStore(func() (store.Store, error) { + return ms, nil + }) + ls.cooldown = 1 * time.Millisecond + + _, err := ls.Status() + if err == nil { + t.Fatal("expected status error") + } + if !ms.closed { + t.Fatal("expected inner store to be closed after reset") + } +} + +func TestLazyStore_WriteReturnsError(t *testing.T) { + ls := newLazyStore(func() (store.Store, error) { + return nil, nil + }) + err := ls.Write("s", store.LogMessage{}) + if err == nil { + t.Fatal("expected write error") + } +} + +func TestLazyStore_CloseWhenNotConnected(t *testing.T) { + ls := newLazyStore(func() (store.Store, error) { + return nil, errors.New("nope") + }) + // Should not panic or error. + if err := ls.Close(); err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + +// Test that after reset, if the original connection was established > cooldown ago, +// the next call retries immediately (no extra wait). This is because reset() does +// NOT update lastAttempt — the elapsed time since the original connect is already +// past the cooldown window. +func TestLazyStore_ResetAllowsImmediateRetry(t *testing.T) { + connectCalls := 0 + failDrain := true + + ls := newLazyStore(func() (store.Store, error) { + connectCalls++ + return &mockStore{ + drainFn: func(string, int) ([]store.LogMessage, error) { + if failDrain { + return nil, errors.New("query failed") + } + return []store.LogMessage{{Tag: "ok"}}, nil + }, + statusFn: func() (map[string]int, error) { return nil, nil }, + }, nil + }) + ls.cooldown = 50 * time.Millisecond + + // Connect succeeds, Drain fails → reset. + _, err := ls.Drain("s", 10) + if err == nil { + t.Fatal("expected drain error") + } + if connectCalls != 1 { + t.Fatalf("expected 1 connect call, got %d", connectCalls) + } + + // Wait longer than cooldown so that time.Since(lastAttempt) > cooldown. + time.Sleep(60 * time.Millisecond) + + // Now Drain succeeds. The retry should happen immediately (no extra cooldown) + // because lastAttempt was set during the original connect, >50ms ago. + failDrain = false + msgs, err := ls.Drain("s", 10) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if connectCalls != 2 { + t.Fatalf("expected 2 connect calls (immediate retry after reset), got %d", connectCalls) + } + if len(msgs) != 1 || msgs[0].Tag != "ok" { + t.Fatalf("unexpected messages: %v", msgs) + } +} + +func TestLazyStore_ConcurrentAccess(t *testing.T) { + var connectCalls atomic.Int32 + ls := newLazyStore(func() (store.Store, error) { + connectCalls.Add(1) + return &mockStore{ + drainFn: func(string, int) ([]store.LogMessage, error) { + return []store.LogMessage{{Tag: "test"}}, nil + }, + statusFn: func() (map[string]int, error) { + return map[string]int{"s": 1}, nil + }, + }, nil + }) + ls.cooldown = 1 * time.Millisecond + + const goroutines = 20 + var wg sync.WaitGroup + wg.Add(goroutines) + + for i := 0; i < goroutines; i++ { + go func(i int) { + defer wg.Done() + if i%2 == 0 { + _, _ = ls.Drain("s", 10) + } else { + _, _ = ls.Status() + } + }(i) + } + + wg.Wait() + + // Should have connected exactly once (all goroutines share the cached connection). + if c := connectCalls.Load(); c != 1 { + t.Fatalf("expected 1 connect call, got %d", c) + } +} + +func TestLazyStore_DrainAfterCloseReturnsError(t *testing.T) { + ms := &mockStore{ + drainFn: func(string, int) ([]store.LogMessage, error) { return nil, nil }, + statusFn: func() (map[string]int, error) { return nil, nil }, + } + ls := newLazyStore(func() (store.Store, error) { + return ms, nil + }) + + // Connect, then close, then try to drain. + _, _ = ls.Drain("s", 10) + _ = ls.Close() + + _, err := ls.Drain("s", 10) + if err == nil { + t.Fatal("expected error after close") + } + _, err = ls.Status() + if err == nil { + t.Fatal("expected error after close") + } +} + +func TestLazyStore_CloseDelegatesToInner(t *testing.T) { + ms := &mockStore{ + drainFn: func(string, int) ([]store.LogMessage, error) { return nil, nil }, + statusFn: func() (map[string]int, error) { return nil, nil }, + } + ls := newLazyStore(func() (store.Store, error) { + return ms, nil + }) + + // Force connect. + _, _ = ls.Drain("s", 10) + if ms.closed { + t.Fatal("inner should not be closed yet") + } + + if err := ls.Close(); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !ms.closed { + t.Fatal("expected inner store to be closed") + } +} diff --git a/cmd/devtap/storefactory.go b/cmd/devtap/storefactory.go index 1c609c2..d976aa7 100644 --- a/cmd/devtap/storefactory.go +++ b/cmd/devtap/storefactory.go @@ -153,17 +153,24 @@ func resolveDrainSources(cmd *cobra.Command) ([]mcp.DrainSource, func(), error) return b } - // Configured store unavailable — fall back to local-only single source. + // Configured store unavailable — use lazy wrapper that retries on drain. if configuredErr != nil { - fmt.Fprintf(os.Stderr, "devtap: configured store %q unavailable (%v), using local only\n", + fmt.Fprintf(os.Stderr, "devtap: configured store %q unavailable (%v), will retry on drain\n", configuredBackend, configuredErr) + lazy := newLazyStore(func() (store.Store, error) { + return openStoreStrict(cfg, configuredBackend, storeDir, adapterName) + }) localStore, err := openStoreByBackend(cfg, localBackend, storeDir, adapterName) if err != nil { return nil, nil, fmt.Errorf("open local store: %w", err) } - cleanup := func() { _ = localStore.Close() } + cleanup := func() { + _ = localStore.Close() + _ = lazy.Close() + } return []mcp.DrainSource{ - {Store: localStore, SessionID: localSession, Label: localSession}, + {Store: localStore, SessionID: localSession, Label: "local"}, + {Store: lazy, SessionID: configuredSession, Label: configuredSession}, }, cleanup, nil }