(Series) Efficient Workflows in Go with Temporal: Signals & Selectors

Younis Jad
Lyonas
Published in
8 min readJul 23, 2023

--

Orchestrating Success: the Potential of Temporal Signals & Selectors

This Tutorial is part of my recently published series: (Series) Efficient Workflow in Go with Temporal.io

What is Workflow Signals in Temporal

Temporal Signals, similar to webhooks, offer a mechanism for external entities to impact the behavior and advancement of a workflow. However, there is a notable difference in how they operate. While webhooks typically involve a direct modification or interruption of execution.

What is Workflow Selectors in Temporal

Workflow Selectors in Temporal are a feature that allows workflows to dynamically wait for different events or conditions to occur before proceeding further. They enable the workflow to make decisions based on available signals or events, providing flexibility and adaptability to the workflow execution.

The Power of Workflow Selectors With Signals

Workflow Selectors allow workflows to wait for specific Signals to occur, enabling them to synchronize different paths of execution based on the first event that happens. This flexibility grants workflows the ability to make informed decisions and take appropriate actions based on the available Signals.

With the use of Signals, workflows can receive updates, notifications, or requests from external systems or actors. These Signals can trigger wait conditions within the workflow, allowing it to patiently pause until a defined Signal is received. Once the appropriate Signal is received, the workflow can resume execution, processing the information or executing actions as desired.

Example Workflow Overview

In this section, we will discuss an example workflow that demonstrates the seamless integration between Temporal, a REST API, and a Data Layer. This scenario showcases how these technologies can work together to streamline processes and optimize communication between various components.

E -Commerce Order Temporal + Rest Client Workflow Example

The workflow begins with a client making a request to the server. The server, acting as a mediator, processes the request and responds back to the client. However, in the background, the server also initiates a workflow using Temporal.

The workflow involves multiple tasks, distributed among different services. These tasks include creating an order, generating a transaction, and creating shipping details. The server communicates with Temporal, which acts as a central hub for managing and coordinating these tasks.

At various stages of the workflow, the server sends signals to Temporal, indicating key events such as invoice confirmation and shipping confirmation. These signals trigger corresponding actions in Temporal, enabling the associated services to perform necessary actions and update their respective states.

For instance, when the invoice is confirmed, the service responsible for invoice management receives a signal from Temporal. This prompts the service to confirm the invoice and proceed to the next step. Similarly, when shipping is confirmed, the corresponding service performs the necessary actions and updates the workflow status in Temporal.

Implementation Details

Now, let’s dive into the implementation details of the example workflow. We’ll discuss the different code blocks for the Workflow, Activities, and the REST Client.

Preparing Workflow

Before diving into the implementation details of the workflow, it’s essential to make sure that everything is set up correctly. In this section, we will guide you through the necessary steps to prepare the workflow, ensuring a smooth and efficient execution of tasks in your Temporal-based application.

Generic Activity Options

// @param ctx: workflow context
// @param productsString string a slice of product ids "1,2,3,..etc"
// @return temporal error
func CreateOrderWorkflow(ctx workflow.Context, productsString string) error {

return nil
}

Workflow Configurations

We configure the retry options to ensure that our workflow continuously retries each step of the transaction, shipping, and order process until the desired status change is achieved. By incorporating signals and selectors, we can interpret these status changes and determine when to advance to the next step. The retry mechanism allows for a resilient workflow that persists until the necessary conditions are met.

func CreateOrderWorkflow(ctx workflow.Context, productsString string) error {
options := workflow.ActivityOptions{
StartToCloseTimeout: time.Minute * 20,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Minute,
BackoffCoefficient: 2,
MaximumAttempts: 7,
MaximumInterval: time.Hour,
NonRetryableErrorTypes: []string{
"activity_error",
},
},
}

// start workflow with activity options
ctx = workflow.WithActivityOptions(ctx, options)

// configure logger
logger := workflow.GetLogger(ctx)
logger.Info("Starting CreateOrderWorkflow")

// start the activites

return nil
}

Now Let’s enhance our workflow by incorporating activities that represent our business logic. Take a look at the sequence diagram to get a visual understanding of how these activities fit within the overall workflow. By aligning our business logic with the sequence of activities, we can effectively orchestrate and streamline the flow of our workflow.

func CreateOrderWorkflow(ctx workflow.Context, productsString string) error {
// ... workflow difnition

// get running workflowId
workflowId := workflow.GetInfo(ctx).WorkflowExecution.ID

// Create Order
orderId := ""
workflow.ExecuteActivity(ctx, activity.CreateOrder, productsString, workflowId).Get(ctx, orderId)

// Create Transaction
workflow.ExecuteActivity(ctx, activity.CreateTransaction, orderId).Get(ctx, nil)

// Create Shipping
workflow.ExecuteActivity(ctx, activity.CreateShipping, orderId).Get(ctx, nil)

// Confirm Transaction invoice
workflow.ExecuteActivity(ctx, activity.ConfirmInvoice, orderId, confirmInvoiceResult).Get(ctx, nil)

// Confirm Shipping fulfillment
workflow.ExecuteActivity(ctx, activity.ConfirmShipping, orderId, confirmShippingResult).Get(ctx, nil)

// Confirm Order
workflow.ExecuteActivity(ctx, activity.ConfirmOrder, orderId).Get(ctx, nil)

// ... workflow difnition
}

If you observe closely, you will notice that we pass the workflowId as a parameter in our activities. We do this because we will leverage this workflowId in our Data layer to signal our workflow from the client at a later stage. This connectivity between the activities and the data layer allows us to effectively communicate and synchronize the workflow’s progress, ensuring seamless coordination and interaction between different components of our system.

Synchronizing Activities with Signals using Selectors

Currently our workflow will run and retry on failure, and most likely will get stuck at confirmInvoice that’s why we need to setup workflow selectors to receive data from our signals, to asynchronously run activities.

Signals & Selectors

Let’s define the ConfirmInvoice selector and create a channel to receive signals. This will allow us to efficiently handle and process signals related to confirming invoices within our workflow.

Confirm Invoice and Shipping Selector and Signal Channel Implementation:

 // result can be string, struct, other data types.
var confirmInvoiceResult string
var confirmShippingResult string

// workflow selector
invoiceSelector := workflow.NewSelector(ctx)
shippingSelector := workflow.NewSelector(ctx)

// workflow named signal channel
invoiceSignalChan := workflow.GetSignalChannel(ctx, "confirmInvoice")
shippingSignalChan := workflow.GetSignalChannel(ctx, "confirmShipping")

// implement selector reciever via signal channel
invoiceSelector.AddReceive(invoiceSignalChan, func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, &confirmInvoiceResult)
})

shippingSelector.AddReceive(shippingSignalChan, func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, &confirmInvoiceResult)
})

Finally, let’s enhance our workflow by incorporating signals and selectors. This update will allow us to effectively utilize signals for communication and synchronizing different steps within the workflow.

// workflow/order.go
func CreateOrderWorkflow(ctx workflow.Context, productsString string) error {
options := workflow.ActivityOptions{
StartToCloseTimeout: time.Minute * 20,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Minute,
BackoffCoefficient: 2,
MaximumAttempts: 7,
MaximumInterval: time.Hour,
NonRetryableErrorTypes: []string{
"activity_error",
},
},
}

// start workflow with activity options
ctx = workflow.WithActivityOptions(ctx, options)

// configure logger
logger := workflow.GetLogger(ctx)
logger.Info("Starting CreateOrderWorkflow")

// get running workflowId
workflowId := workflow.GetInfo(ctx).WorkflowExecution.ID

// result can be string, struct, other data types.
var confirmInvoiceResult string
var confirmShippingResult string

// workflow selector
invoiceSelector := workflow.NewSelector(ctx)
shippingSelector := workflow.NewSelector(ctx)

// workflow named signal channel
invoiceSignalChan := workflow.GetSignalChannel(ctx, "confirmInvoice")
shippingSignalChan := workflow.GetSignalChannel(ctx, "confirmShipping")

// implement selector reciever via signal channel
invoiceSelector.AddReceive(invoiceSignalChan, func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, &confirmInvoiceResult)
})

shippingSelector.AddReceive(shippingSignalChan, func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, &confirmInvoiceResult)
})

// start the activites

// Create Order
orderId := ""
workflow.ExecuteActivity(ctx, activity.CreateOrder, productsString, workflowId).Get(ctx, orderId)

// Create Transaction
workflow.ExecuteActivity(ctx, activity.CreateTransaction, orderId).Get(ctx, nil)

// Create Shipping
workflow.ExecuteActivity(ctx, activity.CreateShipping, orderId).Get(ctx, nil)

invoiceSelector.Select(ctx)

// Confirm Transaction invoice
workflow.ExecuteActivity(ctx, activity.ConfirmInvoice, orderId, confirmInvoiceResult).Get(ctx, nil)

shippingSelector.Select(ctx)

// Confirm Shipping fulfillment
workflow.ExecuteActivity(ctx, activity.ConfirmShipping, orderId, confirmShippingResult).Get(ctx, nil)

// Confirm Order
workflow.ExecuteActivity(ctx, activity.ConfirmOrder, orderId).Get(ctx, nil)

return nil
}

In our final code implementation, we have incorporated the selector.Select(ctx) statement to initiate the workflow’s wait for a selector fulfillment signal. This particular statement pauses the workflow, awaiting the arrival of a signal. Once a signal is received, the code proceeds to execute the subsequent activities. This implementation guarantees that our workflow maintains responsiveness and dynamically progresses forward as soon as the required signal is detected.

Preparing Rest Client

Preparing REST ClientTo interact with external systems or services, enabling seamless communication between your Temporal-based application and the desired REST APIs. Get ready to harness the power of REST APIs in your Temporal workflows.

  • Setup Temporal Worker
  • Register Workflows and Activities
  • Create http server and setup routes
  • Start Temporal Worker as a goroutine
// service/main.go

func Server() {
// set up the worker
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("unable to create Temporal client", err)
}
defer c.Close()

w := worker.New(c, "cart", worker.Options{})
w.RegisterWorkflow(workflow.CreateOrderWorkflow)
w.RegisterActivity(activity.CreateOrder)
w.RegisterActivity(activity.CreateTransaction)
w.RegisterActivity(activity.CreateShipping)
w.RegisterActivity(activity.ConfirmShipping)
w.RegisterActivity(activity.ConfirmInvoice)
w.RegisterActivity(activity.CreateInvoice)

mux := http.NewServeMux()
mux.HandleFunc("/cart", CartGetHandler) // curl -X GET http://localhost:5000/cart
mux.HandleFunc("/cart/set", CartSetHandler) // curl -X POST http://localhost:5000/cart/set\?products\=1,2,3
mux.HandleFunc("/order/create", OrderHandler) // curl -X POST http://localhost:5000/order/create?cartId=1
mux.HandleFunc("/order/signal", SignalOrderHandler) // curl -X POST http://localhost:5000/order/signal?orderId=1&signalName=confirmInvoice&status=confirmed

server := &http.Server{Addr: ":5000", Handler: mux}

// start the worker and the web server
go func() {
err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("unable to start Worker", err)
}
}()

log.Fatal(server.ListenAndServe())
}

REST API As a Client

Create Order Handler

By initiating the creation of an order, we kickstart the Temporal workflow. In other words, starting the order creation process will trigger the Temporal workflow to commence and orchestrate the subsequent steps and activities involved in fulfilling the order.

func OrderHandler(w http.ResponseWriter, r *http.Request) {
productsString := r.URL.Query().Get("products")
// create a new temporal client
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("unable to create Temporal client", err)
}
defer c.Close()

_, err = c.ExecuteWorkflow(r.Context(), client.StartWorkflowOptions{
ID: "CreateOrderWorkflow_" + uuid.New().String(),
TaskQueue: "order",
}, workflow.CreateOrderWorkflow, productsString)
if err != nil {
http.Error(w, "unable to start workflow", http.StatusInternalServerError)
return
}

jsonResponse, err := json.Marshal("{status: 'ok'}")
if err != nil {
http.Error(w, "unable to marshal response", http.StatusInternalServerError)
return
}

w.Write(jsonResponse)
}

Signal Handler

a webhook to signal our workflows using workflow and passed params “orderId, signalName, Status as signal data”

the using orderId we get the order details, and extract workflowId which then we will use to signal our workflow with result using the signal name

func SignalOrderHandler(w http.ResponseWriter, r *http.Request) {
orderId := r.URL.Query().Get("orderId")
signalName := r.URL.Query().Get("signal")
orderStatus := r.URL.Query().Get("status")
// create a new temporal client
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("unable to create Temporal client", err)
}
defer c.Close()

// get order from data layer
repo := &repository.Order{}
order, err := repo.GetOrder(r.Context(), orderId)
if err != nil {
http.Error(w, "unable to get order", http.StatusInternalServerError)
return
}

workflowId := order.WorkflowId
runId := "" // we did not store runId we can safely leave it empty
err = c.SignalWorkflow(r.Context(), workflowId, runId, signalName, orderStatus)
if err != nil {
http.Error(w, "unable to signal workflow", http.StatusInternalServerError)
return
}

jsonResponse, err := json.Marshal("{status: 'ok'}")
if err != nil {
http.Error(w, "unable to marshal response", http.StatusInternalServerError)
return
}

w.Write(jsonResponse)
}

Conclusion

In conclusion, the utilization of Temporal workflow signals with selectors offers a powerful mechanism to achieve synchronized and dynamic workflow execution. By leveraging these features, developers can design workflows that respond to real-time events, dynamically wait for specific signals, and make informed decisions based on the available information.

Furthermore, the integration of signals allows workflows to receive updates, notifications, or requests from external systems or actors. This empowers workflows to collaboratively interact with other components, incorporating external input to determine the flow and behavior of the execution.

Thank you for reading our article

on Temporal workflow signals with selectors! You can find the full code on our GitHub repository: https://github.com/unijad/temporal-select-signal-tutorial-code.

If you enjoyed the article, consider following and subscribing to My mailing list for future updates. also appreciate your comments and feedback.

--

--

Younis Jad
Lyonas

Tech Evangelist, Experienced software engineer, Passionate about learning and building innovative, scalable solutions.