Migrate analytics from SQLite to ClickHouse
SQLite was too slow for analytical aggregations on traffic_events and waf_events (millions of rows, GROUP BY, COUNT DISTINCT). ClickHouse is a columnar OLAP database purpose-built for this workload. - Add ClickHouse container to Docker Compose with health check - Create src/lib/clickhouse/client.ts with singleton client, table DDL, insert helpers, and all analytics query functions - Update log-parser.ts and waf-log-parser.ts to write to ClickHouse - Remove purgeOldEntries — ClickHouse TTL handles 90-day retention - Rewrite analytics-db.ts and waf-events.ts to query ClickHouse - Remove trafficEvents/wafEvents from SQLite schema, add migration - CLICKHOUSE_PASSWORD is required (no hardcoded default) - Update .env.example, README, and test infrastructure API response shapes are unchanged — no frontend modifications needed. Parse state (file offsets) remains in SQLite. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
+32
-219
@@ -1,7 +1,24 @@
|
||||
import { sql, and, gte, lte, eq, inArray } from 'drizzle-orm';
|
||||
import db from './db';
|
||||
import { trafficEvents, proxyHosts, wafEvents } from './db/schema';
|
||||
import { existsSync } from 'node:fs';
|
||||
import db from './db';
|
||||
import { proxyHosts } from './db/schema';
|
||||
import {
|
||||
querySummary,
|
||||
queryTimeline,
|
||||
queryCountries,
|
||||
queryProtocols,
|
||||
queryUserAgents,
|
||||
queryBlocked,
|
||||
queryDistinctHosts,
|
||||
type AnalyticsSummary as CHSummary,
|
||||
type TimelineBucket,
|
||||
type CountryStats,
|
||||
type ProtoStats,
|
||||
type UAStats,
|
||||
type BlockedEvent,
|
||||
type BlockedPage,
|
||||
} from './clickhouse/client';
|
||||
|
||||
export type { TimelineBucket, CountryStats, ProtoStats, UAStats, BlockedEvent, BlockedPage };
|
||||
|
||||
export type Interval = '1h' | '12h' | '24h' | '7d' | '30d';
|
||||
|
||||
@@ -15,250 +32,46 @@ export const INTERVAL_SECONDS: Record<Interval, number> = {
|
||||
'30d': 30 * 86400,
|
||||
};
|
||||
|
||||
function buildWhere(from: number, to: number, hosts: string[]) {
|
||||
const conditions = [gte(trafficEvents.ts, from), lte(trafficEvents.ts, to)];
|
||||
if (hosts.length === 1) {
|
||||
conditions.push(eq(trafficEvents.host, hosts[0]));
|
||||
} else if (hosts.length > 1) {
|
||||
conditions.push(inArray(trafficEvents.host, hosts));
|
||||
}
|
||||
return and(...conditions);
|
||||
}
|
||||
|
||||
// ── Summary ──────────────────────────────────────────────────────────────────
|
||||
|
||||
export interface AnalyticsSummary {
|
||||
totalRequests: number;
|
||||
uniqueIps: number;
|
||||
blockedRequests: number;
|
||||
blockedPercent: number;
|
||||
bytesServed: number;
|
||||
export interface AnalyticsSummary extends CHSummary {
|
||||
loggingDisabled: boolean;
|
||||
}
|
||||
|
||||
export async function getAnalyticsSummary(from: number, to: number, hosts: string[]): Promise<AnalyticsSummary> {
|
||||
const loggingDisabled = !existsSync(LOG_FILE);
|
||||
const where = buildWhere(from, to, hosts);
|
||||
|
||||
const row = db
|
||||
.select({
|
||||
total: sql<number>`count(*)`,
|
||||
uniqueIps: sql<number>`count(distinct ${trafficEvents.clientIp})`,
|
||||
blocked: sql<number>`sum(case when ${trafficEvents.isBlocked} then 1 else 0 end)`,
|
||||
bytes: sql<number>`sum(${trafficEvents.bytesSent})`,
|
||||
})
|
||||
.from(trafficEvents)
|
||||
.where(where)
|
||||
.get();
|
||||
|
||||
// Count WAF blocks in the same time window
|
||||
const wafConditions = [gte(wafEvents.ts, from), lte(wafEvents.ts, to), eq(wafEvents.blocked, true)];
|
||||
if (hosts.length === 1) {
|
||||
wafConditions.push(eq(wafEvents.host, hosts[0]));
|
||||
} else if (hosts.length > 1) {
|
||||
wafConditions.push(inArray(wafEvents.host, hosts));
|
||||
}
|
||||
const wafRow = db
|
||||
.select({ blocked: sql<number>`count(*)` })
|
||||
.from(wafEvents)
|
||||
.where(and(...wafConditions))
|
||||
.get();
|
||||
|
||||
const total = row?.total ?? 0;
|
||||
const geoBlocked = row?.blocked ?? 0;
|
||||
const wafBlocked = wafRow?.blocked ?? 0;
|
||||
const blocked = geoBlocked + wafBlocked;
|
||||
|
||||
return {
|
||||
totalRequests: total,
|
||||
uniqueIps: row?.uniqueIps ?? 0,
|
||||
blockedRequests: blocked,
|
||||
blockedPercent: total > 0 ? Math.round((blocked / total) * 1000) / 10 : 0,
|
||||
bytesServed: row?.bytes ?? 0,
|
||||
loggingDisabled,
|
||||
};
|
||||
const summary = await querySummary(from, to, hosts);
|
||||
return { ...summary, loggingDisabled };
|
||||
}
|
||||
|
||||
// ── Timeline ─────────────────────────────────────────────────────────────────
|
||||
|
||||
export interface TimelineBucket {
|
||||
ts: number;
|
||||
total: number;
|
||||
blocked: number;
|
||||
}
|
||||
|
||||
function bucketSizeForDuration(seconds: number): number {
|
||||
if (seconds <= 3600) return 300;
|
||||
if (seconds <= 43200) return 1800;
|
||||
if (seconds <= 86400) return 3600;
|
||||
if (seconds <= 7 * 86400) return 21600;
|
||||
return 86400;
|
||||
}
|
||||
|
||||
export async function getAnalyticsTimeline(from: number, to: number, hosts: string[]): Promise<TimelineBucket[]> {
|
||||
const bucketSize = bucketSizeForDuration(to - from);
|
||||
const where = buildWhere(from, to, hosts);
|
||||
|
||||
const rows = db
|
||||
.select({
|
||||
bucket: sql<number>`(${trafficEvents.ts} / ${sql.raw(String(bucketSize))})`,
|
||||
total: sql<number>`count(*)`,
|
||||
blocked: sql<number>`sum(case when ${trafficEvents.isBlocked} then 1 else 0 end)`,
|
||||
})
|
||||
.from(trafficEvents)
|
||||
.where(where)
|
||||
.groupBy(sql`(${trafficEvents.ts} / ${sql.raw(String(bucketSize))})`)
|
||||
.orderBy(sql`(${trafficEvents.ts} / ${sql.raw(String(bucketSize))})`)
|
||||
.all();
|
||||
|
||||
return rows.map((r) => ({
|
||||
ts: r.bucket * bucketSize,
|
||||
total: r.total,
|
||||
blocked: r.blocked ?? 0,
|
||||
}));
|
||||
return queryTimeline(from, to, hosts);
|
||||
}
|
||||
|
||||
// ── Countries ────────────────────────────────────────────────────────────────
|
||||
|
||||
export interface CountryStats {
|
||||
countryCode: string;
|
||||
total: number;
|
||||
blocked: number;
|
||||
}
|
||||
|
||||
export async function getAnalyticsCountries(from: number, to: number, hosts: string[]): Promise<CountryStats[]> {
|
||||
const where = buildWhere(from, to, hosts);
|
||||
|
||||
const rows = db
|
||||
.select({
|
||||
countryCode: trafficEvents.countryCode,
|
||||
total: sql<number>`count(*)`,
|
||||
blocked: sql<number>`sum(case when ${trafficEvents.isBlocked} then 1 else 0 end)`,
|
||||
})
|
||||
.from(trafficEvents)
|
||||
.where(where)
|
||||
.groupBy(trafficEvents.countryCode)
|
||||
.orderBy(sql`count(*) desc`)
|
||||
.all();
|
||||
|
||||
return rows.map((r) => ({
|
||||
countryCode: r.countryCode ?? 'XX',
|
||||
total: r.total,
|
||||
blocked: r.blocked ?? 0,
|
||||
}));
|
||||
return queryCountries(from, to, hosts);
|
||||
}
|
||||
|
||||
// ── Protocols ────────────────────────────────────────────────────────────────
|
||||
|
||||
export interface ProtoStats {
|
||||
proto: string;
|
||||
count: number;
|
||||
percent: number;
|
||||
}
|
||||
|
||||
export async function getAnalyticsProtocols(from: number, to: number, hosts: string[]): Promise<ProtoStats[]> {
|
||||
const where = buildWhere(from, to, hosts);
|
||||
|
||||
const rows = db
|
||||
.select({
|
||||
proto: trafficEvents.proto,
|
||||
count: sql<number>`count(*)`,
|
||||
})
|
||||
.from(trafficEvents)
|
||||
.where(where)
|
||||
.groupBy(trafficEvents.proto)
|
||||
.orderBy(sql`count(*) desc`)
|
||||
.all();
|
||||
|
||||
const total = rows.reduce((s, r) => s + r.count, 0);
|
||||
|
||||
return rows.map((r) => ({
|
||||
proto: r.proto || 'Unknown',
|
||||
count: r.count,
|
||||
percent: total > 0 ? Math.round((r.count / total) * 1000) / 10 : 0,
|
||||
}));
|
||||
return queryProtocols(from, to, hosts);
|
||||
}
|
||||
|
||||
// ── User Agents ──────────────────────────────────────────────────────────────
|
||||
|
||||
export interface UAStats {
|
||||
userAgent: string;
|
||||
count: number;
|
||||
percent: number;
|
||||
}
|
||||
|
||||
export async function getAnalyticsUserAgents(from: number, to: number, hosts: string[]): Promise<UAStats[]> {
|
||||
const where = buildWhere(from, to, hosts);
|
||||
|
||||
const rows = db
|
||||
.select({
|
||||
userAgent: trafficEvents.userAgent,
|
||||
count: sql<number>`count(*)`,
|
||||
})
|
||||
.from(trafficEvents)
|
||||
.where(where)
|
||||
.groupBy(trafficEvents.userAgent)
|
||||
.orderBy(sql`count(*) desc`)
|
||||
.limit(10)
|
||||
.all();
|
||||
|
||||
const total = rows.reduce((s, r) => s + r.count, 0);
|
||||
|
||||
return rows.map((r) => ({
|
||||
userAgent: r.userAgent || 'Unknown',
|
||||
count: r.count,
|
||||
percent: total > 0 ? Math.round((r.count / total) * 1000) / 10 : 0,
|
||||
}));
|
||||
return queryUserAgents(from, to, hosts);
|
||||
}
|
||||
|
||||
// ── Blocked events ───────────────────────────────────────────────────────────
|
||||
|
||||
export interface BlockedEvent {
|
||||
id: number;
|
||||
ts: number;
|
||||
clientIp: string;
|
||||
countryCode: string | null;
|
||||
method: string;
|
||||
uri: string;
|
||||
status: number;
|
||||
host: string;
|
||||
}
|
||||
|
||||
export interface BlockedPage {
|
||||
events: BlockedEvent[];
|
||||
total: number;
|
||||
page: number;
|
||||
pages: number;
|
||||
}
|
||||
|
||||
export async function getAnalyticsBlocked(from: number, to: number, hosts: string[], page: number): Promise<BlockedPage> {
|
||||
const pageSize = 10;
|
||||
const where = and(buildWhere(from, to, hosts), eq(trafficEvents.isBlocked, true));
|
||||
|
||||
const totalRow = db.select({ total: sql<number>`count(*)` }).from(trafficEvents).where(where).get();
|
||||
const total = totalRow?.total ?? 0;
|
||||
const pages = Math.max(1, Math.ceil(total / pageSize));
|
||||
const safePage = Math.min(Math.max(1, page), pages);
|
||||
|
||||
const rows = db
|
||||
.select({
|
||||
id: trafficEvents.id,
|
||||
ts: trafficEvents.ts,
|
||||
clientIp: trafficEvents.clientIp,
|
||||
countryCode: trafficEvents.countryCode,
|
||||
method: trafficEvents.method,
|
||||
uri: trafficEvents.uri,
|
||||
status: trafficEvents.status,
|
||||
host: trafficEvents.host,
|
||||
})
|
||||
.from(trafficEvents)
|
||||
.where(where)
|
||||
.orderBy(sql`${trafficEvents.ts} desc`)
|
||||
.limit(pageSize)
|
||||
.offset((safePage - 1) * pageSize)
|
||||
.all();
|
||||
|
||||
return { events: rows, total, page: safePage, pages };
|
||||
return queryBlocked(from, to, hosts, page);
|
||||
}
|
||||
|
||||
// ── Hosts ────────────────────────────────────────────────────────────────────
|
||||
@@ -266,11 +79,11 @@ export async function getAnalyticsBlocked(from: number, to: number, hosts: strin
|
||||
export async function getAnalyticsHosts(): Promise<string[]> {
|
||||
const hostSet = new Set<string>();
|
||||
|
||||
// Hosts that appear in traffic events
|
||||
const trafficRows = db.selectDistinct({ host: trafficEvents.host }).from(trafficEvents).all();
|
||||
for (const r of trafficRows) if (r.host) hostSet.add(r.host);
|
||||
// Hosts from ClickHouse traffic events
|
||||
const chHosts = await queryDistinctHosts();
|
||||
for (const h of chHosts) if (h) hostSet.add(h);
|
||||
|
||||
// All domains configured on proxy hosts (even those with no traffic yet)
|
||||
// All domains configured on proxy hosts (SQLite)
|
||||
const proxyRows = db.select({ domains: proxyHosts.domains }).from(proxyHosts).all();
|
||||
for (const r of proxyRows) {
|
||||
try {
|
||||
|
||||
@@ -0,0 +1,527 @@
|
||||
import { createClient, type ClickHouseClient } from '@clickhouse/client';
|
||||
|
||||
// ── Configuration ───────────────────────────────────────────────────────────
|
||||
|
||||
const CH_URL = process.env.CLICKHOUSE_URL ?? 'http://clickhouse:8123';
|
||||
const CH_USER = process.env.CLICKHOUSE_USER ?? 'cpm';
|
||||
const CH_PASS = process.env.CLICKHOUSE_PASSWORD ?? '';
|
||||
const CH_DB = process.env.CLICKHOUSE_DB ?? 'analytics';
|
||||
|
||||
// ── Singleton client ────────────────────────────────────────────────────────
|
||||
|
||||
let client: ClickHouseClient | null = null;
|
||||
|
||||
export function getClient(): ClickHouseClient {
|
||||
if (!client) {
|
||||
client = createClient({
|
||||
url: CH_URL,
|
||||
username: CH_USER,
|
||||
password: CH_PASS,
|
||||
database: CH_DB,
|
||||
clickhouse_settings: {
|
||||
async_insert: 1,
|
||||
wait_for_async_insert: 0,
|
||||
},
|
||||
});
|
||||
}
|
||||
return client;
|
||||
}
|
||||
|
||||
// ── Table creation ──────────────────────────────────────────────────────────
|
||||
|
||||
const TRAFFIC_EVENTS_DDL = `
|
||||
CREATE TABLE IF NOT EXISTS traffic_events (
|
||||
ts DateTime,
|
||||
client_ip String,
|
||||
country_code Nullable(String),
|
||||
host String DEFAULT '',
|
||||
method String DEFAULT '',
|
||||
uri String DEFAULT '',
|
||||
status UInt16 DEFAULT 0,
|
||||
proto String DEFAULT '',
|
||||
bytes_sent UInt64 DEFAULT 0,
|
||||
user_agent String DEFAULT '',
|
||||
is_blocked Bool DEFAULT false
|
||||
) ENGINE = MergeTree()
|
||||
PARTITION BY toYYYYMM(ts)
|
||||
ORDER BY (host, ts)
|
||||
TTL ts + INTERVAL 90 DAY DELETE
|
||||
SETTINGS index_granularity = 8192
|
||||
`;
|
||||
|
||||
const WAF_EVENTS_DDL = `
|
||||
CREATE TABLE IF NOT EXISTS waf_events (
|
||||
ts DateTime,
|
||||
host String DEFAULT '',
|
||||
client_ip String,
|
||||
country_code Nullable(String),
|
||||
method String DEFAULT '',
|
||||
uri String DEFAULT '',
|
||||
rule_id Nullable(Int32),
|
||||
rule_message Nullable(String),
|
||||
severity Nullable(String),
|
||||
raw_data Nullable(String),
|
||||
blocked Bool DEFAULT true
|
||||
) ENGINE = MergeTree()
|
||||
PARTITION BY toYYYYMM(ts)
|
||||
ORDER BY (host, ts)
|
||||
TTL ts + INTERVAL 90 DAY DELETE
|
||||
SETTINGS index_granularity = 8192
|
||||
`;
|
||||
|
||||
export async function initClickHouse(): Promise<void> {
|
||||
const ch = getClient();
|
||||
// Ensure database exists (the default user may need to create it)
|
||||
await ch.command({ query: `CREATE DATABASE IF NOT EXISTS ${CH_DB}` });
|
||||
await ch.command({ query: TRAFFIC_EVENTS_DDL });
|
||||
await ch.command({ query: WAF_EVENTS_DDL });
|
||||
}
|
||||
|
||||
export async function closeClickHouse(): Promise<void> {
|
||||
if (client) {
|
||||
await client.close();
|
||||
client = null;
|
||||
}
|
||||
}
|
||||
|
||||
// ── Insert helpers ──────────────────────────────────────────────────────────
|
||||
|
||||
export interface TrafficEventRow {
|
||||
ts: number;
|
||||
client_ip: string;
|
||||
country_code: string | null;
|
||||
host: string;
|
||||
method: string;
|
||||
uri: string;
|
||||
status: number;
|
||||
proto: string;
|
||||
bytes_sent: number;
|
||||
user_agent: string;
|
||||
is_blocked: boolean;
|
||||
}
|
||||
|
||||
export interface WafEventRow {
|
||||
ts: number;
|
||||
host: string;
|
||||
client_ip: string;
|
||||
country_code: string | null;
|
||||
rule_id: number | null;
|
||||
rule_message: string | null;
|
||||
severity: string | null;
|
||||
raw_data: string | null;
|
||||
blocked: boolean;
|
||||
method: string;
|
||||
uri: string;
|
||||
}
|
||||
|
||||
export async function insertTrafficEvents(rows: TrafficEventRow[]): Promise<void> {
|
||||
if (rows.length === 0) return;
|
||||
const ch = getClient();
|
||||
// Convert unix timestamp to ClickHouse DateTime string
|
||||
const values = rows.map(r => ({
|
||||
...r,
|
||||
ts: new Date(r.ts * 1000).toISOString().replace('T', ' ').slice(0, 19),
|
||||
is_blocked: r.is_blocked ? 1 : 0,
|
||||
}));
|
||||
await ch.insert({ table: 'traffic_events', values, format: 'JSONEachRow' });
|
||||
}
|
||||
|
||||
export async function insertWafEvents(rows: WafEventRow[]): Promise<void> {
|
||||
if (rows.length === 0) return;
|
||||
const ch = getClient();
|
||||
const values = rows.map(r => ({
|
||||
...r,
|
||||
ts: new Date(r.ts * 1000).toISOString().replace('T', ' ').slice(0, 19),
|
||||
blocked: r.blocked ? 1 : 0,
|
||||
}));
|
||||
await ch.insert({ table: 'waf_events', values, format: 'JSONEachRow' });
|
||||
}
|
||||
|
||||
// ── Query helpers ───────────────────────────────────────────────────────────
|
||||
|
||||
function hostFilter(hosts: string[]): string {
|
||||
if (hosts.length === 0) return '';
|
||||
const escaped = hosts.map(h => `'${h.replace(/'/g, "\\'")}'`).join(',');
|
||||
return ` AND host IN (${escaped})`;
|
||||
}
|
||||
|
||||
function timeFilter(from: number, to: number): string {
|
||||
return `ts >= toDateTime(${from}) AND ts <= toDateTime(${to})`;
|
||||
}
|
||||
|
||||
async function queryRows<T>(query: string): Promise<T[]> {
|
||||
const ch = getClient();
|
||||
const result = await ch.query({ query, format: 'JSONEachRow' });
|
||||
return result.json<T>();
|
||||
}
|
||||
|
||||
async function queryRow<T>(query: string): Promise<T | null> {
|
||||
const rows = await queryRows<T>(query);
|
||||
return rows[0] ?? null;
|
||||
}
|
||||
|
||||
// ── Analytics queries (same signatures as old analytics-db.ts) ──────────────
|
||||
|
||||
export interface AnalyticsSummary {
|
||||
totalRequests: number;
|
||||
uniqueIps: number;
|
||||
blockedRequests: number;
|
||||
blockedPercent: number;
|
||||
bytesServed: number;
|
||||
}
|
||||
|
||||
export async function querySummary(from: number, to: number, hosts: string[]): Promise<AnalyticsSummary> {
|
||||
const hf = hostFilter(hosts);
|
||||
|
||||
const traffic = await queryRow<{ total: string; unique_ips: string; blocked: string; bytes: string }>(`
|
||||
SELECT
|
||||
count() AS total,
|
||||
uniq(client_ip) AS unique_ips,
|
||||
countIf(is_blocked) AS blocked,
|
||||
sum(bytes_sent) AS bytes
|
||||
FROM traffic_events
|
||||
WHERE ${timeFilter(from, to)}${hf}
|
||||
`);
|
||||
|
||||
const wafRow = await queryRow<{ waf_blocked: string }>(`
|
||||
SELECT count() AS waf_blocked
|
||||
FROM waf_events
|
||||
WHERE ${timeFilter(from, to)} AND blocked = true${hf}
|
||||
`);
|
||||
|
||||
const total = Number(traffic?.total ?? 0);
|
||||
const geoBlocked = Number(traffic?.blocked ?? 0);
|
||||
const wafBlocked = Number(wafRow?.waf_blocked ?? 0);
|
||||
const blocked = geoBlocked + wafBlocked;
|
||||
|
||||
return {
|
||||
totalRequests: total,
|
||||
uniqueIps: Number(traffic?.unique_ips ?? 0),
|
||||
blockedRequests: blocked,
|
||||
blockedPercent: total > 0 ? Math.round((blocked / total) * 1000) / 10 : 0,
|
||||
bytesServed: Number(traffic?.bytes ?? 0),
|
||||
};
|
||||
}
|
||||
|
||||
export interface TimelineBucket {
|
||||
ts: number;
|
||||
total: number;
|
||||
blocked: number;
|
||||
}
|
||||
|
||||
export function bucketSizeForDuration(seconds: number): number {
|
||||
if (seconds <= 3600) return 300;
|
||||
if (seconds <= 43200) return 1800;
|
||||
if (seconds <= 86400) return 3600;
|
||||
if (seconds <= 7 * 86400) return 21600;
|
||||
return 86400;
|
||||
}
|
||||
|
||||
export async function queryTimeline(from: number, to: number, hosts: string[]): Promise<TimelineBucket[]> {
|
||||
const bucketSize = bucketSizeForDuration(to - from);
|
||||
const hf = hostFilter(hosts);
|
||||
|
||||
const rows = await queryRows<{ bucket: string; total: string; blocked: string }>(`
|
||||
SELECT
|
||||
intDiv(toUInt32(ts), ${bucketSize}) AS bucket,
|
||||
count() AS total,
|
||||
countIf(is_blocked) AS blocked
|
||||
FROM traffic_events
|
||||
WHERE ${timeFilter(from, to)}${hf}
|
||||
GROUP BY bucket
|
||||
ORDER BY bucket
|
||||
`);
|
||||
|
||||
return rows.map(r => ({
|
||||
ts: Number(r.bucket) * bucketSize,
|
||||
total: Number(r.total),
|
||||
blocked: Number(r.blocked),
|
||||
}));
|
||||
}
|
||||
|
||||
export interface CountryStats {
|
||||
countryCode: string;
|
||||
total: number;
|
||||
blocked: number;
|
||||
}
|
||||
|
||||
export async function queryCountries(from: number, to: number, hosts: string[]): Promise<CountryStats[]> {
|
||||
const hf = hostFilter(hosts);
|
||||
|
||||
const rows = await queryRows<{ country_code: string | null; total: string; blocked: string }>(`
|
||||
SELECT
|
||||
country_code,
|
||||
count() AS total,
|
||||
countIf(is_blocked) AS blocked
|
||||
FROM traffic_events
|
||||
WHERE ${timeFilter(from, to)}${hf}
|
||||
GROUP BY country_code
|
||||
ORDER BY total DESC
|
||||
`);
|
||||
|
||||
return rows.map(r => ({
|
||||
countryCode: r.country_code ?? 'XX',
|
||||
total: Number(r.total),
|
||||
blocked: Number(r.blocked),
|
||||
}));
|
||||
}
|
||||
|
||||
export interface ProtoStats {
|
||||
proto: string;
|
||||
count: number;
|
||||
percent: number;
|
||||
}
|
||||
|
||||
export async function queryProtocols(from: number, to: number, hosts: string[]): Promise<ProtoStats[]> {
|
||||
const hf = hostFilter(hosts);
|
||||
|
||||
const rows = await queryRows<{ proto: string; count: string }>(`
|
||||
SELECT
|
||||
proto,
|
||||
count() AS count
|
||||
FROM traffic_events
|
||||
WHERE ${timeFilter(from, to)}${hf}
|
||||
GROUP BY proto
|
||||
ORDER BY count DESC
|
||||
`);
|
||||
|
||||
const total = rows.reduce((s, r) => s + Number(r.count), 0);
|
||||
|
||||
return rows.map(r => ({
|
||||
proto: r.proto || 'Unknown',
|
||||
count: Number(r.count),
|
||||
percent: total > 0 ? Math.round((Number(r.count) / total) * 1000) / 10 : 0,
|
||||
}));
|
||||
}
|
||||
|
||||
export interface UAStats {
|
||||
userAgent: string;
|
||||
count: number;
|
||||
percent: number;
|
||||
}
|
||||
|
||||
export async function queryUserAgents(from: number, to: number, hosts: string[]): Promise<UAStats[]> {
|
||||
const hf = hostFilter(hosts);
|
||||
|
||||
const rows = await queryRows<{ user_agent: string; count: string }>(`
|
||||
SELECT
|
||||
user_agent,
|
||||
count() AS count
|
||||
FROM traffic_events
|
||||
WHERE ${timeFilter(from, to)}${hf}
|
||||
GROUP BY user_agent
|
||||
ORDER BY count DESC
|
||||
LIMIT 10
|
||||
`);
|
||||
|
||||
const total = rows.reduce((s, r) => s + Number(r.count), 0);
|
||||
|
||||
return rows.map(r => ({
|
||||
userAgent: r.user_agent || 'Unknown',
|
||||
count: Number(r.count),
|
||||
percent: total > 0 ? Math.round((Number(r.count) / total) * 1000) / 10 : 0,
|
||||
}));
|
||||
}
|
||||
|
||||
export interface BlockedEvent {
|
||||
id: number;
|
||||
ts: number;
|
||||
clientIp: string;
|
||||
countryCode: string | null;
|
||||
method: string;
|
||||
uri: string;
|
||||
status: number;
|
||||
host: string;
|
||||
}
|
||||
|
||||
export interface BlockedPage {
|
||||
events: BlockedEvent[];
|
||||
total: number;
|
||||
page: number;
|
||||
pages: number;
|
||||
}
|
||||
|
||||
export async function queryBlocked(from: number, to: number, hosts: string[], page: number): Promise<BlockedPage> {
|
||||
const pageSize = 10;
|
||||
const hf = hostFilter(hosts);
|
||||
const where = `${timeFilter(from, to)} AND is_blocked = true${hf}`;
|
||||
|
||||
const totalRow = await queryRow<{ total: string }>(`SELECT count() AS total FROM traffic_events WHERE ${where}`);
|
||||
const total = Number(totalRow?.total ?? 0);
|
||||
const pages = Math.max(1, Math.ceil(total / pageSize));
|
||||
const safePage = Math.min(Math.max(1, page), pages);
|
||||
|
||||
const rows = await queryRows<{
|
||||
ts: string; client_ip: string; country_code: string | null;
|
||||
method: string; uri: string; status: string; host: string;
|
||||
}>(`
|
||||
SELECT toUInt32(ts) AS ts, client_ip, country_code, method, uri, status, host
|
||||
FROM traffic_events
|
||||
WHERE ${where}
|
||||
ORDER BY ts DESC
|
||||
LIMIT ${pageSize} OFFSET ${(safePage - 1) * pageSize}
|
||||
`);
|
||||
|
||||
return {
|
||||
events: rows.map((r, i) => ({
|
||||
id: (safePage - 1) * pageSize + i + 1,
|
||||
ts: Number(r.ts),
|
||||
clientIp: r.client_ip,
|
||||
countryCode: r.country_code,
|
||||
method: r.method,
|
||||
uri: r.uri,
|
||||
status: Number(r.status),
|
||||
host: r.host,
|
||||
})),
|
||||
total,
|
||||
page: safePage,
|
||||
pages,
|
||||
};
|
||||
}
|
||||
|
||||
export async function queryDistinctHosts(): Promise<string[]> {
|
||||
const rows = await queryRows<{ host: string }>(`SELECT DISTINCT host FROM traffic_events WHERE host != ''`);
|
||||
return rows.map(r => r.host);
|
||||
}
|
||||
|
||||
// ── WAF analytics queries ───────────────────────────────────────────────────
|
||||
|
||||
export async function queryWafCount(from: number, to: number): Promise<number> {
|
||||
const row = await queryRow<{ value: string }>(`
|
||||
SELECT count() AS value FROM waf_events WHERE ${timeFilter(from, to)}
|
||||
`);
|
||||
return Number(row?.value ?? 0);
|
||||
}
|
||||
|
||||
export async function queryWafCountWithSearch(search?: string): Promise<number> {
|
||||
const where = search ? wafSearchFilter(search) : '1=1';
|
||||
const row = await queryRow<{ value: string }>(`SELECT count() AS value FROM waf_events WHERE ${where}`);
|
||||
return Number(row?.value ?? 0);
|
||||
}
|
||||
|
||||
function wafSearchFilter(search: string): string {
|
||||
const escaped = search.replace(/'/g, "\\'");
|
||||
return `(host ILIKE '%${escaped}%' OR client_ip ILIKE '%${escaped}%' OR uri ILIKE '%${escaped}%' OR rule_message ILIKE '%${escaped}%')`;
|
||||
}
|
||||
|
||||
export interface TopWafRule {
|
||||
ruleId: number;
|
||||
count: number;
|
||||
message: string | null;
|
||||
}
|
||||
|
||||
export async function queryTopWafRules(from: number, to: number, limit = 10): Promise<TopWafRule[]> {
|
||||
const rows = await queryRows<{ rule_id: string; count: string; message: string | null }>(`
|
||||
SELECT
|
||||
rule_id,
|
||||
count() AS count,
|
||||
any(rule_message) AS message
|
||||
FROM waf_events
|
||||
WHERE ${timeFilter(from, to)} AND rule_id IS NOT NULL
|
||||
GROUP BY rule_id
|
||||
ORDER BY count DESC
|
||||
LIMIT ${limit}
|
||||
`);
|
||||
|
||||
return rows
|
||||
.filter(r => r.rule_id != null)
|
||||
.map(r => ({ ruleId: Number(r.rule_id), count: Number(r.count), message: r.message ?? null }));
|
||||
}
|
||||
|
||||
export interface TopWafRuleWithHosts {
|
||||
ruleId: number;
|
||||
count: number;
|
||||
message: string | null;
|
||||
hosts: { host: string; count: number }[];
|
||||
}
|
||||
|
||||
export async function queryTopWafRulesWithHosts(from: number, to: number, limit = 10): Promise<TopWafRuleWithHosts[]> {
|
||||
const topRules = await queryTopWafRules(from, to, limit);
|
||||
if (topRules.length === 0) return [];
|
||||
|
||||
const ruleIds = topRules.map(r => r.ruleId).join(',');
|
||||
const hostRows = await queryRows<{ rule_id: string; host: string; count: string }>(`
|
||||
SELECT rule_id, host, count() AS count
|
||||
FROM waf_events
|
||||
WHERE ${timeFilter(from, to)} AND rule_id IN (${ruleIds})
|
||||
GROUP BY rule_id, host
|
||||
ORDER BY count DESC
|
||||
`);
|
||||
|
||||
return topRules.map(rule => ({
|
||||
...rule,
|
||||
hosts: hostRows
|
||||
.filter(r => Number(r.rule_id) === rule.ruleId)
|
||||
.map(r => ({ host: r.host, count: Number(r.count) })),
|
||||
}));
|
||||
}
|
||||
|
||||
export async function queryWafCountries(from: number, to: number): Promise<{ countryCode: string; count: number }[]> {
|
||||
const rows = await queryRows<{ country_code: string | null; count: string }>(`
|
||||
SELECT country_code, count() AS count
|
||||
FROM waf_events
|
||||
WHERE ${timeFilter(from, to)}
|
||||
GROUP BY country_code
|
||||
ORDER BY count DESC
|
||||
`);
|
||||
return rows.map(r => ({ countryCode: r.country_code ?? 'XX', count: Number(r.count) }));
|
||||
}
|
||||
|
||||
export async function queryWafRuleMessages(ruleIds: number[]): Promise<Record<number, string | null>> {
|
||||
if (ruleIds.length === 0) return {};
|
||||
const rows = await queryRows<{ rule_id: string; message: string | null }>(`
|
||||
SELECT rule_id, any(rule_message) AS message
|
||||
FROM waf_events
|
||||
WHERE rule_id IN (${ruleIds.join(',')})
|
||||
GROUP BY rule_id
|
||||
`);
|
||||
return Object.fromEntries(
|
||||
rows.filter(r => r.rule_id != null).map(r => [Number(r.rule_id), r.message ?? null])
|
||||
);
|
||||
}
|
||||
|
||||
export interface WafEvent {
|
||||
id: number;
|
||||
ts: number;
|
||||
host: string;
|
||||
clientIp: string;
|
||||
countryCode: string | null;
|
||||
method: string;
|
||||
uri: string;
|
||||
ruleId: number | null;
|
||||
ruleMessage: string | null;
|
||||
severity: string | null;
|
||||
rawData: string | null;
|
||||
blocked: boolean;
|
||||
}
|
||||
|
||||
export async function queryWafEvents(limit = 50, offset = 0, search?: string): Promise<WafEvent[]> {
|
||||
const where = search ? wafSearchFilter(search) : '1=1';
|
||||
const rows = await queryRows<{
|
||||
ts: string; host: string; client_ip: string; country_code: string | null;
|
||||
method: string; uri: string; rule_id: string | null; rule_message: string | null;
|
||||
severity: string | null; raw_data: string | null; blocked: string;
|
||||
}>(`
|
||||
SELECT toUInt32(ts) AS ts, host, client_ip, country_code, method, uri,
|
||||
rule_id, rule_message, severity, raw_data, blocked
|
||||
FROM waf_events
|
||||
WHERE ${where}
|
||||
ORDER BY ts DESC
|
||||
LIMIT ${limit} OFFSET ${offset}
|
||||
`);
|
||||
|
||||
return rows.map((r, i) => ({
|
||||
id: offset + i + 1,
|
||||
ts: Number(r.ts),
|
||||
host: r.host,
|
||||
clientIp: r.client_ip,
|
||||
countryCode: r.country_code ?? null,
|
||||
method: r.method,
|
||||
uri: r.uri,
|
||||
ruleId: r.rule_id != null ? Number(r.rule_id) : null,
|
||||
ruleMessage: r.rule_message ?? null,
|
||||
severity: r.severity ?? null,
|
||||
rawData: r.raw_data ?? null,
|
||||
blocked: Boolean(Number(r.blocked)),
|
||||
}));
|
||||
}
|
||||
@@ -0,0 +1,39 @@
|
||||
-- Reference DDL for ClickHouse analytics tables.
|
||||
-- Tables are created programmatically by client.ts initClickHouse().
|
||||
-- This file is for documentation only.
|
||||
|
||||
CREATE TABLE IF NOT EXISTS traffic_events (
|
||||
ts DateTime,
|
||||
client_ip String,
|
||||
country_code Nullable(String),
|
||||
host String DEFAULT '',
|
||||
method String DEFAULT '',
|
||||
uri String DEFAULT '',
|
||||
status UInt16 DEFAULT 0,
|
||||
proto String DEFAULT '',
|
||||
bytes_sent UInt64 DEFAULT 0,
|
||||
user_agent String DEFAULT '',
|
||||
is_blocked Bool DEFAULT false
|
||||
) ENGINE = MergeTree()
|
||||
PARTITION BY toYYYYMM(ts)
|
||||
ORDER BY (host, ts)
|
||||
TTL ts + INTERVAL 90 DAY DELETE
|
||||
SETTINGS index_granularity = 8192;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS waf_events (
|
||||
ts DateTime,
|
||||
host String DEFAULT '',
|
||||
client_ip String,
|
||||
country_code Nullable(String),
|
||||
method String DEFAULT '',
|
||||
uri String DEFAULT '',
|
||||
rule_id Nullable(Int32),
|
||||
rule_message Nullable(String),
|
||||
severity Nullable(String),
|
||||
raw_data Nullable(String),
|
||||
blocked Bool DEFAULT true
|
||||
) ENGINE = MergeTree()
|
||||
PARTITION BY toYYYYMM(ts)
|
||||
ORDER BY (host, ts)
|
||||
TTL ts + INTERVAL 90 DAY DELETE
|
||||
SETTINGS index_granularity = 8192;
|
||||
+2
-43
@@ -220,55 +220,14 @@ export const linkingTokens = sqliteTable("linking_tokens", {
|
||||
expiresAt: text("expires_at").notNull()
|
||||
});
|
||||
|
||||
export const trafficEvents = sqliteTable(
|
||||
'traffic_events',
|
||||
{
|
||||
id: integer('id').primaryKey({ autoIncrement: true }),
|
||||
ts: integer('ts').notNull(),
|
||||
clientIp: text('client_ip').notNull(),
|
||||
countryCode: text('country_code'),
|
||||
host: text('host').notNull().default(''),
|
||||
method: text('method').notNull().default(''),
|
||||
uri: text('uri').notNull().default(''),
|
||||
status: integer('status').notNull().default(0),
|
||||
proto: text('proto').notNull().default(''),
|
||||
bytesSent: integer('bytes_sent').notNull().default(0),
|
||||
userAgent: text('user_agent').notNull().default(''),
|
||||
isBlocked: integer('is_blocked', { mode: 'boolean' }).notNull().default(false),
|
||||
},
|
||||
(table) => ({
|
||||
tsIdx: index('idx_traffic_events_ts').on(table.ts),
|
||||
hostTsIdx: index('idx_traffic_events_host_ts').on(table.host, table.ts),
|
||||
})
|
||||
);
|
||||
// traffic_events and waf_events have been migrated to ClickHouse.
|
||||
// See src/lib/clickhouse/client.ts for the ClickHouse schema.
|
||||
|
||||
export const logParseState = sqliteTable('log_parse_state', {
|
||||
key: text('key').primaryKey(),
|
||||
value: text('value').notNull(),
|
||||
});
|
||||
|
||||
export const wafEvents = sqliteTable(
|
||||
'waf_events',
|
||||
{
|
||||
id: integer('id').primaryKey({ autoIncrement: true }),
|
||||
ts: integer('ts').notNull(),
|
||||
host: text('host').notNull().default(''),
|
||||
clientIp: text('client_ip').notNull(),
|
||||
countryCode: text('country_code'),
|
||||
method: text('method').notNull().default(''),
|
||||
uri: text('uri').notNull().default(''),
|
||||
ruleId: integer('rule_id'),
|
||||
ruleMessage: text('rule_message'),
|
||||
severity: text('severity'),
|
||||
rawData: text('raw_data'),
|
||||
blocked: integer('blocked', { mode: 'boolean' }).notNull().default(true),
|
||||
},
|
||||
(table) => ({
|
||||
tsIdx: index('idx_waf_events_ts').on(table.ts),
|
||||
hostTsIdx: index('idx_waf_events_host_ts').on(table.host, table.ts),
|
||||
})
|
||||
);
|
||||
|
||||
export const wafLogParseState = sqliteTable('waf_log_parse_state', {
|
||||
key: text('key').primaryKey(),
|
||||
value: text('value').notNull(),
|
||||
|
||||
+12
-21
@@ -2,13 +2,13 @@ import { createReadStream, existsSync, statSync } from 'node:fs';
|
||||
import { createInterface } from 'node:readline';
|
||||
import maxmind, { CountryResponse } from 'maxmind';
|
||||
import db from './db';
|
||||
import { trafficEvents, logParseState } from './db/schema';
|
||||
import { eq, sql } from 'drizzle-orm';
|
||||
import { logParseState } from './db/schema';
|
||||
import { eq } from 'drizzle-orm';
|
||||
import { insertTrafficEvents, type TrafficEventRow } from './clickhouse/client';
|
||||
|
||||
const LOG_FILE = '/logs/access.log';
|
||||
const GEOIP_DB = '/usr/share/GeoIP/GeoLite2-Country.mmdb';
|
||||
const BATCH_SIZE = 500;
|
||||
const RETENTION_DAYS = 90;
|
||||
|
||||
// GeoIP reader — null if mmdb not available
|
||||
let geoReader: Awaited<ReturnType<typeof maxmind.open<CountryResponse>>> | null = null;
|
||||
@@ -97,7 +97,7 @@ export function collectBlockedSignatures(lines: string[]): Set<string> {
|
||||
return blocked;
|
||||
}
|
||||
|
||||
export function parseLine(line: string, blocked: Set<string>): typeof trafficEvents.$inferInsert | null {
|
||||
export function parseLine(line: string, blocked: Set<string>): TrafficEventRow | null {
|
||||
let entry: CaddyLogEntry;
|
||||
try {
|
||||
entry = JSON.parse(line);
|
||||
@@ -119,16 +119,16 @@ export function parseLine(line: string, blocked: Set<string>): typeof trafficEve
|
||||
|
||||
return {
|
||||
ts,
|
||||
clientIp,
|
||||
countryCode: clientIp ? lookupCountry(clientIp) : null,
|
||||
client_ip: clientIp,
|
||||
country_code: clientIp ? lookupCountry(clientIp) : null,
|
||||
host: req.host ?? '',
|
||||
method,
|
||||
uri,
|
||||
status,
|
||||
proto: req.proto ?? '',
|
||||
bytesSent: entry.size ?? 0,
|
||||
userAgent: req.headers?.['User-Agent']?.[0] ?? '',
|
||||
isBlocked: blocked.has(key),
|
||||
bytes_sent: entry.size ?? 0,
|
||||
user_agent: req.headers?.['User-Agent']?.[0] ?? '',
|
||||
is_blocked: blocked.has(key),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -153,18 +153,12 @@ async function readLines(startOffset: number): Promise<{ lines: string[]; newOff
|
||||
});
|
||||
}
|
||||
|
||||
function insertBatch(rows: typeof trafficEvents.$inferInsert[]): void {
|
||||
async function insertBatch(rows: TrafficEventRow[]): Promise<void> {
|
||||
for (let i = 0; i < rows.length; i += BATCH_SIZE) {
|
||||
db.insert(trafficEvents).values(rows.slice(i, i + BATCH_SIZE)).run();
|
||||
await insertTrafficEvents(rows.slice(i, i + BATCH_SIZE));
|
||||
}
|
||||
}
|
||||
|
||||
function purgeOldEntries(): void {
|
||||
const cutoff = Math.floor(Date.now() / 1000) - RETENTION_DAYS * 86400;
|
||||
// Use parameterized query instead of string interpolation
|
||||
db.run(sql`DELETE FROM traffic_events WHERE ts < ${cutoff}`);
|
||||
}
|
||||
|
||||
// ── public API ───────────────────────────────────────────────────────────────
|
||||
|
||||
export async function initLogParser(): Promise<void> {
|
||||
@@ -195,15 +189,12 @@ export async function parseNewLogEntries(): Promise<void> {
|
||||
if (lines.length > 0) {
|
||||
const blocked = collectBlockedSignatures(lines);
|
||||
const rows = lines.map(l => parseLine(l, blocked)).filter(r => r !== null);
|
||||
insertBatch(rows);
|
||||
await insertBatch(rows);
|
||||
console.log(`[log-parser] inserted ${rows.length} traffic events (${blocked.size} blocked)`);
|
||||
}
|
||||
|
||||
setState('access_log_offset', String(newOffset));
|
||||
setState('access_log_size', String(currentSize));
|
||||
|
||||
// Purge old entries once per run (cheap since it's indexed)
|
||||
purgeOldEntries();
|
||||
} catch (err) {
|
||||
console.error('[log-parser] error during parse:', err);
|
||||
}
|
||||
|
||||
+20
-119
@@ -1,141 +1,42 @@
|
||||
import db from "../db";
|
||||
import { wafEvents } from "../db/schema";
|
||||
import { desc, like, or, count, and, gte, lte, sql, inArray } from "drizzle-orm";
|
||||
import {
|
||||
queryWafCount,
|
||||
queryWafCountWithSearch,
|
||||
queryTopWafRules,
|
||||
queryTopWafRulesWithHosts,
|
||||
queryWafCountries,
|
||||
queryWafRuleMessages,
|
||||
queryWafEvents,
|
||||
type WafEvent,
|
||||
type TopWafRule,
|
||||
type TopWafRuleWithHosts,
|
||||
} from "../clickhouse/client";
|
||||
|
||||
export type WafEvent = {
|
||||
id: number;
|
||||
ts: number;
|
||||
host: string;
|
||||
clientIp: string;
|
||||
countryCode: string | null;
|
||||
method: string;
|
||||
uri: string;
|
||||
ruleId: number | null;
|
||||
ruleMessage: string | null;
|
||||
severity: string | null;
|
||||
rawData: string | null;
|
||||
blocked: boolean;
|
||||
};
|
||||
|
||||
function buildSearch(search?: string) {
|
||||
if (!search) return undefined;
|
||||
return or(
|
||||
like(wafEvents.host, `%${search}%`),
|
||||
like(wafEvents.clientIp, `%${search}%`),
|
||||
like(wafEvents.uri, `%${search}%`),
|
||||
like(wafEvents.ruleMessage, `%${search}%`)
|
||||
);
|
||||
}
|
||||
export type { WafEvent, TopWafRule, TopWafRuleWithHosts };
|
||||
|
||||
export async function countWafEvents(search?: string): Promise<number> {
|
||||
const [row] = await db
|
||||
.select({ value: count() })
|
||||
.from(wafEvents)
|
||||
.where(buildSearch(search));
|
||||
return row?.value ?? 0;
|
||||
return queryWafCountWithSearch(search);
|
||||
}
|
||||
|
||||
export async function countWafEventsInRange(from: number, to: number): Promise<number> {
|
||||
const [row] = await db
|
||||
.select({ value: count() })
|
||||
.from(wafEvents)
|
||||
.where(and(gte(wafEvents.ts, from), lte(wafEvents.ts, to)));
|
||||
return row?.value ?? 0;
|
||||
return queryWafCount(from, to);
|
||||
}
|
||||
|
||||
export type TopWafRule = { ruleId: number; count: number; message: string | null };
|
||||
|
||||
export async function getTopWafRules(from: number, to: number, limit = 10): Promise<TopWafRule[]> {
|
||||
const rows = await db
|
||||
.select({
|
||||
ruleId: wafEvents.ruleId,
|
||||
count: count(),
|
||||
message: sql<string | null>`MAX(${wafEvents.ruleMessage})`,
|
||||
})
|
||||
.from(wafEvents)
|
||||
.where(and(gte(wafEvents.ts, from), lte(wafEvents.ts, to), sql`${wafEvents.ruleId} IS NOT NULL`))
|
||||
.groupBy(wafEvents.ruleId)
|
||||
.orderBy(desc(count()))
|
||||
.limit(limit);
|
||||
return rows
|
||||
.filter((r): r is typeof r & { ruleId: number } => r.ruleId != null)
|
||||
.map((r) => ({ ruleId: r.ruleId, count: r.count, message: r.message ?? null }));
|
||||
return queryTopWafRules(from, to, limit);
|
||||
}
|
||||
|
||||
export type TopWafRuleWithHosts = {
|
||||
ruleId: number;
|
||||
count: number;
|
||||
message: string | null;
|
||||
hosts: { host: string; count: number }[];
|
||||
};
|
||||
|
||||
export async function getTopWafRulesWithHosts(from: number, to: number, limit = 10): Promise<TopWafRuleWithHosts[]> {
|
||||
const topRules = await getTopWafRules(from, to, limit);
|
||||
if (topRules.length === 0) return [];
|
||||
|
||||
const ruleIds = topRules.map(r => r.ruleId);
|
||||
const hostRows = await db
|
||||
.select({ ruleId: wafEvents.ruleId, host: wafEvents.host, count: count() })
|
||||
.from(wafEvents)
|
||||
.where(and(gte(wafEvents.ts, from), lte(wafEvents.ts, to), inArray(wafEvents.ruleId, ruleIds)))
|
||||
.groupBy(wafEvents.ruleId, wafEvents.host)
|
||||
.orderBy(desc(count()));
|
||||
|
||||
return topRules.map(rule => ({
|
||||
...rule,
|
||||
hosts: hostRows
|
||||
.filter(r => r.ruleId === rule.ruleId)
|
||||
.map(r => ({ host: r.host, count: r.count })),
|
||||
}));
|
||||
return queryTopWafRulesWithHosts(from, to, limit);
|
||||
}
|
||||
|
||||
export async function getWafEventCountries(from: number, to: number): Promise<{ countryCode: string; count: number }[]> {
|
||||
const rows = await db
|
||||
.select({ countryCode: wafEvents.countryCode, count: count() })
|
||||
.from(wafEvents)
|
||||
.where(and(gte(wafEvents.ts, from), lte(wafEvents.ts, to)))
|
||||
.groupBy(wafEvents.countryCode)
|
||||
.orderBy(desc(count()));
|
||||
return rows.map(r => ({ countryCode: r.countryCode ?? 'XX', count: r.count }));
|
||||
return queryWafCountries(from, to);
|
||||
}
|
||||
|
||||
export async function getWafRuleMessages(ruleIds: number[]): Promise<Record<number, string | null>> {
|
||||
if (ruleIds.length === 0) return {};
|
||||
const rows = await db
|
||||
.select({
|
||||
ruleId: wafEvents.ruleId,
|
||||
message: sql<string | null>`MAX(${wafEvents.ruleMessage})`,
|
||||
})
|
||||
.from(wafEvents)
|
||||
.where(inArray(wafEvents.ruleId, ruleIds))
|
||||
.groupBy(wafEvents.ruleId);
|
||||
return Object.fromEntries(
|
||||
rows.filter((r): r is typeof r & { ruleId: number } => r.ruleId != null)
|
||||
.map((r) => [r.ruleId, r.message ?? null])
|
||||
);
|
||||
return queryWafRuleMessages(ruleIds);
|
||||
}
|
||||
|
||||
export async function listWafEvents(limit = 50, offset = 0, search?: string): Promise<WafEvent[]> {
|
||||
const rows = await db
|
||||
.select()
|
||||
.from(wafEvents)
|
||||
.where(buildSearch(search))
|
||||
.orderBy(desc(wafEvents.ts))
|
||||
.limit(limit)
|
||||
.offset(offset);
|
||||
|
||||
return rows.map((r) => ({
|
||||
id: r.id,
|
||||
ts: r.ts,
|
||||
host: r.host,
|
||||
clientIp: r.clientIp,
|
||||
countryCode: r.countryCode ?? null,
|
||||
method: r.method,
|
||||
uri: r.uri,
|
||||
ruleId: r.ruleId ?? null,
|
||||
ruleMessage: r.ruleMessage ?? null,
|
||||
severity: r.severity ?? null,
|
||||
rawData: r.rawData ?? null,
|
||||
blocked: r.blocked ?? true,
|
||||
}));
|
||||
return queryWafEvents(limit, offset, search);
|
||||
}
|
||||
|
||||
+13
-21
@@ -2,14 +2,14 @@ import { createReadStream, existsSync, statSync } from 'node:fs';
|
||||
import { createInterface } from 'node:readline';
|
||||
import maxmind, { CountryResponse } from 'maxmind';
|
||||
import db from './db';
|
||||
import { wafEvents, wafLogParseState } from './db/schema';
|
||||
import { eq, sql } from 'drizzle-orm';
|
||||
import { wafLogParseState } from './db/schema';
|
||||
import { eq } from 'drizzle-orm';
|
||||
import { insertWafEvents, type WafEventRow } from './clickhouse/client';
|
||||
|
||||
const AUDIT_LOG = '/logs/waf-audit.log';
|
||||
const RULES_LOG = '/logs/waf-rules.log';
|
||||
const GEOIP_DB = '/usr/share/GeoIP/GeoLite2-Country.mmdb';
|
||||
const BATCH_SIZE = 200;
|
||||
const RETENTION_DAYS = 90;
|
||||
|
||||
let geoReader: Awaited<ReturnType<typeof maxmind.open<CountryResponse>>> | null = null;
|
||||
const geoCache = new Map<string, string | null>();
|
||||
@@ -130,7 +130,7 @@ interface CorazaAuditEntry {
|
||||
};
|
||||
}
|
||||
|
||||
function parseLine(line: string, ruleMap: Map<string, RuleInfo>): typeof wafEvents.$inferInsert | null {
|
||||
function parseLine(line: string, ruleMap: Map<string, RuleInfo>): WafEventRow | null {
|
||||
let entry: CorazaAuditEntry;
|
||||
try {
|
||||
entry = JSON.parse(line);
|
||||
@@ -172,14 +172,14 @@ function parseLine(line: string, ruleMap: Map<string, RuleInfo>): typeof wafEven
|
||||
return {
|
||||
ts,
|
||||
host,
|
||||
clientIp,
|
||||
countryCode: lookupCountry(clientIp),
|
||||
client_ip: clientIp,
|
||||
country_code: lookupCountry(clientIp),
|
||||
method: req.method ?? '',
|
||||
uri: req.uri ?? '',
|
||||
ruleId: ruleInfo?.ruleId ?? null,
|
||||
ruleMessage: ruleInfo?.ruleMessage ?? null,
|
||||
rule_id: ruleInfo?.ruleId ?? null,
|
||||
rule_message: ruleInfo?.ruleMessage ?? null,
|
||||
severity: ruleInfo?.severity ?? null,
|
||||
rawData: line,
|
||||
raw_data: line,
|
||||
blocked,
|
||||
};
|
||||
}
|
||||
@@ -205,18 +205,12 @@ async function readAuditLog(startOffset: number): Promise<{ lines: string[]; new
|
||||
});
|
||||
}
|
||||
|
||||
function insertBatch(rows: typeof wafEvents.$inferInsert[]): void {
|
||||
async function insertBatch(rows: WafEventRow[]): Promise<void> {
|
||||
for (let i = 0; i < rows.length; i += BATCH_SIZE) {
|
||||
db.insert(wafEvents).values(rows.slice(i, i + BATCH_SIZE)).run();
|
||||
await insertWafEvents(rows.slice(i, i + BATCH_SIZE));
|
||||
}
|
||||
}
|
||||
|
||||
function purgeOldEntries(): void {
|
||||
const cutoff = Math.floor(Date.now() / 1000) - RETENTION_DAYS * 86400;
|
||||
// Use parameterized query instead of string interpolation
|
||||
db.run(sql`DELETE FROM waf_events WHERE ts < ${cutoff}`);
|
||||
}
|
||||
|
||||
// ── public API ────────────────────────────────────────────────────────────────
|
||||
|
||||
export async function initWafLogParser(): Promise<void> {
|
||||
@@ -258,17 +252,15 @@ export async function parseNewWafLogEntries(): Promise<void> {
|
||||
const { lines, newOffset } = await readAuditLog(startOffset);
|
||||
|
||||
if (lines.length > 0) {
|
||||
const rows = lines.map(l => parseLine(l, ruleMap)).filter((r): r is typeof wafEvents.$inferInsert => r !== null);
|
||||
const rows = lines.map(l => parseLine(l, ruleMap)).filter((r): r is WafEventRow => r !== null);
|
||||
if (rows.length > 0) {
|
||||
insertBatch(rows);
|
||||
await insertBatch(rows);
|
||||
console.log(`[waf-log-parser] inserted ${rows.length} WAF events`);
|
||||
}
|
||||
}
|
||||
|
||||
setState('waf_audit_log_offset', String(newOffset));
|
||||
setState('waf_audit_log_size', String(currentSize));
|
||||
|
||||
purgeOldEntries();
|
||||
} catch (err) {
|
||||
console.error('[waf-log-parser] error during parse:', err);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user