Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 73 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ else
SHELL := /bin/bash
endif

.PHONY : check lint install-linters dep test test-e2e test-e2e-build test-e2e-run test-e2e-test test-e2e-stop test-e2e-clean build
.PHONY : check lint install-linters dep tidy test test-e2e test-e2e-build test-e2e-run test-e2e-test test-e2e-stop test-e2e-clean build
.PHONY : update-dep update-skywire update-skycoin push-deps sync-upstream-develop

VERSION := $(shell git describe --always)

Expand Down Expand Up @@ -101,17 +102,86 @@ install-linters-windows: ## Install linters on windows
${OPTS} go install golang.org/x/tools/cmd/goimports@latest
${OPTS} go install github.com/incu6us/goimports-reviser@latest

format: ## Formats the code. Must have goimports and goimports-reviser installed (use make install-linters).
format: tidy ## Formats the code. Must have goimports and goimports-reviser installed (use make install-linters).
${OPTS} goimports -w -local ${DMSG_REPO} ./pkg ./cmd ./internal ./examples
find . -type f -name '*.go' -not -path "./.git/*" -not -path "./vendor/*" -exec goimports-reviser -project-name ${DMSG_REPO} {} \;


format-windows: ## Formats the code. Must have goimports and goimports-reviser installed (use make install-linters-windows).
powershell -Command .\scripts\format-windows.ps1

dep: ## Sorts dependencies
tidy: ## Tidies dependencies
${OPTS} go mod tidy -v

dep: tidy ## Sorts and vendors dependencies
${OPTS} go mod vendor -v

update-dep: ## Update all dependencies to latest versions, vendor, and commit
${OPTS} go get -v -u ./...
${OPTS} go mod tidy -v
${OPTS} go mod vendor -v
git add go.mod go.sum vendor
git diff --cached --quiet || git commit -m "update deps"

update-skywire: ## Update skywire to latest develop branch
@echo "Updating skywire to latest develop..."
${OPTS} go get -v github.com/skycoin/skywire@develop
${OPTS} go mod tidy -v
${OPTS} go mod vendor -v
@echo "skywire updated successfully"

update-skycoin: ## Update skycoin to latest develop branch
@echo "Updating skycoin to latest develop..."
${OPTS} go get -v github.com/skycoin/skycoin@develop
${OPTS} go mod tidy -v
${OPTS} go mod vendor -v
@echo "skycoin updated successfully"

push-deps: ## Commit and push dependency updates
@echo "Committing dependency updates..."
git add go.mod go.sum vendor
git diff --cached --quiet || git commit -m "update deps"
git push
@echo "Dependencies pushed successfully"

sync-upstream-develop: ## Sync local develop branch with upstream skycoin/dmsg develop
@normalize() { \
echo "$$1" | sed \
-e 's|git@github.com:|https://github.com/|' \
-e 's|ssh://github.com/|https://github.com/|' \
-e 's|\.git$$||' \
-e 's|https://github.com/||' \
| tr '[:upper:]' '[:lower:]'; \
}; \
UPSTREAM_URL=$$(git remote get-url upstream 2>/dev/null); \
if [ -z "$$UPSTREAM_URL" ]; then \
echo "[error] no 'upstream' remote found. Add it with:"; \
echo " git remote add upstream https://github.com/skycoin/dmsg.git"; \
exit 1; \
fi; \
UPSTREAM_NORM=$$(normalize "$$UPSTREAM_URL"); \
if [ "$$UPSTREAM_NORM" != "skycoin/dmsg" ]; then \
echo "[error] upstream remote does not point to skycoin/dmsg."; \
echo " Found: $$UPSTREAM_URL"; \
exit 1; \
fi; \
ORIGIN_URL=$$(git remote get-url origin 2>/dev/null); \
if [ -z "$$ORIGIN_URL" ]; then \
echo "[error] no 'origin' remote found."; \
exit 1; \
fi; \
ORIGIN_NORM=$$(normalize "$$ORIGIN_URL"); \
if [ "$$ORIGIN_NORM" = "skycoin/dmsg" ]; then \
echo "[error] origin points to skycoin/dmsg directly."; \
echo " This target must be run from a fork, not the canonical repo."; \
exit 1; \
fi; \
echo "[ok] origin is a fork ($$ORIGIN_NORM), upstream is skycoin/dmsg — syncing develop..."; \
git checkout develop && \
git pull && \
git fetch upstream && \
git merge upstream/develop && \
git push

install: ## Install `dmsg-discovery`, `dmsg-server`, `dmsgcurl`,`dmsgpty-cli`, `dmsgpty-host`, `dmsgpty-ui`
${OPTS} go install ${BUILD_OPTS} ./cmd/*
Expand Down
7 changes: 7 additions & 0 deletions cmd/dmsg-server/commands/start/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,17 @@ var RootCmd = &cobra.Command{

srvAPI := dmsgserver.NewServerAPI(r, log, m)

// Convert peer config to dmsg.PeerEntry.
var peers []dmsg.PeerEntry
for _, p := range conf.Peers {
peers = append(peers, dmsg.PeerEntry{PK: p.PubKey, Addr: p.Address})
}

srvConf := dmsg.ServerConfig{
MaxSessions: conf.MaxSessions,
UpdateInterval: conf.UpdateInterval,
AuthPassphrase: authPassphrase,
Peers: peers,
}
srv := dmsg.NewServer(conf.PubKey, conf.SecKey, disc.NewHTTP(conf.Discovery, &http.Client{}, log), &srvConf, m)
srv.SetLogger(log)
Expand Down
24 changes: 23 additions & 1 deletion cmd/dmsg-socks5/commands/dmsg-socks5.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package commands
import (
"context"
"fmt"
"net"
"net/http"
"os"
"strings"
Expand Down Expand Up @@ -138,7 +139,6 @@ var serveCmd = &cobra.Command{

ctx, cancel := cmdutil.SignalContext(context.Background(), dlog)
defer cancel()
//TODO: implement whitelist logic

dmsgC, closeDmsg, err := dmsgclient.InitDmsgWithFlags(ctx, dlog, pk, sk, httpClient, pk.String())

Expand Down Expand Up @@ -182,6 +182,28 @@ var serveCmd = &cobra.Command{
}
dlog.Infof("Accepted connection from: %s", respConn.RemoteAddr())

// Enforce whitelist: extract remote PK from the dmsg address.
if len(wlkeys) > 0 {
remotePK, _, splitErr := net.SplitHostPort(respConn.RemoteAddr().String())
if splitErr != nil {
dlog.WithError(splitErr).Warn("Failed to parse remote address, rejecting connection.")
respConn.Close() //nolint:errcheck,gosec
continue
}
allowed := false
for _, key := range wlkeys {
if remotePK == key.String() {
allowed = true
break
}
}
if !allowed {
dlog.WithField("remote_pk", remotePK).Warn("Connection rejected: not in whitelist.")
respConn.Close() //nolint:errcheck,gosec
continue
}
}

conf := &socks5.Config{}
server, err := socks5.New(conf)
if err != nil {
Expand Down
27 changes: 0 additions & 27 deletions cmd/dmsg/commands/kill.go

This file was deleted.

4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ require (
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/bytedance/gopkg v0.1.4 // indirect
github.com/bytedance/sonic v1.15.0 // indirect
github.com/bytedance/sonic/loader v0.5.0 // indirect
github.com/bytedance/sonic/loader v0.5.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cloudwego/base64x v0.1.6 // indirect
github.com/containerd/errdefs v1.0.0 // indirect
Expand All @@ -53,7 +53,7 @@ require (
github.com/fatih/color v1.19.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/gabriel-vasile/mimetype v1.4.13 // indirect
github.com/gin-contrib/sse v1.1.0 // indirect
github.com/gin-contrib/sse v1.1.1 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-playground/locales v0.14.1 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ github.com/bytedance/gopkg v0.1.4 h1:oZnQwnX82KAIWb7033bEwtxvTqXcYMxDBaQxo5JJHWM
github.com/bytedance/gopkg v0.1.4/go.mod h1:v1zWfPm21Fb+OsyXN2VAHdL6TBb2L88anLQgdyje6R4=
github.com/bytedance/sonic v1.15.0 h1:/PXeWFaR5ElNcVE84U0dOHjiMHQOwNIx3K4ymzh/uSE=
github.com/bytedance/sonic v1.15.0/go.mod h1:tFkWrPz0/CUCLEF4ri4UkHekCIcdnkqXw9VduqpJh0k=
github.com/bytedance/sonic/loader v0.5.0 h1:gXH3KVnatgY7loH5/TkeVyXPfESoqSBSBEiDd5VjlgE=
github.com/bytedance/sonic/loader v0.5.0/go.mod h1:AR4NYCk5DdzZizZ5djGqQ92eEhCCcdf5x77udYiSJRo=
github.com/bytedance/sonic/loader v0.5.1 h1:Ygpfa9zwRCCKSlrp5bBP/b/Xzc3VxsAW+5NIYXrOOpI=
github.com/bytedance/sonic/loader v0.5.1/go.mod h1:AR4NYCk5DdzZizZ5djGqQ92eEhCCcdf5x77udYiSJRo=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM=
github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
Expand Down Expand Up @@ -61,8 +61,8 @@ github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S
github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
github.com/gabriel-vasile/mimetype v1.4.13 h1:46nXokslUBsAJE/wMsp5gtO500a4F3Nkz9Ufpk2AcUM=
github.com/gabriel-vasile/mimetype v1.4.13/go.mod h1:d+9Oxyo1wTzWdyVUPMmXFvp4F9tea18J8ufA774AB3s=
github.com/gin-contrib/sse v1.1.0 h1:n0w2GMuUpWDVp7qSpvze6fAu9iRxJY4Hmj6AmBOU05w=
github.com/gin-contrib/sse v1.1.0/go.mod h1:hxRZ5gVpWMT7Z0B0gSNYqqsSCNIJMjzvm6fqCz9vjwM=
github.com/gin-contrib/sse v1.1.1 h1:uGYpNwTacv5R68bSGMapo62iLTRa9l5zxGCps4hK6ko=
github.com/gin-contrib/sse v1.1.1/go.mod h1:QXzuVkA0YO7o/gun03UI1Q+FTI8ZV/n5t03kIQAI89s=
github.com/gin-gonic/gin v1.12.0 h1:b3YAbrZtnf8N//yjKeU2+MQsh2mY5htkZidOM7O0wG8=
github.com/gin-gonic/gin v1.12.0/go.mod h1:VxccKfsSllpKshkBWgVgRniFFAzFb9csfngsqANjnLc=
github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug=
Expand Down
5 changes: 3 additions & 2 deletions pkg/dmsg/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type Client struct {
errCh chan error
done chan struct{}
once sync.Once
wg sync.WaitGroup // tracks background goroutines for clean shutdown
sesMx sync.Mutex
}

Expand Down Expand Up @@ -330,8 +331,7 @@ func (ce *Client) discoverServers(ctx context.Context, all bool) (entries []*dis
return entries, err
}

// Close closes the dmsg client entity.
// TODO(evanlinjin): Have waitgroup.
// Close closes the dmsg client entity and waits for background goroutines to finish.
func (ce *Client) Close() error {
if ce == nil {
return nil
Expand All @@ -354,6 +354,7 @@ func (ce *Client) Close() error {
ce.log.Debug("All sessions closed.")
ce.sessionsMx.Unlock()
ce.porter.CloseAll(ce.log)
ce.wg.Wait()
err = ce.EntityCommon.delEntry(context.Background())
})
return err
Expand Down
21 changes: 17 additions & 4 deletions pkg/dmsg/client_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ func (ce *Client) DialStream(ctx context.Context, addr Addr) (*Stream, error) {
return nil, err
}

// Range client's delegated servers.
// Try existing sessions first, falling back to next server on failure.
// 1. Try existing sessions to the target's delegated servers (direct path, cheapest).
for _, srvPK := range entry.Client.DelegatedServers {
if dSes, ok := ce.clientSession(ce.porter, srvPK); ok {
stream, err := dSes.DialStream(addr)
Expand All @@ -47,8 +46,22 @@ func (ce *Client) DialStream(ctx context.Context, addr Addr) (*Stream, error) {
}
}

// Range client's delegated servers.
// Attempt to connect to a delegated server.
// 2. Try all other existing sessions (mesh path — already connected, no new handshake).
// If servers are meshed, our server forwards the request to the target's server.
for _, ses := range ce.allClientSessions(ce.porter) {
if hasPK(entry.Client.DelegatedServers, ses.RemotePK()) {
continue // already tried above
}
stream, err := ses.DialStream(addr)
if err != nil {
ce.log.WithError(err).WithField("server", ses.RemotePK()).
Debug("DialStream failed via mesh, trying next server")
continue
}
return stream, nil
}

// 3. Last resort: establish new sessions to the target's delegated servers.
for _, srvPK := range entry.Client.DelegatedServers {
dSes, err := ce.EnsureAndObtainSession(ctx, srvPK)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/dmsg/client_sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ func (ce *Client) dialSession(ctx context.Context, entry *disc.Entry) (cs Client
return ClientSession{}, errors.New("session already exists")
}

ce.wg.Add(1)
go func() {
defer ce.wg.Done()
defer func() {
if r := recover(); r != nil {
ce.log.Warnf("recovered panic in session serve goroutine: %v", r)
Expand Down
2 changes: 1 addition & 1 deletion pkg/dmsg/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const (

DefaultUpdateInterval = time.Minute

DefaultMaxSessions = 100
DefaultMaxSessions = 2048

DefaultDmsgHTTPPort = uint16(80)

Expand Down
17 changes: 17 additions & 0 deletions pkg/dmsg/entity_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ type EntityCommon struct {

setSessionCallback func(ctx context.Context) error
delSessionCallback func(ctx context.Context) error

// peerSessionsFunc returns peer server sessions for mesh forwarding.
// Only set on Server entities; nil for clients.
peerSessionsFunc func() []*SessionCommon
}

func (c *EntityCommon) init(pk cipher.PubKey, sk cipher.SecKey, dc disc.APIClient, log logrus.FieldLogger, updateInterval time.Duration) {
Expand Down Expand Up @@ -84,6 +88,19 @@ func (c *EntityCommon) serverSession(pk cipher.PubKey) (ServerSession, bool) {
return ServerSession{SessionCommon: ses}, ok
}

// peerServerSessions returns all peer server sessions for mesh forwarding.
func (c *EntityCommon) peerServerSessions() []ServerSession {
if c.peerSessionsFunc == nil {
return nil
}
raw := c.peerSessionsFunc()
sessions := make([]ServerSession, len(raw))
for i, ses := range raw {
sessions[i] = ServerSession{SessionCommon: ses}
}
return sessions
}

// clientSession obtains a session as a client.
func (c *EntityCommon) clientSession(porter *netutil.Porter, pk cipher.PubKey) (ClientSession, bool) {
ses, ok := c.session(pk)
Expand Down
Loading
Loading