Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion project/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions project/dagrs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ authors = [
"Xiaolong Fu <njufxl@gmail.com>",
"Zhilei Qiu <qzl2503687@gmail.com>",
]
version = "0.7.0"
version = "0.8.0"
edition = "2024"
license = "MIT OR Apache-2.0"
description = "Dagrs follows the concept of Flow-based Programming and is suitable for the execution of multiple tasks with graph-like dependencies. Dagrs has the characteristics of high performance and asynchronous execution. It provides users with a convenient programming interface."
Expand All @@ -15,7 +15,7 @@ keywords = ["DAG", "task", "async", "fbp", "tokio"]

[dependencies]
dagrs-derive = {workspace = true}
tokio = { workspace = true , features = ["rt", "sync", "rt-multi-thread", "time"] }
tokio = { workspace = true , features = ["macros", "rt", "sync", "rt-multi-thread", "time"] }
log = {workspace = true}
async-trait = {workspace = true}
futures = {workspace = true}
Expand Down
96 changes: 48 additions & 48 deletions project/dagrs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,54 +104,54 @@ Each stage is defined as a task with its dependencies and execution command. The

For more detailed info about this example, please see the [notebook.ipynb](examples/dagrs-sklearn/examples/notebook.ipynb) jupyter notebook file.

## Execution Entry Points
Dagrs no longer creates or owns a Tokio runtime internally. Runtime lifecycle is managed by callers.
### Preferred async entry
Use `async_start().await` when already in an async runtime context:
```rust
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut graph = build_graph_somehow();
graph.async_start().await?;
Ok(())
}
```
### Temporary sync adapter (deprecated)
For synchronous callers, use an externally managed runtime and call `start_with_runtime(&runtime)`:
```rust
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to create tokio runtime");
graph.start_with_runtime(&runtime)?;
```
`Graph::start()` has been removed.
`Graph::start_with_runtime()` is deprecated and planned for removal in the next major version.
## Changelog
### v0.7.0
#### 🚀 Runtime/API Changes
- **Runtime Decoupling**: Dagrs no longer creates or owns a Tokio runtime internally.
- **Primary Entry**: `Graph::async_start()` is the recommended execution entry.
- **Legacy Sync Adapter**: `Graph::start_with_runtime(&runtime)` remains temporarily for compatibility but is deprecated.
- **Removed API**: `Graph::start()` has been removed.
#### 💡 Migration
- Existing `Graph::start()` callers should migrate to:
- `async_start().await` (preferred)
- `start_with_runtime(&runtime)` (temporary sync adapter)
## Quick Start

Dagrs execution is async-only. Runtime lifecycle is managed by callers.

```rust
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut graph = build_graph_somehow();
graph.async_start().await?;
Ok(())
}
```

- Execution entry: `Graph::async_start().await`.
- `Graph::start_with_runtime()` has been removed.
- Use async channel APIs (`recv_from().await`, `send_to(...).await`, `broadcast(...).await`, `close(...).await`).

## Changelog

### v0.8.0

#### 🚀 Runtime/API Changes

- **Async-only Execution**: `Graph::async_start()` is the only execution entry point.
- **Removed Sync Adapter**: `Graph::start_with_runtime(&runtime)` has been removed.
- **Removed Blocking Channel APIs**:
- `InChannels` / `TypedInChannels`: removed `blocking_recv_from`, `blocking_map`.
- `OutChannels` / `TypedOutChannels`: removed `blocking_send_to`, `blocking_broadcast`.
- **Internal Async Cleanup**: runtime internals no longer use blocking channel paths.

#### 💡 Migration

- Replace `start_with_runtime(&runtime)` with `async_start().await`.
- Replace blocking channel calls with async methods:
- `recv_from().await`, `recv_any().await`, `map(...).await`
- `send_to(...).await`, `broadcast(...).await`, `close(...).await`

### v0.7.0

#### 🚀 Runtime/API Changes

- **Runtime Decoupling**: Dagrs no longer creates or owns a Tokio runtime internally.
- **Primary Entry**: `Graph::async_start()` is the recommended execution entry.
- **Removed API**: `Graph::start()` has been removed.

#### 💡 Migration

- Existing `Graph::start()` callers should migrate to `async_start().await`.

### v0.6.0

Expand Down
29 changes: 29 additions & 0 deletions project/dagrs/docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,35 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.8.0] - 2026-03-17

### Changed
- **Async-only Execution API**: `Graph::async_start()` is now the only execution entry point.
- **Async-only Channel API**:
- Removed all blocking channel operations from `InChannels` / `OutChannels`.
- Removed all blocking channel operations from `TypedInChannels` / `TypedOutChannels`.
- Channel closing is now asynchronous (`close(...).await`, `close_all().await` in internal paths).
- **Examples & Tests**:
- Migrated all runtime-based sync invocation examples to async (`#[tokio::main]` + `async_start().await`).
- Migrated graph execution tests to async style (`#[tokio::test]`) and removed runtime `block_on` wrappers.

### Removed
- `Graph::start_with_runtime(&runtime)` sync adapter.
- `GraphError::BlockingCallInAsyncContext` (no longer needed after sync adapter removal).
- Internal blocking channel paths (`blocking_lock` / `blocking_recv` / `blocking_send`) in `dagrs` runtime code.

### Migration
- Replace:
- `graph.start_with_runtime(&runtime)?`
- With:
- `graph.async_start().await?`
- Replace any blocking channel usage with async equivalents:
- `recv_from().await`, `recv_any().await`, `map(...).await`
- `send_to(...).await`, `broadcast(...).await`, `close(...).await`

### Planned
- **Visualization (REQ-005)**: Export DAG structure to DOT/Mermaid format (Scheduled for next release).

## [0.7.0] - 2026-03-16

### Added
Expand Down
11 changes: 3 additions & 8 deletions project/dagrs/examples/auto_relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ impl MyNode {
}
}

#[allow(deprecated)]
fn main() {
#[tokio::main]
async fn main() {
let mut node_table = NodeTable::default();

let node_name = "auto_node";
Expand All @@ -33,10 +33,5 @@ fn main() {
s -> a b,
b -> a
);
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to create tokio runtime");

g.start_with_runtime(&runtime).unwrap();
g.async_start().await.unwrap();
}
11 changes: 3 additions & 8 deletions project/dagrs/examples/compute_dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ impl Action for Compute {
}
}

#[allow(deprecated)]
fn main() {
#[tokio::main]
async fn main() {
// Initialization log.
env_logger::init();

Expand Down Expand Up @@ -92,13 +92,8 @@ fn main() {
let mut env = EnvVar::new(node_table);
env.set("base", 2usize);
graph.set_env(env);
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to create tokio runtime");

// Start executing this dag.
match graph.start_with_runtime(&runtime) {
match graph.async_start().await {
Ok(_) => {
let res = graph
.get_results::<usize>()
Expand Down
11 changes: 3 additions & 8 deletions project/dagrs/examples/conditional_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ impl Condition for VerifyGT {
}
}

#[allow(deprecated)]
fn main() {
#[tokio::main]
async fn main() {
// Initialization log.
unsafe {
env::set_var("RUST_LOG", "debug");
Expand Down Expand Up @@ -136,13 +136,8 @@ fn main() {
let mut env = EnvVar::new(node_table);
env.set("base", 2usize);
graph.set_env(env);
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to create tokio runtime");

// Start executing this dag.
match graph.start_with_runtime(&runtime) {
match graph.async_start().await {
Ok(_) => {
// Since node X's condition (VerifyGT(128)) is not met,
// execution stops at node X and never reaches node G.
Expand Down
10 changes: 3 additions & 7 deletions project/dagrs/examples/custom_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ impl MessageNode {
}
}

#[allow(deprecated)]
fn main() {
#[tokio::main]
async fn main() {
// create an empty `NodeTable`
let mut node_table = NodeTable::new();
// create a `MessageNode`
Expand All @@ -64,11 +64,7 @@ fn main() {
// create a graph with this node and run
let mut graph = Graph::new();
graph.add_node(node);
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to create tokio runtime");
match graph.start_with_runtime(&runtime) {
match graph.async_start().await {
Ok(_) => {
// verify the output of this node
let outputs = graph.get_outputs();
Expand Down
3 changes: 2 additions & 1 deletion project/dagrs/examples/dagrs-sklearn/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ dagrs = {path = "../../"}
thiserror = "2"
yaml-rust = "0.4"
log = "0.4.22"
env_logger = "0.11.6"
env_logger = "0.11.6"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
14 changes: 5 additions & 9 deletions project/dagrs/examples/dagrs-sklearn/examples/sklearn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl Action for NodeAction {
let content: Arc<(Vec<String>, Vec<String>)> =
content.unwrap().into_inner().unwrap();
let (stdout, _) = (&content.0, &content.1);
let theta = stdout.get(0).unwrap().clone();
let theta = stdout.first().unwrap().clone();
out_channels.broadcast(Content::new(theta)).await
}
Output::Err(e) => panic!("{}", e),
Expand Down Expand Up @@ -85,8 +85,8 @@ impl Action for RootAction {
}
}

#[allow(deprecated)]
fn main() {
#[tokio::main]
async fn main() {
env_logger::init();

let specific_actions: HashMap<String, Box<dyn Action>> = HashMap::from([
Expand Down Expand Up @@ -140,11 +140,7 @@ fn main() {

let root_id = *env_var.get_node_id("root").unwrap();
dag.set_env(env_var);
let runtime = dagrs::tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
dag.start_with_runtime(&runtime).unwrap();
dag.async_start().await.unwrap();

let outputs = dag.get_outputs();
let result = outputs.get(&root_id).unwrap().get_out().unwrap();
Expand All @@ -153,7 +149,7 @@ fn main() {
let acc = if cfg!(target_os = "windows") {
stdout.get(1).unwrap()
} else {
stdout.get(0).unwrap()
stdout.first().unwrap()
};
assert_eq!("Accuracy: 94.46%", acc);

Expand Down
3 changes: 1 addition & 2 deletions project/dagrs/examples/dagrs-sklearn/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ pub mod yaml_parser;
pub mod yaml_task;

use thiserror::Error;
use yaml_rust;

use crate::utils::parser::ParseError;

Expand Down Expand Up @@ -111,6 +110,6 @@ impl From<FileContentError> for ParseError {

impl From<FileNotFound> for ParseError {
fn from(value: FileNotFound) -> Self {
ParseError(value.to_string().into())
ParseError(value.to_string())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl YamlParser {
.split(' ')
.collect::<Vec<_>>();

let cmd = cmd_args.get(0).unwrap_or(&"");
let cmd = cmd_args.first().unwrap_or(&"");
let args = cmd_args[1..].iter().map(|s| s.to_string()).collect();

Ok(YamlTask::new(
Expand Down Expand Up @@ -118,7 +118,7 @@ impl Parser for YamlParser {

let succ_id = task.id();
pres.iter().for_each(|p| {
if let Some(p) = edges.get_mut(&p) {
if let Some(p) = edges.get_mut(p) {
p.push(succ_id);
} else {
edges.insert(*p, vec![succ_id]);
Expand Down
Loading
Loading