Skip to content

Commit 580a38f

Browse files
committed
DGW/mqtt: fixed reconnection to the broker (#21)
1 parent 4029a16 commit 580a38f

1 file changed

Lines changed: 30 additions & 21 deletions

File tree

cmd/device-gateway/mqtt.go

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@ import (
88
)
99

1010
type MQTTPublisher struct {
11-
config *MqttProtocol
12-
client *MQTT.MqttClient
13-
dataCh chan AgentResponse
11+
config *MqttProtocol
12+
clientId string
13+
client *MQTT.MqttClient
14+
dataCh chan AgentResponse
1415
}
1516

1617
func newMQTTPublisher(conf *Config) *MQTTPublisher {
@@ -45,17 +46,19 @@ func newMQTTPublisher(conf *Config) *MQTTPublisher {
4546
return nil
4647
}
4748

48-
// Prepare MQTT connection opts
49-
broker := fmt.Sprintf("tcp://%s:%v", config.Host, config.Port)
50-
clientId := conf.Id
51-
connOpts := MQTT.NewClientOptions().AddBroker(broker).SetClientId(clientId).SetCleanSession(true).SetOnConnectionLost(onConnectionLost)
52-
5349
// Create and return publisher
5450
publisher := &MQTTPublisher{
55-
config: &config,
56-
client: MQTT.NewClient(connOpts),
57-
dataCh: make(chan AgentResponse),
51+
config: &config,
52+
clientId: conf.Id,
53+
dataCh: make(chan AgentResponse),
5854
}
55+
56+
// Prepare MQTT connection opts
57+
broker := fmt.Sprintf("tcp://%s:%v", config.Host, config.Port)
58+
connOpts := MQTT.NewClientOptions().AddBroker(broker).SetClientId(publisher.clientId).
59+
SetCleanSession(true).SetOnConnectionLost(publisher.onConnectionLost)
60+
61+
publisher.client = MQTT.NewClient(connOpts)
5962
return publisher
6063
}
6164

@@ -67,7 +70,7 @@ func (self *MQTTPublisher) start() {
6770
log.Println("MQTTPublisher.start()")
6871
// start the connection routine
6972
log.Printf("MQTTPublisher: Will connect to the broker tcp://%s:%v", self.config.Host, self.config.Port)
70-
go connect(self.client, 0)
73+
go self.connect(0)
7174

7275
qos := 1
7376
prefix := self.config.Prefix
@@ -95,30 +98,36 @@ func (self *MQTTPublisher) stop() {
9598
}
9699
}
97100

98-
func connect(client *MQTT.MqttClient, backOff int) {
99-
log.Println("MQTTPublisher.connect() with backOff (sec): ", backOff)
101+
func (self *MQTTPublisher) connect(backOff int) {
102+
log.Printf("MQTTPublisher: connecting to the broker %s:%v, backOff: %v sec\n", self.config.Host, self.config.Port, backOff)
100103
// sleep for backOff seconds
101104
time.Sleep(time.Duration(backOff) * time.Second)
102-
_, err := client.Start()
105+
_, err := self.client.Start()
103106

104107
if err != nil {
105-
log.Printf("Failed to connected to MQTT broker: %v\n", err.Error())
108+
log.Printf("MQTTPublisher: failed to connect: %v\n", err.Error())
106109
// intial backOff 10 sec, every further retry backOff*2 unless <= 10 min
107110
if backOff == 0 {
108111
backOff = 10
109112
} else if backOff <= 600 {
110113
backOff *= 2
111114
}
112-
go connect(client, backOff)
115+
go self.connect(backOff)
113116
return
114117
}
115118

116-
log.Printf("MQTTPublisher: Connected to the broker")
119+
log.Printf("MQTTPublisher: connected to the broker %s:%v", self.config.Host, self.config.Port)
117120
return
118121
}
119122

120-
func onConnectionLost(client *MQTT.MqttClient, reason error) {
123+
func (self *MQTTPublisher) onConnectionLost(client *MQTT.MqttClient, reason error) {
121124
log.Println("MQTTPulbisher: lost connection to the broker: ", reason.Error())
122-
// FIXME: bug in mqtt library (panic on reconnect)?
123-
// go connect(client, 0)
125+
126+
// Initialize a new client and reconnect
127+
broker := fmt.Sprintf("tcp://%s:%v", self.config.Host, self.config.Port)
128+
connOpts := MQTT.NewClientOptions().AddBroker(broker).SetClientId(self.clientId).
129+
SetCleanSession(true).SetOnConnectionLost(self.onConnectionLost)
130+
131+
self.client = MQTT.NewClient(connOpts)
132+
go self.connect(0)
124133
}

0 commit comments

Comments
 (0)