Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 3 additions & 12 deletions correlation/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM ubuntu:24.04
RUN apt-get update
RUN apt-get install -y ca-certificates git wget
FROM ubuntu:22.04
RUN apt update
RUN apt install -y ca-certificates git
COPY correlation /app/
COPY docs/swagger.json /app/docs/
COPY docs/swagger.yaml /app/docs/
Expand All @@ -9,13 +9,4 @@ COPY run.sh /
RUN chmod +x /app/correlation
RUN chmod +x /run.sh
RUN update-ca-certificates
RUN wget -O /app/asn-blocks-v4.csv https://cdn.utmstack.com/geoip/asn-blocks-v4.csv
RUN wget -O /app/asn-blocks-v6.csv https://cdn.utmstack.com/geoip/asn-blocks-v6.csv
RUN wget -O /app/blocks-v4.csv https://cdn.utmstack.com/geoip/blocks-v4.csv
RUN wget -O /app/blocks-v6.csv https://cdn.utmstack.com/geoip/blocks-v6.csv
RUN wget -O /app/locations-en.csv https://cdn.utmstack.com/geoip/locations-en.csv
RUN wget -O /app/ip_blocklist.list https://intelligence.threatwinds.com/feeds/public/ip/cumulative.list
RUN wget -O /app/domain_blocklist.list https://intelligence.threatwinds.com/feeds/public/domain/cumulative.list
RUN wget -O /app/hostname_blocklist.list https://intelligence.threatwinds.com/feeds/public/hostname/cumulative.list

ENTRYPOINT [ "/run.sh" ]
2 changes: 0 additions & 2 deletions correlation/api/newLogHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package api
import (
"encoding/json"
"fmt"
"github.com/utmstack/UTMStack/correlation/ti"
"io"
"log"
"net/http"
Expand Down Expand Up @@ -75,7 +74,6 @@ func NewLog(c *gin.Context) {
}

cache.AddToCache(l)
ti.Enqueue(l)
search.AddToQueue(l)
response["status"] = "queued"
c.JSON(http.StatusOK, response)
Expand Down
42 changes: 21 additions & 21 deletions correlation/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ import (

const bufferSize int = 1000000

var storageMutex = &sync.RWMutex{}
var cacheStorageMutex = &sync.RWMutex{}

var storage []string
var CacheStorage []string

func Status() {
for {
log.Printf("Logs in cache: %v", len(storage))
if len(storage) != 0 {
est := gjson.Get(storage[0], "@timestamp").String()
log.Printf("Logs in cache: %v", len(CacheStorage))
if len(CacheStorage) != 0 {
est := gjson.Get(CacheStorage[0], "@timestamp").String()
log.Printf("Old document in cache: %s", est)
}
time.Sleep(60 * time.Second)
Expand All @@ -31,8 +31,8 @@ func Status() {

func Search(allOf []rules.AllOf, oneOf []rules.OneOf, seconds int64) []string {
var elements []string
storageMutex.RLock()
defer storageMutex.RUnlock()
cacheStorageMutex.RLock()
defer cacheStorageMutex.RUnlock()

cToBreak := 0
ait := time.Now().UTC().Unix() - func() int64 {
Expand All @@ -43,8 +43,8 @@ func Search(allOf []rules.AllOf, oneOf []rules.OneOf, seconds int64) []string {
return seconds
}
}()
for i := len(storage) - 1; i >= 0; i-- {
est := gjson.Get(storage[i], "@timestamp").String()
for i := len(CacheStorage) - 1; i >= 0; i-- {
est := gjson.Get(CacheStorage[i], "@timestamp").String()
eit, err := time.Parse(time.RFC3339Nano, est)
if err != nil {
log.Printf("Could not parse @timestamp: %v", err)
Expand All @@ -61,23 +61,23 @@ func Search(allOf []rules.AllOf, oneOf []rules.OneOf, seconds int64) []string {
var allCatch bool
var oneCatch bool
for _, of := range oneOf {
oneCatch = evalElement(storage[i], of.Field, of.Operator, of.Value)
oneCatch = evalElement(CacheStorage[i], of.Field, of.Operator, of.Value)
if oneCatch {
break
}
}
for _, af := range allOf {
allCatch = evalElement(storage[i], af.Field, af.Operator, af.Value)
allCatch = evalElement(CacheStorage[i], af.Field, af.Operator, af.Value)
if !allCatch {
break
}
}
if (len(allOf) == 0 || allCatch) && (len(oneOf) == 0 || oneCatch) {
elements = append(elements, storage[i])
elements = append(elements, CacheStorage[i])
}
}
}

return elements
}

Expand All @@ -97,9 +97,9 @@ func ProcessQueue() {
go func() {
for {
l := <-logs
storageMutex.Lock()
storage = append(storage, l)
storageMutex.Unlock()
cacheStorageMutex.Lock()
CacheStorage = append(CacheStorage, l)
cacheStorageMutex.Unlock()
}
}()
}
Expand All @@ -109,11 +109,11 @@ func Clean() {
for {
var clean bool

if len(storage) > 1 {
if len(CacheStorage) > 1 {
if utils.AssignedMemory >= 80 {
clean = true
} else {
old := gjson.Get(storage[0], "@timestamp").String()
old := gjson.Get(CacheStorage[0], "@timestamp").String()
oldTime, err := time.Parse(time.RFC3339Nano, old)
if err != nil {
log.Printf("Could not parse old log timestamp. Cleaning up")
Expand All @@ -129,9 +129,9 @@ func Clean() {
}

if clean {
storageMutex.Lock()
storage = storage[1:]
storageMutex.Unlock()
cacheStorageMutex.Lock()
CacheStorage = CacheStorage[1:]
cacheStorageMutex.Unlock()
} else {
time.Sleep(5 * time.Second)
}
Expand Down
13 changes: 8 additions & 5 deletions correlation/cache/cache_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package cache
package cache_test

import (
"github.com/utmstack/UTMStack/correlation/rules"
"testing"


"github.com/utmstack/UTMStack/correlation/cache"
"github.com/utmstack/UTMStack/correlation/rules"
)

func TestSearch(t *testing.T) {
Expand All @@ -13,7 +16,7 @@ func TestSearch(t *testing.T) {
`{"@timestamp":"2022-01-01T00:00:03.000Z","field1":"value1","field2":"value2"}`,
`{"@timestamp":"2022-01-01T00:00:04.000Z","field1":"value1","field2":"value2"}`,
}
storage = cacheStorage
cache.CacheStorage = cacheStorage
allOf := []rules.AllOf{
{Field: "field1", Operator: "==", Value: "value1"},
}
Expand All @@ -28,7 +31,7 @@ func TestSearch(t *testing.T) {
`{"@timestamp":"2022-01-01T00:00:01.000Z","field1":"value1","field2":"value2"}`,
`{"@timestamp":"2022-01-01T00:00:00.000Z","field1":"value1","field2":"value2"}`,
}
result := Search(allOf, oneOf, int64(seconds))
result := cache.Search(allOf, oneOf, int64(seconds))
if len(result) != len(expected) {
t.Errorf("Expected %d elements, but got %d", len(expected), len(result))
}
Expand All @@ -37,4 +40,4 @@ func TestSearch(t *testing.T) {
t.Errorf("Expected %s, but got %s", expected[i], r)
}
}
}
}
74 changes: 67 additions & 7 deletions correlation/correlation/analyzer.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,85 @@
package correlation

import (
"github.com/tidwall/gjson"
"net"

"github.com/utmstack/UTMStack/correlation/geo"
"github.com/utmstack/UTMStack/correlation/rules"
"github.com/utmstack/UTMStack/correlation/utils"
"github.com/tidwall/gjson"
)

func processResponse(logs []string, rule rules.Rule, save []utils.SavedField, tmpLogs *[20][]map[string]string,
func processResponse(logs []string, rule rules.Rule, save []rules.SavedField, tmpLogs *[20][]map[string]string,
steps, step, minCount int) {
if len(logs) >= func() int {
switch minCount {
if len(logs) >= func()int{
switch minCount{
case 0:
return 1
default:
return minCount
}
}() {
for _, l := range logs {
fields := utils.ExtractDetails(save, l)
var fields = map[string]string{
"id": gjson.Get(l, "id").String(),
}
//User saved fields
for _, save := range save {
fields[save.Alias] = gjson.Get(l, save.Field).String()
}
// Try to resolve SourceHost if SourceIP exists but not SourceHost
if fields["SourceHost"] == "" && fields["SourceIP"] != "" {
host, _ := net.LookupHost(fields["SourceIP"])
fields["SourceHost"] = host[0]
}
// Try to resolve DestinationHost if DestinationIP exists but not DestinationHost
if fields["DestinationHost"] == "" && fields["DestinationIP"] != "" {
host, _ := net.LookupHost(fields["DestinationIP"])
fields["DestinationHost"] = host[0]
}
// Try to resolve SourceIP if SourceHost exists but not SourceIP
if fields["SourceHost"] != "" && fields["SourceIP"] == "" {
ip, _ := net.LookupIP(fields["SourceHost"])
if len(ip) != 0 && ip[0].String() != "<nil>" {
fields["SourceIP"] = ip[0].String()
}
}
// Try to resolve DestinationIP if DestinationHost exists but not DestinationIP
if fields["DestinationHost"] != "" && fields["DestinationIP"] == "" {
ip, _ := net.LookupIP(fields["DestinationHost"])
if len(ip) != 0 && ip[0].String() != "<nil>" {
fields["DestinationIP"] = ip[0].String()
}
}
// Try to geolocate SourceIP if exists
if fields["SourceIP"] != "" {
location := geo.Geolocate(fields["SourceIP"])
fields["SourceCountry"] = location["country"]
fields["SourceCountryCode"] = location["countryCode"]
fields["SourceCity"] = location["city"]
fields["SourceLat"] = location["latitude"]
fields["SourceLon"] = location["longitude"]
fields["SourceAccuracyRadius"] = location["accuracyRadius"]
fields["SourceASN"] = location["asn"]
fields["SourceASO"] = location["aso"]
fields["SourceIsSatelliteProvider"] = location["isSatelliteProvider"]
fields["SourceIsAnonymousProxy"] = location["isAnonymousProxy"]
}
// Try to geolocate DetinationIP if exists
if fields["DestinationIP"] != "" {
location := geo.Geolocate(fields["DestinationIP"])
fields["DestinationCountry"] = location["country"]
fields["DestinationCountryCode"] = location["countryCode"]
fields["DestinationCity"] = location["city"]
fields["DestinationLat"] = location["latitude"]
fields["DestinationLon"] = location["longitude"]
fields["DestinationAccuracyRadius"] = location["accuracyRadius"]
fields["DestinationASN"] = location["asn"]
fields["DestinationASO"] = location["aso"]
fields["DestinationIsSatelliteProvider"] = location["isSatelliteProvider"]
fields["DestinationIsAnonymousProxy"] = location["isAnonymousProxy"]
}

// Alert in the last step or save data to the next iteration
// Alert in the last step or save data to next cicle
if steps-1 == step {
// Use content of AlertName as Name if exists
var alertName string
Expand Down
54 changes: 10 additions & 44 deletions correlation/correlation/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package correlation

import (
"encoding/json"
"strconv"
"strings"
"time"

"log"

"github.com/google/uuid"
"github.com/levigross/grequests"
"github.com/utmstack/UTMStack/correlation/geo"
"github.com/utmstack/UTMStack/correlation/search"
"github.com/utmstack/UTMStack/correlation/utils"
"log"
"strconv"
"strings"
"time"
)

type Host struct {
Expand Down Expand Up @@ -64,47 +65,13 @@ type AlertFields struct {
}

func Alert(name, severity, description, solution, category, tactic string, reference []string, dataType, dataSource string,
fields map[string]string) {

// Try to geolocate SourceIP if exists
if fields["SourceIP"] != "" {
location := geo.Geolocate(fields["SourceIP"])
if len(location) != 0 {
fields["SourceCountry"] = location["country"]
fields["SourceCountryCode"] = location["countryCode"]
fields["SourceCity"] = location["city"]
fields["SourceLat"] = location["latitude"]
fields["SourceLon"] = location["longitude"]
fields["SourceAccuracyRadius"] = location["accuracyRadius"]
fields["SourceASN"] = location["asn"]
fields["SourceASO"] = location["aso"]
fields["SourceIsSatelliteProvider"] = location["isSatelliteProvider"]
fields["SourceIsAnonymousProxy"] = location["isAnonymousProxy"]
}
}

// Try to geolocate DestinationIP if exists
if fields["DestinationIP"] != "" {
location := geo.Geolocate(fields["DestinationIP"])
if len(location) != 0 {
fields["DestinationCountry"] = location["country"]
fields["DestinationCountryCode"] = location["countryCode"]
fields["DestinationCity"] = location["city"]
fields["DestinationLat"] = location["latitude"]
fields["DestinationLon"] = location["longitude"]
fields["DestinationAccuracyRadius"] = location["accuracyRadius"]
fields["DestinationASN"] = location["asn"]
fields["DestinationASO"] = location["aso"]
fields["DestinationIsSatelliteProvider"] = location["isSatelliteProvider"]
fields["DestinationIsAnonymousProxy"] = location["isAnonymousProxy"]
}
}
details map[string]string) {

log.Printf("Reporting alert: %s", name)

if !UpdateAlert(name, severity, fields) {
if !UpdateAlert(name, severity, details) {
NewAlert(name, severity, description, solution, category, tactic, reference, dataType, dataSource,
fields)
details)
}
}

Expand Down Expand Up @@ -241,12 +208,11 @@ func UpdateAlert(name, severity string, details map[string]string) bool {
},
},
})
_ = r.Close()
if err != nil {
log.Printf("Could not update existent alert: %v", err)
return false
}

_ = r.Close()
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions correlation/docs/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ const docTemplate = `{
"description": "{{escape .Description}}",
"title": "{{.Title}}",
"contact": {
"name": "UTMStack LLC",
"email": "contact@utmstack.com"
"name": "Osmany Montero",
"email": "osmany@quantfall.com"
},
"license": {
"name": "AGPLv3"
"name": "Private"
},
"version": "{{.Version}}"
},
Expand Down Expand Up @@ -47,7 +47,7 @@ var SwaggerInfo = &swag.Spec{
BasePath: "/v1",
Schemes: []string{},
Title: "UTMStack's Correlation Engine",
Description: "Rules-based correlation engine for UTMStack.",
Description: "Rules based correlation engine for UTMStack.",
InfoInstanceName: "swagger",
SwaggerTemplate: docTemplate,
LeftDelim: "{{",
Expand Down
Loading