eventual: Design: Async Activities, Timeout, and Heartbeat

Problem Statement:

As a developer/workflow author, I want to create activities that run for indefinite amounts of time, involve human interaction, invoke other services, or wait for the result of outside actions. I should be able to ensure inconsistencies and fault are recoverable from. I should be able to use the service to support idempotency of partial failures.

Stories:

  • Runtime Decision - a single activity can return either sync results or async results, decided at runtime.
  • Durable - async activities can be retried, can be given a timeout, and a heartbeat
  • Heartbeat - activities may be stuck before it’s total timeout. Use a heartbeat to report that the processing is continuing to operate.
  • Complete By Client - Any service with permissions to write to the workflow should be able to complete an activity with data
  • Fail by Client - Any service with permissions to write to the workflow should be able to fail an activity with an error message
  • Consistent Token - Activities should generate a token used to complete, fail, or heartbeat them. That token should encode information for the workflow to interact with the right workflow and activity.
  • Checkpointing - future - - Activities may fail during processing, let the activity report a value on heart beat that is given back to the activity when the workflow is resumed.
  • Cancellation - ??

Strawman

workflow(() => {
	const result = await act1();
});

act1 = activity<{ result: string }>({
   heartbeat: { seconds: number },
   timeout: { seconds: number }
}, (context: Context): { result: string } | AsyncToken => {
   ...doSomeWork...

   await sendToQueue({ token: context.activity.token });

   return makeAsync();
})

// lambda function consuming the queue
const workflowClient = new WorkflowClient();
export const handler = async (event) => {
    Promise.all(event.Records.map(async (record) => {
        const payload = JSON.parse(record.body);
        
        // complete the activity with a payload
	    await workflowClient.completeActivity<typeof act1>(
		    payload.token,
		    { result: "done"}
		);

		// or fail
		await workflowClient.failActivity(payload.token, {result: "done"});
    }));
}

with heartbeat

workflow(() => {
	const result = await act1();
});

act1 = activity<{ result: string }>({
   heartbeat: { seconds: 20 },
   timeout: { seconds: 160 }
}, (context: Context): { result: string } | AsyncToken => {
   ...doSomeWork...

   await sendToQueue({ token: context.activity.token });

   await sendHeartbeat();

   return makeAsync();
})

// lambda function consuming the queue
const workflowClient = new WorkflowClient();
export const handler = async (event) => {
    Promise.all(event.Records.map(async (record) => {
        const payload = JSON.parse(record.body);

		while(true) {
		   // some long process
		   await workflowClient.heartbeatActivity(payload.token);
		}
        
        // complete the activity with a payload
	    await workflowClient.completeActivity<typeof act1>(
		    payload.token,
		    { result: "done"}
		);
    }));
}

with heartbeat checkpoint - FUTURE

workflow(() => {
	const act = act1();

    act.onHeartbeat(async ({ i: 100 }) => {
	    await reportProgress(i);
    });
});

const reportProceess = activity(...);

const act1 = activity<{ result: string }, { i: 100 } | undefined>({
   heartbeat: { seconds: 20 },
   timeout: { seconds: 160 }
}, (context: Context): { result: string } | AsyncToken => {
   ...doSomeWork...

   await sendToQueue({ token: context.activity.token, start: context.checkpoint });

	// should this be on the context to be typed?
   await sendHeartbeat();

   return makeAsync();
})

// lambda function consuming the queue
const workflowClient = new WorkflowClient();
export const handler = async (event) => {
    Promise.all(event.Records.map(async (record) => {
        const payload = JSON.parse(record.body);

		const items = [...];

		const start = event.start ?? 0;

		for(const i of items.slice()) {
		   // some long process
		   await workflowClient.heartbeatActivity<typeof act1>(
			   payload.token,
			   { i }
		   );
		}
        
        // complete the activity with a payload
	    await workflowClient.completeActivity<typeof act1>(
		    payload.token,
		    { result: "done"}
		);
    }));
}

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Comments: 18 (18 by maintainers)

Most upvoted comments

Totally agree. From chats with people, timeout and heartbeat are important parts of long running workflows that would make the service look more legit/complete. Because timers are implemented from sleep, it is easy to create timeouts now and the only new part about heartbeat is adding the client operation.

Will start to work on this and if it proves to be high effort, will push off.