provider/openai: - Fix doubled tool call args (argsComplete flag): Ollama sends complete args in the first streaming chunk then repeats them as delta, causing doubled JSON and 400 errors in elfs - Handle fs: prefix (gemma4 uses fs:grep instead of fs.grep) - Add Reasoning field support for Ollama thinking output cmd/gnoma: - Early TTY detection so logger is created with correct destination before any component gets a reference to it (fixes slog WARN bleed into TUI textarea) permission: - Exempt spawn_elfs and agent tools from safety scanner: elf prompt text may legitimately mention .env/.ssh/credentials patterns and should not be blocked tui/app: - /init retry chain: no-tool-calls → spawn_elfs nudge → write nudge (ask for plain text output) → TUI fallback write from streamBuf - looksLikeAgentsMD + extractMarkdownDoc: validate and clean fallback content before writing (reject refusals, strip narrative preambles) - Collapse thinking output to 3 lines; ctrl+o to expand (live stream and committed messages) - Stream-level filter for model pseudo-tool-call blocks: suppresses <<tool_code>>...</tool_code>> and <<function_call>>...<tool_call|> from entering streamBuf across chunk boundaries - sanitizeAssistantText regex covers both block formats - Reset streamFilterClose at every turn start
423 lines
11 KiB
Go
423 lines
11 KiB
Go
package security
|
|
|
|
import (
|
|
"strings"
|
|
"testing"
|
|
|
|
"somegit.dev/Owlibou/gnoma/internal/message"
|
|
)
|
|
|
|
// --- Scanner ---
|
|
|
|
func TestScanner_DetectsAnthropicKey(t *testing.T) {
|
|
s := NewScanner(4.5)
|
|
matches := s.Scan("my key is sk-ant-api03-abcdefghijklmnopqrstuvwxyz")
|
|
if len(matches) == 0 {
|
|
t.Error("should detect Anthropic API key")
|
|
}
|
|
if matches[0].Pattern != "anthropic_api_key" {
|
|
t.Errorf("pattern = %q, want anthropic_api_key", matches[0].Pattern)
|
|
}
|
|
}
|
|
|
|
func TestScanner_DetectsOpenAIKey(t *testing.T) {
|
|
s := NewScanner(4.5)
|
|
matches := s.Scan("key: sk-proj-abcdefghijklmnopqrstuvwxyz123456")
|
|
if len(matches) == 0 {
|
|
t.Error("should detect OpenAI API key")
|
|
}
|
|
}
|
|
|
|
func TestScanner_DetectsAWSKey(t *testing.T) {
|
|
s := NewScanner(4.5)
|
|
matches := s.Scan("AKIAIOSFODNN7EXAMPLE")
|
|
if len(matches) == 0 {
|
|
t.Error("should detect AWS access key")
|
|
}
|
|
if matches[0].Pattern != "aws_access_key" {
|
|
t.Errorf("pattern = %q", matches[0].Pattern)
|
|
}
|
|
}
|
|
|
|
func TestScanner_DetectsGitHubPAT(t *testing.T) {
|
|
s := NewScanner(4.5)
|
|
matches := s.Scan("token: ghp_ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghij")
|
|
hasGH := false
|
|
for _, m := range matches {
|
|
if m.Pattern == "github_pat" {
|
|
hasGH = true
|
|
break
|
|
}
|
|
}
|
|
if !hasGH {
|
|
t.Error("should detect GitHub PAT")
|
|
}
|
|
}
|
|
|
|
func TestScanner_DetectsPrivateKey(t *testing.T) {
|
|
s := NewScanner(4.5)
|
|
matches := s.Scan("-----BEGIN RSA PRIVATE KEY-----\nMIIE...")
|
|
hasKey := false
|
|
for _, m := range matches {
|
|
if m.Pattern == "private_key" {
|
|
hasKey = true
|
|
break
|
|
}
|
|
}
|
|
if !hasKey {
|
|
t.Error("should detect private key header")
|
|
}
|
|
}
|
|
|
|
func TestScanner_DetectsGenericSecret(t *testing.T) {
|
|
s := NewScanner(4.5)
|
|
matches := s.Scan(`password = "supersecretpassword123"`)
|
|
hasGeneric := false
|
|
for _, m := range matches {
|
|
if m.Pattern == "generic_secret_assign" {
|
|
hasGeneric = true
|
|
break
|
|
}
|
|
}
|
|
if !hasGeneric {
|
|
t.Error("should detect generic secret assignment")
|
|
}
|
|
}
|
|
|
|
func TestScanner_DetectsDatabaseURL(t *testing.T) {
|
|
s := NewScanner(4.5)
|
|
matches := s.Scan("postgres://admin:secretpass@db.example.com:5432/mydb")
|
|
hasDB := false
|
|
for _, m := range matches {
|
|
if m.Pattern == "database_url" {
|
|
hasDB = true
|
|
break
|
|
}
|
|
}
|
|
if !hasDB {
|
|
t.Error("should detect database URL with credentials")
|
|
}
|
|
}
|
|
|
|
func TestScanner_NoFalsePositives(t *testing.T) {
|
|
s := NewScanner(6.0) // high entropy threshold to avoid false positives
|
|
safe := []string{
|
|
"hello world",
|
|
"func main() {}",
|
|
"https://example.com/path",
|
|
"go test ./...",
|
|
"The quick brown fox jumps over the lazy dog",
|
|
}
|
|
for _, text := range safe {
|
|
matches := s.Scan(text)
|
|
if len(matches) > 0 {
|
|
t.Errorf("false positive on %q: %v", text, matches[0].Pattern)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestScanner_Entropy(t *testing.T) {
|
|
s := NewScanner(4.0) // lower threshold for testing
|
|
|
|
// High entropy string (random-looking)
|
|
matches := s.Scan("token: aB3dE5fG7hI9jK1lM3nO5pQ7rS9tU1v")
|
|
hasEntropy := false
|
|
for _, m := range matches {
|
|
if m.Pattern == "high_entropy" {
|
|
hasEntropy = true
|
|
break
|
|
}
|
|
}
|
|
if !hasEntropy {
|
|
t.Error("should detect high entropy string")
|
|
}
|
|
}
|
|
|
|
func TestShannonEntropy(t *testing.T) {
|
|
tests := []struct {
|
|
input string
|
|
minBits float64
|
|
maxBits float64
|
|
}{
|
|
{"aaaa", 0, 0.1}, // very low entropy
|
|
{"abcd", 1.9, 2.1}, // 4 unique chars = ~2 bits
|
|
{"abcdefgh", 2.9, 3.1}, // 8 unique = ~3 bits
|
|
{"aB3dE5fG7hI9jK", 3.5, 4.5}, // mixed case + digits
|
|
}
|
|
for _, tt := range tests {
|
|
e := shannonEntropy(tt.input)
|
|
if e < tt.minBits || e > tt.maxBits {
|
|
t.Errorf("shannonEntropy(%q) = %.2f, want [%.1f, %.1f]", tt.input, e, tt.minBits, tt.maxBits)
|
|
}
|
|
}
|
|
}
|
|
|
|
// --- Redactor ---
|
|
|
|
func TestRedact_SingleMatch(t *testing.T) {
|
|
content := `AKIAIOSFODNN7EXAMPLE is my key`
|
|
s := NewScanner(6.0)
|
|
matches := s.Scan(content)
|
|
|
|
result := Redact(content, matches)
|
|
if strings.Contains(result, "AKIA") {
|
|
t.Error("should have redacted the key")
|
|
}
|
|
if !strings.Contains(result, "[REDACTED]") {
|
|
t.Error("should contain [REDACTED] placeholder")
|
|
}
|
|
}
|
|
|
|
func TestRedact_MultipleMatches(t *testing.T) {
|
|
content := "aws: AKIAIOSFODNN7EXAMPLE github: ghp_ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghij"
|
|
s := NewScanner(6.0)
|
|
matches := s.Scan(content)
|
|
|
|
result := Redact(content, matches)
|
|
if strings.Contains(result, "AKIA") {
|
|
t.Error("should redact AWS key")
|
|
}
|
|
if strings.Contains(result, "ghp_") {
|
|
t.Error("should redact GitHub PAT")
|
|
}
|
|
count := strings.Count(result, "[REDACTED]")
|
|
if count < 2 {
|
|
t.Errorf("expected at least 2 redactions, got %d in: %q", count, result)
|
|
}
|
|
}
|
|
|
|
func TestRedact_NoMatches(t *testing.T) {
|
|
content := "hello world"
|
|
result := Redact(content, nil)
|
|
if result != content {
|
|
t.Errorf("should return unchanged content, got %q", result)
|
|
}
|
|
}
|
|
|
|
func TestRedact_SkipsWarnAction(t *testing.T) {
|
|
matches := []SecretMatch{
|
|
{Pattern: "test", Action: ActionWarn, Start: 0, End: 5},
|
|
}
|
|
result := Redact("hello world", matches)
|
|
if result != "hello world" {
|
|
t.Errorf("warn-only matches should not be redacted, got %q", result)
|
|
}
|
|
}
|
|
|
|
// --- Unicode Sanitization ---
|
|
|
|
func TestSanitizeUnicode_Normal(t *testing.T) {
|
|
normal := "Hello, world! 123"
|
|
result := SanitizeUnicode(normal)
|
|
if result != normal {
|
|
t.Errorf("normal text should be unchanged, got %q", result)
|
|
}
|
|
}
|
|
|
|
func TestSanitizeUnicode_StripsTags(t *testing.T) {
|
|
// Unicode tag characters (U+E0000-U+E007F) used for ASCII smuggling
|
|
tagged := "Hello" + string([]rune{0xE0048, 0xE0065, 0xE006C, 0xE006C, 0xE006F}) + " world"
|
|
result := SanitizeUnicode(tagged)
|
|
if result != "Hello world" {
|
|
t.Errorf("should strip tag characters, got %q (len=%d)", result, len(result))
|
|
}
|
|
}
|
|
|
|
func TestSanitizeUnicode_StripsZeroWidth(t *testing.T) {
|
|
// Zero-width space (U+200B), zero-width joiner (U+200D)
|
|
zwsp := "Hello\u200B\u200Dworld"
|
|
result := SanitizeUnicode(zwsp)
|
|
if result != "Helloworld" {
|
|
t.Errorf("should strip zero-width characters, got %q", result)
|
|
}
|
|
}
|
|
|
|
func TestSanitizeUnicode_StripsRTL(t *testing.T) {
|
|
// RTL override (U+202E) used for visual spoofing
|
|
rtl := "Hello\u202Eworld"
|
|
result := SanitizeUnicode(rtl)
|
|
if strings.ContainsRune(result, 0x202E) {
|
|
t.Error("should strip RTL override character")
|
|
}
|
|
}
|
|
|
|
func TestSanitizeUnicode_PreservesNewlines(t *testing.T) {
|
|
multiline := "line1\nline2\ttab"
|
|
result := SanitizeUnicode(multiline)
|
|
if result != multiline {
|
|
t.Errorf("should preserve newlines and tabs, got %q", result)
|
|
}
|
|
}
|
|
|
|
func TestSanitizeUnicode_PreservesEmoji(t *testing.T) {
|
|
emoji := "Hello 😊 world"
|
|
result := SanitizeUnicode(emoji)
|
|
if result != emoji {
|
|
t.Errorf("should preserve emoji, got %q", result)
|
|
}
|
|
}
|
|
|
|
// --- Incognito ---
|
|
|
|
func TestIncognito_DefaultOff(t *testing.T) {
|
|
m := NewIncognitoMode()
|
|
if m.Active() {
|
|
t.Error("should default to inactive")
|
|
}
|
|
if !m.ShouldPersist() {
|
|
t.Error("should allow persistence when not incognito")
|
|
}
|
|
if !m.ShouldLearn() {
|
|
t.Error("should allow learning when not incognito")
|
|
}
|
|
}
|
|
|
|
func TestIncognito_Activate(t *testing.T) {
|
|
m := NewIncognitoMode()
|
|
m.Activate()
|
|
|
|
if !m.Active() {
|
|
t.Error("should be active")
|
|
}
|
|
if m.ShouldPersist() {
|
|
t.Error("should not persist in incognito")
|
|
}
|
|
if m.ShouldLearn() {
|
|
t.Error("should not learn in incognito")
|
|
}
|
|
if m.ShouldLogContent() {
|
|
t.Error("should not log content in incognito")
|
|
}
|
|
}
|
|
|
|
func TestIncognito_Toggle(t *testing.T) {
|
|
m := NewIncognitoMode()
|
|
|
|
active := m.Toggle()
|
|
if !active {
|
|
t.Error("first toggle should activate")
|
|
}
|
|
|
|
active = m.Toggle()
|
|
if active {
|
|
t.Error("second toggle should deactivate")
|
|
}
|
|
}
|
|
|
|
// --- Firewall ---
|
|
|
|
func TestFirewall_ScanOutgoing(t *testing.T) {
|
|
fw := NewFirewall(FirewallConfig{
|
|
ScanOutgoing: true,
|
|
EntropyThreshold: 6.0,
|
|
})
|
|
|
|
msgs := []message.Message{
|
|
message.NewUserText("my key is sk-ant-api03-abcdefghijklmnopqrstuvwxyz"),
|
|
}
|
|
|
|
cleaned := fw.ScanOutgoingMessages(msgs)
|
|
text := cleaned[0].TextContent()
|
|
|
|
if strings.Contains(text, "sk-ant-") {
|
|
t.Error("should redact Anthropic key from outgoing message")
|
|
}
|
|
if !strings.Contains(text, "[REDACTED]") {
|
|
t.Errorf("should contain [REDACTED], got %q", text)
|
|
}
|
|
}
|
|
|
|
func TestFirewall_ScanToolResult(t *testing.T) {
|
|
fw := NewFirewall(FirewallConfig{
|
|
ScanToolResults: true,
|
|
EntropyThreshold: 6.0,
|
|
})
|
|
|
|
result := fw.ScanToolResult("contents of .env:\nOPENAI_API_KEY=sk-proj-testkey1234567890abcdef12345")
|
|
if strings.Contains(result, "sk-proj-") {
|
|
t.Error("should redact key from tool result")
|
|
}
|
|
}
|
|
|
|
func TestFirewall_DisabledScanning(t *testing.T) {
|
|
fw := NewFirewall(FirewallConfig{
|
|
ScanOutgoing: false,
|
|
ScanToolResults: false,
|
|
})
|
|
|
|
original := "sk-ant-api03-abcdefghijklmnopqrstuvwxyz"
|
|
msgs := []message.Message{message.NewUserText(original)}
|
|
|
|
cleaned := fw.ScanOutgoingMessages(msgs)
|
|
if cleaned[0].TextContent() != original {
|
|
t.Error("disabled scanning should pass through unchanged")
|
|
}
|
|
|
|
result := fw.ScanToolResult(original)
|
|
if result != original {
|
|
t.Error("disabled scanning should pass through tool results unchanged")
|
|
}
|
|
}
|
|
|
|
func TestFirewall_UnicodeCleanedBeforeSecretScan(t *testing.T) {
|
|
fw := NewFirewall(FirewallConfig{
|
|
ScanOutgoing: true,
|
|
EntropyThreshold: 6.0,
|
|
})
|
|
|
|
// Unicode tags embedded in text
|
|
tagged := "normal text" + string([]rune{0xE0048, 0xE0065}) + " more text"
|
|
msgs := []message.Message{message.NewUserText(tagged)}
|
|
|
|
cleaned := fw.ScanOutgoingMessages(msgs)
|
|
text := cleaned[0].TextContent()
|
|
if strings.ContainsRune(text, 0xE0048) {
|
|
t.Error("unicode tags should be stripped")
|
|
}
|
|
}
|
|
|
|
func TestFirewall_ActionBlockReturnsBlockedString(t *testing.T) {
|
|
// Pattern with ActionBlock should return a blocked marker, not the original content
|
|
fw := NewFirewall(FirewallConfig{
|
|
ScanOutgoing: true,
|
|
EntropyThreshold: 3.0,
|
|
})
|
|
if err := fw.Scanner().AddPattern("test_block", `BLOCK_THIS_SECRET`, ActionBlock); err != nil {
|
|
t.Fatalf("AddPattern: %v", err)
|
|
}
|
|
|
|
msgs := []message.Message{
|
|
message.NewUserText("some text BLOCK_THIS_SECRET more text"),
|
|
}
|
|
cleaned := fw.ScanOutgoingMessages(msgs)
|
|
text := cleaned[0].TextContent()
|
|
|
|
if strings.Contains(text, "BLOCK_THIS_SECRET") {
|
|
t.Error("ActionBlock content should not pass through")
|
|
}
|
|
if !strings.Contains(text, "[BLOCKED:") {
|
|
t.Errorf("expected [BLOCKED: ...] marker, got %q", text)
|
|
}
|
|
}
|
|
|
|
func TestScanner_DedupKeyNoCollision(t *testing.T) {
|
|
// Two matches at byte offsets > 127 in the same pattern should both appear,
|
|
// not get deduplicated because of hash collision in the key.
|
|
s := NewScanner(3.0)
|
|
// Build a string where two matches appear after offset 127
|
|
prefix := strings.Repeat("x", 128) // push matches past offset 127
|
|
input := prefix + "sk-ant-api03-aaaaaaaabbbbbbbbcccccccc " + prefix + "sk-ant-api03-ddddddddeeeeeeeeffffffff"
|
|
matches := s.Scan(input)
|
|
|
|
count := 0
|
|
for _, m := range matches {
|
|
if m.Pattern == "anthropic_api_key" {
|
|
count++
|
|
}
|
|
}
|
|
|
|
if count < 2 {
|
|
t.Errorf("expected 2 distinct Anthropic key matches after offset 127, got %d (dedup key collision?)", count)
|
|
}
|
|
}
|