Skip to content

Commit f05b114

Browse files
committed
feat: v2.0.0, makefile
1 parent 5ed76c4 commit f05b114

File tree

3 files changed

+138
-55
lines changed

3 files changed

+138
-55
lines changed

README.md

+15-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ This Go program is designed to efficiently process a large dataset of temperatur
1313

1414
Processing Time: 9m21s. Tested with a Ryzen 5800x3d
1515

16-
## Recent Optimizations (v1.1.0)
16+
## v1.1.0
1717

1818
The program has undergone several optimizations to improve its processing time:
1919

@@ -24,6 +24,20 @@ The program has undergone several optimizations to improve its processing time:
2424

2525
Processing Time: 6m53s. Tested with a Ryzen 5800x3d
2626

27+
## v2.0.0
28+
29+
Version 2.0 of the One Billion Row Challenge Processor introduces significant optimizations, leading to a substantial reduction in processing time. This release focuses on enhancing concurrency handling and reducing contention, along with other performance improvements.
30+
31+
## Performance Enhancements
32+
33+
- **Concurrent Map Implementation:** Introduced a sharded concurrent map to reduce lock contention. This allows for more efficient updates to the data structure in a multi-threaded environment.
34+
- **Hash-Based Sharding:** Implemented hash-based sharding for distributing data across multiple shards, further reducing the chance of lock conflicts.
35+
- **Optimized String Processing:** Refined the string handling logic to minimize overhead during file parsing.
36+
- **Buffer Size Adjustments:** Tuned the buffer sizes for channels to balance throughput and memory usage.
37+
- **Efficient Data Aggregation:** Streamlined the data aggregation process for improved efficiency.
38+
39+
Processing Time 5m19s. Tested with a Ryzen 5800x3d
40+
2741
## Requirements
2842

2943
- Go Runtime ofc (1.21)

main.go

+87-54
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,54 @@
11
package main
22

33
import (
4-
"bufio"
5-
"fmt"
6-
"os"
7-
"sort"
8-
"strconv"
9-
"strings"
10-
"sync"
11-
"time"
4+
"bufio"
5+
"fmt"
6+
"hash/fnv"
7+
"os"
8+
"sort"
9+
"strconv"
10+
"strings"
11+
"sync"
12+
"time"
1213
)
1314

14-
// StationData stores min, mean, max, and total sum of temperatures and count of readings for a station
1515
type StationData struct {
16-
min, max, sum, count float64
16+
min, max, sum, count float64
1717
}
1818

19-
const numWorkers = 16 // Number of worker goroutines
19+
const numWorkers = 16
20+
const numShards = 32 // Number of shards in the concurrent map
21+
22+
type ConcurrentMap struct {
23+
shards [numShards]map[string]*StationData
24+
locks [numShards]*sync.Mutex
25+
}
26+
27+
func NewConcurrentMap() *ConcurrentMap {
28+
cMap := &ConcurrentMap{}
29+
for i := 0; i < numShards; i++ {
30+
cMap.shards[i] = make(map[string]*StationData)
31+
cMap.locks[i] = &sync.Mutex{}
32+
}
33+
return cMap
34+
}
35+
36+
func (cMap *ConcurrentMap) GetShard(key string) (shard map[string]*StationData, lock *sync.Mutex) {
37+
hash := fnv.New32()
38+
hash.Write([]byte(key))
39+
shardIndex := hash.Sum32() % numShards
40+
return cMap.shards[shardIndex], cMap.locks[shardIndex]
41+
}
2042

2143
func main() {
2244
startTime := time.Now()
2345

24-
// Adjust this to the path of your data file
25-
fileName := "./data/measurements.txt"
46+
if len(os.Args) < 2 {
47+
fmt.Println("Usage: brc <file_path>")
48+
os.Exit(1)
49+
}
50+
fileName := os.Args[1]
51+
2652
stationData := processFile(fileName)
2753

2854
printResults(stationData)
@@ -31,18 +57,15 @@ func main() {
3157
fmt.Printf("Processing completed in %s\n", duration)
3258
}
3359

34-
func processFile(fileName string) map[string]*StationData {
60+
func processFile(fileName string) *ConcurrentMap {
3561
linesCh := make(chan string, 1000)
36-
3762
var wg sync.WaitGroup
3863
wg.Add(numWorkers)
3964

40-
stationData := make(map[string]*StationData)
41-
var mu sync.Mutex
65+
cMap := NewConcurrentMap()
4266

43-
// Worker pool pattern
4467
for i := 0; i < numWorkers; i++ {
45-
go worker(&wg, linesCh, stationData, &mu)
68+
go worker(&wg, linesCh, cMap)
4669
}
4770

4871
file, err := os.Open(fileName)
@@ -58,17 +81,17 @@ func processFile(fileName string) map[string]*StationData {
5881
close(linesCh)
5982
wg.Wait()
6083

61-
return stationData
84+
return cMap
6285
}
6386

64-
func worker(wg *sync.WaitGroup, lines <-chan string, data map[string]*StationData, mu *sync.Mutex) {
87+
func worker(wg *sync.WaitGroup, lines <-chan string, cMap *ConcurrentMap) {
6588
defer wg.Done()
6689
for line := range lines {
67-
processLine(line, data, mu)
90+
processLine(line, cMap)
6891
}
6992
}
7093

71-
func processLine(line string, data map[string]*StationData, mu *sync.Mutex) {
94+
func processLine(line string, cMap *ConcurrentMap) {
7295
parts := strings.Split(line, ";")
7396
if len(parts) != 2 {
7497
return
@@ -80,40 +103,50 @@ func processLine(line string, data map[string]*StationData, mu *sync.Mutex) {
80103
return
81104
}
82105

83-
mu.Lock()
84-
defer mu.Unlock()
85-
86-
if sd, exists := data[station]; exists {
87-
sd.sum += temp
88-
sd.count++
89-
if temp < sd.min {
90-
sd.min = temp
106+
shard, lock := cMap.GetShard(station)
107+
lock.Lock()
108+
data, exists := shard[station]
109+
if !exists {
110+
data = &StationData{min: temp, max: temp, sum: temp, count: 1}
111+
shard[station] = data
112+
} else {
113+
data.sum += temp
114+
data.count++
115+
if temp < data.min {
116+
data.min = temp
91117
}
92-
if temp > sd.max {
93-
sd.max = temp
118+
if temp > data.max {
119+
data.max = temp
94120
}
95-
} else {
96-
data[station] = &StationData{min: temp, max: temp, sum: temp, count: 1}
97121
}
122+
lock.Unlock()
98123
}
99124

100-
func printResults(stationData map[string]*StationData) {
101-
// Extract keys and sort them
102-
keys := make([]string, 0, len(stationData))
103-
for key := range stationData {
104-
keys = append(keys, key)
105-
}
106-
sort.Strings(keys)
107-
108-
// Generate output
109-
fmt.Print("{")
110-
for i, key := range keys {
111-
data := stationData[key]
112-
mean := data.sum / data.count
113-
fmt.Printf("%s=%.1f/%.1f/%.1f", key, data.min, mean, data.max)
114-
if i < len(keys)-1 {
115-
fmt.Print(", ")
116-
}
117-
}
118-
fmt.Println("}")
125+
func printResults(cMap *ConcurrentMap) {
126+
// Consolidate data from shards
127+
consolidatedData := make(map[string]*StationData)
128+
for _, shard := range cMap.shards {
129+
for station, data := range shard {
130+
consolidatedData[station] = data
131+
}
132+
}
133+
134+
// Sort the station names
135+
keys := make([]string, 0, len(consolidatedData))
136+
for station := range consolidatedData {
137+
keys = append(keys, station)
138+
}
139+
sort.Strings(keys)
140+
141+
// Print sorted results
142+
fmt.Print("{")
143+
for i, key := range keys {
144+
data := consolidatedData[key]
145+
mean := data.sum / data.count
146+
fmt.Printf("%s=%.1f/%.1f/%.1f", key, data.min, mean, data.max)
147+
if i < len(keys)-1 {
148+
fmt.Print(", ")
149+
}
150+
}
151+
fmt.Println("}")
119152
}

makefile

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
2+
BINARY = brc
3+
# Default rule
4+
all: build run clean
5+
6+
# Build
7+
build:
8+
go build -o ${BINARY}
9+
10+
# Run
11+
run:
12+
@echo "Usage: make run FILE=<file_path>"
13+
@if [ "${FILE}" = "" ]; then \
14+
echo "Error: FILE not specified. Use 'make run FILE=<file_path>' to run the project."; \
15+
else \
16+
./${BINARY} ${FILE}; \
17+
fi
18+
19+
# Clean up
20+
clean:
21+
if [ -f ${BINARY} ] ; then rm ${BINARY} ; fi
22+
23+
# Test
24+
test:
25+
go test
26+
27+
# Help
28+
help:
29+
@echo "make: build and run the project"
30+
@echo "make build: build the project"
31+
@echo "make run FILE=<file_path>: run the project with specified file"
32+
@echo "make clean: clean up the binary"
33+
@echo "make test: run tests"
34+
@echo "make help: show this message"
35+
36+
.PHONY: all build run clean test help

0 commit comments

Comments
 (0)