Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| e22732aa7c | |||
| 6928b9f1c9 | |||
| 0ab8064a06 | |||
| c5b0011e30 | |||
| dc30e09c77 | |||
| 3b0530a409 | |||
| 29aa8e0de1 | |||
| 910970f45e | |||
| a699495fc2 | |||
| a41bf39325 | |||
| 58712f8364 | |||
| b2a1f141e0 |
32
CHANGELOG.md
32
CHANGELOG.md
@@ -1,3 +1,35 @@
|
||||
## v1.2.0 — 2026-04-02
|
||||
|
||||
Upstream sync with Python SDK v2.2.0. Adds Workflows API and DeleteBatchJob.
|
||||
|
||||
### Added
|
||||
|
||||
- **Workflows API** (new `workflow/` package) — complete workflow orchestration
|
||||
support with 37 service methods across 8 sub-resources:
|
||||
- **Workflows CRUD** — `ListWorkflows`, `GetWorkflow`, `UpdateWorkflow`,
|
||||
`ArchiveWorkflow`, `UnarchiveWorkflow`, `ExecuteWorkflow`,
|
||||
`ExecuteWorkflowAndWait`.
|
||||
- **Registrations** — `ListWorkflowRegistrations`, `GetWorkflowRegistration`,
|
||||
`ExecuteWorkflowRegistration` (deprecated).
|
||||
- **Executions** — `GetWorkflowExecution`, `GetWorkflowExecutionHistory`,
|
||||
`StreamWorkflowExecution`, `SignalWorkflowExecution`,
|
||||
`QueryWorkflowExecution`, `UpdateWorkflowExecution`,
|
||||
`TerminateWorkflowExecution`, `CancelWorkflowExecution`,
|
||||
`ResetWorkflowExecution`, `BatchCancelWorkflowExecutions`,
|
||||
`BatchTerminateWorkflowExecutions`.
|
||||
- **Trace** — `GetWorkflowExecutionTraceOTel`,
|
||||
`GetWorkflowExecutionTraceSummary`, `GetWorkflowExecutionTraceEvents`.
|
||||
- **Events** — `StreamWorkflowEvents`, `ListWorkflowEvents`.
|
||||
- **Deployments** — `ListWorkflowDeployments`, `GetWorkflowDeployment`.
|
||||
- **Metrics** — `GetWorkflowMetrics`.
|
||||
- **Runs** — `ListWorkflowRuns`, `GetWorkflowRun`, `GetWorkflowRunHistory`.
|
||||
- **Schedules** — `ListWorkflowSchedules`, `ScheduleWorkflow`,
|
||||
`UnscheduleWorkflow`.
|
||||
- **Workers** — `GetWorkflowWorkerInfo`.
|
||||
- **`WorkflowEventStream`** — typed SSE stream wrapper with `StreamPayload`
|
||||
envelope, sealed `Event` interface (17 concrete types + `UnknownEvent`).
|
||||
- **`DeleteBatchJob`** — delete a batch job by ID.
|
||||
|
||||
## v1.1.0 — 2026-03-24
|
||||
|
||||
Upstream sync with Python SDK v2.1.3. Adds Connectors, Audio Speech/Voices, and Observability (beta).
|
||||
|
||||
@@ -34,7 +34,7 @@ No Makefile, linter config, or code generation tooling — standard `go test` /
|
||||
|
||||
### Two-layer design: types in sub-packages, methods on `*Client`
|
||||
|
||||
Sub-packages (`chat/`, `agents/`, `conversation/`, `embedding/`, `model/`, `file/`, `finetune/`, `batch/`, `ocr/`, `audio/`, `library/`, `moderation/`, `classification/`, `fim/`) are **types-only** — they define request/response structs and enums but contain no HTTP logic. All service methods live on `*Client` in the root package, prefix-namespaced by domain (e.g. `ChatComplete`, `AgentsComplete`, `CreateFineTuningJob`, `UploadFile`).
|
||||
Sub-packages (`chat/`, `agents/`, `conversation/`, `embedding/`, `model/`, `file/`, `finetune/`, `batch/`, `ocr/`, `audio/`, `library/`, `moderation/`, `classification/`, `fim/`, `connector/`, `observability/`, `workflow/`) are **types-only** — they define request/response structs and enums but contain no HTTP logic. All service methods live on `*Client` in the root package, prefix-namespaced by domain (e.g. `ChatComplete`, `AgentsComplete`, `CreateFineTuningJob`, `UploadFile`).
|
||||
|
||||
### HTTP internals (request.go)
|
||||
|
||||
|
||||
22
README.md
22
README.md
@@ -11,15 +11,15 @@ The most complete Go client for the [Mistral AI API](https://docs.mistral.ai/).
|
||||
|
||||
**Zero dependencies.** The entire SDK — including tests — uses only the Go standard library. No `go.sum`, no transitive dependency tree to audit, no version conflicts, no supply chain risk.
|
||||
|
||||
**Full API coverage.** 116 methods across every Mistral endpoint — including Connectors, Audio Speech/Voices, Conversations, Agents CRUD, Libraries, OCR, Observability, Fine-tuning, and Batch Jobs. No other Go SDK covers Conversations, Connectors, or Observability.
|
||||
**Full API coverage.** 166 methods across every Mistral endpoint — including Workflows, Connectors, Audio Speech/Voices, Conversations, Agents CRUD, Libraries, OCR, Observability, Fine-tuning, and Batch Jobs. No other Go SDK covers Workflows, Conversations, Connectors, or Observability.
|
||||
|
||||
**Typed streaming.** A generic pull-based `Stream[T]` iterator — no channels, no goroutines, no leaks. Just `Next()` / `Current()` / `Err()` / `Close()`.
|
||||
|
||||
**Forward-compatible.** Unknown types (`UnknownEntry`, `UnknownEvent`, `UnknownMessage`, `UnknownChunk`, `UnknownAgentTool`) capture raw JSON instead of returning errors. When Mistral ships a new message role or event type, your code keeps running — it doesn't panic.
|
||||
**Forward-compatible.** Unknown types (`UnknownEntry`, `UnknownEvent`, `UnknownMessage`, `UnknownChunk`, `UnknownAgentTool`, workflow `UnknownEvent`) capture raw JSON instead of returning errors. When Mistral ships a new message role or event type, your code keeps running — it doesn't panic.
|
||||
|
||||
**Hand-written, not generated.** Idiomatic Go with sealed interfaces, discriminated unions, and functional options — not a Speakeasy/OpenAPI auto-gen dump with `any` everywhere.
|
||||
|
||||
**Test-driven.** 193 tests with race detection clean. Every endpoint tested against mock servers; integration tests against the real API.
|
||||
**Test-driven.** 284 tests with race detection clean. Every endpoint tested against mock servers; integration tests against the real API.
|
||||
|
||||
## Install
|
||||
|
||||
@@ -132,7 +132,7 @@ for stream.Next() {
|
||||
|
||||
## API Coverage
|
||||
|
||||
116 public methods on `Client`, grouped by domain:
|
||||
166 public methods on `Client`, grouped by domain:
|
||||
|
||||
| Domain | Methods |
|
||||
|--------|---------|
|
||||
@@ -146,7 +146,7 @@ for stream.Next() {
|
||||
| **Files** | `UploadFile`, `ListFiles`, `GetFile`, `DeleteFile`, `GetFileContent`, `GetFileURL` |
|
||||
| **Embeddings** | `CreateEmbeddings` |
|
||||
| **Fine-tuning** | `CreateFineTuningJob`, `ListFineTuningJobs`, `GetFineTuningJob`, `CancelFineTuningJob`, `StartFineTuningJob`, `UpdateFineTunedModel`, `ArchiveFineTunedModel`, `UnarchiveFineTunedModel` |
|
||||
| **Batch** | `CreateBatchJob`, `ListBatchJobs`, `GetBatchJob`, `CancelBatchJob` |
|
||||
| **Batch** | `CreateBatchJob`, `ListBatchJobs`, `GetBatchJob`, `CancelBatchJob`, `DeleteBatchJob` |
|
||||
| **OCR** | `OCR` |
|
||||
| **Audio (transcription)** | `Transcribe`, `TranscribeStream` |
|
||||
| **Audio (speech)** | `Speech`, `SpeechStream` |
|
||||
@@ -158,6 +158,16 @@ for stream.Next() {
|
||||
| **Observability (events)** | `SearchChatCompletionEvents`, `SearchChatCompletionEventIDs`, `GetChatCompletionEvent`, `GetSimilarChatCompletionEvents`, `JudgeChatCompletionEvent` |
|
||||
| **Observability (judges)** | `CreateJudge`, `ListJudges`, `GetJudge`, `UpdateJudge`, `DeleteJudge`, `JudgeConversation` |
|
||||
| **Observability (datasets)** | `CreateDataset`, `ListDatasets`, `GetDataset`, `UpdateDataset`, `DeleteDataset`, `ExportDatasetToJSONL`, `ListDatasetRecords`, `CreateDatasetRecord`, `GetDatasetRecord`, `UpdateDatasetRecordPayload`, `UpdateDatasetRecordProperties`, `DeleteDatasetRecord`, `BulkDeleteDatasetRecords`, `JudgeDatasetRecord`, `ImportDatasetFromCampaign`, `ImportDatasetFromExplorer`, `ImportDatasetFromFile`, `ImportDatasetFromPlayground`, `ImportDatasetFromDataset`, `ListDatasetTasks`, `GetDatasetTask` |
|
||||
| **Workflows (CRUD)** | `ListWorkflows`, `GetWorkflow`, `UpdateWorkflow`, `ArchiveWorkflow`, `UnarchiveWorkflow`, `ExecuteWorkflow`, `ExecuteWorkflowAndWait` |
|
||||
| **Workflows (registrations)** | `ListWorkflowRegistrations`, `GetWorkflowRegistration`, `ExecuteWorkflowRegistration` |
|
||||
| **Workflows (executions)** | `GetWorkflowExecution`, `GetWorkflowExecutionHistory`, `StreamWorkflowExecution`, `SignalWorkflowExecution`, `QueryWorkflowExecution`, `UpdateWorkflowExecution`, `TerminateWorkflowExecution`, `CancelWorkflowExecution`, `ResetWorkflowExecution`, `BatchCancelWorkflowExecutions`, `BatchTerminateWorkflowExecutions` |
|
||||
| **Workflows (trace)** | `GetWorkflowExecutionTraceOTel`, `GetWorkflowExecutionTraceSummary`, `GetWorkflowExecutionTraceEvents` |
|
||||
| **Workflows (events)** | `StreamWorkflowEvents`, `ListWorkflowEvents` |
|
||||
| **Workflows (deployments)** | `ListWorkflowDeployments`, `GetWorkflowDeployment` |
|
||||
| **Workflows (metrics)** | `GetWorkflowMetrics` |
|
||||
| **Workflows (runs)** | `ListWorkflowRuns`, `GetWorkflowRun`, `GetWorkflowRunHistory` |
|
||||
| **Workflows (schedules)** | `ListWorkflowSchedules`, `ScheduleWorkflow`, `UnscheduleWorkflow` |
|
||||
| **Workflows (workers)** | `GetWorkflowWorkerInfo` |
|
||||
|
||||
## Comparison
|
||||
|
||||
@@ -176,6 +186,7 @@ There is no official Go SDK from Mistral AI (only Python and TypeScript). The ma
|
||||
| Fine-tuning / Batch | Yes | No | No | No |
|
||||
| OCR | Yes | No | No | Yes |
|
||||
| Audio (transcription + TTS + voices) | Yes | No | No | No |
|
||||
| Workflows API | Yes | No | No | No |
|
||||
| Observability (beta) | Yes | No | No | No |
|
||||
| Moderation / Classification | Yes | No | No | No |
|
||||
| Vision (multimodal) | Yes | No | No | Yes |
|
||||
@@ -230,6 +241,7 @@ as its upstream reference for API surface and type definitions.
|
||||
|
||||
| SDK Version | Upstream Python SDK |
|
||||
|-------------|---------------------|
|
||||
| v1.2.0 | v2.2.0 |
|
||||
| v1.1.0 | v2.1.3 |
|
||||
| v1.0.0 | v2.0.4 |
|
||||
|
||||
|
||||
@@ -60,3 +60,10 @@ type ListParams struct {
|
||||
Status []string
|
||||
OrderBy *string
|
||||
}
|
||||
|
||||
// DeleteResponse is the response from deleting a batch job.
|
||||
type DeleteResponse struct {
|
||||
ID string `json:"id"`
|
||||
Object string `json:"object"`
|
||||
Deleted bool `json:"deleted"`
|
||||
}
|
||||
|
||||
@@ -76,3 +76,12 @@ func (c *Client) CancelBatchJob(ctx context.Context, jobID string) (*batch.JobOu
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
// DeleteBatchJob deletes a batch job.
|
||||
func (c *Client) DeleteBatchJob(ctx context.Context, jobID string) (*batch.DeleteResponse, error) {
|
||||
var resp batch.DeleteResponse
|
||||
if err := c.doJSON(ctx, "DELETE", fmt.Sprintf("/v1/batch/jobs/%s", jobID), nil, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
@@ -121,3 +121,30 @@ func TestCancelBatchJob_Success(t *testing.T) {
|
||||
t.Errorf("got status %q", job.Status)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeleteBatchJob_Success(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != "DELETE" {
|
||||
t.Errorf("expected DELETE, got %s", r.Method)
|
||||
}
|
||||
if r.URL.Path != "/v1/batch/jobs/batch-123" {
|
||||
t.Errorf("got path %s", r.URL.Path)
|
||||
}
|
||||
json.NewEncoder(w).Encode(map[string]any{
|
||||
"id": "batch-123", "object": "batch", "deleted": true,
|
||||
})
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := NewClient("key", WithBaseURL(server.URL))
|
||||
resp, err := client.DeleteBatchJob(context.Background(), "batch-123")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if resp.ID != "batch-123" {
|
||||
t.Errorf("got id %q", resp.ID)
|
||||
}
|
||||
if !resp.Deleted {
|
||||
t.Error("expected deleted=true")
|
||||
}
|
||||
}
|
||||
|
||||
2966
docs/superpowers/plans/2026-04-02-workflows-api.md
Normal file
2966
docs/superpowers/plans/2026-04-02-workflows-api.md
Normal file
File diff suppressed because it is too large
Load Diff
650
docs/superpowers/specs/2026-04-02-workflows-api-design.md
Normal file
650
docs/superpowers/specs/2026-04-02-workflows-api-design.md
Normal file
@@ -0,0 +1,650 @@
|
||||
# Workflows API Integration — Design Spec
|
||||
|
||||
**Date:** 2026-04-02
|
||||
**Upstream:** Mistral Python SDK v2.2.0 (released 2026-03-31)
|
||||
**SDK version:** v1.2.0
|
||||
**Scope:** Full parity with Python SDK v2.2.0 changes since v2.1.3
|
||||
|
||||
## Summary
|
||||
|
||||
Add the Workflows API (37 new methods) and `DeleteBatchJob` (1 method) to the Go SDK.
|
||||
This is purely additive — no breaking changes to existing API surface.
|
||||
|
||||
## New Package: `workflow/`
|
||||
|
||||
Types-only package following the two-layer architecture. 8 type files + `doc.go`.
|
||||
|
||||
### `workflow/doc.go`
|
||||
|
||||
Package documentation.
|
||||
|
||||
### `workflow/workflow.go` — Core CRUD types
|
||||
|
||||
```go
|
||||
type Workflow struct {
|
||||
ID string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
DisplayName *string `json:"display_name,omitempty"`
|
||||
Description *string `json:"description,omitempty"`
|
||||
OwnerID string `json:"owner_id"`
|
||||
WorkspaceID string `json:"workspace_id"`
|
||||
AvailableInChatAssistant bool `json:"available_in_chat_assistant"`
|
||||
Archived bool `json:"archived"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
UpdatedAt string `json:"updated_at"`
|
||||
}
|
||||
|
||||
type WorkflowUpdateRequest struct {
|
||||
DisplayName *string `json:"display_name,omitempty"`
|
||||
Description *string `json:"description,omitempty"`
|
||||
AvailableInChatAssistant *bool `json:"available_in_chat_assistant,omitempty"`
|
||||
}
|
||||
|
||||
type WorkflowListResponse struct {
|
||||
Workflows []Workflow `json:"workflows"`
|
||||
NextCursor *string `json:"next_cursor,omitempty"`
|
||||
}
|
||||
|
||||
type WorkflowListParams struct {
|
||||
ActiveOnly *bool
|
||||
IncludeShared *bool
|
||||
AvailableInChatAssistant *bool
|
||||
Archived *bool
|
||||
Cursor *string
|
||||
Limit *int
|
||||
}
|
||||
|
||||
type WorkflowArchiveResponse struct {
|
||||
ID string `json:"id"`
|
||||
Archived bool `json:"archived"`
|
||||
}
|
||||
```
|
||||
|
||||
### `workflow/execution.go` — Execution types
|
||||
|
||||
```go
|
||||
type ExecutionStatus string
|
||||
|
||||
const (
|
||||
ExecutionRunning ExecutionStatus = "RUNNING"
|
||||
ExecutionCompleted ExecutionStatus = "COMPLETED"
|
||||
ExecutionFailed ExecutionStatus = "FAILED"
|
||||
ExecutionCanceled ExecutionStatus = "CANCELED"
|
||||
ExecutionTerminated ExecutionStatus = "TERMINATED"
|
||||
ExecutionContinuedAsNew ExecutionStatus = "CONTINUED_AS_NEW"
|
||||
ExecutionTimedOut ExecutionStatus = "TIMED_OUT"
|
||||
ExecutionRetryingAfterErr ExecutionStatus = "RETRYING_AFTER_ERROR"
|
||||
)
|
||||
|
||||
type ExecutionRequest struct {
|
||||
ExecutionID *string `json:"execution_id,omitempty"`
|
||||
Input map[string]any `json:"input,omitempty"`
|
||||
EncodedInput *NetworkEncodedInput `json:"encoded_input,omitempty"`
|
||||
WaitForResult bool `json:"wait_for_result,omitempty"`
|
||||
TimeoutSeconds *float64 `json:"timeout_seconds,omitempty"`
|
||||
CustomTracingAttributes map[string]string `json:"custom_tracing_attributes,omitempty"`
|
||||
DeploymentName *string `json:"deployment_name,omitempty"`
|
||||
}
|
||||
|
||||
type ExecutionResponse struct {
|
||||
WorkflowName string `json:"workflow_name"`
|
||||
ExecutionID string `json:"execution_id"`
|
||||
RootExecutionID string `json:"root_execution_id"`
|
||||
Status ExecutionStatus `json:"status"`
|
||||
StartTime string `json:"start_time"`
|
||||
EndTime *string `json:"end_time,omitempty"`
|
||||
Result any `json:"result,omitempty"`
|
||||
ParentExecutionID *string `json:"parent_execution_id,omitempty"`
|
||||
TotalDurationMs *int `json:"total_duration_ms,omitempty"`
|
||||
}
|
||||
|
||||
type NetworkEncodedInput struct {
|
||||
B64Payload string `json:"b64payload"`
|
||||
EncodingOptions []string `json:"encoding_options,omitempty"`
|
||||
Empty bool `json:"empty,omitempty"`
|
||||
}
|
||||
|
||||
type SignalInvocationBody struct {
|
||||
Name string `json:"name"`
|
||||
Input any `json:"input"`
|
||||
}
|
||||
|
||||
type SignalResponse struct {
|
||||
Message string `json:"message"` // default: "Signal accepted"
|
||||
}
|
||||
|
||||
type QueryInvocationBody struct {
|
||||
Name string `json:"name"`
|
||||
Input any `json:"input,omitempty"`
|
||||
}
|
||||
|
||||
type QueryResponse struct {
|
||||
QueryName string `json:"query_name"`
|
||||
Result any `json:"result"`
|
||||
}
|
||||
|
||||
type UpdateInvocationBody struct {
|
||||
Name string `json:"name"`
|
||||
Input any `json:"input,omitempty"`
|
||||
}
|
||||
|
||||
type UpdateResponse struct {
|
||||
UpdateName string `json:"update_name"`
|
||||
Result any `json:"result"`
|
||||
}
|
||||
|
||||
// Trace response types
|
||||
|
||||
type TraceOTelResponse struct {
|
||||
WorkflowName string `json:"workflow_name"`
|
||||
ExecutionID string `json:"execution_id"`
|
||||
RootExecutionID string `json:"root_execution_id"`
|
||||
Status *ExecutionStatus `json:"status"`
|
||||
StartTime string `json:"start_time"`
|
||||
EndTime *string `json:"end_time,omitempty"`
|
||||
Result any `json:"result"`
|
||||
DataSource string `json:"data_source"`
|
||||
ParentExecutionID *string `json:"parent_execution_id,omitempty"`
|
||||
TotalDurationMs *int `json:"total_duration_ms,omitempty"`
|
||||
OTelTraceID *string `json:"otel_trace_id,omitempty"`
|
||||
OTelTraceData any `json:"otel_trace_data,omitempty"`
|
||||
}
|
||||
|
||||
type TraceSummaryResponse struct {
|
||||
WorkflowName string `json:"workflow_name"`
|
||||
ExecutionID string `json:"execution_id"`
|
||||
RootExecutionID string `json:"root_execution_id"`
|
||||
Status *ExecutionStatus `json:"status"`
|
||||
StartTime string `json:"start_time"`
|
||||
EndTime *string `json:"end_time,omitempty"`
|
||||
Result any `json:"result"`
|
||||
ParentExecutionID *string `json:"parent_execution_id,omitempty"`
|
||||
TotalDurationMs *int `json:"total_duration_ms,omitempty"`
|
||||
SpanTree any `json:"span_tree,omitempty"`
|
||||
}
|
||||
|
||||
type TraceEventsResponse struct {
|
||||
WorkflowName string `json:"workflow_name"`
|
||||
ExecutionID string `json:"execution_id"`
|
||||
RootExecutionID string `json:"root_execution_id"`
|
||||
Status *ExecutionStatus `json:"status"`
|
||||
StartTime string `json:"start_time"`
|
||||
EndTime *string `json:"end_time,omitempty"`
|
||||
Result any `json:"result"`
|
||||
ParentExecutionID *string `json:"parent_execution_id,omitempty"`
|
||||
TotalDurationMs *int `json:"total_duration_ms,omitempty"`
|
||||
Events []json.RawMessage `json:"events,omitempty"`
|
||||
}
|
||||
|
||||
type TraceEventsParams struct {
|
||||
MergeSameIDEvents *bool
|
||||
IncludeInternalEvents *bool
|
||||
}
|
||||
|
||||
type ResetInvocationBody struct {
|
||||
EventID int `json:"event_id"`
|
||||
Reason *string `json:"reason,omitempty"`
|
||||
ExcludeSignals bool `json:"exclude_signals,omitempty"`
|
||||
ExcludeUpdates bool `json:"exclude_updates,omitempty"`
|
||||
}
|
||||
|
||||
type BatchExecutionBody struct {
|
||||
ExecutionIDs []string `json:"execution_ids"`
|
||||
}
|
||||
|
||||
type BatchExecutionResponse struct {
|
||||
Results map[string]BatchExecutionResult `json:"results,omitempty"`
|
||||
}
|
||||
|
||||
type BatchExecutionResult struct {
|
||||
Status string `json:"status"`
|
||||
Error *string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
type StreamParams struct {
|
||||
EventSource *EventSource
|
||||
LastEventID *string
|
||||
}
|
||||
```
|
||||
|
||||
### `workflow/event.go` — Sealed event interface + 17 variants
|
||||
|
||||
Discriminator field: `event_type`
|
||||
|
||||
```go
|
||||
type Event interface {
|
||||
workflowEvent()
|
||||
EventType() EventType
|
||||
}
|
||||
|
||||
type EventType string
|
||||
|
||||
const (
|
||||
EventWorkflowStarted EventType = "WORKFLOW_EXECUTION_STARTED"
|
||||
EventWorkflowCompleted EventType = "WORKFLOW_EXECUTION_COMPLETED"
|
||||
EventWorkflowFailed EventType = "WORKFLOW_EXECUTION_FAILED"
|
||||
EventWorkflowCanceled EventType = "WORKFLOW_EXECUTION_CANCELED"
|
||||
EventWorkflowContinuedAsNew EventType = "WORKFLOW_EXECUTION_CONTINUED_AS_NEW"
|
||||
EventWorkflowTaskTimedOut EventType = "WORKFLOW_TASK_TIMED_OUT"
|
||||
EventWorkflowTaskFailed EventType = "WORKFLOW_TASK_FAILED"
|
||||
EventCustomTaskStarted EventType = "CUSTOM_TASK_STARTED"
|
||||
EventCustomTaskInProgress EventType = "CUSTOM_TASK_IN_PROGRESS"
|
||||
EventCustomTaskCompleted EventType = "CUSTOM_TASK_COMPLETED"
|
||||
EventCustomTaskFailed EventType = "CUSTOM_TASK_FAILED"
|
||||
EventCustomTaskTimedOut EventType = "CUSTOM_TASK_TIMED_OUT"
|
||||
EventCustomTaskCanceled EventType = "CUSTOM_TASK_CANCELED"
|
||||
EventActivityTaskStarted EventType = "ACTIVITY_TASK_STARTED"
|
||||
EventActivityTaskCompleted EventType = "ACTIVITY_TASK_COMPLETED"
|
||||
EventActivityTaskRetrying EventType = "ACTIVITY_TASK_RETRYING"
|
||||
EventActivityTaskFailed EventType = "ACTIVITY_TASK_FAILED"
|
||||
)
|
||||
|
||||
type EventSource string
|
||||
|
||||
const (
|
||||
EventSourceDatabase EventSource = "DATABASE"
|
||||
EventSourceLive EventSource = "LIVE"
|
||||
)
|
||||
|
||||
type Scope string
|
||||
|
||||
const (
|
||||
ScopeActivity Scope = "activity"
|
||||
ScopeWorkflow Scope = "workflow"
|
||||
ScopeAll Scope = "*"
|
||||
)
|
||||
```
|
||||
|
||||
Each concrete event type has common fields + type-specific attributes:
|
||||
|
||||
```go
|
||||
// Common fields embedded in all event types
|
||||
type eventBase struct {
|
||||
ID string `json:"event_id"`
|
||||
Timestamp int64 `json:"event_timestamp"`
|
||||
RootWorkflowExecID string `json:"root_workflow_exec_id"`
|
||||
ParentWorkflowExecID *string `json:"parent_workflow_exec_id"`
|
||||
WorkflowExecID string `json:"workflow_exec_id"`
|
||||
WorkflowRunID string `json:"workflow_run_id"`
|
||||
WorkflowName string `json:"workflow_name"`
|
||||
}
|
||||
|
||||
// Example concrete types:
|
||||
|
||||
type WorkflowExecutionStartedEvent struct {
|
||||
eventBase
|
||||
Attributes WorkflowStartedAttributes `json:"attributes"`
|
||||
}
|
||||
func (WorkflowExecutionStartedEvent) workflowEvent() {}
|
||||
func (WorkflowExecutionStartedEvent) EventType() EventType { return EventWorkflowStarted }
|
||||
|
||||
type WorkflowExecutionCompletedEvent struct {
|
||||
eventBase
|
||||
Attributes WorkflowCompletedAttributes `json:"attributes"`
|
||||
}
|
||||
// ... pattern repeats for all 17 types
|
||||
|
||||
type UnknownEvent struct {
|
||||
eventBase
|
||||
RawType string
|
||||
Raw json.RawMessage
|
||||
}
|
||||
```
|
||||
|
||||
SSE envelope types:
|
||||
|
||||
```go
|
||||
type StreamPayload struct {
|
||||
Stream string `json:"stream"`
|
||||
Data json.RawMessage `json:"data"`
|
||||
WorkflowContext StreamWorkflowContext `json:"workflow_context"`
|
||||
BrokerSequence int `json:"broker_sequence"`
|
||||
Timestamp *string `json:"timestamp,omitempty"`
|
||||
Metadata map[string]any `json:"metadata,omitempty"`
|
||||
}
|
||||
|
||||
type StreamWorkflowContext struct {
|
||||
Namespace string `json:"namespace"`
|
||||
WorkflowName string `json:"workflow_name"`
|
||||
WorkflowExecID string `json:"workflow_exec_id"`
|
||||
ParentWorkflowExecID *string `json:"parent_workflow_exec_id,omitempty"`
|
||||
RootWorkflowExecID *string `json:"root_workflow_exec_id,omitempty"`
|
||||
}
|
||||
|
||||
func UnmarshalEvent(data json.RawMessage) (Event, error)
|
||||
// Probes event_type discriminator, dispatches to concrete type.
|
||||
// Unknown event_type returns UnknownEvent.
|
||||
```
|
||||
|
||||
### `workflow/deployment.go`
|
||||
|
||||
```go
|
||||
type Deployment struct {
|
||||
ID string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
IsActive bool `json:"is_active"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
UpdatedAt string `json:"updated_at"`
|
||||
}
|
||||
|
||||
type DeploymentListResponse struct {
|
||||
Deployments []Deployment `json:"deployments"`
|
||||
}
|
||||
|
||||
type DeploymentListParams struct {
|
||||
ActiveOnly *bool
|
||||
WorkflowName *string
|
||||
}
|
||||
```
|
||||
|
||||
### `workflow/metrics.go`
|
||||
|
||||
```go
|
||||
type Metrics struct {
|
||||
ExecutionCount ScalarMetric `json:"execution_count"`
|
||||
SuccessCount ScalarMetric `json:"success_count"`
|
||||
ErrorCount ScalarMetric `json:"error_count"`
|
||||
AverageLatencyMs ScalarMetric `json:"average_latency_ms"`
|
||||
LatencyOverTime TimeSeriesMetric `json:"latency_over_time"`
|
||||
RetryRate ScalarMetric `json:"retry_rate"`
|
||||
}
|
||||
|
||||
type ScalarMetric struct {
|
||||
Value float64 `json:"value"`
|
||||
}
|
||||
|
||||
type TimeSeriesMetric struct {
|
||||
Value [][]float64 `json:"value"`
|
||||
}
|
||||
|
||||
type MetricsParams struct {
|
||||
StartTime *string
|
||||
EndTime *string
|
||||
}
|
||||
```
|
||||
|
||||
### `workflow/run.go`
|
||||
|
||||
```go
|
||||
type Run struct {
|
||||
ID string `json:"id"`
|
||||
WorkflowName string `json:"workflow_name"`
|
||||
ExecutionID string `json:"execution_id"`
|
||||
Status ExecutionStatus `json:"status"`
|
||||
StartTime string `json:"start_time"`
|
||||
EndTime *string `json:"end_time,omitempty"`
|
||||
}
|
||||
|
||||
type ListRunsResponse struct {
|
||||
Runs []Run `json:"runs"`
|
||||
NextPageToken *string `json:"next_page_token,omitempty"`
|
||||
}
|
||||
|
||||
type RunListParams struct {
|
||||
WorkflowIdentifier *string
|
||||
Search *string
|
||||
Status *string
|
||||
PageSize *int
|
||||
NextPageToken *string
|
||||
}
|
||||
```
|
||||
|
||||
### `workflow/schedule.go`
|
||||
|
||||
```go
|
||||
type ScheduleRequest struct {
|
||||
Schedule ScheduleDefinition `json:"schedule"`
|
||||
WorkflowRegistrationID *string `json:"workflow_registration_id,omitempty"`
|
||||
WorkflowIdentifier *string `json:"workflow_identifier,omitempty"`
|
||||
ScheduleID *string `json:"schedule_id,omitempty"`
|
||||
DeploymentName *string `json:"deployment_name,omitempty"`
|
||||
}
|
||||
|
||||
type ScheduleDefinition struct {
|
||||
Input any `json:"input"`
|
||||
Calendars []ScheduleCalendar `json:"calendars,omitempty"`
|
||||
Intervals []ScheduleInterval `json:"intervals,omitempty"`
|
||||
CronExpressions []string `json:"cron_expressions,omitempty"`
|
||||
Skip []ScheduleCalendar `json:"skip,omitempty"`
|
||||
StartAt *string `json:"start_at,omitempty"`
|
||||
EndAt *string `json:"end_at,omitempty"`
|
||||
Jitter *string `json:"jitter,omitempty"`
|
||||
TimeZoneName *string `json:"time_zone_name,omitempty"`
|
||||
Policy *SchedulePolicy `json:"policy,omitempty"`
|
||||
}
|
||||
|
||||
type ScheduleCalendar struct {
|
||||
Second []ScheduleRange `json:"second,omitempty"`
|
||||
Minute []ScheduleRange `json:"minute,omitempty"`
|
||||
Hour []ScheduleRange `json:"hour,omitempty"`
|
||||
DayOfMonth []ScheduleRange `json:"day_of_month,omitempty"`
|
||||
Month []ScheduleRange `json:"month,omitempty"`
|
||||
Year []ScheduleRange `json:"year,omitempty"`
|
||||
DayOfWeek []ScheduleRange `json:"day_of_week,omitempty"`
|
||||
Comment *string `json:"comment,omitempty"`
|
||||
}
|
||||
|
||||
type ScheduleRange struct {
|
||||
Start int `json:"start"`
|
||||
End int `json:"end,omitempty"`
|
||||
Step int `json:"step,omitempty"`
|
||||
}
|
||||
|
||||
type ScheduleInterval struct {
|
||||
Every string `json:"every"`
|
||||
Offset *string `json:"offset,omitempty"`
|
||||
}
|
||||
|
||||
type SchedulePolicy struct {
|
||||
CatchupWindowSeconds int `json:"catchup_window_seconds,omitempty"`
|
||||
Overlap *int `json:"overlap,omitempty"`
|
||||
PauseOnFailure bool `json:"pause_on_failure,omitempty"`
|
||||
}
|
||||
|
||||
type ScheduleResponse struct {
|
||||
ScheduleID string `json:"schedule_id"`
|
||||
}
|
||||
|
||||
type ScheduleListResponse struct {
|
||||
Schedules []Schedule `json:"schedules"`
|
||||
}
|
||||
|
||||
type Schedule struct {
|
||||
ScheduleID string `json:"schedule_id"`
|
||||
Definition ScheduleDefinition `json:"definition"`
|
||||
WorkflowName string `json:"workflow_name"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
UpdatedAt string `json:"updated_at"`
|
||||
}
|
||||
```
|
||||
|
||||
### `workflow/registration.go`
|
||||
|
||||
```go
|
||||
type Registration struct {
|
||||
ID string `json:"id"`
|
||||
WorkflowID string `json:"workflow_id"`
|
||||
TaskQueue string `json:"task_queue"`
|
||||
Workflow *Workflow `json:"workflow,omitempty"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
UpdatedAt string `json:"updated_at"`
|
||||
}
|
||||
|
||||
type RegistrationListResponse struct {
|
||||
Registrations []Registration `json:"registrations"`
|
||||
NextCursor *string `json:"next_cursor,omitempty"`
|
||||
}
|
||||
|
||||
type RegistrationListParams struct {
|
||||
WorkflowID *string
|
||||
TaskQueue *string
|
||||
ActiveOnly *bool
|
||||
IncludeShared *bool
|
||||
WorkflowSearch *string
|
||||
Archived *bool
|
||||
WithWorkflow *bool
|
||||
AvailableInChatAssistant *bool
|
||||
Limit *int
|
||||
Cursor *string
|
||||
}
|
||||
|
||||
type RegistrationGetParams struct {
|
||||
WithWorkflow *bool
|
||||
IncludeShared *bool
|
||||
}
|
||||
|
||||
type WorkerInfo struct {
|
||||
SchedulerURL string `json:"scheduler_url"`
|
||||
Namespace string `json:"namespace"`
|
||||
TLS bool `json:"tls"`
|
||||
}
|
||||
```
|
||||
|
||||
## Service Methods (root package)
|
||||
|
||||
### `workflows.go` — 10 methods
|
||||
|
||||
| Method | HTTP | Path |
|
||||
|--------|------|------|
|
||||
| `ListWorkflows` | GET | `/v1/workflows` |
|
||||
| `GetWorkflow` | GET | `/v1/workflows/{id}` |
|
||||
| `UpdateWorkflow` | PUT | `/v1/workflows/{id}` |
|
||||
| `ArchiveWorkflow` | PUT | `/v1/workflows/{id}/archive` |
|
||||
| `UnarchiveWorkflow` | PUT | `/v1/workflows/{id}/unarchive` |
|
||||
| `ExecuteWorkflow` | POST | `/v1/workflows/{id}/execute` |
|
||||
| `ListWorkflowRegistrations` | GET | `/v1/workflows/registrations` |
|
||||
| `GetWorkflowRegistration` | GET | `/v1/workflows/registrations/{id}` |
|
||||
| `ExecuteWorkflowRegistration` | POST | `/v1/workflows/registrations/{id}/execute` |
|
||||
| `ExecuteWorkflowAndWait` | (composite) | execute + poll |
|
||||
|
||||
`ExecuteWorkflowRegistration` is deprecated (doc comment only).
|
||||
|
||||
`ExecuteWorkflowAndWait` calls `ExecuteWorkflow`, then polls `GetWorkflowExecution`
|
||||
in a loop until status is terminal or context is canceled.
|
||||
|
||||
### `workflows_executions.go` — 14 methods
|
||||
|
||||
| Method | HTTP | Path |
|
||||
|--------|------|------|
|
||||
| `GetWorkflowExecution` | GET | `/v1/workflows/executions/{id}` |
|
||||
| `GetWorkflowExecutionHistory` | GET | `/v1/workflows/executions/{id}/history` |
|
||||
| `StreamWorkflowExecution` | GET (SSE) | `/v1/workflows/executions/{id}/stream` |
|
||||
| `SignalWorkflowExecution` | POST | `/v1/workflows/executions/{id}/signals` |
|
||||
| `QueryWorkflowExecution` | POST | `/v1/workflows/executions/{id}/queries` |
|
||||
| `UpdateWorkflowExecution` | POST | `/v1/workflows/executions/{id}/updates` |
|
||||
| `TerminateWorkflowExecution` | POST (204) | `/v1/workflows/executions/{id}/terminate` |
|
||||
| `CancelWorkflowExecution` | POST (204) | `/v1/workflows/executions/{id}/cancel` |
|
||||
| `ResetWorkflowExecution` | POST (204) | `/v1/workflows/executions/{id}/reset` |
|
||||
| `BatchCancelWorkflowExecutions` | POST | `/v1/workflows/executions/cancel` |
|
||||
| `BatchTerminateWorkflowExecutions` | POST | `/v1/workflows/executions/terminate` |
|
||||
| `GetWorkflowExecutionTraceOTel` | GET | `/v1/workflows/executions/{id}/trace/otel` |
|
||||
| `GetWorkflowExecutionTraceSummary` | GET | `/v1/workflows/executions/{id}/trace/summary` |
|
||||
| `GetWorkflowExecutionTraceEvents` | GET | `/v1/workflows/executions/{id}/trace/events` |
|
||||
|
||||
Also contains `WorkflowEventStream` type (wraps `Stream[json.RawMessage]`,
|
||||
dispatches via `workflow.UnmarshalEvent`).
|
||||
|
||||
### `workflows_events.go` — 2 methods
|
||||
|
||||
| Method | HTTP | Path |
|
||||
|--------|------|------|
|
||||
| `StreamWorkflowEvents` | GET (SSE) | `/v1/workflows/events/stream` |
|
||||
| `ListWorkflowEvents` | GET | `/v1/workflows/events/list` |
|
||||
|
||||
### `workflows_deployments.go` — 2 methods
|
||||
|
||||
| Method | HTTP | Path |
|
||||
|--------|------|------|
|
||||
| `ListWorkflowDeployments` | GET | `/v1/workflows/deployments` |
|
||||
| `GetWorkflowDeployment` | GET | `/v1/workflows/deployments/{id}` |
|
||||
|
||||
### `workflows_metrics.go` — 1 method
|
||||
|
||||
| Method | HTTP | Path |
|
||||
|--------|------|------|
|
||||
| `GetWorkflowMetrics` | GET | `/v1/workflows/{name}/metrics` |
|
||||
|
||||
### `workflows_runs.go` — 3 methods
|
||||
|
||||
| Method | HTTP | Path |
|
||||
|--------|------|------|
|
||||
| `ListWorkflowRuns` | GET | `/v1/workflows/runs` |
|
||||
| `GetWorkflowRun` | GET | `/v1/workflows/runs/{id}` |
|
||||
| `GetWorkflowRunHistory` | GET | `/v1/workflows/runs/{id}/history` |
|
||||
|
||||
### `workflows_schedules.go` — 3 methods
|
||||
|
||||
| Method | HTTP | Path |
|
||||
|--------|------|------|
|
||||
| `ListWorkflowSchedules` | GET | `/v1/workflows/schedules` |
|
||||
| `ScheduleWorkflow` | POST | `/v1/workflows/schedules` |
|
||||
| `UnscheduleWorkflow` | DELETE | `/v1/workflows/schedules/{id}` |
|
||||
|
||||
### `workflows_workers.go` — 1 method
|
||||
|
||||
| Method | HTTP | Path |
|
||||
|--------|------|------|
|
||||
| `GetWorkflowWorkerInfo` | GET | `/v1/workflows/workers/whoami` |
|
||||
|
||||
### `batch_api.go` — 1 new method
|
||||
|
||||
| Method | HTTP | Path |
|
||||
|--------|------|------|
|
||||
| `DeleteBatchJob` | DELETE | `/v1/batch/jobs/{id}` |
|
||||
|
||||
New type in `batch/`: `DeleteResponse { ID, Object, Deleted }`.
|
||||
|
||||
## Streaming Design
|
||||
|
||||
`WorkflowEventStream` wraps `Stream[json.RawMessage]` like `EventStream` does for conversations.
|
||||
|
||||
SSE data arrives as `StreamPayload` envelope:
|
||||
```json
|
||||
{
|
||||
"stream": "...",
|
||||
"data": { "event_type": "WORKFLOW_EXECUTION_COMPLETED", ... },
|
||||
"workflow_context": { ... },
|
||||
"broker_sequence": 42,
|
||||
"timestamp": "...",
|
||||
"metadata": {}
|
||||
}
|
||||
```
|
||||
|
||||
`WorkflowEventStream.Next()`:
|
||||
1. Read next SSE `data:` line via inner `Stream[json.RawMessage]`
|
||||
2. Unmarshal into `workflow.StreamPayload`
|
||||
3. Dispatch `payload.Data` via `workflow.UnmarshalEvent` (probes `event_type`)
|
||||
4. Expose both `Current() workflow.Event` and `CurrentPayload() *workflow.StreamPayload`
|
||||
|
||||
Both `StreamWorkflowExecution` and `StreamWorkflowEvents` use GET (not POST)
|
||||
with SSE response. They use `doStream` without a request body — the stream method
|
||||
needs to support GET + query params (verify `doStream` handles nil body for GET).
|
||||
|
||||
## Testing
|
||||
|
||||
One test file per service file. `httptest.NewServer` with inline handlers. Stdlib only.
|
||||
|
||||
Key scenarios:
|
||||
- Query param encoding for list/filter endpoints
|
||||
- PUT body marshaling for update/archive
|
||||
- 204 no-body responses for terminate/cancel/reset
|
||||
- 202 response for signal
|
||||
- SSE streaming with StreamPayload envelope + event type dispatch
|
||||
- UnknownEvent forward compatibility
|
||||
- ExecuteWorkflowAndWait polling loop (mock multiple get-execution responses)
|
||||
- Sealed interface UnmarshalEvent for all 17 event types + unknown
|
||||
- Batch operations with map response
|
||||
|
||||
## Version & Docs
|
||||
|
||||
- Bump version constant in `mistral.go` to `1.2.0`
|
||||
- Update `CLAUDE.md` sub-packages list to include `workflow/`
|
||||
- Update `CHANGELOG.md` with v1.2.0 entry
|
||||
- Upstream sync reference: Python SDK v2.2.0
|
||||
|
||||
## Non-Goals
|
||||
|
||||
- No pagination helpers (cursor chaining) — callers manage pagination manually, same as existing endpoints
|
||||
- No traceparent injection hook — Go callers manage their own tracing headers
|
||||
- No `execute_workflow_and_wait_async` — Go has context cancellation instead
|
||||
- No WebSocket/realtime workflow support (not in Python SDK either)
|
||||
@@ -6,7 +6,7 @@ import (
|
||||
)
|
||||
|
||||
// Version is the SDK version string.
|
||||
const Version = "1.1.0"
|
||||
const Version = "1.2.0"
|
||||
|
||||
const (
|
||||
defaultBaseURL = "https://api.mistral.ai"
|
||||
|
||||
21
workflow/deployment.go
Normal file
21
workflow/deployment.go
Normal file
@@ -0,0 +1,21 @@
|
||||
package workflow
|
||||
|
||||
// Deployment represents a workflow deployment.
|
||||
type Deployment struct {
|
||||
ID string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
IsActive bool `json:"is_active"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
UpdatedAt string `json:"updated_at"`
|
||||
}
|
||||
|
||||
// DeploymentListResponse is the response from listing deployments.
|
||||
type DeploymentListResponse struct {
|
||||
Deployments []Deployment `json:"deployments"`
|
||||
}
|
||||
|
||||
// DeploymentListParams holds query parameters for listing deployments.
|
||||
type DeploymentListParams struct {
|
||||
ActiveOnly *bool
|
||||
WorkflowName *string
|
||||
}
|
||||
5
workflow/doc.go
Normal file
5
workflow/doc.go
Normal file
@@ -0,0 +1,5 @@
|
||||
// Package workflow provides types for the Mistral workflows API.
|
||||
//
|
||||
// Workflows support orchestrating multi-step processes with execution
|
||||
// management, scheduling, event streaming, and observability.
|
||||
package workflow
|
||||
369
workflow/event.go
Normal file
369
workflow/event.go
Normal file
@@ -0,0 +1,369 @@
|
||||
package workflow
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// EventType identifies the kind of workflow event.
|
||||
type EventType string
|
||||
|
||||
const (
|
||||
EventWorkflowStarted EventType = "WORKFLOW_EXECUTION_STARTED"
|
||||
EventWorkflowCompleted EventType = "WORKFLOW_EXECUTION_COMPLETED"
|
||||
EventWorkflowFailed EventType = "WORKFLOW_EXECUTION_FAILED"
|
||||
EventWorkflowCanceled EventType = "WORKFLOW_EXECUTION_CANCELED"
|
||||
EventWorkflowContinuedAsNew EventType = "WORKFLOW_EXECUTION_CONTINUED_AS_NEW"
|
||||
EventWorkflowTaskTimedOut EventType = "WORKFLOW_TASK_TIMED_OUT"
|
||||
EventWorkflowTaskFailed EventType = "WORKFLOW_TASK_FAILED"
|
||||
EventCustomTaskStarted EventType = "CUSTOM_TASK_STARTED"
|
||||
EventCustomTaskInProgress EventType = "CUSTOM_TASK_IN_PROGRESS"
|
||||
EventCustomTaskCompleted EventType = "CUSTOM_TASK_COMPLETED"
|
||||
EventCustomTaskFailed EventType = "CUSTOM_TASK_FAILED"
|
||||
EventCustomTaskTimedOut EventType = "CUSTOM_TASK_TIMED_OUT"
|
||||
EventCustomTaskCanceled EventType = "CUSTOM_TASK_CANCELED"
|
||||
EventActivityTaskStarted EventType = "ACTIVITY_TASK_STARTED"
|
||||
EventActivityTaskCompleted EventType = "ACTIVITY_TASK_COMPLETED"
|
||||
EventActivityTaskRetrying EventType = "ACTIVITY_TASK_RETRYING"
|
||||
EventActivityTaskFailed EventType = "ACTIVITY_TASK_FAILED"
|
||||
)
|
||||
|
||||
// EventSource identifies where an event originated.
|
||||
type EventSource string
|
||||
|
||||
const (
|
||||
EventSourceDatabase EventSource = "DATABASE"
|
||||
EventSourceLive EventSource = "LIVE"
|
||||
)
|
||||
|
||||
// Scope identifies the scope of an event subscription.
|
||||
type Scope string
|
||||
|
||||
const (
|
||||
ScopeActivity Scope = "activity"
|
||||
ScopeWorkflow Scope = "workflow"
|
||||
ScopeAll Scope = "*"
|
||||
)
|
||||
|
||||
// Event is a sealed interface for workflow execution events.
|
||||
type Event interface {
|
||||
workflowEvent()
|
||||
EventType() EventType
|
||||
}
|
||||
|
||||
// eventBase holds fields common to all workflow events.
|
||||
type eventBase struct {
|
||||
ID string `json:"event_id"`
|
||||
Timestamp int64 `json:"event_timestamp"`
|
||||
RootWorkflowExecID string `json:"root_workflow_exec_id"`
|
||||
ParentWorkflowExecID *string `json:"parent_workflow_exec_id"`
|
||||
WorkflowExecID string `json:"workflow_exec_id"`
|
||||
WorkflowRunID string `json:"workflow_run_id"`
|
||||
WorkflowName string `json:"workflow_name"`
|
||||
}
|
||||
|
||||
// WorkflowStartedAttributes holds typed attributes for workflow started events.
|
||||
type WorkflowStartedAttributes struct {
|
||||
TaskID string `json:"task_id"`
|
||||
}
|
||||
|
||||
// JSONPayload holds a typed JSON value.
|
||||
type JSONPayload struct {
|
||||
Value any `json:"value"`
|
||||
Type string `json:"type"`
|
||||
}
|
||||
|
||||
// WorkflowCompletedAttributes holds typed attributes for workflow completed events.
|
||||
type WorkflowCompletedAttributes struct {
|
||||
TaskID string `json:"task_id"`
|
||||
Result JSONPayload `json:"result"`
|
||||
}
|
||||
|
||||
// WorkflowFailedAttributes holds typed attributes for workflow failed events.
|
||||
type WorkflowFailedAttributes struct {
|
||||
TaskID string `json:"task_id"`
|
||||
Failure any `json:"failure"`
|
||||
}
|
||||
|
||||
// WorkflowExecutionStartedEvent signals that a workflow execution has started.
|
||||
type WorkflowExecutionStartedEvent struct {
|
||||
eventBase
|
||||
Attributes WorkflowStartedAttributes `json:"attributes"`
|
||||
}
|
||||
|
||||
func (*WorkflowExecutionStartedEvent) workflowEvent() {}
|
||||
func (*WorkflowExecutionStartedEvent) EventType() EventType { return EventWorkflowStarted }
|
||||
|
||||
// WorkflowExecutionCompletedEvent signals that a workflow execution has completed.
|
||||
type WorkflowExecutionCompletedEvent struct {
|
||||
eventBase
|
||||
Attributes WorkflowCompletedAttributes `json:"attributes"`
|
||||
}
|
||||
|
||||
func (*WorkflowExecutionCompletedEvent) workflowEvent() {}
|
||||
func (*WorkflowExecutionCompletedEvent) EventType() EventType { return EventWorkflowCompleted }
|
||||
|
||||
// WorkflowExecutionFailedEvent signals that a workflow execution has failed.
|
||||
type WorkflowExecutionFailedEvent struct {
|
||||
eventBase
|
||||
Attributes WorkflowFailedAttributes `json:"attributes"`
|
||||
}
|
||||
|
||||
func (*WorkflowExecutionFailedEvent) workflowEvent() {}
|
||||
func (*WorkflowExecutionFailedEvent) EventType() EventType { return EventWorkflowFailed }
|
||||
|
||||
// WorkflowExecutionCanceledEvent signals that a workflow execution was canceled.
|
||||
type WorkflowExecutionCanceledEvent struct {
|
||||
eventBase
|
||||
Attributes json.RawMessage `json:"attributes"`
|
||||
}
|
||||
|
||||
func (*WorkflowExecutionCanceledEvent) workflowEvent() {}
|
||||
func (*WorkflowExecutionCanceledEvent) EventType() EventType { return EventWorkflowCanceled }
|
||||
|
||||
// WorkflowExecutionContinuedAsNewEvent signals that a workflow continued as a new execution.
|
||||
type WorkflowExecutionContinuedAsNewEvent struct {
|
||||
eventBase
|
||||
Attributes json.RawMessage `json:"attributes"`
|
||||
}
|
||||
|
||||
func (*WorkflowExecutionContinuedAsNewEvent) workflowEvent() {}
|
||||
func (*WorkflowExecutionContinuedAsNewEvent) EventType() EventType { return EventWorkflowContinuedAsNew }
|
||||
|
||||
// WorkflowTaskTimedOutEvent signals that a workflow task timed out.
|
||||
type WorkflowTaskTimedOutEvent struct {
|
||||
eventBase
|
||||
Attributes json.RawMessage `json:"attributes"`
|
||||
}
|
||||
|
||||
func (*WorkflowTaskTimedOutEvent) workflowEvent() {}
|
||||
func (*WorkflowTaskTimedOutEvent) EventType() EventType { return EventWorkflowTaskTimedOut }
|
||||
|
||||
// WorkflowTaskFailedEvent signals that a workflow task failed.
|
||||
type WorkflowTaskFailedEvent struct {
|
||||
eventBase
|
||||
Attributes json.RawMessage `json:"attributes"`
|
||||
}
|
||||
|
||||
func (*WorkflowTaskFailedEvent) workflowEvent() {}
|
||||
func (*WorkflowTaskFailedEvent) EventType() EventType { return EventWorkflowTaskFailed }
|
||||
|
||||
// CustomTaskStartedEvent signals that a custom task has started.
|
||||
type CustomTaskStartedEvent struct {
|
||||
eventBase
|
||||
Attributes json.RawMessage `json:"attributes"`
|
||||
}
|
||||
|
||||
func (*CustomTaskStartedEvent) workflowEvent() {}
|
||||
func (*CustomTaskStartedEvent) EventType() EventType { return EventCustomTaskStarted }
|
||||
|
||||
// CustomTaskInProgressEvent signals that a custom task is in progress.
|
||||
type CustomTaskInProgressEvent struct {
|
||||
eventBase
|
||||
Attributes json.RawMessage `json:"attributes"`
|
||||
}
|
||||
|
||||
func (*CustomTaskInProgressEvent) workflowEvent() {}
|
||||
func (*CustomTaskInProgressEvent) EventType() EventType { return EventCustomTaskInProgress }
|
||||
|
||||
// CustomTaskCompletedEvent signals that a custom task has completed.
|
||||
type CustomTaskCompletedEvent struct {
|
||||
eventBase
|
||||
Attributes json.RawMessage `json:"attributes"`
|
||||
}
|
||||
|
||||
func (*CustomTaskCompletedEvent) workflowEvent() {}
|
||||
func (*CustomTaskCompletedEvent) EventType() EventType { return EventCustomTaskCompleted }
|
||||
|
||||
// CustomTaskFailedEvent signals that a custom task has failed.
|
||||
type CustomTaskFailedEvent struct {
|
||||
eventBase
|
||||
Attributes json.RawMessage `json:"attributes"`
|
||||
}
|
||||
|
||||
func (*CustomTaskFailedEvent) workflowEvent() {}
|
||||
func (*CustomTaskFailedEvent) EventType() EventType { return EventCustomTaskFailed }
|
||||
|
||||
// CustomTaskTimedOutEvent signals that a custom task timed out.
|
||||
type CustomTaskTimedOutEvent struct {
|
||||
eventBase
|
||||
Attributes json.RawMessage `json:"attributes"`
|
||||
}
|
||||
|
||||
func (*CustomTaskTimedOutEvent) workflowEvent() {}
|
||||
func (*CustomTaskTimedOutEvent) EventType() EventType { return EventCustomTaskTimedOut }
|
||||
|
||||
// CustomTaskCanceledEvent signals that a custom task was canceled.
|
||||
type CustomTaskCanceledEvent struct {
|
||||
eventBase
|
||||
Attributes json.RawMessage `json:"attributes"`
|
||||
}
|
||||
|
||||
func (*CustomTaskCanceledEvent) workflowEvent() {}
|
||||
func (*CustomTaskCanceledEvent) EventType() EventType { return EventCustomTaskCanceled }
|
||||
|
||||
// ActivityTaskStartedEvent signals that an activity task has started.
|
||||
type ActivityTaskStartedEvent struct {
|
||||
eventBase
|
||||
Attributes json.RawMessage `json:"attributes"`
|
||||
}
|
||||
|
||||
func (*ActivityTaskStartedEvent) workflowEvent() {}
|
||||
func (*ActivityTaskStartedEvent) EventType() EventType { return EventActivityTaskStarted }
|
||||
|
||||
// ActivityTaskCompletedEvent signals that an activity task has completed.
|
||||
type ActivityTaskCompletedEvent struct {
|
||||
eventBase
|
||||
Attributes json.RawMessage `json:"attributes"`
|
||||
}
|
||||
|
||||
func (*ActivityTaskCompletedEvent) workflowEvent() {}
|
||||
func (*ActivityTaskCompletedEvent) EventType() EventType { return EventActivityTaskCompleted }
|
||||
|
||||
// ActivityTaskRetryingEvent signals that an activity task is being retried.
|
||||
type ActivityTaskRetryingEvent struct {
|
||||
eventBase
|
||||
Attributes json.RawMessage `json:"attributes"`
|
||||
}
|
||||
|
||||
func (*ActivityTaskRetryingEvent) workflowEvent() {}
|
||||
func (*ActivityTaskRetryingEvent) EventType() EventType { return EventActivityTaskRetrying }
|
||||
|
||||
// ActivityTaskFailedEvent signals that an activity task has failed.
|
||||
type ActivityTaskFailedEvent struct {
|
||||
eventBase
|
||||
Attributes json.RawMessage `json:"attributes"`
|
||||
}
|
||||
|
||||
func (*ActivityTaskFailedEvent) workflowEvent() {}
|
||||
func (*ActivityTaskFailedEvent) EventType() EventType { return EventActivityTaskFailed }
|
||||
|
||||
// UnknownEvent holds an event with an unrecognized event_type.
|
||||
// This prevents the SDK from breaking when new event types are added.
|
||||
type UnknownEvent struct {
|
||||
eventBase
|
||||
RawType string
|
||||
Raw json.RawMessage
|
||||
}
|
||||
|
||||
func (*UnknownEvent) workflowEvent() {}
|
||||
func (e *UnknownEvent) EventType() EventType { return EventType(e.RawType) }
|
||||
|
||||
// UnmarshalEvent dispatches JSON to the concrete Event type
|
||||
// based on the "event_type" discriminator field.
|
||||
func UnmarshalEvent(data []byte) (Event, error) {
|
||||
var probe struct {
|
||||
Type string `json:"event_type"`
|
||||
}
|
||||
if err := json.Unmarshal(data, &probe); err != nil {
|
||||
return nil, fmt.Errorf("mistral: unmarshal workflow event: %w", err)
|
||||
}
|
||||
switch probe.Type {
|
||||
case string(EventWorkflowStarted):
|
||||
var e WorkflowExecutionStartedEvent
|
||||
return &e, json.Unmarshal(data, &e)
|
||||
case string(EventWorkflowCompleted):
|
||||
var e WorkflowExecutionCompletedEvent
|
||||
return &e, json.Unmarshal(data, &e)
|
||||
case string(EventWorkflowFailed):
|
||||
var e WorkflowExecutionFailedEvent
|
||||
return &e, json.Unmarshal(data, &e)
|
||||
case string(EventWorkflowCanceled):
|
||||
var e WorkflowExecutionCanceledEvent
|
||||
return &e, json.Unmarshal(data, &e)
|
||||
case string(EventWorkflowContinuedAsNew):
|
||||
var e WorkflowExecutionContinuedAsNewEvent
|
||||
return &e, json.Unmarshal(data, &e)
|
||||
case string(EventWorkflowTaskTimedOut):
|
||||
var e WorkflowTaskTimedOutEvent
|
||||
return &e, json.Unmarshal(data, &e)
|
||||
case string(EventWorkflowTaskFailed):
|
||||
var e WorkflowTaskFailedEvent
|
||||
return &e, json.Unmarshal(data, &e)
|
||||
case string(EventCustomTaskStarted):
|
||||
var e CustomTaskStartedEvent
|
||||
return &e, json.Unmarshal(data, &e)
|
||||
case string(EventCustomTaskInProgress):
|
||||
var e CustomTaskInProgressEvent
|
||||
return &e, json.Unmarshal(data, &e)
|
||||
case string(EventCustomTaskCompleted):
|
||||
var e CustomTaskCompletedEvent
|
||||
return &e, json.Unmarshal(data, &e)
|
||||
case string(EventCustomTaskFailed):
|
||||
var e CustomTaskFailedEvent
|
||||
return &e, json.Unmarshal(data, &e)
|
||||
case string(EventCustomTaskTimedOut):
|
||||
var e CustomTaskTimedOutEvent
|
||||
return &e, json.Unmarshal(data, &e)
|
||||
case string(EventCustomTaskCanceled):
|
||||
var e CustomTaskCanceledEvent
|
||||
return &e, json.Unmarshal(data, &e)
|
||||
case string(EventActivityTaskStarted):
|
||||
var e ActivityTaskStartedEvent
|
||||
return &e, json.Unmarshal(data, &e)
|
||||
case string(EventActivityTaskCompleted):
|
||||
var e ActivityTaskCompletedEvent
|
||||
return &e, json.Unmarshal(data, &e)
|
||||
case string(EventActivityTaskRetrying):
|
||||
var e ActivityTaskRetryingEvent
|
||||
return &e, json.Unmarshal(data, &e)
|
||||
case string(EventActivityTaskFailed):
|
||||
var e ActivityTaskFailedEvent
|
||||
return &e, json.Unmarshal(data, &e)
|
||||
default:
|
||||
var base eventBase
|
||||
if err := json.Unmarshal(data, &base); err != nil {
|
||||
return nil, fmt.Errorf("mistral: unmarshal workflow event base: %w", err)
|
||||
}
|
||||
return &UnknownEvent{
|
||||
eventBase: base,
|
||||
RawType: probe.Type,
|
||||
Raw: json.RawMessage(data),
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
// StreamPayload is a single SSE payload from the workflow event stream.
|
||||
type StreamPayload struct {
|
||||
Stream string `json:"stream"`
|
||||
Data json.RawMessage `json:"data"`
|
||||
WorkflowContext StreamWorkflowContext `json:"workflow_context"`
|
||||
BrokerSequence int64 `json:"broker_sequence"`
|
||||
}
|
||||
|
||||
// StreamWorkflowContext holds context for a workflow event stream.
|
||||
type StreamWorkflowContext struct {
|
||||
WorkflowName string `json:"workflow_name"`
|
||||
ExecutionID string `json:"execution_id"`
|
||||
}
|
||||
|
||||
// EventStreamParams holds query parameters for streaming workflow events.
|
||||
type EventStreamParams struct {
|
||||
Scope *Scope
|
||||
ActivityName *string
|
||||
ActivityID *string
|
||||
WorkflowName *string
|
||||
WorkflowExecID *string
|
||||
RootWorkflowExecID *string
|
||||
ParentWorkflowExecID *string
|
||||
Stream *string
|
||||
StartSeq *int
|
||||
MetadataFilters map[string]any
|
||||
WorkflowEventTypes []EventType
|
||||
LastEventID *string
|
||||
}
|
||||
|
||||
// EventListParams holds query parameters for listing workflow events.
|
||||
type EventListParams struct {
|
||||
RootWorkflowExecID *string
|
||||
WorkflowExecID *string
|
||||
WorkflowRunID *string
|
||||
Limit *int
|
||||
Cursor *string
|
||||
}
|
||||
|
||||
// EventListResponse is the response from listing workflow events.
|
||||
type EventListResponse struct {
|
||||
Events []json.RawMessage `json:"events"`
|
||||
NextCursor *string `json:"next_cursor,omitempty"`
|
||||
}
|
||||
158
workflow/event_test.go
Normal file
158
workflow/event_test.go
Normal file
@@ -0,0 +1,158 @@
|
||||
package workflow
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestUnmarshalEvent_WorkflowExecutionCompleted(t *testing.T) {
|
||||
data := []byte(`{
|
||||
"event_id": "evt-1",
|
||||
"event_timestamp": 1711929600000000000,
|
||||
"root_workflow_exec_id": "root-1",
|
||||
"parent_workflow_exec_id": null,
|
||||
"workflow_exec_id": "exec-1",
|
||||
"workflow_run_id": "run-1",
|
||||
"workflow_name": "my-workflow",
|
||||
"event_type": "WORKFLOW_EXECUTION_COMPLETED",
|
||||
"attributes": {"task_id": "t1", "result": {"value": {"answer": 42}, "type": "json"}}
|
||||
}`)
|
||||
event, err := UnmarshalEvent(data)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
e, ok := event.(*WorkflowExecutionCompletedEvent)
|
||||
if !ok {
|
||||
t.Fatalf("expected *WorkflowExecutionCompletedEvent, got %T", event)
|
||||
}
|
||||
if e.ID != "evt-1" {
|
||||
t.Errorf("got ID %q", e.ID)
|
||||
}
|
||||
if e.WorkflowName != "my-workflow" {
|
||||
t.Errorf("got WorkflowName %q", e.WorkflowName)
|
||||
}
|
||||
if e.EventType() != EventWorkflowCompleted {
|
||||
t.Errorf("got EventType %q", e.EventType())
|
||||
}
|
||||
}
|
||||
|
||||
func TestUnmarshalEvent_CustomTaskStarted(t *testing.T) {
|
||||
data := []byte(`{
|
||||
"event_id": "evt-2",
|
||||
"event_timestamp": 1711929600000000000,
|
||||
"root_workflow_exec_id": "root-1",
|
||||
"parent_workflow_exec_id": "parent-1",
|
||||
"workflow_exec_id": "exec-1",
|
||||
"workflow_run_id": "run-1",
|
||||
"workflow_name": "my-workflow",
|
||||
"event_type": "CUSTOM_TASK_STARTED",
|
||||
"attributes": {}
|
||||
}`)
|
||||
event, err := UnmarshalEvent(data)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
e, ok := event.(*CustomTaskStartedEvent)
|
||||
if !ok {
|
||||
t.Fatalf("expected *CustomTaskStartedEvent, got %T", event)
|
||||
}
|
||||
parent := "parent-1"
|
||||
if e.ParentWorkflowExecID == nil || *e.ParentWorkflowExecID != parent {
|
||||
t.Errorf("expected parent %q, got %v", parent, e.ParentWorkflowExecID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUnmarshalEvent_ActivityTaskRetrying(t *testing.T) {
|
||||
data := []byte(`{
|
||||
"event_id": "evt-3",
|
||||
"event_timestamp": 1711929600000000000,
|
||||
"root_workflow_exec_id": "root-1",
|
||||
"parent_workflow_exec_id": null,
|
||||
"workflow_exec_id": "exec-1",
|
||||
"workflow_run_id": "run-1",
|
||||
"workflow_name": "my-workflow",
|
||||
"event_type": "ACTIVITY_TASK_RETRYING",
|
||||
"attributes": {}
|
||||
}`)
|
||||
event, err := UnmarshalEvent(data)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, ok := event.(*ActivityTaskRetryingEvent); !ok {
|
||||
t.Fatalf("expected *ActivityTaskRetryingEvent, got %T", event)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUnmarshalEvent_UnknownType(t *testing.T) {
|
||||
data := []byte(`{
|
||||
"event_id": "evt-4",
|
||||
"event_timestamp": 1711929600000000000,
|
||||
"root_workflow_exec_id": "root-1",
|
||||
"parent_workflow_exec_id": null,
|
||||
"workflow_exec_id": "exec-1",
|
||||
"workflow_run_id": "run-1",
|
||||
"workflow_name": "my-workflow",
|
||||
"event_type": "FUTURE_EVENT_TYPE",
|
||||
"attributes": {}
|
||||
}`)
|
||||
event, err := UnmarshalEvent(data)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
unk, ok := event.(*UnknownEvent)
|
||||
if !ok {
|
||||
t.Fatalf("expected *UnknownEvent, got %T", event)
|
||||
}
|
||||
if unk.RawType != "FUTURE_EVENT_TYPE" {
|
||||
t.Errorf("got RawType %q", unk.RawType)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUnmarshalEvent_AllTypes(t *testing.T) {
|
||||
types := []struct {
|
||||
eventType string
|
||||
wantType string
|
||||
}{
|
||||
{"WORKFLOW_EXECUTION_STARTED", "*workflow.WorkflowExecutionStartedEvent"},
|
||||
{"WORKFLOW_EXECUTION_COMPLETED", "*workflow.WorkflowExecutionCompletedEvent"},
|
||||
{"WORKFLOW_EXECUTION_FAILED", "*workflow.WorkflowExecutionFailedEvent"},
|
||||
{"WORKFLOW_EXECUTION_CANCELED", "*workflow.WorkflowExecutionCanceledEvent"},
|
||||
{"WORKFLOW_EXECUTION_CONTINUED_AS_NEW", "*workflow.WorkflowExecutionContinuedAsNewEvent"},
|
||||
{"WORKFLOW_TASK_TIMED_OUT", "*workflow.WorkflowTaskTimedOutEvent"},
|
||||
{"WORKFLOW_TASK_FAILED", "*workflow.WorkflowTaskFailedEvent"},
|
||||
{"CUSTOM_TASK_STARTED", "*workflow.CustomTaskStartedEvent"},
|
||||
{"CUSTOM_TASK_IN_PROGRESS", "*workflow.CustomTaskInProgressEvent"},
|
||||
{"CUSTOM_TASK_COMPLETED", "*workflow.CustomTaskCompletedEvent"},
|
||||
{"CUSTOM_TASK_FAILED", "*workflow.CustomTaskFailedEvent"},
|
||||
{"CUSTOM_TASK_TIMED_OUT", "*workflow.CustomTaskTimedOutEvent"},
|
||||
{"CUSTOM_TASK_CANCELED", "*workflow.CustomTaskCanceledEvent"},
|
||||
{"ACTIVITY_TASK_STARTED", "*workflow.ActivityTaskStartedEvent"},
|
||||
{"ACTIVITY_TASK_COMPLETED", "*workflow.ActivityTaskCompletedEvent"},
|
||||
{"ACTIVITY_TASK_RETRYING", "*workflow.ActivityTaskRetryingEvent"},
|
||||
{"ACTIVITY_TASK_FAILED", "*workflow.ActivityTaskFailedEvent"},
|
||||
}
|
||||
for _, tt := range types {
|
||||
t.Run(tt.eventType, func(t *testing.T) {
|
||||
data, _ := json.Marshal(map[string]any{
|
||||
"event_id": "evt",
|
||||
"event_timestamp": 1711929600000000000,
|
||||
"root_workflow_exec_id": "root",
|
||||
"parent_workflow_exec_id": nil,
|
||||
"workflow_exec_id": "exec",
|
||||
"workflow_run_id": "run",
|
||||
"workflow_name": "wf",
|
||||
"event_type": tt.eventType,
|
||||
"attributes": map[string]any{},
|
||||
})
|
||||
event, err := UnmarshalEvent(data)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
got := fmt.Sprintf("%T", event)
|
||||
if got != tt.wantType {
|
||||
t.Errorf("event_type %q: got %s, want %s", tt.eventType, got, tt.wantType)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
163
workflow/execution.go
Normal file
163
workflow/execution.go
Normal file
@@ -0,0 +1,163 @@
|
||||
package workflow
|
||||
|
||||
import "encoding/json"
|
||||
|
||||
// ExecutionStatus is the status of a workflow execution.
|
||||
type ExecutionStatus string
|
||||
|
||||
const (
|
||||
ExecutionRunning ExecutionStatus = "RUNNING"
|
||||
ExecutionCompleted ExecutionStatus = "COMPLETED"
|
||||
ExecutionFailed ExecutionStatus = "FAILED"
|
||||
ExecutionCanceled ExecutionStatus = "CANCELED"
|
||||
ExecutionTerminated ExecutionStatus = "TERMINATED"
|
||||
ExecutionContinuedAsNew ExecutionStatus = "CONTINUED_AS_NEW"
|
||||
ExecutionTimedOut ExecutionStatus = "TIMED_OUT"
|
||||
ExecutionRetryingAfterErr ExecutionStatus = "RETRYING_AFTER_ERROR"
|
||||
)
|
||||
|
||||
// ExecutionRequest is the request body for executing a workflow.
|
||||
type ExecutionRequest struct {
|
||||
ExecutionID *string `json:"execution_id,omitempty"`
|
||||
Input map[string]any `json:"input,omitempty"`
|
||||
EncodedInput *NetworkEncodedInput `json:"encoded_input,omitempty"`
|
||||
WaitForResult bool `json:"wait_for_result,omitempty"`
|
||||
TimeoutSeconds *float64 `json:"timeout_seconds,omitempty"`
|
||||
CustomTracingAttributes map[string]string `json:"custom_tracing_attributes,omitempty"`
|
||||
DeploymentName *string `json:"deployment_name,omitempty"`
|
||||
}
|
||||
|
||||
// ExecutionResponse is the response from a workflow execution.
|
||||
type ExecutionResponse struct {
|
||||
WorkflowName string `json:"workflow_name"`
|
||||
ExecutionID string `json:"execution_id"`
|
||||
RootExecutionID string `json:"root_execution_id"`
|
||||
Status ExecutionStatus `json:"status"`
|
||||
StartTime string `json:"start_time"`
|
||||
EndTime *string `json:"end_time,omitempty"`
|
||||
Result any `json:"result,omitempty"`
|
||||
ParentExecutionID *string `json:"parent_execution_id,omitempty"`
|
||||
TotalDurationMs *int `json:"total_duration_ms,omitempty"`
|
||||
}
|
||||
|
||||
// NetworkEncodedInput holds a base64-encoded payload for workflow input.
|
||||
type NetworkEncodedInput struct {
|
||||
B64Payload string `json:"b64payload"`
|
||||
EncodingOptions []string `json:"encoding_options,omitempty"`
|
||||
Empty bool `json:"empty,omitempty"`
|
||||
}
|
||||
|
||||
// SignalInvocationBody is the request body for signaling a workflow execution.
|
||||
type SignalInvocationBody struct {
|
||||
Name string `json:"name"`
|
||||
Input any `json:"input"`
|
||||
}
|
||||
|
||||
// SignalResponse is the response from signaling a workflow execution.
|
||||
type SignalResponse struct {
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
// QueryInvocationBody is the request body for querying a workflow execution.
|
||||
type QueryInvocationBody struct {
|
||||
Name string `json:"name"`
|
||||
Input any `json:"input,omitempty"`
|
||||
}
|
||||
|
||||
// QueryResponse is the response from querying a workflow execution.
|
||||
type QueryResponse struct {
|
||||
QueryName string `json:"query_name"`
|
||||
Result any `json:"result"`
|
||||
}
|
||||
|
||||
// UpdateInvocationBody is the request body for updating a workflow execution.
|
||||
type UpdateInvocationBody struct {
|
||||
Name string `json:"name"`
|
||||
Input any `json:"input,omitempty"`
|
||||
}
|
||||
|
||||
// UpdateResponse is the response from updating a workflow execution.
|
||||
type UpdateResponse struct {
|
||||
UpdateName string `json:"update_name"`
|
||||
Result any `json:"result"`
|
||||
}
|
||||
|
||||
// ResetInvocationBody is the request body for resetting a workflow execution.
|
||||
type ResetInvocationBody struct {
|
||||
EventID int `json:"event_id"`
|
||||
Reason *string `json:"reason,omitempty"`
|
||||
ExcludeSignals bool `json:"exclude_signals,omitempty"`
|
||||
ExcludeUpdates bool `json:"exclude_updates,omitempty"`
|
||||
}
|
||||
|
||||
// BatchExecutionBody is the request body for batch execution operations.
|
||||
type BatchExecutionBody struct {
|
||||
ExecutionIDs []string `json:"execution_ids"`
|
||||
}
|
||||
|
||||
// BatchExecutionResponse is the response from batch execution operations.
|
||||
type BatchExecutionResponse struct {
|
||||
Results map[string]BatchExecutionResult `json:"results,omitempty"`
|
||||
}
|
||||
|
||||
// BatchExecutionResult is the result of a single batch operation.
|
||||
type BatchExecutionResult struct {
|
||||
Status string `json:"status"`
|
||||
Error *string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
// StreamParams holds query parameters for streaming workflow executions.
|
||||
type StreamParams struct {
|
||||
EventSource *EventSource
|
||||
LastEventID *string
|
||||
}
|
||||
|
||||
// TraceOTelResponse is the response from the OTel trace endpoint.
|
||||
type TraceOTelResponse struct {
|
||||
WorkflowName string `json:"workflow_name"`
|
||||
ExecutionID string `json:"execution_id"`
|
||||
RootExecutionID string `json:"root_execution_id"`
|
||||
Status *ExecutionStatus `json:"status"`
|
||||
StartTime string `json:"start_time"`
|
||||
EndTime *string `json:"end_time,omitempty"`
|
||||
Result any `json:"result"`
|
||||
DataSource string `json:"data_source"`
|
||||
ParentExecutionID *string `json:"parent_execution_id,omitempty"`
|
||||
TotalDurationMs *int `json:"total_duration_ms,omitempty"`
|
||||
OTelTraceID *string `json:"otel_trace_id,omitempty"`
|
||||
OTelTraceData any `json:"otel_trace_data,omitempty"`
|
||||
}
|
||||
|
||||
// TraceSummaryResponse is the response from the trace summary endpoint.
|
||||
type TraceSummaryResponse struct {
|
||||
WorkflowName string `json:"workflow_name"`
|
||||
ExecutionID string `json:"execution_id"`
|
||||
RootExecutionID string `json:"root_execution_id"`
|
||||
Status *ExecutionStatus `json:"status"`
|
||||
StartTime string `json:"start_time"`
|
||||
EndTime *string `json:"end_time,omitempty"`
|
||||
Result any `json:"result"`
|
||||
ParentExecutionID *string `json:"parent_execution_id,omitempty"`
|
||||
TotalDurationMs *int `json:"total_duration_ms,omitempty"`
|
||||
SpanTree any `json:"span_tree,omitempty"`
|
||||
}
|
||||
|
||||
// TraceEventsResponse is the response from the trace events endpoint.
|
||||
type TraceEventsResponse struct {
|
||||
WorkflowName string `json:"workflow_name"`
|
||||
ExecutionID string `json:"execution_id"`
|
||||
RootExecutionID string `json:"root_execution_id"`
|
||||
Status *ExecutionStatus `json:"status"`
|
||||
StartTime string `json:"start_time"`
|
||||
EndTime *string `json:"end_time,omitempty"`
|
||||
Result any `json:"result"`
|
||||
ParentExecutionID *string `json:"parent_execution_id,omitempty"`
|
||||
TotalDurationMs *int `json:"total_duration_ms,omitempty"`
|
||||
Events []json.RawMessage `json:"events,omitempty"`
|
||||
}
|
||||
|
||||
// TraceEventsParams holds query parameters for the trace events endpoint.
|
||||
type TraceEventsParams struct {
|
||||
MergeSameIDEvents *bool
|
||||
IncludeInternalEvents *bool
|
||||
}
|
||||
27
workflow/metrics.go
Normal file
27
workflow/metrics.go
Normal file
@@ -0,0 +1,27 @@
|
||||
package workflow
|
||||
|
||||
// Metrics holds workflow performance metrics.
|
||||
type Metrics struct {
|
||||
ExecutionCount ScalarMetric `json:"execution_count"`
|
||||
SuccessCount ScalarMetric `json:"success_count"`
|
||||
ErrorCount ScalarMetric `json:"error_count"`
|
||||
AverageLatencyMs ScalarMetric `json:"average_latency_ms"`
|
||||
LatencyOverTime TimeSeriesMetric `json:"latency_over_time"`
|
||||
RetryRate ScalarMetric `json:"retry_rate"`
|
||||
}
|
||||
|
||||
// ScalarMetric holds a single numeric metric value.
|
||||
type ScalarMetric struct {
|
||||
Value float64 `json:"value"`
|
||||
}
|
||||
|
||||
// TimeSeriesMetric holds a time series of [timestamp, value] pairs.
|
||||
type TimeSeriesMetric struct {
|
||||
Value [][]float64 `json:"value"`
|
||||
}
|
||||
|
||||
// MetricsParams holds query parameters for workflow metrics.
|
||||
type MetricsParams struct {
|
||||
StartTime *string
|
||||
EndTime *string
|
||||
}
|
||||
44
workflow/registration.go
Normal file
44
workflow/registration.go
Normal file
@@ -0,0 +1,44 @@
|
||||
package workflow
|
||||
|
||||
// Registration represents a workflow registration.
|
||||
type Registration struct {
|
||||
ID string `json:"id"`
|
||||
WorkflowID string `json:"workflow_id"`
|
||||
TaskQueue string `json:"task_queue"`
|
||||
Workflow *Workflow `json:"workflow,omitempty"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
UpdatedAt string `json:"updated_at"`
|
||||
}
|
||||
|
||||
// RegistrationListResponse is the response from listing workflow registrations.
|
||||
type RegistrationListResponse struct {
|
||||
Registrations []Registration `json:"registrations"`
|
||||
NextCursor *string `json:"next_cursor,omitempty"`
|
||||
}
|
||||
|
||||
// RegistrationListParams holds query parameters for listing registrations.
|
||||
type RegistrationListParams struct {
|
||||
WorkflowID *string
|
||||
TaskQueue *string
|
||||
ActiveOnly *bool
|
||||
IncludeShared *bool
|
||||
WorkflowSearch *string
|
||||
Archived *bool
|
||||
WithWorkflow *bool
|
||||
AvailableInChatAssistant *bool
|
||||
Limit *int
|
||||
Cursor *string
|
||||
}
|
||||
|
||||
// RegistrationGetParams holds query parameters for getting a registration.
|
||||
type RegistrationGetParams struct {
|
||||
WithWorkflow *bool
|
||||
IncludeShared *bool
|
||||
}
|
||||
|
||||
// WorkerInfo holds information about the current worker.
|
||||
type WorkerInfo struct {
|
||||
SchedulerURL string `json:"scheduler_url"`
|
||||
Namespace string `json:"namespace"`
|
||||
TLS bool `json:"tls"`
|
||||
}
|
||||
26
workflow/run.go
Normal file
26
workflow/run.go
Normal file
@@ -0,0 +1,26 @@
|
||||
package workflow
|
||||
|
||||
// Run represents a workflow run.
|
||||
type Run struct {
|
||||
ID string `json:"id"`
|
||||
WorkflowName string `json:"workflow_name"`
|
||||
ExecutionID string `json:"execution_id"`
|
||||
Status ExecutionStatus `json:"status"`
|
||||
StartTime string `json:"start_time"`
|
||||
EndTime *string `json:"end_time,omitempty"`
|
||||
}
|
||||
|
||||
// ListRunsResponse is the response from listing workflow runs.
|
||||
type ListRunsResponse struct {
|
||||
Runs []Run `json:"runs"`
|
||||
NextPageToken *string `json:"next_page_token,omitempty"`
|
||||
}
|
||||
|
||||
// RunListParams holds query parameters for listing workflow runs.
|
||||
type RunListParams struct {
|
||||
WorkflowIdentifier *string
|
||||
Search *string
|
||||
Status *string
|
||||
PageSize *int
|
||||
NextPageToken *string
|
||||
}
|
||||
75
workflow/schedule.go
Normal file
75
workflow/schedule.go
Normal file
@@ -0,0 +1,75 @@
|
||||
package workflow
|
||||
|
||||
// ScheduleRequest is the request body for scheduling a workflow.
|
||||
type ScheduleRequest struct {
|
||||
Schedule ScheduleDefinition `json:"schedule"`
|
||||
WorkflowRegistrationID *string `json:"workflow_registration_id,omitempty"`
|
||||
WorkflowIdentifier *string `json:"workflow_identifier,omitempty"`
|
||||
ScheduleID *string `json:"schedule_id,omitempty"`
|
||||
DeploymentName *string `json:"deployment_name,omitempty"`
|
||||
}
|
||||
|
||||
// ScheduleDefinition describes when and how a workflow should be scheduled.
|
||||
type ScheduleDefinition struct {
|
||||
Input any `json:"input"`
|
||||
Calendars []ScheduleCalendar `json:"calendars,omitempty"`
|
||||
Intervals []ScheduleInterval `json:"intervals,omitempty"`
|
||||
CronExpressions []string `json:"cron_expressions,omitempty"`
|
||||
Skip []ScheduleCalendar `json:"skip,omitempty"`
|
||||
StartAt *string `json:"start_at,omitempty"`
|
||||
EndAt *string `json:"end_at,omitempty"`
|
||||
Jitter *string `json:"jitter,omitempty"`
|
||||
TimeZoneName *string `json:"time_zone_name,omitempty"`
|
||||
Policy *SchedulePolicy `json:"policy,omitempty"`
|
||||
}
|
||||
|
||||
// ScheduleCalendar defines calendar-based schedule entries.
|
||||
type ScheduleCalendar struct {
|
||||
Second []ScheduleRange `json:"second,omitempty"`
|
||||
Minute []ScheduleRange `json:"minute,omitempty"`
|
||||
Hour []ScheduleRange `json:"hour,omitempty"`
|
||||
DayOfMonth []ScheduleRange `json:"day_of_month,omitempty"`
|
||||
Month []ScheduleRange `json:"month,omitempty"`
|
||||
Year []ScheduleRange `json:"year,omitempty"`
|
||||
DayOfWeek []ScheduleRange `json:"day_of_week,omitempty"`
|
||||
Comment *string `json:"comment,omitempty"`
|
||||
}
|
||||
|
||||
// ScheduleRange defines a numeric range for calendar schedules.
|
||||
type ScheduleRange struct {
|
||||
Start int `json:"start"`
|
||||
End int `json:"end,omitempty"`
|
||||
Step int `json:"step,omitempty"`
|
||||
}
|
||||
|
||||
// ScheduleInterval defines an interval-based schedule.
|
||||
type ScheduleInterval struct {
|
||||
Every string `json:"every"`
|
||||
Offset *string `json:"offset,omitempty"`
|
||||
}
|
||||
|
||||
// SchedulePolicy controls schedule overlap and failure behavior.
|
||||
type SchedulePolicy struct {
|
||||
CatchupWindowSeconds int `json:"catchup_window_seconds,omitempty"`
|
||||
Overlap *int `json:"overlap,omitempty"`
|
||||
PauseOnFailure bool `json:"pause_on_failure,omitempty"`
|
||||
}
|
||||
|
||||
// ScheduleResponse is the response from creating a workflow schedule.
|
||||
type ScheduleResponse struct {
|
||||
ScheduleID string `json:"schedule_id"`
|
||||
}
|
||||
|
||||
// ScheduleListResponse is the response from listing workflow schedules.
|
||||
type ScheduleListResponse struct {
|
||||
Schedules []Schedule `json:"schedules"`
|
||||
}
|
||||
|
||||
// Schedule represents a workflow schedule.
|
||||
type Schedule struct {
|
||||
ScheduleID string `json:"schedule_id"`
|
||||
Definition ScheduleDefinition `json:"definition"`
|
||||
WorkflowName string `json:"workflow_name"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
UpdatedAt string `json:"updated_at"`
|
||||
}
|
||||
44
workflow/workflow.go
Normal file
44
workflow/workflow.go
Normal file
@@ -0,0 +1,44 @@
|
||||
package workflow
|
||||
|
||||
// Workflow represents a workflow definition.
|
||||
type Workflow struct {
|
||||
ID string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
DisplayName *string `json:"display_name,omitempty"`
|
||||
Description *string `json:"description,omitempty"`
|
||||
OwnerID string `json:"owner_id"`
|
||||
WorkspaceID string `json:"workspace_id"`
|
||||
AvailableInChatAssistant bool `json:"available_in_chat_assistant"`
|
||||
Archived bool `json:"archived"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
UpdatedAt string `json:"updated_at"`
|
||||
}
|
||||
|
||||
// WorkflowUpdateRequest is the request body for updating a workflow.
|
||||
type WorkflowUpdateRequest struct {
|
||||
DisplayName *string `json:"display_name,omitempty"`
|
||||
Description *string `json:"description,omitempty"`
|
||||
AvailableInChatAssistant *bool `json:"available_in_chat_assistant,omitempty"`
|
||||
}
|
||||
|
||||
// WorkflowListResponse is the response from listing workflows.
|
||||
type WorkflowListResponse struct {
|
||||
Workflows []Workflow `json:"workflows"`
|
||||
NextCursor *string `json:"next_cursor,omitempty"`
|
||||
}
|
||||
|
||||
// WorkflowListParams holds query parameters for listing workflows.
|
||||
type WorkflowListParams struct {
|
||||
ActiveOnly *bool
|
||||
IncludeShared *bool
|
||||
AvailableInChatAssistant *bool
|
||||
Archived *bool
|
||||
Cursor *string
|
||||
Limit *int
|
||||
}
|
||||
|
||||
// WorkflowArchiveResponse is the response from archiving/unarchiving a workflow.
|
||||
type WorkflowArchiveResponse struct {
|
||||
ID string `json:"id"`
|
||||
Archived bool `json:"archived"`
|
||||
}
|
||||
168
workflows.go
Normal file
168
workflows.go
Normal file
@@ -0,0 +1,168 @@
|
||||
package mistral
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strconv"
|
||||
|
||||
"somegit.dev/vikingowl/mistral-go-sdk/workflow"
|
||||
)
|
||||
|
||||
// ListWorkflows lists workflows.
|
||||
func (c *Client) ListWorkflows(ctx context.Context, params *workflow.WorkflowListParams) (*workflow.WorkflowListResponse, error) {
|
||||
path := "/v1/workflows"
|
||||
if params != nil {
|
||||
q := url.Values{}
|
||||
if params.ActiveOnly != nil {
|
||||
q.Set("active_only", strconv.FormatBool(*params.ActiveOnly))
|
||||
}
|
||||
if params.IncludeShared != nil {
|
||||
q.Set("include_shared", strconv.FormatBool(*params.IncludeShared))
|
||||
}
|
||||
if params.AvailableInChatAssistant != nil {
|
||||
q.Set("available_in_chat_assistant", strconv.FormatBool(*params.AvailableInChatAssistant))
|
||||
}
|
||||
if params.Archived != nil {
|
||||
q.Set("archived", strconv.FormatBool(*params.Archived))
|
||||
}
|
||||
if params.Cursor != nil {
|
||||
q.Set("cursor", *params.Cursor)
|
||||
}
|
||||
if params.Limit != nil {
|
||||
q.Set("limit", strconv.Itoa(*params.Limit))
|
||||
}
|
||||
if encoded := q.Encode(); encoded != "" {
|
||||
path += "?" + encoded
|
||||
}
|
||||
}
|
||||
var resp workflow.WorkflowListResponse
|
||||
if err := c.doJSON(ctx, "GET", path, nil, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
// GetWorkflow retrieves a workflow by identifier.
|
||||
func (c *Client) GetWorkflow(ctx context.Context, workflowIdentifier string) (*workflow.Workflow, error) {
|
||||
var resp workflow.Workflow
|
||||
if err := c.doJSON(ctx, "GET", fmt.Sprintf("/v1/workflows/%s", workflowIdentifier), nil, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
// UpdateWorkflow updates a workflow.
|
||||
func (c *Client) UpdateWorkflow(ctx context.Context, workflowIdentifier string, req *workflow.WorkflowUpdateRequest) (*workflow.Workflow, error) {
|
||||
var resp workflow.Workflow
|
||||
if err := c.doJSON(ctx, "PUT", fmt.Sprintf("/v1/workflows/%s", workflowIdentifier), req, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
// ArchiveWorkflow archives a workflow.
|
||||
func (c *Client) ArchiveWorkflow(ctx context.Context, workflowIdentifier string) (*workflow.WorkflowArchiveResponse, error) {
|
||||
var resp workflow.WorkflowArchiveResponse
|
||||
if err := c.doJSON(ctx, "PUT", fmt.Sprintf("/v1/workflows/%s/archive", workflowIdentifier), nil, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
// UnarchiveWorkflow unarchives a workflow.
|
||||
func (c *Client) UnarchiveWorkflow(ctx context.Context, workflowIdentifier string) (*workflow.WorkflowArchiveResponse, error) {
|
||||
var resp workflow.WorkflowArchiveResponse
|
||||
if err := c.doJSON(ctx, "PUT", fmt.Sprintf("/v1/workflows/%s/unarchive", workflowIdentifier), nil, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
// ExecuteWorkflow executes a workflow.
|
||||
func (c *Client) ExecuteWorkflow(ctx context.Context, workflowIdentifier string, req *workflow.ExecutionRequest) (*workflow.ExecutionResponse, error) {
|
||||
var resp workflow.ExecutionResponse
|
||||
if err := c.doJSON(ctx, "POST", fmt.Sprintf("/v1/workflows/%s/execute", workflowIdentifier), req, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
// ListWorkflowRegistrations lists workflow registrations.
|
||||
func (c *Client) ListWorkflowRegistrations(ctx context.Context, params *workflow.RegistrationListParams) (*workflow.RegistrationListResponse, error) {
|
||||
path := "/v1/workflows/registrations"
|
||||
if params != nil {
|
||||
q := url.Values{}
|
||||
if params.WorkflowID != nil {
|
||||
q.Set("workflow_id", *params.WorkflowID)
|
||||
}
|
||||
if params.TaskQueue != nil {
|
||||
q.Set("task_queue", *params.TaskQueue)
|
||||
}
|
||||
if params.ActiveOnly != nil {
|
||||
q.Set("active_only", strconv.FormatBool(*params.ActiveOnly))
|
||||
}
|
||||
if params.IncludeShared != nil {
|
||||
q.Set("include_shared", strconv.FormatBool(*params.IncludeShared))
|
||||
}
|
||||
if params.WorkflowSearch != nil {
|
||||
q.Set("workflow_search", *params.WorkflowSearch)
|
||||
}
|
||||
if params.Archived != nil {
|
||||
q.Set("archived", strconv.FormatBool(*params.Archived))
|
||||
}
|
||||
if params.WithWorkflow != nil {
|
||||
q.Set("with_workflow", strconv.FormatBool(*params.WithWorkflow))
|
||||
}
|
||||
if params.AvailableInChatAssistant != nil {
|
||||
q.Set("available_in_chat_assistant", strconv.FormatBool(*params.AvailableInChatAssistant))
|
||||
}
|
||||
if params.Limit != nil {
|
||||
q.Set("limit", strconv.Itoa(*params.Limit))
|
||||
}
|
||||
if params.Cursor != nil {
|
||||
q.Set("cursor", *params.Cursor)
|
||||
}
|
||||
if encoded := q.Encode(); encoded != "" {
|
||||
path += "?" + encoded
|
||||
}
|
||||
}
|
||||
var resp workflow.RegistrationListResponse
|
||||
if err := c.doJSON(ctx, "GET", path, nil, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
// GetWorkflowRegistration retrieves a workflow registration by ID.
|
||||
func (c *Client) GetWorkflowRegistration(ctx context.Context, registrationID string, params *workflow.RegistrationGetParams) (*workflow.Registration, error) {
|
||||
path := fmt.Sprintf("/v1/workflows/registrations/%s", registrationID)
|
||||
if params != nil {
|
||||
q := url.Values{}
|
||||
if params.WithWorkflow != nil {
|
||||
q.Set("with_workflow", strconv.FormatBool(*params.WithWorkflow))
|
||||
}
|
||||
if params.IncludeShared != nil {
|
||||
q.Set("include_shared", strconv.FormatBool(*params.IncludeShared))
|
||||
}
|
||||
if encoded := q.Encode(); encoded != "" {
|
||||
path += "?" + encoded
|
||||
}
|
||||
}
|
||||
var resp workflow.Registration
|
||||
if err := c.doJSON(ctx, "GET", path, nil, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
// ExecuteWorkflowRegistration executes a workflow via its registration.
|
||||
//
|
||||
// Deprecated: Use ExecuteWorkflow instead. This method will be removed in a future release.
|
||||
func (c *Client) ExecuteWorkflowRegistration(ctx context.Context, registrationID string, req *workflow.ExecutionRequest) (*workflow.ExecutionResponse, error) {
|
||||
var resp workflow.ExecutionResponse
|
||||
if err := c.doJSON(ctx, "POST", fmt.Sprintf("/v1/workflows/registrations/%s/execute", registrationID), req, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
41
workflows_deployments.go
Normal file
41
workflows_deployments.go
Normal file
@@ -0,0 +1,41 @@
|
||||
package mistral
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strconv"
|
||||
|
||||
"somegit.dev/vikingowl/mistral-go-sdk/workflow"
|
||||
)
|
||||
|
||||
// ListWorkflowDeployments lists workflow deployments.
|
||||
func (c *Client) ListWorkflowDeployments(ctx context.Context, params *workflow.DeploymentListParams) (*workflow.DeploymentListResponse, error) {
|
||||
path := "/v1/workflows/deployments"
|
||||
if params != nil {
|
||||
q := url.Values{}
|
||||
if params.ActiveOnly != nil {
|
||||
q.Set("active_only", strconv.FormatBool(*params.ActiveOnly))
|
||||
}
|
||||
if params.WorkflowName != nil {
|
||||
q.Set("workflow_name", *params.WorkflowName)
|
||||
}
|
||||
if encoded := q.Encode(); encoded != "" {
|
||||
path += "?" + encoded
|
||||
}
|
||||
}
|
||||
var resp workflow.DeploymentListResponse
|
||||
if err := c.doJSON(ctx, "GET", path, nil, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
// GetWorkflowDeployment retrieves a workflow deployment by ID.
|
||||
func (c *Client) GetWorkflowDeployment(ctx context.Context, deploymentID string) (*workflow.Deployment, error) {
|
||||
var resp workflow.Deployment
|
||||
if err := c.doJSON(ctx, "GET", fmt.Sprintf("/v1/workflows/deployments/%s", deploymentID), nil, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
57
workflows_deployments_test.go
Normal file
57
workflows_deployments_test.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package mistral
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestListWorkflowDeployments_Success(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path != "/v1/workflows/deployments" {
|
||||
t.Errorf("got path %s", r.URL.Path)
|
||||
}
|
||||
json.NewEncoder(w).Encode(map[string]any{
|
||||
"deployments": []map[string]any{
|
||||
{"id": "dep-1", "name": "prod", "is_active": true, "created_at": "2026-01-01", "updated_at": "2026-01-01"},
|
||||
},
|
||||
})
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := NewClient("key", WithBaseURL(server.URL))
|
||||
resp, err := client.ListWorkflowDeployments(context.Background(), nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(resp.Deployments) != 1 {
|
||||
t.Fatalf("got %d deployments", len(resp.Deployments))
|
||||
}
|
||||
if resp.Deployments[0].Name != "prod" {
|
||||
t.Errorf("got name %q", resp.Deployments[0].Name)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetWorkflowDeployment_Success(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path != "/v1/workflows/deployments/dep-1" {
|
||||
t.Errorf("got path %s", r.URL.Path)
|
||||
}
|
||||
json.NewEncoder(w).Encode(map[string]any{
|
||||
"id": "dep-1", "name": "prod", "is_active": true,
|
||||
"created_at": "2026-01-01", "updated_at": "2026-01-01",
|
||||
})
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := NewClient("key", WithBaseURL(server.URL))
|
||||
dep, err := client.GetWorkflowDeployment(context.Background(), "dep-1")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if dep.ID != "dep-1" {
|
||||
t.Errorf("got id %q", dep.ID)
|
||||
}
|
||||
}
|
||||
99
workflows_events.go
Normal file
99
workflows_events.go
Normal file
@@ -0,0 +1,99 @@
|
||||
package mistral
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"somegit.dev/vikingowl/mistral-go-sdk/workflow"
|
||||
)
|
||||
|
||||
// StreamWorkflowEvents streams workflow events via SSE.
|
||||
func (c *Client) StreamWorkflowEvents(ctx context.Context, params *workflow.EventStreamParams) (*WorkflowEventStream, error) {
|
||||
path := "/v1/workflows/events/stream"
|
||||
if params != nil {
|
||||
q := url.Values{}
|
||||
if params.Scope != nil {
|
||||
q.Set("scope", string(*params.Scope))
|
||||
}
|
||||
if params.ActivityName != nil {
|
||||
q.Set("activity_name", *params.ActivityName)
|
||||
}
|
||||
if params.ActivityID != nil {
|
||||
q.Set("activity_id", *params.ActivityID)
|
||||
}
|
||||
if params.WorkflowName != nil {
|
||||
q.Set("workflow_name", *params.WorkflowName)
|
||||
}
|
||||
if params.WorkflowExecID != nil {
|
||||
q.Set("workflow_exec_id", *params.WorkflowExecID)
|
||||
}
|
||||
if params.RootWorkflowExecID != nil {
|
||||
q.Set("root_workflow_exec_id", *params.RootWorkflowExecID)
|
||||
}
|
||||
if params.ParentWorkflowExecID != nil {
|
||||
q.Set("parent_workflow_exec_id", *params.ParentWorkflowExecID)
|
||||
}
|
||||
if params.Stream != nil {
|
||||
q.Set("stream", *params.Stream)
|
||||
}
|
||||
if params.StartSeq != nil {
|
||||
q.Set("start_seq", strconv.Itoa(*params.StartSeq))
|
||||
}
|
||||
if params.MetadataFilters != nil {
|
||||
data, _ := json.Marshal(params.MetadataFilters)
|
||||
q.Set("metadata_filters", string(data))
|
||||
}
|
||||
if len(params.WorkflowEventTypes) > 0 {
|
||||
types := make([]string, len(params.WorkflowEventTypes))
|
||||
for i, et := range params.WorkflowEventTypes {
|
||||
types[i] = string(et)
|
||||
}
|
||||
q.Set("workflow_event_types", strings.Join(types, ","))
|
||||
}
|
||||
if params.LastEventID != nil {
|
||||
q.Set("last_event_id", *params.LastEventID)
|
||||
}
|
||||
if encoded := q.Encode(); encoded != "" {
|
||||
path += "?" + encoded
|
||||
}
|
||||
}
|
||||
resp, err := c.doStream(ctx, "GET", path, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return newWorkflowEventStream(resp.Body), nil
|
||||
}
|
||||
|
||||
// ListWorkflowEvents lists workflow events.
|
||||
func (c *Client) ListWorkflowEvents(ctx context.Context, params *workflow.EventListParams) (*workflow.EventListResponse, error) {
|
||||
path := "/v1/workflows/events/list"
|
||||
if params != nil {
|
||||
q := url.Values{}
|
||||
if params.RootWorkflowExecID != nil {
|
||||
q.Set("root_workflow_exec_id", *params.RootWorkflowExecID)
|
||||
}
|
||||
if params.WorkflowExecID != nil {
|
||||
q.Set("workflow_exec_id", *params.WorkflowExecID)
|
||||
}
|
||||
if params.WorkflowRunID != nil {
|
||||
q.Set("workflow_run_id", *params.WorkflowRunID)
|
||||
}
|
||||
if params.Limit != nil {
|
||||
q.Set("limit", strconv.Itoa(*params.Limit))
|
||||
}
|
||||
if params.Cursor != nil {
|
||||
q.Set("cursor", *params.Cursor)
|
||||
}
|
||||
if encoded := q.Encode(); encoded != "" {
|
||||
path += "?" + encoded
|
||||
}
|
||||
}
|
||||
var resp workflow.EventListResponse
|
||||
if err := c.doJSON(ctx, "GET", path, nil, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
40
workflows_events_test.go
Normal file
40
workflows_events_test.go
Normal file
@@ -0,0 +1,40 @@
|
||||
package mistral
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"somegit.dev/vikingowl/mistral-go-sdk/workflow"
|
||||
)
|
||||
|
||||
func TestListWorkflowEvents_Success(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path != "/v1/workflows/events/list" {
|
||||
t.Errorf("got path %s", r.URL.Path)
|
||||
}
|
||||
if r.URL.Query().Get("limit") != "50" {
|
||||
t.Errorf("got limit %q", r.URL.Query().Get("limit"))
|
||||
}
|
||||
json.NewEncoder(w).Encode(map[string]any{
|
||||
"events": []map[string]any{{"event_type": "WORKFLOW_EXECUTION_STARTED"}},
|
||||
"next_cursor": "cur-1",
|
||||
})
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := NewClient("key", WithBaseURL(server.URL))
|
||||
limit := 50
|
||||
resp, err := client.ListWorkflowEvents(context.Background(), &workflow.EventListParams{Limit: &limit})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(resp.Events) != 1 {
|
||||
t.Fatalf("got %d events", len(resp.Events))
|
||||
}
|
||||
if resp.NextCursor == nil || *resp.NextCursor != "cur-1" {
|
||||
t.Errorf("got cursor %v", resp.NextCursor)
|
||||
}
|
||||
}
|
||||
255
workflows_executions.go
Normal file
255
workflows_executions.go
Normal file
@@ -0,0 +1,255 @@
|
||||
package mistral
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"somegit.dev/vikingowl/mistral-go-sdk/workflow"
|
||||
)
|
||||
|
||||
// GetWorkflowExecution retrieves a workflow execution by ID.
|
||||
func (c *Client) GetWorkflowExecution(ctx context.Context, executionID string) (*workflow.ExecutionResponse, error) {
|
||||
var resp workflow.ExecutionResponse
|
||||
if err := c.doJSON(ctx, "GET", fmt.Sprintf("/v1/workflows/executions/%s", executionID), nil, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
// GetWorkflowExecutionHistory retrieves the history of a workflow execution.
|
||||
func (c *Client) GetWorkflowExecutionHistory(ctx context.Context, executionID string, decodePayloads *bool) (json.RawMessage, error) {
|
||||
path := fmt.Sprintf("/v1/workflows/executions/%s/history", executionID)
|
||||
if decodePayloads != nil {
|
||||
path += "?decode_payloads=" + strconv.FormatBool(*decodePayloads)
|
||||
}
|
||||
var resp json.RawMessage
|
||||
if err := c.doJSON(ctx, "GET", path, nil, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// StreamWorkflowExecution streams events for a workflow execution via SSE.
|
||||
func (c *Client) StreamWorkflowExecution(ctx context.Context, executionID string, params *workflow.StreamParams) (*WorkflowEventStream, error) {
|
||||
path := fmt.Sprintf("/v1/workflows/executions/%s/stream", executionID)
|
||||
if params != nil {
|
||||
q := url.Values{}
|
||||
if params.EventSource != nil {
|
||||
q.Set("event_source", string(*params.EventSource))
|
||||
}
|
||||
if params.LastEventID != nil {
|
||||
q.Set("last_event_id", *params.LastEventID)
|
||||
}
|
||||
if encoded := q.Encode(); encoded != "" {
|
||||
path += "?" + encoded
|
||||
}
|
||||
}
|
||||
resp, err := c.doStream(ctx, "GET", path, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return newWorkflowEventStream(resp.Body), nil
|
||||
}
|
||||
|
||||
// SignalWorkflowExecution sends a signal to a workflow execution.
|
||||
func (c *Client) SignalWorkflowExecution(ctx context.Context, executionID string, req *workflow.SignalInvocationBody) (*workflow.SignalResponse, error) {
|
||||
var resp workflow.SignalResponse
|
||||
if err := c.doJSON(ctx, "POST", fmt.Sprintf("/v1/workflows/executions/%s/signals", executionID), req, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
// QueryWorkflowExecution queries a workflow execution.
|
||||
func (c *Client) QueryWorkflowExecution(ctx context.Context, executionID string, req *workflow.QueryInvocationBody) (*workflow.QueryResponse, error) {
|
||||
var resp workflow.QueryResponse
|
||||
if err := c.doJSON(ctx, "POST", fmt.Sprintf("/v1/workflows/executions/%s/queries", executionID), req, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
// UpdateWorkflowExecution sends an update to a workflow execution.
|
||||
func (c *Client) UpdateWorkflowExecution(ctx context.Context, executionID string, req *workflow.UpdateInvocationBody) (*workflow.UpdateResponse, error) {
|
||||
var resp workflow.UpdateResponse
|
||||
if err := c.doJSON(ctx, "POST", fmt.Sprintf("/v1/workflows/executions/%s/updates", executionID), req, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
// TerminateWorkflowExecution terminates a workflow execution.
|
||||
func (c *Client) TerminateWorkflowExecution(ctx context.Context, executionID string) error {
|
||||
resp, err := c.do(ctx, "POST", fmt.Sprintf("/v1/workflows/executions/%s/terminate", executionID), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode >= 400 {
|
||||
return parseAPIError(resp)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// CancelWorkflowExecution cancels a workflow execution.
|
||||
func (c *Client) CancelWorkflowExecution(ctx context.Context, executionID string) error {
|
||||
resp, err := c.do(ctx, "POST", fmt.Sprintf("/v1/workflows/executions/%s/cancel", executionID), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode >= 400 {
|
||||
return parseAPIError(resp)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ResetWorkflowExecution resets a workflow execution to a specific event.
|
||||
func (c *Client) ResetWorkflowExecution(ctx context.Context, executionID string, req *workflow.ResetInvocationBody) error {
|
||||
return c.doJSON(ctx, "POST", fmt.Sprintf("/v1/workflows/executions/%s/reset", executionID), req, nil)
|
||||
}
|
||||
|
||||
// BatchCancelWorkflowExecutions cancels multiple workflow executions.
|
||||
func (c *Client) BatchCancelWorkflowExecutions(ctx context.Context, req *workflow.BatchExecutionBody) (*workflow.BatchExecutionResponse, error) {
|
||||
var resp workflow.BatchExecutionResponse
|
||||
if err := c.doJSON(ctx, "POST", "/v1/workflows/executions/cancel", req, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
// BatchTerminateWorkflowExecutions terminates multiple workflow executions.
|
||||
func (c *Client) BatchTerminateWorkflowExecutions(ctx context.Context, req *workflow.BatchExecutionBody) (*workflow.BatchExecutionResponse, error) {
|
||||
var resp workflow.BatchExecutionResponse
|
||||
if err := c.doJSON(ctx, "POST", "/v1/workflows/executions/terminate", req, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
// GetWorkflowExecutionTraceOTel retrieves the OpenTelemetry trace for a workflow execution.
|
||||
func (c *Client) GetWorkflowExecutionTraceOTel(ctx context.Context, executionID string) (*workflow.TraceOTelResponse, error) {
|
||||
var resp workflow.TraceOTelResponse
|
||||
if err := c.doJSON(ctx, "GET", fmt.Sprintf("/v1/workflows/executions/%s/trace/otel", executionID), nil, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
// GetWorkflowExecutionTraceSummary retrieves the trace summary for a workflow execution.
|
||||
func (c *Client) GetWorkflowExecutionTraceSummary(ctx context.Context, executionID string) (*workflow.TraceSummaryResponse, error) {
|
||||
var resp workflow.TraceSummaryResponse
|
||||
if err := c.doJSON(ctx, "GET", fmt.Sprintf("/v1/workflows/executions/%s/trace/summary", executionID), nil, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
// GetWorkflowExecutionTraceEvents retrieves the trace events for a workflow execution.
|
||||
func (c *Client) GetWorkflowExecutionTraceEvents(ctx context.Context, executionID string, params *workflow.TraceEventsParams) (*workflow.TraceEventsResponse, error) {
|
||||
path := fmt.Sprintf("/v1/workflows/executions/%s/trace/events", executionID)
|
||||
if params != nil {
|
||||
q := url.Values{}
|
||||
if params.MergeSameIDEvents != nil {
|
||||
q.Set("merge_same_id_events", strconv.FormatBool(*params.MergeSameIDEvents))
|
||||
}
|
||||
if params.IncludeInternalEvents != nil {
|
||||
q.Set("include_internal_events", strconv.FormatBool(*params.IncludeInternalEvents))
|
||||
}
|
||||
if encoded := q.Encode(); encoded != "" {
|
||||
path += "?" + encoded
|
||||
}
|
||||
}
|
||||
var resp workflow.TraceEventsResponse
|
||||
if err := c.doJSON(ctx, "GET", path, nil, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
// WorkflowEventStream wraps the generic Stream to provide typed workflow events
|
||||
// with StreamPayload envelope metadata.
|
||||
type WorkflowEventStream struct {
|
||||
stream *Stream[json.RawMessage]
|
||||
event workflow.Event
|
||||
payload *workflow.StreamPayload
|
||||
err error
|
||||
}
|
||||
|
||||
func newWorkflowEventStream(body readCloser) *WorkflowEventStream {
|
||||
return &WorkflowEventStream{
|
||||
stream: newStream[json.RawMessage](body),
|
||||
}
|
||||
}
|
||||
|
||||
// Next advances to the next event. Returns false when done or on error.
|
||||
func (s *WorkflowEventStream) Next() bool {
|
||||
if s.err != nil {
|
||||
return false
|
||||
}
|
||||
if !s.stream.Next() {
|
||||
s.err = s.stream.Err()
|
||||
return false
|
||||
}
|
||||
var payload workflow.StreamPayload
|
||||
if err := json.Unmarshal(s.stream.Current(), &payload); err != nil {
|
||||
s.err = fmt.Errorf("mistral: decode workflow stream payload: %w", err)
|
||||
return false
|
||||
}
|
||||
event, err := workflow.UnmarshalEvent(payload.Data)
|
||||
if err != nil {
|
||||
s.err = err
|
||||
return false
|
||||
}
|
||||
s.event = event
|
||||
s.payload = &payload
|
||||
return true
|
||||
}
|
||||
|
||||
// Current returns the most recently read workflow event.
|
||||
func (s *WorkflowEventStream) Current() workflow.Event { return s.event }
|
||||
|
||||
// CurrentPayload returns the full StreamPayload envelope of the current event.
|
||||
func (s *WorkflowEventStream) CurrentPayload() *workflow.StreamPayload { return s.payload }
|
||||
|
||||
// Err returns any error encountered during streaming.
|
||||
func (s *WorkflowEventStream) Err() error { return s.err }
|
||||
|
||||
// Close releases the underlying connection.
|
||||
func (s *WorkflowEventStream) Close() error { return s.stream.Close() }
|
||||
|
||||
// ExecuteWorkflowAndWait executes a workflow and polls until completion.
|
||||
func (c *Client) ExecuteWorkflowAndWait(ctx context.Context, workflowIdentifier string, req *workflow.ExecutionRequest) (*workflow.ExecutionResponse, error) {
|
||||
execResp, err := c.ExecuteWorkflow(ctx, workflowIdentifier, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for {
|
||||
if isTerminal(execResp.Status) {
|
||||
return execResp, nil
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
}
|
||||
execResp, err = c.GetWorkflowExecution(ctx, execResp.ExecutionID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func isTerminal(s workflow.ExecutionStatus) bool {
|
||||
switch s {
|
||||
case workflow.ExecutionCompleted, workflow.ExecutionFailed,
|
||||
workflow.ExecutionCanceled, workflow.ExecutionTerminated,
|
||||
workflow.ExecutionTimedOut:
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
266
workflows_executions_test.go
Normal file
266
workflows_executions_test.go
Normal file
@@ -0,0 +1,266 @@
|
||||
package mistral
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"somegit.dev/vikingowl/mistral-go-sdk/workflow"
|
||||
)
|
||||
|
||||
func TestGetWorkflowExecution_Success(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path != "/v1/workflows/executions/exec-1" {
|
||||
t.Errorf("got path %s", r.URL.Path)
|
||||
}
|
||||
json.NewEncoder(w).Encode(map[string]any{
|
||||
"workflow_name": "my-flow", "execution_id": "exec-1",
|
||||
"root_execution_id": "exec-1", "status": "COMPLETED",
|
||||
"start_time": "2026-01-01T00:00:00Z",
|
||||
"end_time": "2026-01-01T00:01:00Z",
|
||||
"result": map[string]any{"answer": 42},
|
||||
})
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := NewClient("key", WithBaseURL(server.URL))
|
||||
resp, err := client.GetWorkflowExecution(context.Background(), "exec-1")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if resp.Status != workflow.ExecutionCompleted {
|
||||
t.Errorf("got status %q", resp.Status)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSignalWorkflowExecution_Success(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != "POST" {
|
||||
t.Errorf("got method %s", r.Method)
|
||||
}
|
||||
if r.URL.Path != "/v1/workflows/executions/exec-1/signals" {
|
||||
t.Errorf("got path %s", r.URL.Path)
|
||||
}
|
||||
var body map[string]any
|
||||
json.NewDecoder(r.Body).Decode(&body)
|
||||
if body["name"] != "approval" {
|
||||
t.Errorf("got name %v", body["name"])
|
||||
}
|
||||
w.WriteHeader(http.StatusAccepted)
|
||||
json.NewEncoder(w).Encode(map[string]any{"message": "Signal accepted"})
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := NewClient("key", WithBaseURL(server.URL))
|
||||
resp, err := client.SignalWorkflowExecution(context.Background(), "exec-1", &workflow.SignalInvocationBody{
|
||||
Name: "approval",
|
||||
Input: map[string]any{"approved": true},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if resp.Message != "Signal accepted" {
|
||||
t.Errorf("got message %q", resp.Message)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTerminateWorkflowExecution_Success(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != "POST" {
|
||||
t.Errorf("got method %s", r.Method)
|
||||
}
|
||||
if r.URL.Path != "/v1/workflows/executions/exec-1/terminate" {
|
||||
t.Errorf("got path %s", r.URL.Path)
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := NewClient("key", WithBaseURL(server.URL))
|
||||
err := client.TerminateWorkflowExecution(context.Background(), "exec-1")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBatchCancelWorkflowExecutions_Success(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != "POST" {
|
||||
t.Errorf("got method %s", r.Method)
|
||||
}
|
||||
if r.URL.Path != "/v1/workflows/executions/cancel" {
|
||||
t.Errorf("got path %s", r.URL.Path)
|
||||
}
|
||||
json.NewEncoder(w).Encode(map[string]any{
|
||||
"results": map[string]any{
|
||||
"exec-1": map[string]any{"status": "success"},
|
||||
"exec-2": map[string]any{"status": "failure", "error": "not found"},
|
||||
},
|
||||
})
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := NewClient("key", WithBaseURL(server.URL))
|
||||
resp, err := client.BatchCancelWorkflowExecutions(context.Background(), &workflow.BatchExecutionBody{
|
||||
ExecutionIDs: []string{"exec-1", "exec-2"},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if resp.Results["exec-1"].Status != "success" {
|
||||
t.Errorf("got exec-1 status %q", resp.Results["exec-1"].Status)
|
||||
}
|
||||
if resp.Results["exec-2"].Error == nil || *resp.Results["exec-2"].Error != "not found" {
|
||||
t.Errorf("expected exec-2 error")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStreamWorkflowExecution_Success(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != "GET" {
|
||||
t.Errorf("got method %s", r.Method)
|
||||
}
|
||||
if r.URL.Path != "/v1/workflows/executions/exec-1/stream" {
|
||||
t.Errorf("got path %s", r.URL.Path)
|
||||
}
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
flusher, _ := w.(http.Flusher)
|
||||
|
||||
payloads := []map[string]any{
|
||||
{
|
||||
"stream": "events",
|
||||
"data": map[string]any{
|
||||
"event_id": "evt-1", "event_timestamp": 1711929600000000000,
|
||||
"root_workflow_exec_id": "exec-1", "parent_workflow_exec_id": nil,
|
||||
"workflow_exec_id": "exec-1", "workflow_run_id": "run-1",
|
||||
"workflow_name": "my-flow", "event_type": "WORKFLOW_EXECUTION_STARTED",
|
||||
"attributes": map[string]any{},
|
||||
},
|
||||
"workflow_context": map[string]any{
|
||||
"namespace": "default", "workflow_name": "my-flow", "workflow_exec_id": "exec-1",
|
||||
},
|
||||
"broker_sequence": 1,
|
||||
},
|
||||
{
|
||||
"stream": "events",
|
||||
"data": map[string]any{
|
||||
"event_id": "evt-2", "event_timestamp": 1711929601000000000,
|
||||
"root_workflow_exec_id": "exec-1", "parent_workflow_exec_id": nil,
|
||||
"workflow_exec_id": "exec-1", "workflow_run_id": "run-1",
|
||||
"workflow_name": "my-flow", "event_type": "WORKFLOW_EXECUTION_COMPLETED",
|
||||
"attributes": map[string]any{"result": map[string]any{"value": 42, "type": "json"}},
|
||||
},
|
||||
"workflow_context": map[string]any{
|
||||
"namespace": "default", "workflow_name": "my-flow", "workflow_exec_id": "exec-1",
|
||||
},
|
||||
"broker_sequence": 2,
|
||||
},
|
||||
}
|
||||
for _, p := range payloads {
|
||||
data, _ := json.Marshal(p)
|
||||
fmt.Fprintf(w, "data: %s\n\n", data)
|
||||
flusher.Flush()
|
||||
}
|
||||
fmt.Fprint(w, "data: [DONE]\n\n")
|
||||
flusher.Flush()
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := NewClient("key", WithBaseURL(server.URL))
|
||||
stream, err := client.StreamWorkflowExecution(context.Background(), "exec-1", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer stream.Close()
|
||||
|
||||
var events []workflow.Event
|
||||
var lastPayload *workflow.StreamPayload
|
||||
for stream.Next() {
|
||||
events = append(events, stream.Current())
|
||||
lastPayload = stream.CurrentPayload()
|
||||
}
|
||||
if stream.Err() != nil {
|
||||
t.Fatal(stream.Err())
|
||||
}
|
||||
if len(events) != 2 {
|
||||
t.Fatalf("got %d events, want 2", len(events))
|
||||
}
|
||||
if _, ok := events[0].(*workflow.WorkflowExecutionStartedEvent); !ok {
|
||||
t.Errorf("expected *WorkflowExecutionStartedEvent, got %T", events[0])
|
||||
}
|
||||
if _, ok := events[1].(*workflow.WorkflowExecutionCompletedEvent); !ok {
|
||||
t.Errorf("expected *WorkflowExecutionCompletedEvent, got %T", events[1])
|
||||
}
|
||||
if lastPayload.WorkflowContext.WorkflowName != "my-flow" {
|
||||
t.Errorf("got workflow context name %q", lastPayload.WorkflowContext.WorkflowName)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetWorkflowExecutionTraceOTel_Success(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path != "/v1/workflows/executions/exec-1/trace/otel" {
|
||||
t.Errorf("got path %s", r.URL.Path)
|
||||
}
|
||||
json.NewEncoder(w).Encode(map[string]any{
|
||||
"workflow_name": "my-flow", "execution_id": "exec-1",
|
||||
"root_execution_id": "exec-1", "status": "COMPLETED",
|
||||
"start_time": "2026-01-01T00:00:00Z", "data_source": "temporal",
|
||||
})
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := NewClient("key", WithBaseURL(server.URL))
|
||||
resp, err := client.GetWorkflowExecutionTraceOTel(context.Background(), "exec-1")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if resp.DataSource != "temporal" {
|
||||
t.Errorf("got data_source %q", resp.DataSource)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExecuteWorkflowAndWait_Success(t *testing.T) {
|
||||
calls := 0
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
switch {
|
||||
case r.Method == "POST" && r.URL.Path == "/v1/workflows/wf-1/execute":
|
||||
json.NewEncoder(w).Encode(map[string]any{
|
||||
"workflow_name": "my-flow", "execution_id": "exec-1",
|
||||
"root_execution_id": "exec-1", "status": "RUNNING",
|
||||
"start_time": "2026-01-01T00:00:00Z",
|
||||
})
|
||||
case r.Method == "GET" && r.URL.Path == "/v1/workflows/executions/exec-1":
|
||||
calls++
|
||||
status := "RUNNING"
|
||||
if calls >= 2 {
|
||||
status = "COMPLETED"
|
||||
}
|
||||
resp := map[string]any{
|
||||
"workflow_name": "my-flow", "execution_id": "exec-1",
|
||||
"root_execution_id": "exec-1", "status": status,
|
||||
"start_time": "2026-01-01T00:00:00Z",
|
||||
}
|
||||
if status == "COMPLETED" {
|
||||
resp["result"] = map[string]any{"answer": 42}
|
||||
}
|
||||
json.NewEncoder(w).Encode(resp)
|
||||
default:
|
||||
t.Errorf("unexpected %s %s", r.Method, r.URL.Path)
|
||||
}
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := NewClient("key", WithBaseURL(server.URL))
|
||||
resp, err := client.ExecuteWorkflowAndWait(context.Background(), "wf-1", &workflow.ExecutionRequest{
|
||||
Input: map[string]any{"prompt": "hello"},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if resp.Status != workflow.ExecutionCompleted {
|
||||
t.Errorf("got status %q", resp.Status)
|
||||
}
|
||||
}
|
||||
31
workflows_metrics.go
Normal file
31
workflows_metrics.go
Normal file
@@ -0,0 +1,31 @@
|
||||
package mistral
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/url"
|
||||
|
||||
"somegit.dev/vikingowl/mistral-go-sdk/workflow"
|
||||
)
|
||||
|
||||
// GetWorkflowMetrics retrieves performance metrics for a workflow.
|
||||
func (c *Client) GetWorkflowMetrics(ctx context.Context, workflowName string, params *workflow.MetricsParams) (*workflow.Metrics, error) {
|
||||
path := fmt.Sprintf("/v1/workflows/%s/metrics", workflowName)
|
||||
if params != nil {
|
||||
q := url.Values{}
|
||||
if params.StartTime != nil {
|
||||
q.Set("start_time", *params.StartTime)
|
||||
}
|
||||
if params.EndTime != nil {
|
||||
q.Set("end_time", *params.EndTime)
|
||||
}
|
||||
if encoded := q.Encode(); encoded != "" {
|
||||
path += "?" + encoded
|
||||
}
|
||||
}
|
||||
var resp workflow.Metrics
|
||||
if err := c.doJSON(ctx, "GET", path, nil, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
44
workflows_metrics_test.go
Normal file
44
workflows_metrics_test.go
Normal file
@@ -0,0 +1,44 @@
|
||||
package mistral
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"somegit.dev/vikingowl/mistral-go-sdk/workflow"
|
||||
)
|
||||
|
||||
func TestGetWorkflowMetrics_Success(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path != "/v1/workflows/my-flow/metrics" {
|
||||
t.Errorf("got path %s", r.URL.Path)
|
||||
}
|
||||
if r.URL.Query().Get("start_time") != "2026-01-01T00:00:00Z" {
|
||||
t.Errorf("got start_time %q", r.URL.Query().Get("start_time"))
|
||||
}
|
||||
json.NewEncoder(w).Encode(map[string]any{
|
||||
"execution_count": map[string]any{"value": 100},
|
||||
"success_count": map[string]any{"value": 95},
|
||||
"error_count": map[string]any{"value": 5},
|
||||
"average_latency_ms": map[string]any{"value": 1234.5},
|
||||
"latency_over_time": map[string]any{"value": [][]float64{{1711929600, 1200}, {1711929660, 1300}}},
|
||||
"retry_rate": map[string]any{"value": 0.02},
|
||||
})
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := NewClient("key", WithBaseURL(server.URL))
|
||||
start := "2026-01-01T00:00:00Z"
|
||||
resp, err := client.GetWorkflowMetrics(context.Background(), "my-flow", &workflow.MetricsParams{StartTime: &start})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if resp.ExecutionCount.Value != 100 {
|
||||
t.Errorf("got execution_count %v", resp.ExecutionCount.Value)
|
||||
}
|
||||
if resp.AverageLatencyMs.Value != 1234.5 {
|
||||
t.Errorf("got average_latency_ms %v", resp.AverageLatencyMs.Value)
|
||||
}
|
||||
}
|
||||
60
workflows_runs.go
Normal file
60
workflows_runs.go
Normal file
@@ -0,0 +1,60 @@
|
||||
package mistral
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strconv"
|
||||
|
||||
"somegit.dev/vikingowl/mistral-go-sdk/workflow"
|
||||
)
|
||||
|
||||
// ListWorkflowRuns lists workflow runs.
|
||||
func (c *Client) ListWorkflowRuns(ctx context.Context, params *workflow.RunListParams) (*workflow.ListRunsResponse, error) {
|
||||
path := "/v1/workflows/runs"
|
||||
if params != nil {
|
||||
q := url.Values{}
|
||||
if params.WorkflowIdentifier != nil {
|
||||
q.Set("workflow_identifier", *params.WorkflowIdentifier)
|
||||
}
|
||||
if params.Search != nil {
|
||||
q.Set("search", *params.Search)
|
||||
}
|
||||
if params.Status != nil {
|
||||
q.Set("status", *params.Status)
|
||||
}
|
||||
if params.PageSize != nil {
|
||||
q.Set("page_size", strconv.Itoa(*params.PageSize))
|
||||
}
|
||||
if params.NextPageToken != nil {
|
||||
q.Set("next_page_token", *params.NextPageToken)
|
||||
}
|
||||
if encoded := q.Encode(); encoded != "" {
|
||||
path += "?" + encoded
|
||||
}
|
||||
}
|
||||
var resp workflow.ListRunsResponse
|
||||
if err := c.doJSON(ctx, "GET", path, nil, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
// GetWorkflowRun retrieves a workflow run by ID.
|
||||
func (c *Client) GetWorkflowRun(ctx context.Context, runID string) (*workflow.Run, error) {
|
||||
var resp workflow.Run
|
||||
if err := c.doJSON(ctx, "GET", fmt.Sprintf("/v1/workflows/runs/%s", runID), nil, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
// GetWorkflowRunHistory retrieves the history of a workflow run.
|
||||
func (c *Client) GetWorkflowRunHistory(ctx context.Context, runID string) (json.RawMessage, error) {
|
||||
var resp json.RawMessage
|
||||
if err := c.doJSON(ctx, "GET", fmt.Sprintf("/v1/workflows/runs/%s/history", runID), nil, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
58
workflows_runs_test.go
Normal file
58
workflows_runs_test.go
Normal file
@@ -0,0 +1,58 @@
|
||||
package mistral
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestListWorkflowRuns_Success(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path != "/v1/workflows/runs" {
|
||||
t.Errorf("got path %s", r.URL.Path)
|
||||
}
|
||||
json.NewEncoder(w).Encode(map[string]any{
|
||||
"runs": []map[string]any{
|
||||
{"id": "run-1", "workflow_name": "my-flow", "execution_id": "exec-1", "status": "COMPLETED", "start_time": "2026-01-01"},
|
||||
},
|
||||
"next_page_token": "tok-1",
|
||||
})
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := NewClient("key", WithBaseURL(server.URL))
|
||||
resp, err := client.ListWorkflowRuns(context.Background(), nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(resp.Runs) != 1 {
|
||||
t.Fatalf("got %d runs", len(resp.Runs))
|
||||
}
|
||||
if resp.NextPageToken == nil || *resp.NextPageToken != "tok-1" {
|
||||
t.Errorf("got token %v", resp.NextPageToken)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetWorkflowRun_Success(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path != "/v1/workflows/runs/run-1" {
|
||||
t.Errorf("got path %s", r.URL.Path)
|
||||
}
|
||||
json.NewEncoder(w).Encode(map[string]any{
|
||||
"id": "run-1", "workflow_name": "my-flow", "execution_id": "exec-1",
|
||||
"status": "COMPLETED", "start_time": "2026-01-01",
|
||||
})
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := NewClient("key", WithBaseURL(server.URL))
|
||||
run, err := client.GetWorkflowRun(context.Background(), "run-1")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if run.ID != "run-1" {
|
||||
t.Errorf("got id %q", run.ID)
|
||||
}
|
||||
}
|
||||
39
workflows_schedules.go
Normal file
39
workflows_schedules.go
Normal file
@@ -0,0 +1,39 @@
|
||||
package mistral
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"somegit.dev/vikingowl/mistral-go-sdk/workflow"
|
||||
)
|
||||
|
||||
// ListWorkflowSchedules lists workflow schedules.
|
||||
func (c *Client) ListWorkflowSchedules(ctx context.Context) (*workflow.ScheduleListResponse, error) {
|
||||
var resp workflow.ScheduleListResponse
|
||||
if err := c.doJSON(ctx, "GET", "/v1/workflows/schedules", nil, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
// ScheduleWorkflow creates a workflow schedule.
|
||||
func (c *Client) ScheduleWorkflow(ctx context.Context, req *workflow.ScheduleRequest) (*workflow.ScheduleResponse, error) {
|
||||
var resp workflow.ScheduleResponse
|
||||
if err := c.doJSON(ctx, "POST", "/v1/workflows/schedules", req, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
// UnscheduleWorkflow removes a workflow schedule.
|
||||
func (c *Client) UnscheduleWorkflow(ctx context.Context, scheduleID string) error {
|
||||
resp, err := c.do(ctx, "DELETE", fmt.Sprintf("/v1/workflows/schedules/%s", scheduleID), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode >= 400 {
|
||||
return parseAPIError(resp)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
89
workflows_schedules_test.go
Normal file
89
workflows_schedules_test.go
Normal file
@@ -0,0 +1,89 @@
|
||||
package mistral
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"somegit.dev/vikingowl/mistral-go-sdk/workflow"
|
||||
)
|
||||
|
||||
func TestScheduleWorkflow_Success(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != "POST" {
|
||||
t.Errorf("got method %s", r.Method)
|
||||
}
|
||||
if r.URL.Path != "/v1/workflows/schedules" {
|
||||
t.Errorf("got path %s", r.URL.Path)
|
||||
}
|
||||
var body map[string]any
|
||||
json.NewDecoder(r.Body).Decode(&body)
|
||||
schedule, _ := body["schedule"].(map[string]any)
|
||||
cronExprs, _ := schedule["cron_expressions"].([]any)
|
||||
if len(cronExprs) != 1 || cronExprs[0] != "0 9 * * MON-FRI" {
|
||||
t.Errorf("got cron_expressions %v", cronExprs)
|
||||
}
|
||||
json.NewEncoder(w).Encode(map[string]any{"schedule_id": "sched-1"})
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := NewClient("key", WithBaseURL(server.URL))
|
||||
wfID := "wf-1"
|
||||
resp, err := client.ScheduleWorkflow(context.Background(), &workflow.ScheduleRequest{
|
||||
WorkflowIdentifier: &wfID,
|
||||
Schedule: workflow.ScheduleDefinition{
|
||||
Input: map[string]any{"prompt": "daily report"},
|
||||
CronExpressions: []string{"0 9 * * MON-FRI"},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if resp.ScheduleID != "sched-1" {
|
||||
t.Errorf("got schedule_id %q", resp.ScheduleID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUnscheduleWorkflow_Success(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != "DELETE" {
|
||||
t.Errorf("got method %s", r.Method)
|
||||
}
|
||||
if r.URL.Path != "/v1/workflows/schedules/sched-1" {
|
||||
t.Errorf("got path %s", r.URL.Path)
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := NewClient("key", WithBaseURL(server.URL))
|
||||
err := client.UnscheduleWorkflow(context.Background(), "sched-1")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestListWorkflowSchedules_Success(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path != "/v1/workflows/schedules" {
|
||||
t.Errorf("got path %s", r.URL.Path)
|
||||
}
|
||||
json.NewEncoder(w).Encode(map[string]any{
|
||||
"schedules": []map[string]any{
|
||||
{"schedule_id": "sched-1", "workflow_name": "my-flow", "created_at": "2026-01-01", "updated_at": "2026-01-01"},
|
||||
},
|
||||
})
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := NewClient("key", WithBaseURL(server.URL))
|
||||
resp, err := client.ListWorkflowSchedules(context.Background())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(resp.Schedules) != 1 {
|
||||
t.Fatalf("got %d schedules", len(resp.Schedules))
|
||||
}
|
||||
}
|
||||
190
workflows_test.go
Normal file
190
workflows_test.go
Normal file
@@ -0,0 +1,190 @@
|
||||
package mistral
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"somegit.dev/vikingowl/mistral-go-sdk/workflow"
|
||||
)
|
||||
|
||||
func TestListWorkflows_Success(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != "GET" {
|
||||
t.Errorf("got method %s", r.Method)
|
||||
}
|
||||
if r.URL.Path != "/v1/workflows" {
|
||||
t.Errorf("got path %s", r.URL.Path)
|
||||
}
|
||||
if r.URL.Query().Get("limit") != "10" {
|
||||
t.Errorf("got limit %q", r.URL.Query().Get("limit"))
|
||||
}
|
||||
if r.URL.Query().Get("active_only") != "true" {
|
||||
t.Errorf("got active_only %q", r.URL.Query().Get("active_only"))
|
||||
}
|
||||
json.NewEncoder(w).Encode(map[string]any{
|
||||
"workflows": []map[string]any{
|
||||
{"id": "wf-1", "name": "my-flow", "owner_id": "u1", "workspace_id": "ws1", "created_at": "2026-01-01", "updated_at": "2026-01-01"},
|
||||
},
|
||||
"next_cursor": "cur-abc",
|
||||
})
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := NewClient("key", WithBaseURL(server.URL))
|
||||
active := true
|
||||
limit := 10
|
||||
resp, err := client.ListWorkflows(context.Background(), &workflow.WorkflowListParams{
|
||||
ActiveOnly: &active,
|
||||
Limit: &limit,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(resp.Workflows) != 1 {
|
||||
t.Fatalf("got %d workflows", len(resp.Workflows))
|
||||
}
|
||||
if resp.Workflows[0].ID != "wf-1" {
|
||||
t.Errorf("got id %q", resp.Workflows[0].ID)
|
||||
}
|
||||
if resp.NextCursor == nil || *resp.NextCursor != "cur-abc" {
|
||||
t.Errorf("got cursor %v", resp.NextCursor)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetWorkflow_Success(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path != "/v1/workflows/wf-1" {
|
||||
t.Errorf("got path %s", r.URL.Path)
|
||||
}
|
||||
json.NewEncoder(w).Encode(map[string]any{
|
||||
"id": "wf-1", "name": "my-flow", "owner_id": "u1", "workspace_id": "ws1",
|
||||
"created_at": "2026-01-01", "updated_at": "2026-01-01",
|
||||
})
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := NewClient("key", WithBaseURL(server.URL))
|
||||
wf, err := client.GetWorkflow(context.Background(), "wf-1")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if wf.Name != "my-flow" {
|
||||
t.Errorf("got name %q", wf.Name)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateWorkflow_Success(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != "PUT" {
|
||||
t.Errorf("got method %s", r.Method)
|
||||
}
|
||||
var body map[string]any
|
||||
json.NewDecoder(r.Body).Decode(&body)
|
||||
if body["display_name"] != "New Name" {
|
||||
t.Errorf("got display_name %v", body["display_name"])
|
||||
}
|
||||
json.NewEncoder(w).Encode(map[string]any{
|
||||
"id": "wf-1", "name": "my-flow", "display_name": "New Name",
|
||||
"owner_id": "u1", "workspace_id": "ws1",
|
||||
"created_at": "2026-01-01", "updated_at": "2026-01-02",
|
||||
})
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := NewClient("key", WithBaseURL(server.URL))
|
||||
name := "New Name"
|
||||
wf, err := client.UpdateWorkflow(context.Background(), "wf-1", &workflow.WorkflowUpdateRequest{
|
||||
DisplayName: &name,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if wf.DisplayName == nil || *wf.DisplayName != "New Name" {
|
||||
t.Errorf("got display_name %v", wf.DisplayName)
|
||||
}
|
||||
}
|
||||
|
||||
func TestArchiveWorkflow_Success(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != "PUT" {
|
||||
t.Errorf("got method %s", r.Method)
|
||||
}
|
||||
if r.URL.Path != "/v1/workflows/wf-1/archive" {
|
||||
t.Errorf("got path %s", r.URL.Path)
|
||||
}
|
||||
json.NewEncoder(w).Encode(map[string]any{"id": "wf-1", "archived": true})
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := NewClient("key", WithBaseURL(server.URL))
|
||||
resp, err := client.ArchiveWorkflow(context.Background(), "wf-1")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !resp.Archived {
|
||||
t.Error("expected archived=true")
|
||||
}
|
||||
}
|
||||
|
||||
func TestExecuteWorkflow_Success(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != "POST" {
|
||||
t.Errorf("got method %s", r.Method)
|
||||
}
|
||||
if r.URL.Path != "/v1/workflows/wf-1/execute" {
|
||||
t.Errorf("got path %s", r.URL.Path)
|
||||
}
|
||||
var body map[string]any
|
||||
json.NewDecoder(r.Body).Decode(&body)
|
||||
input, _ := body["input"].(map[string]any)
|
||||
if input["prompt"] != "hello" {
|
||||
t.Errorf("got input %v", body["input"])
|
||||
}
|
||||
json.NewEncoder(w).Encode(map[string]any{
|
||||
"workflow_name": "my-flow", "execution_id": "exec-1",
|
||||
"root_execution_id": "exec-1", "status": "RUNNING",
|
||||
"start_time": "2026-01-01T00:00:00Z",
|
||||
})
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := NewClient("key", WithBaseURL(server.URL))
|
||||
resp, err := client.ExecuteWorkflow(context.Background(), "wf-1", &workflow.ExecutionRequest{
|
||||
Input: map[string]any{"prompt": "hello"},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if resp.ExecutionID != "exec-1" {
|
||||
t.Errorf("got execution_id %q", resp.ExecutionID)
|
||||
}
|
||||
if resp.Status != workflow.ExecutionRunning {
|
||||
t.Errorf("got status %q", resp.Status)
|
||||
}
|
||||
}
|
||||
|
||||
func TestListWorkflowRegistrations_Success(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path != "/v1/workflows/registrations" {
|
||||
t.Errorf("got path %s", r.URL.Path)
|
||||
}
|
||||
json.NewEncoder(w).Encode(map[string]any{
|
||||
"registrations": []map[string]any{
|
||||
{"id": "reg-1", "workflow_id": "wf-1", "task_queue": "default", "created_at": "2026-01-01", "updated_at": "2026-01-01"},
|
||||
},
|
||||
})
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := NewClient("key", WithBaseURL(server.URL))
|
||||
resp, err := client.ListWorkflowRegistrations(context.Background(), nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(resp.Registrations) != 1 {
|
||||
t.Fatalf("got %d registrations", len(resp.Registrations))
|
||||
}
|
||||
}
|
||||
16
workflows_workers.go
Normal file
16
workflows_workers.go
Normal file
@@ -0,0 +1,16 @@
|
||||
package mistral
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"somegit.dev/vikingowl/mistral-go-sdk/workflow"
|
||||
)
|
||||
|
||||
// GetWorkflowWorkerInfo retrieves information about the current worker.
|
||||
func (c *Client) GetWorkflowWorkerInfo(ctx context.Context) (*workflow.WorkerInfo, error) {
|
||||
var resp workflow.WorkerInfo
|
||||
if err := c.doJSON(ctx, "GET", "/v1/workflows/workers/whoami", nil, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
35
workflows_workers_test.go
Normal file
35
workflows_workers_test.go
Normal file
@@ -0,0 +1,35 @@
|
||||
package mistral
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestGetWorkflowWorkerInfo_Success(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path != "/v1/workflows/workers/whoami" {
|
||||
t.Errorf("got path %s", r.URL.Path)
|
||||
}
|
||||
json.NewEncoder(w).Encode(map[string]any{
|
||||
"scheduler_url": "https://scheduler.mistral.ai",
|
||||
"namespace": "default",
|
||||
"tls": true,
|
||||
})
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
client := NewClient("key", WithBaseURL(server.URL))
|
||||
info, err := client.GetWorkflowWorkerInfo(context.Background())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if info.Namespace != "default" {
|
||||
t.Errorf("got namespace %q", info.Namespace)
|
||||
}
|
||||
if !info.TLS {
|
||||
t.Error("expected tls=true")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user