Skip to content

Commit

Permalink
Redo weather/traffic websocket updates (#74).
Browse files Browse the repository at this point in the history
  • Loading branch information
cyoung committed Oct 8, 2015
1 parent e41b040 commit a2a7b11
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 33 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ GOARM ?= 7

all:
GOOS=$(GOOS) GOARCH=$(GOARCH) GOARM=$(GOARM) go get -t -d -v ./...
GOOS=$(GOOS) GOARCH=$(GOARCH) GOARM=$(GOARM) go build -ldflags " -X main.stratuxVersion=`git describe --abbrev=0 --tags` -X main.stratuxBuild=`git log -n 1 --pretty=%H`" main/gen_gdl90.go main/traffic.go main/ry835ai.go main/network.go main/managementinterface.go main/sdr.go
GOOS=$(GOOS) GOARCH=$(GOARCH) GOARM=$(GOARM) go build -ldflags " -X main.stratuxVersion=`git describe --abbrev=0 --tags` -X main.stratuxBuild=`git log -n 1 --pretty=%H`" main/gen_gdl90.go main/traffic.go main/ry835ai.go main/network.go main/managementinterface.go main/sdr.go main/uibroadcast.go
cd dump978 && make lib

test:
Expand Down
17 changes: 4 additions & 13 deletions main/gen_gdl90.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,9 +475,7 @@ type WeatherMessage struct {
LocaltimeReceived time.Time
}

var weatherMessages []WeatherMessage

// Send update to attached client.
// Send update to connected websockets.
func registerADSBTextMessageReceived(msg string) {
x := strings.Split(msg, " ")
if len(x) < 5 {
Expand All @@ -492,16 +490,10 @@ func registerADSBTextMessageReceived(msg string) {
wm.Data = strings.Join(x[3:], " ")
wm.LocaltimeReceived = time.Now()

//FIXME: Fixed log size currently - determine what works best for the web interface.
n := len(weatherMessages)
if n >= 2500 {
weatherMessages = weatherMessages[1:]
}

weatherMessages = append(weatherMessages, wm)
wmJSON, _ := json.Marshal(&wm)

// Send to weatherUpdate channel for any connected clients.
weatherUpdate <- wm
weatherUpdate.Send(wmJSON)
}

func parseInput(buf string) ([]byte, uint16) {
Expand Down Expand Up @@ -712,7 +704,7 @@ func defaultSettings() {
globalSettings.ES_Enabled = false //TODO
globalSettings.GPS_Enabled = false //TODO
//FIXME: Need to change format below.
globalSettings.NetworkOutputs = []networkConnection{{nil, "", 4000, NETWORK_GDL90_STANDARD | NETWORK_AHRS_GDL90, nil, time.Time{}, time.Time{}, 0}, {nil, "", 49002, NETWORK_AHRS_FFSIM, nil, time.Time{}, time.Time{}, 0}}
globalSettings.NetworkOutputs = []networkConnection{{nil, "", 4000, NETWORK_GDL90_STANDARD | NETWORK_AHRS_GDL90, nil, time.Time{}, time.Time{}, 0}, {nil, "", 49002, NETWORK_AHRS_FFSIM, nil, time.Time{}, time.Time{}, 0}}
globalSettings.AHRS_Enabled = false
globalSettings.DEBUG = false
globalSettings.ReplayLog = false //TODO: 'true' for debug builds.
Expand Down Expand Up @@ -826,7 +818,6 @@ func main() {

ADSBTowers = make(map[string]ADSBTower)
MsgLog = make([]msg, 0)
weatherMessages = make([]WeatherMessage, 0)

crcInit() // Initialize CRC16 table.
sdrInit()
Expand Down
47 changes: 30 additions & 17 deletions main/managementinterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,28 @@ type SettingMessage struct {
}

// Weather updates channel.
var weatherUpdate chan WeatherMessage
var trafficUpdate chan TrafficInfo
var weatherUpdate *uibroadcaster
var trafficUpdate *uibroadcaster

/*
The /weather websocket starts off by sending the current buffer of weather messages, then sends updates as they are received.
*/

func handleWeatherWS(conn *websocket.Conn) {
// Send current buffer.
for _, w := range weatherMessages {
weatherJSON, _ := json.Marshal(&w)
conn.Write(weatherJSON)
}
// Wait for updates and send as they are received.
// Subscribe the socket to receive updates.
weatherUpdate.AddSocket(conn)

// Connection closes when function returns. Since uibroadcast is writing and we don't need to read anything (for now), just keep it busy.
for {
lastUpdate := <-weatherUpdate
weatherJSON, _ := json.Marshal(&lastUpdate)
conn.Write(weatherJSON)
buf := make([]byte, 1024)
_, err := conn.Read(buf)
if err != nil {
break
}
if buf[0] != 0 { // Dummy.
continue
}
time.Sleep(1 * time.Second)
}
}

Expand All @@ -50,10 +54,20 @@ func handleTrafficWS(conn *websocket.Conn) {
conn.Write(trafficJSON)
}
trafficMutex.Unlock()
// Subscribe the socket to receive updates.
trafficUpdate.AddSocket(conn)

// Connection closes when function returns. Since uibroadcast is writing and we don't need to read anything (for now), just keep it busy.
for {
lastUpdate := <-trafficUpdate
trafficJSON, _ := json.Marshal(&lastUpdate)
conn.Write(trafficJSON)
buf := make([]byte, 1024)
_, err := conn.Read(buf)
if err != nil {
break
}
if buf[0] != 0 { // Dummy.
continue
}
time.Sleep(1 * time.Second)
}
}

Expand Down Expand Up @@ -183,8 +197,8 @@ func handleSettingsSetRequest(w http.ResponseWriter, r *http.Request) {
}

func managementInterface() {
weatherUpdate = make(chan WeatherMessage, 1024)
trafficUpdate = make(chan TrafficInfo, 1024)
weatherUpdate = NewUIBroadcaster()
trafficUpdate = NewUIBroadcaster()

http.Handle("/", http.FileServer(http.Dir("/var/www")))
http.Handle("/logs/", http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log"))))
Expand All @@ -207,7 +221,6 @@ func managementInterface() {
s.ServeHTTP(w, req)
})


http.HandleFunc("/getSituation", handleSituationRequest)
http.HandleFunc("/getTowers", handleTowersRequest)
http.HandleFunc("/getSettings", handleSettingsGetRequest)
Expand Down
8 changes: 6 additions & 2 deletions main/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"bufio"
"encoding/hex"
"encoding/json"
"math"
"net"
"strconv"
Expand Down Expand Up @@ -102,9 +103,12 @@ func sendTrafficUpdates() {

// Send update to attached client.
func registerTrafficUpdate(ti TrafficInfo) {
if ti.Position_valid { // Don't send unless a valid position exists.
trafficUpdate <- ti
if !ti.Position_valid { // Don't send unless a valid position exists.
return
}

tiJSON, _ := json.Marshal(&ti)
trafficUpdate.Send(tiJSON)
}

func makeTrafficReport(ti TrafficInfo) {
Expand Down
42 changes: 42 additions & 0 deletions main/uibroadcast.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package main

import (
"golang.org/x/net/websocket"
)

type uibroadcaster struct {
sockets []*websocket.Conn
messages chan []byte
}

func NewUIBroadcaster() *uibroadcaster {
ret := &uibroadcaster{
sockets: make([]*websocket.Conn, 0),
messages: make(chan []byte, 1024),
}
go ret.writer()
return ret
}

func (u *uibroadcaster) Send(msg []byte) {
u.messages <- msg
}

func (u *uibroadcaster) AddSocket(sock *websocket.Conn) {
u.sockets = append(u.sockets, sock)
}

func (u *uibroadcaster) writer() {
for {
msg := <-u.messages
// Send to all.
p := make([]*websocket.Conn, 0) // Keep a list of the writeable sockets.
for _, sock := range u.sockets {
_, err := sock.Write(msg)
if err == nil {
p = append(p, sock)
}
}
u.sockets = p // Save the list of writeable sockets.
}
}

0 comments on commit a2a7b11

Please sign in to comment.