Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,39 @@ To disable Cloud Fetch (e.g., when handling smaller datasets or to avoid additio
token:[your token]@[Workspace hostname]:[Port number][Endpoint HTTP Path]?useCloudFetch=false
```

### Telemetry Configuration (Optional)

The driver includes optional telemetry to help improve performance and reliability. Telemetry is **disabled by default** and requires explicit opt-in.

**Opt-in to telemetry** (respects server-side feature flags):
```
token:[your token]@[Workspace hostname]:[Port number][Endpoint HTTP Path]?enableTelemetry=true
```

**Opt-out of telemetry** (explicitly disable):
```
token:[your token]@[Workspace hostname]:[Port number][Endpoint HTTP Path]?enableTelemetry=false
```

**Advanced configuration** (for testing/debugging):
```
token:[your token]@[Workspace hostname]:[Port number][Endpoint HTTP Path]?forceEnableTelemetry=true
```

**What data is collected:**
- ✅ Query latency and performance metrics
- ✅ Error codes (not error messages)
- ✅ Feature usage (CloudFetch, LZ4, etc.)
- ✅ Driver version and environment info

**What is NOT collected:**
- ❌ SQL query text
- ❌ Query results or data values
- ❌ Table/column names
- ❌ User identities or credentials

Telemetry has < 1% performance overhead and uses circuit breaker protection to ensure it never impacts your queries. For more details, see `telemetry/DESIGN.md` and `telemetry/TROUBLESHOOTING.md`.

### Connecting with a new Connector

You can also connect with a new connector object. For example:
Expand Down
73 changes: 56 additions & 17 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,18 @@ func (c *conn) Close() error {
ctx := driverctx.NewContextWithConnId(context.Background(), c.id)

// Close telemetry and release resources
closeStart := time.Now()
_, err := c.client.CloseSession(ctx, &cli_service.TCloseSessionReq{
SessionHandle: c.session.SessionHandle,
})
closeLatencyMs := time.Since(closeStart).Milliseconds()

if c.telemetry != nil {
c.telemetry.RecordOperation(ctx, c.id, telemetry.OperationTypeDeleteSession, closeLatencyMs)
_ = c.telemetry.Close(ctx)
telemetry.ReleaseForConnection(c.cfg.Host)
}

_, err := c.client.CloseSession(ctx, &cli_service.TCloseSessionReq{
SessionHandle: c.session.SessionHandle,
})

if err != nil {
log.Err(err).Msg("databricks: failed to close connection")
return dbsqlerrint.NewBadConnectionError(err)
Expand Down Expand Up @@ -123,15 +126,16 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name

corrId := driverctx.CorrelationIdFromContext(ctx)

exStmtResp, opStatusResp, err := c.runQuery(ctx, query, args)
var pollCount int
exStmtResp, opStatusResp, err := c.runQuery(ctx, query, args, &pollCount)
log, ctx = client.LoggerAndContext(ctx, exStmtResp)
stagingErr := c.execStagingOperation(exStmtResp, ctx)

// Telemetry: track statement execution
var statementID string
if c.telemetry != nil && exStmtResp != nil && exStmtResp.OperationHandle != nil && exStmtResp.OperationHandle.OperationId != nil {
statementID = client.SprintGuid(exStmtResp.OperationHandle.OperationId.GUID)
ctx = c.telemetry.BeforeExecute(ctx, statementID)
ctx = c.telemetry.BeforeExecute(ctx, c.id, statementID)
defer func() {
finalErr := err
if stagingErr != nil {
Expand All @@ -140,6 +144,7 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name
c.telemetry.AfterExecute(ctx, finalErr)
c.telemetry.CompleteStatement(ctx, statementID, finalErr != nil)
}()
c.telemetry.AddTag(ctx, "poll_count", pollCount)
}

if exStmtResp != nil && exStmtResp.OperationHandle != nil {
Expand Down Expand Up @@ -181,34 +186,60 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
log, _ := client.LoggerAndContext(ctx, nil)
msg, start := log.Track("QueryContext")

// first we try to get the results synchronously.
// at any point in time that the context is done we must cancel and return
exStmtResp, opStatusResp, err := c.runQuery(ctx, query, args)
// Capture execution start time for telemetry before running the query
executeStart := time.Now()
var pollCount int
exStmtResp, opStatusResp, pollCount, err := c.runQueryWithTelemetry(ctx, query, args, &pollCount)
log, ctx = client.LoggerAndContext(ctx, exStmtResp)
defer log.Duration(msg, start)

// Telemetry: track statement execution
var statementID string
if c.telemetry != nil && exStmtResp != nil && exStmtResp.OperationHandle != nil && exStmtResp.OperationHandle.OperationId != nil {
statementID = client.SprintGuid(exStmtResp.OperationHandle.OperationId.GUID)
ctx = c.telemetry.BeforeExecute(ctx, statementID)
// Use BeforeExecuteWithTime to set the correct start time (before execution)
ctx = c.telemetry.BeforeExecuteWithTime(ctx, c.id, statementID, executeStart)
defer func() {
c.telemetry.AfterExecute(ctx, err)
c.telemetry.CompleteStatement(ctx, statementID, err != nil)
}()

c.telemetry.AddTag(ctx, "poll_count", pollCount)
c.telemetry.AddTag(ctx, "operation_type", telemetry.OperationTypeExecuteStatement)

if exStmtResp.DirectResults != nil && exStmtResp.DirectResults.ResultSetMetadata != nil {
resultFormat := exStmtResp.DirectResults.ResultSetMetadata.GetResultFormat()
c.telemetry.AddTag(ctx, "result.format", resultFormat.String())
}
}

if err != nil {
log.Err(err).Msg("databricks: failed to run query") // To log query we need to redact credentials
return nil, dbsqlerrint.NewExecutionError(ctx, dbsqlerr.ErrQueryExecution, err, opStatusResp)
}

rows, err := rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults)
var telemetryUpdate func(int, int64)
if c.telemetry != nil {
telemetryUpdate = func(chunkCount int, bytesDownloaded int64) {
c.telemetry.AddTag(ctx, "chunk_count", chunkCount)
c.telemetry.AddTag(ctx, "bytes_downloaded", bytesDownloaded)
}
}

rows, err := rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults, ctx, telemetryUpdate)
return rows, err

}

func (c *conn) runQuery(ctx context.Context, query string, args []driver.NamedValue) (*cli_service.TExecuteStatementResp, *cli_service.TGetOperationStatusResp, error) {
func (c *conn) runQueryWithTelemetry(ctx context.Context, query string, args []driver.NamedValue, pollCount *int) (*cli_service.TExecuteStatementResp, *cli_service.TGetOperationStatusResp, int, error) {
exStmtResp, opStatusResp, err := c.runQuery(ctx, query, args, pollCount)
count := 0
if pollCount != nil {
count = *pollCount
}
return exStmtResp, opStatusResp, count, err
}

func (c *conn) runQuery(ctx context.Context, query string, args []driver.NamedValue, pollCount *int) (*cli_service.TExecuteStatementResp, *cli_service.TGetOperationStatusResp, error) {
// first we try to get the results synchronously.
// at any point in time that the context is done we must cancel and return
exStmtResp, err := c.executeStatement(ctx, query, args)
Expand Down Expand Up @@ -240,7 +271,7 @@ func (c *conn) runQuery(ctx context.Context, query string, args []driver.NamedVa
case cli_service.TOperationState_INITIALIZED_STATE,
cli_service.TOperationState_PENDING_STATE,
cli_service.TOperationState_RUNNING_STATE:
statusResp, err := c.pollOperation(ctx, opHandle)
statusResp, err := c.pollOperationWithCount(ctx, opHandle, pollCount)
if err != nil {
return exStmtResp, statusResp, err
}
Expand Down Expand Up @@ -268,7 +299,7 @@ func (c *conn) runQuery(ctx context.Context, query string, args []driver.NamedVa
}

} else {
statusResp, err := c.pollOperation(ctx, opHandle)
statusResp, err := c.pollOperationWithCount(ctx, opHandle, pollCount)
if err != nil {
return exStmtResp, statusResp, err
}
Expand Down Expand Up @@ -396,7 +427,7 @@ func (c *conn) executeStatement(ctx context.Context, query string, args []driver
return resp, err
}

func (c *conn) pollOperation(ctx context.Context, opHandle *cli_service.TOperationHandle) (*cli_service.TGetOperationStatusResp, error) {
func (c *conn) pollOperationWithCount(ctx context.Context, opHandle *cli_service.TOperationHandle, pollCount *int) (*cli_service.TGetOperationStatusResp, error) {
corrId := driverctx.CorrelationIdFromContext(ctx)
log := logger.WithContext(c.id, corrId, client.SprintGuid(opHandle.OperationId.GUID))
var statusResp *cli_service.TGetOperationStatusResp
Expand All @@ -413,6 +444,10 @@ func (c *conn) pollOperation(ctx context.Context, opHandle *cli_service.TOperati
OperationHandle: opHandle,
})

if pollCount != nil {
*pollCount++
}

if statusResp != nil && statusResp.OperationState != nil {
log.Debug().Msgf("databricks: status %s", statusResp.GetOperationState().String())
}
Expand Down Expand Up @@ -455,6 +490,10 @@ func (c *conn) pollOperation(ctx context.Context, opHandle *cli_service.TOperati
return statusResp, nil
}

func (c *conn) pollOperation(ctx context.Context, opHandle *cli_service.TOperationHandle) (*cli_service.TGetOperationStatusResp, error) {
return c.pollOperationWithCount(ctx, opHandle, nil)
}

func (c *conn) CheckNamedValue(nv *driver.NamedValue) error {
var err error
if parameter, ok := nv.Value.(Parameter); ok {
Expand Down Expand Up @@ -622,7 +661,7 @@ func (c *conn) execStagingOperation(
}

if len(driverctx.StagingPathsFromContext(ctx)) != 0 {
row, err = rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults)
row, err = rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults, nil, nil)
if err != nil {
return dbsqlerrint.NewDriverError(ctx, "error reading row.", err)
}
Expand Down
16 changes: 8 additions & 8 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ func TestConn_runQuery(t *testing.T) {
client: testClient,
cfg: config.WithDefaults(),
}
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)
assert.Error(t, err)
assert.Nil(t, exStmtResp)
assert.Nil(t, opStatusResp)
Expand Down Expand Up @@ -875,7 +875,7 @@ func TestConn_runQuery(t *testing.T) {
client: testClient,
cfg: config.WithDefaults(),
}
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)

assert.Error(t, err)
assert.Equal(t, 1, executeStatementCount)
Expand Down Expand Up @@ -921,7 +921,7 @@ func TestConn_runQuery(t *testing.T) {
client: testClient,
cfg: config.WithDefaults(),
}
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)

assert.NoError(t, err)
assert.Equal(t, 1, executeStatementCount)
Expand Down Expand Up @@ -968,7 +968,7 @@ func TestConn_runQuery(t *testing.T) {
client: testClient,
cfg: config.WithDefaults(),
}
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)

assert.Error(t, err)
assert.Equal(t, 1, executeStatementCount)
Expand Down Expand Up @@ -1021,7 +1021,7 @@ func TestConn_runQuery(t *testing.T) {
client: testClient,
cfg: config.WithDefaults(),
}
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)

assert.NoError(t, err)
assert.Equal(t, 1, executeStatementCount)
Expand Down Expand Up @@ -1073,7 +1073,7 @@ func TestConn_runQuery(t *testing.T) {
client: testClient,
cfg: config.WithDefaults(),
}
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)

assert.Error(t, err)
assert.Equal(t, 1, executeStatementCount)
Expand Down Expand Up @@ -1126,7 +1126,7 @@ func TestConn_runQuery(t *testing.T) {
client: testClient,
cfg: config.WithDefaults(),
}
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)

assert.NoError(t, err)
assert.Equal(t, 1, executeStatementCount)
Expand Down Expand Up @@ -1179,7 +1179,7 @@ func TestConn_runQuery(t *testing.T) {
client: testClient,
cfg: config.WithDefaults(),
}
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)

assert.Error(t, err)
assert.Equal(t, 1, executeStatementCount)
Expand Down
20 changes: 20 additions & 0 deletions connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
}

protocolVersion := int64(c.cfg.ThriftProtocolVersion)

sessionStart := time.Now()
session, err := tclient.OpenSession(ctx, &cli_service.TOpenSessionReq{
ClientProtocolI64: &protocolVersion,
Configuration: sessionParams,
Expand All @@ -64,6 +66,8 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
},
CanUseMultipleCatalogs: &c.cfg.CanUseMultipleCatalogs,
})
sessionLatencyMs := time.Since(sessionStart).Milliseconds()

if err != nil {
return nil, dbsqlerrint.NewRequestError(ctx, fmt.Sprintf("error connecting: host=%s port=%d, httpPath=%s", c.cfg.Host, c.cfg.Port, c.cfg.HTTPPath), err)
}
Expand All @@ -85,14 +89,30 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
}
// else: leave nil to check server feature flag

// Build connection parameters for telemetry
connParams := &telemetry.DriverConnectionParameters{
Host: c.cfg.Host,
Port: c.cfg.Port,
HTTPPath: c.cfg.HTTPPath,
EnableArrow: c.cfg.UseArrowBatches,
EnableMetricViewMetadata: c.cfg.EnableMetricViewMetadata,
SocketTimeoutSeconds: int64(c.cfg.ClientTimeout.Seconds()),
RowsFetchedPerBlock: int64(c.cfg.MaxRows),
}

conn.telemetry = telemetry.InitializeForConnection(
ctx,
c.cfg.Host,
c.cfg.Port,
c.cfg.HTTPPath,
c.cfg.DriverVersion,
c.client,
enableTelemetry,
connParams,
)
if conn.telemetry != nil {
log.Debug().Msg("telemetry initialized for connection")
conn.telemetry.RecordOperation(ctx, conn.id, telemetry.OperationTypeCreateSession, sessionLatencyMs)
}

log.Info().Msgf("connect: host=%s port=%d httpPath=%s serverProtocolVersion=0x%X", c.cfg.Host, c.cfg.Port, c.cfg.HTTPPath, session.ServerProtocolVersion)
Expand Down
5 changes: 4 additions & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ func (ucfg UserConfig) WithDefaults() UserConfig {
ucfg.UseLz4Compression = false
ucfg.CloudFetchConfig = CloudFetchConfig{}.WithDefaults()

// Enable telemetry by default (respects server feature flags)
ucfg.EnableTelemetry = true

return ucfg
}

Expand All @@ -197,7 +200,7 @@ func WithDefaults() *Config {
ClientTimeout: 900 * time.Second,
PingTimeout: 60 * time.Second,
CanUseMultipleCatalogs: true,
DriverName: "godatabrickssqlconnector", // important. Do not change
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the comment say do not change? do we know why was that?

DriverName: "godatabrickssqlconnector", // Server requires this exact name for validation
ThriftProtocol: "binary",
ThriftTransport: "http",
ThriftProtocolVersion: cli_service.TProtocolVersion_SPARK_CLI_SERVICE_PROTOCOL_V8,
Expand Down
Loading
Loading