-
Notifications
You must be signed in to change notification settings - Fork 0
/
kvdbtrx.go
116 lines (93 loc) · 3.1 KB
/
kvdbtrx.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package main
import (
"context"
"fmt"
"math"
"net/http"
"strings"
"time"
bt "cloud.google.com/go/bigtable"
"github.com/eoscanada/dhammer"
"github.com/eoscanada/kvdb"
"go.uber.org/zap"
)
func (d *Diagnose) EOSKVDBTrxsValidation(w http.ResponseWriter, req *http.Request) {
kvdbInfo, db := d.getEOSDatabase(w, req)
if kvdbInfo == nil || db == nil {
return
}
zlog.Info("diagnose - EOS - KVDB Trx Validation", zap.Reflect("connection_info", kvdbInfo))
conn, err := d.upgrader.Upgrade(w, req, nil)
if err != nil {
return
}
defer conn.Close()
reqCtx, cancel := context.WithCancel(req.Context())
go readWebsocket(conn, cancel)
startTime := time.Now()
processRowRange := func(ctx context.Context, ranges []interface{}) ([]interface{}, error) {
zlog.Info("processing ranges", zap.Int("range_count", len(ranges)), zap.Reflect("ranges", ranges))
var results []interface{}
for _, r := range ranges {
rowRange, _ := r.(bt.RowRange)
db.Transactions.BaseTable.ReadRows(ctx, rowRange, func(row bt.Row) bool {
key := row.Key()
trxID := key[0:64]
results = append(results, &Transaction{
Prefix: trxID[0:8],
Id: trxID,
BlockNum: kvdb.BlockNum(key[65:73]),
})
return true
}, bt.RowFilter(bt.ConditionFilter(bt.ColumnFilter("written"), nil, bt.StripValueFilter())))
}
zlog.Info("finished process ranges", zap.Int("trx_count", len(results)))
return results, nil
}
concurrency := 16
zlog.Info("concurrency count", zap.Int("concurrency_count", concurrency))
rowRanges := createTrxRowSets(concurrency)
hammer := dhammer.NewHammer(1, len(rowRanges), processRowRange)
hammer.Start(reqCtx)
maybeSendWebsocket(conn, WebsocketTypeProgress, Progress{Elapsed: time.Now().Sub(startTime)})
for _, rowRange := range rowRanges {
zlog.Info("pushing in hammer", zap.Reflect("row_range", rowRange.String()))
maybeSendWebsocket(conn, WebsocketTypeMessage, &Message{
Msg: fmt.Sprintf("Processing group range: start %s", rowRange.String()),
})
hammer.In <- rowRange
}
hammer.Close()
for {
select {
case <-hammer.Done():
zlog.Info("hammer completion")
return
case trxInt, ok := <-hammer.Out:
if !ok {
return
}
trx := trxInt.(*Transaction)
maybeSendWebsocket(conn, WebsocketTypeTransaction, trx)
}
}
}
func createTrxRowSets(concurrentReadCount int) []bt.RowRange {
letters := "123456789abcdef"
if concurrentReadCount > len(letters)+1 {
panic(fmt.Errorf("only accepting concurrent <= %d, got %d", len(letters), concurrentReadCount))
}
step := int(math.Ceil(float64(len(letters)) / float64(concurrentReadCount)))
startPrefix := ""
var endPrefix string
var rowRanges []bt.RowRange
for i := 0; i < len(letters); i += step {
endPrefix = string(letters[i]) + strings.Repeat("0", 63) + ":"
rowRanges = append(rowRanges, bt.NewRange(startPrefix, endPrefix))
startPrefix = endPrefix
}
// FIXME: Find a way to get up to last possible keys of `a:` set without copying the `prefixSuccessor` method from eosdb
// Hard-coded for now.
rowRanges = append(rowRanges, bt.NewRange(startPrefix, strings.Repeat("f", 64)+";"))
return rowRanges
}