363 lines
9.7 KiB
Go
363 lines
9.7 KiB
Go
// Package services provides business logic services for the application.
|
|
package services
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"encoding/json"
|
|
"io"
|
|
"os"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/Wikid82/charon/backend/internal/logger"
|
|
"github.com/Wikid82/charon/backend/internal/models"
|
|
)
|
|
|
|
// LogWatcher provides real-time tailing of Caddy access logs.
|
|
// It is a singleton service that can have multiple WebSocket clients subscribe
|
|
// to receive security-relevant log entries in real-time.
|
|
type LogWatcher struct {
|
|
mu sync.RWMutex
|
|
subscribers map[chan models.SecurityLogEntry]struct{}
|
|
logPath string
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
started bool
|
|
}
|
|
|
|
// NewLogWatcher creates a new LogWatcher instance for the given log file path.
|
|
func NewLogWatcher(logPath string) *LogWatcher {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
return &LogWatcher{
|
|
subscribers: make(map[chan models.SecurityLogEntry]struct{}),
|
|
logPath: logPath,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}
|
|
}
|
|
|
|
// Start begins tailing the log file. This method is idempotent.
|
|
func (w *LogWatcher) Start(ctx context.Context) error {
|
|
w.mu.Lock()
|
|
if w.started {
|
|
w.mu.Unlock()
|
|
return nil
|
|
}
|
|
w.started = true
|
|
w.mu.Unlock()
|
|
|
|
go w.tailFile()
|
|
logger.Log().WithField("path", w.logPath).Info("LogWatcher started")
|
|
return nil
|
|
}
|
|
|
|
// Stop halts the log watcher and closes all subscriber channels.
|
|
func (w *LogWatcher) Stop() {
|
|
w.cancel()
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
for ch := range w.subscribers {
|
|
close(ch)
|
|
delete(w.subscribers, ch)
|
|
}
|
|
w.started = false
|
|
logger.Log().Info("LogWatcher stopped")
|
|
}
|
|
|
|
// Subscribe adds a new subscriber and returns a channel for receiving log entries.
|
|
// The caller is responsible for calling Unsubscribe when done.
|
|
func (w *LogWatcher) Subscribe() <-chan models.SecurityLogEntry {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
ch := make(chan models.SecurityLogEntry, 100)
|
|
w.subscribers[ch] = struct{}{}
|
|
logger.Log().WithField("subscriber_count", len(w.subscribers)).Debug("New subscriber added to LogWatcher")
|
|
return ch
|
|
}
|
|
|
|
// Unsubscribe removes a subscriber channel.
|
|
func (w *LogWatcher) Unsubscribe(ch <-chan models.SecurityLogEntry) {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
// Type assert to get the writable channel for map lookup
|
|
// The channel passed in is receive-only, but we stored the bidirectional channel
|
|
for subCh := range w.subscribers {
|
|
// Compare the underlying channel - convert bidirectional to receive-only for comparison
|
|
recvOnlyCh := (<-chan models.SecurityLogEntry)(subCh) //nolint:gocritic // Type conversion required for channel comparison
|
|
if recvOnlyCh == ch {
|
|
close(subCh)
|
|
delete(w.subscribers, subCh)
|
|
logger.Log().WithField("subscriber_count", len(w.subscribers)).Debug("Subscriber removed from LogWatcher")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// broadcast sends a log entry to all subscribers.
|
|
// Non-blocking: if a subscriber's channel is full, the entry is dropped for that subscriber.
|
|
func (w *LogWatcher) broadcast(entry models.SecurityLogEntry) {
|
|
w.mu.RLock()
|
|
defer w.mu.RUnlock()
|
|
|
|
for ch := range w.subscribers {
|
|
select {
|
|
case ch <- entry:
|
|
// Successfully sent
|
|
default:
|
|
// Channel is full, skip (prevents blocking other subscribers)
|
|
}
|
|
}
|
|
}
|
|
|
|
// tailFile continuously reads new entries from the log file.
|
|
// It handles file rotation and missing files gracefully.
|
|
func (w *LogWatcher) tailFile() {
|
|
for {
|
|
select {
|
|
case <-w.ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
// Wait for file to exist
|
|
if _, err := os.Stat(w.logPath); os.IsNotExist(err) {
|
|
logger.Log().WithField("path", w.logPath).Debug("Log file not found, waiting...")
|
|
time.Sleep(time.Second)
|
|
continue
|
|
}
|
|
|
|
// Open the file
|
|
file, err := os.Open(w.logPath)
|
|
if err != nil {
|
|
logger.Log().WithError(err).WithField("path", w.logPath).Error("Failed to open log file for tailing")
|
|
time.Sleep(time.Second)
|
|
continue
|
|
}
|
|
|
|
// Seek to end of file (we only want new entries)
|
|
if _, err := file.Seek(0, io.SeekEnd); err != nil {
|
|
logger.Log().WithError(err).Warn("Failed to seek to end of log file")
|
|
}
|
|
|
|
w.readLoop(file)
|
|
if err := file.Close(); err != nil {
|
|
logger.Log().WithError(err).Warn("Failed to close log file")
|
|
}
|
|
|
|
// Brief pause before reopening (handles log rotation)
|
|
time.Sleep(time.Second)
|
|
}
|
|
}
|
|
|
|
// readLoop reads lines from the file until EOF or error.
|
|
func (w *LogWatcher) readLoop(file *os.File) {
|
|
reader := bufio.NewReader(file)
|
|
for {
|
|
select {
|
|
case <-w.ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
line, err := reader.ReadString('\n')
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
// No new data, wait and retry
|
|
time.Sleep(100 * time.Millisecond)
|
|
continue
|
|
}
|
|
// File may have been rotated or truncated
|
|
logger.Log().WithError(err).Debug("Error reading log file, will reopen")
|
|
return
|
|
}
|
|
|
|
// Skip empty lines
|
|
line = strings.TrimSpace(line)
|
|
if line == "" {
|
|
continue
|
|
}
|
|
|
|
entry := w.ParseLogEntry(line)
|
|
if entry != nil {
|
|
w.broadcast(*entry)
|
|
}
|
|
}
|
|
}
|
|
|
|
// ParseLogEntry converts a Caddy JSON log line into a SecurityLogEntry.
|
|
// Returns nil if the line cannot be parsed.
|
|
func (w *LogWatcher) ParseLogEntry(line string) *models.SecurityLogEntry {
|
|
var caddyLog models.CaddyAccessLog
|
|
if err := json.Unmarshal([]byte(line), &caddyLog); err != nil {
|
|
logger.Log().WithError(err).WithField("line", line[:minInt(100, len(line))]).Debug("Failed to parse log line as JSON")
|
|
return nil
|
|
}
|
|
|
|
// Convert Caddy timestamp (Unix float) to RFC3339
|
|
timestamp := time.Unix(int64(caddyLog.Ts), int64((caddyLog.Ts-float64(int64(caddyLog.Ts)))*1e9))
|
|
|
|
// Extract User-Agent from headers
|
|
userAgent := ""
|
|
if ua, ok := caddyLog.Request.Headers["User-Agent"]; ok && len(ua) > 0 {
|
|
userAgent = ua[0]
|
|
}
|
|
|
|
entry := &models.SecurityLogEntry{
|
|
Timestamp: timestamp.Format(time.RFC3339),
|
|
Level: caddyLog.Level,
|
|
Logger: caddyLog.Logger,
|
|
ClientIP: caddyLog.Request.RemoteIP,
|
|
Method: caddyLog.Request.Method,
|
|
URI: caddyLog.Request.URI,
|
|
Status: caddyLog.Status,
|
|
Duration: caddyLog.Duration,
|
|
Size: int64(caddyLog.Size),
|
|
UserAgent: userAgent,
|
|
Host: caddyLog.Request.Host,
|
|
Source: "normal",
|
|
Blocked: false,
|
|
Details: make(map[string]any),
|
|
}
|
|
|
|
// Detect security events based on status codes and response headers
|
|
w.detectSecurityEvent(entry, &caddyLog)
|
|
|
|
return entry
|
|
}
|
|
|
|
// detectSecurityEvent analyzes the log entry and sets security-related fields.
|
|
func (w *LogWatcher) detectSecurityEvent(entry *models.SecurityLogEntry, caddyLog *models.CaddyAccessLog) {
|
|
loggerLower := strings.ToLower(caddyLog.Logger)
|
|
|
|
// Check for WAF/Coraza indicators (highest priority for 403s)
|
|
if strings.Contains(loggerLower, "waf") ||
|
|
strings.Contains(loggerLower, "coraza") ||
|
|
hasHeader(caddyLog.RespHeaders, "X-Coraza-Id") ||
|
|
hasHeader(caddyLog.RespHeaders, "X-Coraza-Rule-Id") {
|
|
entry.Blocked = true
|
|
entry.Source = "waf"
|
|
entry.Level = "warn"
|
|
entry.BlockReason = "WAF rule triggered"
|
|
|
|
// Try to extract rule ID from headers
|
|
if ruleID, ok := caddyLog.RespHeaders["X-Coraza-Id"]; ok && len(ruleID) > 0 {
|
|
entry.Details["rule_id"] = ruleID[0]
|
|
}
|
|
if ruleID, ok := caddyLog.RespHeaders["X-Coraza-Rule-Id"]; ok && len(ruleID) > 0 {
|
|
entry.Details["rule_id"] = ruleID[0]
|
|
}
|
|
return
|
|
}
|
|
|
|
// Check for CrowdSec indicators
|
|
if strings.Contains(loggerLower, "crowdsec") ||
|
|
strings.Contains(loggerLower, "bouncer") ||
|
|
hasHeader(caddyLog.RespHeaders, "X-Crowdsec-Decision") ||
|
|
hasHeader(caddyLog.RespHeaders, "X-Crowdsec-Origin") {
|
|
entry.Blocked = true
|
|
entry.Source = "crowdsec"
|
|
entry.Level = "warn"
|
|
entry.BlockReason = "CrowdSec decision"
|
|
|
|
// Extract CrowdSec-specific headers
|
|
if origin, ok := caddyLog.RespHeaders["X-Crowdsec-Origin"]; ok && len(origin) > 0 {
|
|
entry.Details["crowdsec_origin"] = origin[0]
|
|
}
|
|
return
|
|
}
|
|
|
|
// Check for ACL blocks
|
|
if strings.Contains(loggerLower, "acl") ||
|
|
hasHeader(caddyLog.RespHeaders, "X-Acl-Denied") ||
|
|
hasHeader(caddyLog.RespHeaders, "X-Blocked-By-Acl") {
|
|
entry.Blocked = true
|
|
entry.Source = "acl"
|
|
entry.Level = "warn"
|
|
entry.BlockReason = "Access list denied"
|
|
return
|
|
}
|
|
|
|
// Check for rate limiting (429 Too Many Requests)
|
|
if caddyLog.Status == 429 {
|
|
entry.Blocked = true
|
|
entry.Source = "ratelimit"
|
|
entry.Level = "warn"
|
|
entry.BlockReason = "Rate limit exceeded"
|
|
|
|
// Extract rate limit headers if present
|
|
if remaining, ok := caddyLog.RespHeaders["X-Ratelimit-Remaining"]; ok && len(remaining) > 0 {
|
|
entry.Details["ratelimit_remaining"] = remaining[0]
|
|
}
|
|
if reset, ok := caddyLog.RespHeaders["X-Ratelimit-Reset"]; ok && len(reset) > 0 {
|
|
entry.Details["ratelimit_reset"] = reset[0]
|
|
}
|
|
if limit, ok := caddyLog.RespHeaders["X-Ratelimit-Limit"]; ok && len(limit) > 0 {
|
|
entry.Details["ratelimit_limit"] = limit[0]
|
|
}
|
|
return
|
|
}
|
|
|
|
// Check for other 403s (generic security block)
|
|
if caddyLog.Status == 403 {
|
|
entry.Blocked = true
|
|
entry.Source = "cerberus"
|
|
entry.Level = "warn"
|
|
entry.BlockReason = "Access denied"
|
|
return
|
|
}
|
|
|
|
// Check for authentication failures
|
|
if caddyLog.Status == 401 {
|
|
entry.Level = "warn"
|
|
entry.Source = "auth"
|
|
entry.Details["auth_failure"] = true
|
|
return
|
|
}
|
|
|
|
// Check for server errors
|
|
if caddyLog.Status >= 500 {
|
|
entry.Level = "error"
|
|
return
|
|
}
|
|
|
|
// Normal traffic - set appropriate level based on status
|
|
entry.Source = "normal"
|
|
entry.Blocked = false
|
|
if caddyLog.Status >= 400 {
|
|
entry.Level = "warn"
|
|
} else {
|
|
entry.Level = "info"
|
|
}
|
|
}
|
|
|
|
// hasHeader checks if a header map contains a specific key (case-insensitive).
|
|
func hasHeader(headers map[string][]string, key string) bool {
|
|
if headers == nil {
|
|
return false
|
|
}
|
|
// Direct lookup first
|
|
if _, ok := headers[key]; ok {
|
|
return true
|
|
}
|
|
// Case-insensitive fallback
|
|
for k := range headers {
|
|
if strings.EqualFold(k, key) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// minInt returns the minimum of two integers.
|
|
func minInt(a, b int) int {
|
|
if a < b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|