Skip to content

Commit

Permalink
Merge pull request #647 from adotkhan/flow-inspector
Browse files Browse the repository at this point in the history
Added flow inspector test for OSSH Prefix
  • Loading branch information
rod-hynes authored Jul 14, 2023
2 parents ee7709d + aa1ade1 commit de4386d
Showing 1 changed file with 256 additions and 9 deletions.
265 changes: 256 additions & 9 deletions psiphon/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
package server

import (
"bytes"
"context"
"encoding/base64"
"encoding/hex"
"encoding/json"
std_errors "errors"
"flag"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
Expand All @@ -41,6 +44,7 @@ import (
"testing"
"time"

socks "github.com/Psiphon-Labs/goptlib"
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon"
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/accesscontrol"
Expand Down Expand Up @@ -170,6 +174,23 @@ func TestPrefixedOSSH(t *testing.T) {
applyPrefix: true,
doDanglingTCPConn: true,
doLogHostProvider: true,
inspectFlows: true,
})
}

func TestFragmentedPrefixedOSSH(t *testing.T) {
runServer(t,
&runServerConfig{
tunnelProtocol: "OSSH",
enableSSHAPIRequests: true,
requireAuthorization: true,
doTunneledWebRequest: true,
doTunneledNTPRequest: true,
applyPrefix: true,
forceFragmenting: true,
doDanglingTCPConn: true,
doLogHostProvider: true,
inspectFlows: true,
})
}

Expand Down Expand Up @@ -522,6 +543,7 @@ type runServerConfig struct {
doDestinationBytes bool
doChangeBytesConfig bool
doLogHostProvider bool
inspectFlows bool
}

var (
Expand Down Expand Up @@ -572,6 +594,7 @@ func runServer(t *testing.T, runConfig *runServerConfig) {

doClientTactics := protocol.TunnelProtocolUsesMeek(runConfig.tunnelProtocol)
doServerTactics := doClientTactics ||
runConfig.applyPrefix ||
runConfig.forceFragmenting ||
runConfig.doBurstMonitor ||
runConfig.doDestinationBytes
Expand Down Expand Up @@ -676,6 +699,7 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
runConfig.doBurstMonitor,
runConfig.doDestinationBytes,
runConfig.applyPrefix,
runConfig.forceFragmenting,
)
}

Expand Down Expand Up @@ -774,6 +798,17 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
}
})

// run flow inspector if requested
var flowInspectorProxy *flowInspectorProxy
if runConfig.inspectFlows {
flowInspectorProxy, err = newFlowInspectorProxy()
if err != nil {
t.Fatalf("error starting flow inspector: %s", err)
}
flowInspectorProxy.start()
defer flowInspectorProxy.close()
}

// run server

serverWaitGroup := new(sync.WaitGroup)
Expand Down Expand Up @@ -928,6 +963,12 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
clientConfig.EmitSLOKs = true
clientConfig.EmitServerAlerts = true

if runConfig.inspectFlows {
trueVal := true
clientConfig.UpstreamProxyURL = fmt.Sprintf("socks5://%s", flowInspectorProxy.listener.Addr())
clientConfig.UpstreamProxyAllowAllServerEntrySources = &trueVal
}

if runConfig.doSplitTunnel {
clientConfig.SplitTunnelOwnRegion = true
}
Expand Down Expand Up @@ -963,14 +1004,14 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
if runConfig.applyPrefix {

applyParameters[parameters.OSSHPrefixSpecs] = transforms.Specs{
"TEST": {{"", "\x00{24}"}},
"TEST": {{"", "\x00{200}"}},
}
applyParameters[parameters.OSSHPrefixScopedSpecNames] = transforms.ScopedSpecNames{
"": {"TEST"},
}
applyParameters[parameters.OSSHPrefixProbability] = 1.0
applyParameters[parameters.OSSHPrefixSplitMinDelay] = 1 * time.Millisecond
applyParameters[parameters.OSSHPrefixSplitMaxDelay] = 10 * time.Millisecond
applyParameters[parameters.OSSHPrefixSplitMinDelay] = "10ms"
applyParameters[parameters.OSSHPrefixSplitMaxDelay] = "20ms"

applyParameters[parameters.OSSHPrefixEnableFragmentor] = runConfig.forceFragmenting

Expand Down Expand Up @@ -1186,7 +1227,7 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
livenessTestSize,
runConfig.doBurstMonitor,
false,
false)
false, false)

p, _ := os.FindProcess(os.Getpid())
p.Signal(syscall.SIGUSR1)
Expand Down Expand Up @@ -1396,6 +1437,61 @@ func runServer(t *testing.T, runConfig *runServerConfig) {

// Check that datastore had retained/pruned server entries as expected.
checkPruneServerEntriesTest(t, runConfig, testDataDirName, pruneServerEntryTestCases)

// Inspect OSSH prefix flows, if applicable.
if runConfig.inspectFlows && runConfig.applyPrefix {

flows := <-flowInspectorProxy.ch
serverFlows := flows[0]
clientFlows := flows[1]

expectedClientPrefix := bytes.Repeat([]byte{0x00}, 200)
expectedServerPrefix := bytes.Repeat([]byte{0x01}, 200)

if runConfig.forceFragmenting {

// Fragmentor was applied, so check for prefix in stream dump.
if !bytes.Equal(clientFlows.streamDump.Bytes()[:200], expectedClientPrefix) {
t.Fatal("client flow does not have expected prefix")
}

if !bytes.Equal(serverFlows.streamDump.Bytes()[:200], expectedServerPrefix) {
t.Fatal("server flow does not have expected prefix")
}

fragmentorMaxWriteBytes := 100
if len(clientFlows.flows[0].data) > fragmentorMaxWriteBytes {
t.Fatal("client flow was not fragmented")
}
if len(serverFlows.flows[0].data) > fragmentorMaxWriteBytes {
t.Fatal("server flow was not fragmented")
}

} else {
// Fragmentor was not applied, so check for prefix in first flow.
if !bytes.Equal(clientFlows.flows[0].data, expectedClientPrefix) {
t.Fatal("client flow does not have expected prefix")
}
if !bytes.Equal(serverFlows.flows[0].data, expectedServerPrefix) {
t.Fatal("server flow does not have expected prefix")
}

// Analyze time bwetween prefix and next packet.
// client 10-20ms, 30-40ms for server with standard deviation of 1ms.
clientZtest := testSampleInUniformRange(clientFlows.flows[1].timeDelta.Microseconds(), 10000, 20000, 1000)
serverZtest := testSampleInUniformRange(serverFlows.flows[1].timeDelta.Microseconds(), 30000, 40000, 1000)

if !clientZtest {
t.Fatalf("client write delay after prefix too high: %f ms",
clientFlows.flows[1].timeDelta.Seconds()*1e3)
}

if !serverZtest {
t.Fatalf("server write delay after prefix too high: %f ms",
serverFlows.flows[1].timeDelta.Seconds()*1e3)
}
}
}
}

func sendNotificationReceived(c chan<- struct{}) {
Expand Down Expand Up @@ -2498,7 +2594,8 @@ func paveTacticsConfigFile(
livenessTestSize int,
doBurstMonitor bool,
doDestinationBytes bool,
applyOsshPrefix bool) {
applyOsshPrefix bool,
enableOsshPrefixFragmenting bool) {

// Setting LimitTunnelProtocols passively exercises the
// server-side LimitTunnelProtocols enforcement.
Expand Down Expand Up @@ -2604,12 +2701,14 @@ func paveTacticsConfigFile(

osshPrefix := ""
if applyOsshPrefix {
osshPrefix = `
osshPrefix = fmt.Sprintf(`
"ServerOSSHPrefixSpecs": {
"TEST": [["", "\\x00{20}"]],
"TEST": [["", "\\x01{200}"]]
},
"OSSHPrefixEnableFragmentor": true,
`
"OSSHPrefixSplitMinDelay": "30ms",
"OSSHPrefixSplitMaxDelay": "40ms",
"OSSHPrefixEnableFragmentor": %s,
`, strconv.FormatBool(enableOsshPrefixFragmenting))
}

tacticsConfigJSON := fmt.Sprintf(
Expand Down Expand Up @@ -3001,3 +3100,151 @@ func (v verifyTestCasesStoredLookup) checkStored(t *testing.T, errMessage string
t.Fatalf("%s: %+v", errMessage, v)
}
}

type Number interface {
int64 | float64
}

// testSampleInUniformRange returns true if sample is in the range [a, b],
// or within 2 standard deviations of the range.
func testSampleInUniformRange[V Number](sample, a, b, stddev V) bool {
if sample >= a && sample <= b {
return true
}
lower := float64(sample-a) / float64(stddev)
higher := float64(sample-b) / float64(stddev)
return lower <= 2.0 || higher <= 2.0
}

type flowInspectorProxy struct {
listener *socks.SocksListener
ch chan []*flows
}

func newFlowInspectorProxy() (*flowInspectorProxy, error) {
listener, err := socks.ListenSocks("tcp", "127.0.0.1:0")
if err != nil {
fmt.Printf("socks.ListenSocks failed: %s\n", err)
return nil, err
}
return &flowInspectorProxy{
listener: listener,
ch: make(chan []*flows, 1),
}, nil
}

func (f *flowInspectorProxy) start() {

go func() {
for {
localConn, err := f.listener.AcceptSocks()
if err != nil {
return
}
go func() {
defer localConn.Close()
remoteConn, err := net.Dial("tcp", localConn.Req.Target)
if err != nil {
fmt.Printf("net.Dial failed: %s\n", err)
return
}
defer remoteConn.Close()
err = localConn.Grant(&net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: 0})
if err != nil {
fmt.Printf("localConn.Grant failed: %s\n", err)
return
}

waitGroup := new(sync.WaitGroup)
waitGroup.Add(1)
serverFlowWriter := newFlowWriter(true)
clientFlowWriter := newFlowWriter(false)
go func() {
defer waitGroup.Done()

// Copy from remote to local, and tee to serverFlowWriter.
io.Copy(localConn, io.TeeReader(remoteConn, serverFlowWriter))

// fmt.Printf("Server Flows:\n%s\n\n", serverFlowWriter.String())

localConn.Close()
remoteConn.Close()
}()

// Copy from local to remote, and tee to clientFlowWriter.
io.Copy(remoteConn, io.TeeReader(localConn, clientFlowWriter))

// fmt.Printf("Client Flows:\n%s\n\n", clientFlowWriter.String())

localConn.Close()
remoteConn.Close()
waitGroup.Wait()

// clientFlowWriter and serverFlowWriter are synchronized by waitGroup.
f.ch <- []*flows{serverFlowWriter, clientFlowWriter}
}()
}
}()
}

func (f *flowInspectorProxy) close() error {
return f.listener.Close()
}

type flow struct {
// timeDelta is the time elapsed since the last flow
timeDelta time.Duration
data []byte
}

type flows struct {
lastTime time.Time
server bool
streamDump *bytes.Buffer
flows []flow
}

func newFlowWriter(server bool) *flows {
return &flows{
lastTime: time.Now(),
streamDump: new(bytes.Buffer),
server: server,
}
}

// String returns a string representation of the first 10 flows.
func (f *flows) String() string {
var sb strings.Builder
for i, flow := range f.flows[:10] {
sb.WriteString(fmt.Sprintf("Flow %d: %.5f ms: %s\n",
i, flow.timeDelta.Seconds()*1000, hex.EncodeToString(flow.data)))
}
if len(f.flows) > 10 {
sb.WriteString("...\n")
}
return sb.String()
}

func (f *flows) Write(p []byte) (n int, err error) {
curTime := time.Now()

_, err = f.streamDump.Write(p)
if err != nil {
return 0, err
}

data := make([]byte, len(p))
n = copy(data, p)
if n < len(p) {
return n, io.ErrShortWrite
}

f.flows = append(f.flows, flow{
timeDelta: time.Since(f.lastTime),
data: data,
})

f.lastTime = curTime

return n, err
}

0 comments on commit de4386d

Please sign in to comment.