2
\$\begingroup\$

I'm sending and receiving data over an interface (serial in this case) with the following behaviour:

  • The receiver sends back an Ack message if a message is delivered successfully.
  • If an Ack is not received within a timeout, the command should be re-sent limited a number of times.
  • Multiple clients may request sending at the same time, but no new commands should be sent until an Ack for the last command is received or it times out. Other sending commands shall wait until the channel is unreserved or they time out.

The algorithm to do this is fairly simple. (Feel free to say so if you think it can be improved):

flow diagram

The implementation in Go works as expected. I just feel there might be a smarter way to do it:

package main

import (
    "fmt"
    "math/rand"
    "time"
)

/****** Simulating the receiver behaviour ******/

var receiveChannel = make(chan string, 1)

// Used to read messages from the interface
func Read() string {
    return <-receiveChannel
}

// Used to write messages to the interface
func Write(data string) {
    // Randomly drop 50% of packets
    if n := rand.Intn(100); n < 50 {
        receiveChannel <- "ACK"
    }
}

/*******************************************/

var ackChannel = make(chan bool, 1)
var sendChannel = make(chan string, 1)

func run() {
    for {
        if Read() == "ACK" {
            ackChannel <- true
        }
    }
}

func sendWrapper(data string) error {
    // Reserve the sending channel
    timeoutTimer := time.NewTimer(1 * time.Second)
    select {
    // This should block until sendChannel has a free spot or times out
    case sendChannel <- data:
        timeoutTimer.Stop()
        fmt.Printf("Send chan reserved for command id=%v\n", data)
    case <-timeoutTimer.C:
        return fmt.Errorf("timed out while waiting for send channel to be free. id=%v", data)
    }

    attempts := 2
    err := send(data, attempts)

    // Free up the sending channel
    select {
    case x := <-sendChannel:
        fmt.Printf("Send channel cleared %v\n", x)
    default:
        fmt.Printf("Send channel is empty. This should never happen!\n")
    }

    return err
}

func send(data string, attempts int) error {
    // Send data
    Write(data)

    // Wait for an ACK to be received
    ackTimeoutTimer := time.NewTimer(time.Millisecond * 100)
    select {
    case <-ackChannel:
        ackTimeoutTimer.Stop()
    case <-ackTimeoutTimer.C:
        // Retry again
        if attempts > 1 {
            return send(data, attempts-1)
        }
        return fmt.Errorf("Timed out while waiting for ack. id=%v", data)
    }
    fmt.Printf("Frame sent and acked id=%v\n", data)
    return nil
}

/****** Testing ******/

func main() {
    go run()
    
    // Multiple goroutines sending data
    for i := 0; i < 7; i++ {
        go func() {
            x := i
            if err := sendWrapper(fmt.Sprint(x)); err != nil {
                fmt.Println(err.Error())
            } else {
                fmt.Printf("Sending successful. i=%v\n", x)
            }
        }()
        time.Sleep(10 * time.Millisecond)
    }
    time.Sleep(4 * time.Second)
}
\$\endgroup\$
3
  • \$\begingroup\$ i would not use async style for a synchronous read/write api. Especially if you can handle only one command at a time. imo, for each write, there is a read with timeout, once the read pass, if it timedout, retry, if it acked process next command. Also it is unclear if multiple clients will be querying the same interface concurrently or not. \$\endgroup\$
    – mh-cbon
    Commented Jun 8, 2020 at 19:31
  • \$\begingroup\$ @mh-cbon thanks for the comment. I updated the question to mention that multiple clients may concurrently request to send, but only one command is sent at a time. This is shown in the first conditional block in the diagram. I'm afraid I don't get what you mean by "i would not use async style for a synchronous read/write api". Would you care to elaborate? Sending is synchronous. Requesting to send, however, is not. \$\endgroup\$ Commented Jun 10, 2020 at 13:51
  • \$\begingroup\$ Please do not update the code in your question to incorporate feedback from answers, doing so goes against the Question + Answer style of Code Review. This is not a forum where you should keep the most updated version in your question. Please see what you may and may not do after receiving answers. \$\endgroup\$
    – Mast
    Commented Jun 10, 2020 at 14:12

1 Answer 1

1
\$\begingroup\$

You have at least one bug.


The closure from your code:

package main

import (
    "fmt"
    "time"
)

func main() {
    for i := 0; i < 7; i++ {
        go func() {
            x := i
            fmt.Printf("Sending successful. i=%v\n", x)
        }()
    }
    time.Sleep(4 * time.Second)
}

Output:

$ go run closure.go
Sending successful. i=7
Sending successful. i=7
Sending successful. i=7
Sending successful. i=7
Sending successful. i=7
Sending successful. i=7
Sending successful. i=7
$

The x := i statement is in the wrong place. It should be:

package main

import (
    "fmt"
    "time"
)

func main() {
    for i := 0; i < 7; i++ {
        x := i
        go func() {
            fmt.Printf("Sending successful. i=%v\n", x)
        }()
    }
    time.Sleep(4 * time.Second)
}

Output:

$ go run closure.go
Sending successful. i=4
Sending successful. i=5
Sending successful. i=1
Sending successful. i=3
Sending successful. i=2
Sending successful. i=0
Sending successful. i=6
$

Reference:

Go: Frequently Asked Questions (FAQ)

What happens with closures running as goroutines?

\$\endgroup\$
0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.