>(
+ path: Option,
+ ) -> Result> {
+ let config_path = path
+ .map(|p| p.as_ref().to_path_buf())
+ .unwrap_or_else(|| std::path::PathBuf::from("../../conf/config.yaml"));
+
+ if config_path.exists() {
+ crate::config::load_global_config(&config_path)
+ } else {
+ Ok(Self::default())
+ }
+ }
+}
diff --git a/src/config/loader.rs b/src/config/loader.rs
new file mode 100644
index 00000000..b8bf845e
--- /dev/null
+++ b/src/config/loader.rs
@@ -0,0 +1,30 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use serde_yaml::Value;
+use std::fs;
+use std::path::Path;
+
+/// Read configuration from YAML file
+pub fn read_yaml_file>(path: P) -> Result> {
+ let content = fs::read_to_string(path)?;
+ let value: Value = serde_yaml::from_str(&content)?;
+ Ok(value)
+}
+
+/// Load global configuration from file
+pub fn load_global_config>(
+ path: P,
+) -> Result> {
+ let value = read_yaml_file(path)?;
+ crate::config::GlobalConfig::from_yaml_value(value)
+}
diff --git a/src/config/log_config.rs b/src/config/log_config.rs
new file mode 100644
index 00000000..ecc88c75
--- /dev/null
+++ b/src/config/log_config.rs
@@ -0,0 +1,34 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use serde::{Deserialize, Serialize};
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LogConfig {
+ pub level: String,
+ pub format: String,
+ pub file_path: Option,
+ pub max_file_size: Option,
+ pub max_files: Option,
+}
+
+impl Default for LogConfig {
+ fn default() -> Self {
+ Self {
+ level: "info".to_string(),
+ format: "json".to_string(),
+ file_path: Some("logs/app.log".to_string()),
+ max_file_size: Some(100),
+ max_files: Some(5),
+ }
+ }
+}
diff --git a/src/config/mod.rs b/src/config/mod.rs
new file mode 100644
index 00000000..251c1f3c
--- /dev/null
+++ b/src/config/mod.rs
@@ -0,0 +1,32 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+pub mod global_config;
+pub mod loader;
+pub mod log_config;
+pub mod paths;
+pub mod python_config;
+pub mod service_config;
+pub mod storage;
+pub mod wasm_config;
+
+pub use global_config::GlobalConfig;
+pub use loader::load_global_config;
+pub use log_config::LogConfig;
+#[allow(unused_imports)]
+pub use paths::{
+ ENV_CONF, ENV_HOME, find_config_file, get_app_log_path, get_conf_dir, get_data_dir,
+ get_log_path, get_logs_dir, get_project_root, get_python_cache_dir, get_python_cwasm_path,
+ get_python_wasm_path, get_state_dir, get_state_dir_for_base, get_task_dir, get_wasm_cache_dir,
+ resolve_path,
+};
+pub use python_config::PythonConfig;
diff --git a/src/config/paths.rs b/src/config/paths.rs
new file mode 100644
index 00000000..937f6607
--- /dev/null
+++ b/src/config/paths.rs
@@ -0,0 +1,145 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::env;
+use std::fs;
+use std::path::PathBuf;
+use std::sync::OnceLock;
+
+pub const ENV_HOME: &str = "FUNCTION_STREAM_HOME";
+pub const ENV_CONF: &str = "FUNCTION_STREAM_CONF";
+
+static PROJECT_ROOT: OnceLock = OnceLock::new();
+
+pub fn get_project_root() -> &'static PathBuf {
+ PROJECT_ROOT
+ .get_or_init(|| resolve_project_root().expect("CRITICAL: Failed to resolve project root"))
+}
+
+fn resolve_project_root() -> std::io::Result {
+ if let Ok(home) = env::var(ENV_HOME) {
+ let path = PathBuf::from(&home);
+ return path.canonicalize().or(Ok(path));
+ }
+
+ if let Ok(manifest_dir) = env::var("CARGO_MANIFEST_DIR") {
+ return Ok(PathBuf::from(manifest_dir));
+ }
+
+ if let Ok(exe_path) = env::current_exe() {
+ let mut path = exe_path;
+ path.pop();
+ if path.file_name().map_or(false, |n| n == "bin") {
+ path.pop();
+ }
+ return Ok(path);
+ }
+
+ env::current_dir()
+}
+
+pub fn resolve_path(input_path: &str) -> PathBuf {
+ let path = PathBuf::from(input_path);
+ if path.is_absolute() {
+ path
+ } else {
+ get_project_root().join(path)
+ }
+}
+
+fn to_absolute_path(input_path: &str) -> PathBuf {
+ resolve_path(input_path)
+}
+
+pub fn find_config_file(config_name: &str) -> Option {
+ if let Ok(conf_env) = env::var(ENV_CONF) {
+ let path = to_absolute_path(&conf_env);
+ if path.is_file() {
+ return Some(path);
+ }
+ if path.is_dir() {
+ let full = path.join(config_name);
+ if full.exists() {
+ return Some(full);
+ }
+ }
+ }
+
+ let search_paths = vec![
+ get_conf_dir().join(config_name),
+ get_project_root().join(config_name),
+ ];
+
+ for path in search_paths {
+ if path.exists() {
+ return Some(path.canonicalize().unwrap_or(path));
+ }
+ }
+
+ None
+}
+
+fn get_or_create_sub_dir(name: &str) -> PathBuf {
+ let dir = get_project_root().join(name);
+ if !dir.exists() {
+ let _ = fs::create_dir_all(&dir);
+ }
+ dir
+}
+
+pub fn get_data_dir() -> PathBuf {
+ get_or_create_sub_dir("data")
+}
+
+pub fn get_logs_dir() -> PathBuf {
+ get_or_create_sub_dir("logs")
+}
+
+pub fn get_conf_dir() -> PathBuf {
+ get_or_create_sub_dir("conf")
+}
+
+pub fn get_task_dir() -> PathBuf {
+ get_or_create_sub_dir("data/task")
+}
+
+pub fn get_state_dir() -> PathBuf {
+ get_or_create_sub_dir("data/state")
+}
+
+pub fn get_state_dir_for_base(base: &str) -> PathBuf {
+ resolve_path(base).join("state")
+}
+
+pub fn get_app_log_path() -> PathBuf {
+ get_logs_dir().join("app.log")
+}
+
+pub fn get_log_path(relative: &str) -> PathBuf {
+ get_logs_dir().join(relative)
+}
+
+pub fn get_wasm_cache_dir() -> PathBuf {
+ get_or_create_sub_dir("data/cache/wasm-incremental")
+}
+
+pub fn get_python_cache_dir() -> PathBuf {
+ get_or_create_sub_dir("data/cache/python-runner")
+}
+
+pub fn get_python_wasm_path() -> PathBuf {
+ get_python_cache_dir().join("functionstream-python-runtime.wasm")
+}
+
+pub fn get_python_cwasm_path() -> PathBuf {
+ get_python_cache_dir().join("functionstream-python-runtime.cwasm")
+}
diff --git a/src/config/python_config.rs b/src/config/python_config.rs
new file mode 100644
index 00000000..9d539c5f
--- /dev/null
+++ b/src/config/python_config.rs
@@ -0,0 +1,69 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use serde::{Deserialize, Serialize};
+use std::path::PathBuf;
+
+pub const DEFAULT_PYTHON_WASM_FILENAME: &str = "functionstream-python-runtime.wasm";
+pub const DEFAULT_PYTHON_CWASM_FILENAME: &str = "functionstream-python-runtime.cwasm";
+
+fn default_python_wasm_path() -> String {
+ super::paths::get_python_wasm_path()
+ .to_string_lossy()
+ .to_string()
+}
+
+fn default_python_cache_dir() -> String {
+ super::paths::get_python_cache_dir()
+ .to_string_lossy()
+ .to_string()
+}
+
+fn default_true() -> bool {
+ true
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct PythonConfig {
+ #[serde(default = "default_python_wasm_path")]
+ pub wasm_path: String,
+
+ #[serde(default = "default_python_cache_dir")]
+ pub cache_dir: String,
+
+ #[serde(default = "default_true")]
+ pub enable_cache: bool,
+}
+
+impl Default for PythonConfig {
+ fn default() -> Self {
+ Self {
+ wasm_path: default_python_wasm_path(),
+ cache_dir: default_python_cache_dir(),
+ enable_cache: true,
+ }
+ }
+}
+
+impl PythonConfig {
+ pub fn wasm_path_buf(&self) -> PathBuf {
+ super::paths::resolve_path(&self.wasm_path)
+ }
+
+ pub fn cache_dir_buf(&self) -> PathBuf {
+ super::paths::resolve_path(&self.cache_dir)
+ }
+
+ pub fn cwasm_cache_path(&self) -> PathBuf {
+ super::paths::resolve_path(&self.cache_dir).join(DEFAULT_PYTHON_CWASM_FILENAME)
+ }
+}
diff --git a/src/config/service_config.rs b/src/config/service_config.rs
new file mode 100644
index 00000000..4b58b2c1
--- /dev/null
+++ b/src/config/service_config.rs
@@ -0,0 +1,40 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use serde::{Deserialize, Serialize};
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct ServiceConfig {
+ pub service_id: String,
+ pub service_name: String,
+ pub version: String,
+ pub host: String,
+ pub port: u16,
+ pub workers: Option,
+ pub worker_multiplier: Option,
+ pub debug: bool,
+}
+
+impl Default for ServiceConfig {
+ fn default() -> Self {
+ Self {
+ service_id: "default-service".to_string(),
+ service_name: "function-stream".to_string(),
+ version: "0.1.0".to_string(),
+ host: "127.0.0.1".to_string(),
+ port: 8080,
+ workers: None,
+ worker_multiplier: Some(4),
+ debug: false,
+ }
+ }
+}
diff --git a/src/config/storage.rs b/src/config/storage.rs
new file mode 100644
index 00000000..e5186648
--- /dev/null
+++ b/src/config/storage.rs
@@ -0,0 +1,120 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Storage Configuration - Storage configuration
+//
+// Defines configuration structures for state storage and task storage
+
+use serde::{Deserialize, Serialize};
+
+/// State storage factory type
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "lowercase")]
+pub enum StateStorageType {
+ /// Memory storage
+ Memory,
+ /// RocksDB storage
+ RocksDB,
+}
+
+/// RocksDB configuration options
+#[derive(Debug, Clone, Serialize, Deserialize, Default)]
+pub struct RocksDBStorageConfig {
+ // Note: dir_name is no longer used, database is stored directly in {base_dir}/state/{task_name}-{time} directory
+ // Example: data/state/my_task-1234567890
+ /// Maximum number of open files
+ pub max_open_files: Option,
+ /// Write buffer size (bytes)
+ pub write_buffer_size: Option,
+ /// Maximum number of write buffers
+ pub max_write_buffer_number: Option,
+ /// Target file size base (bytes)
+ pub target_file_size_base: Option,
+ /// Maximum bytes for level base (bytes)
+ pub max_bytes_for_level_base: Option,
+ // Note: Compression configuration is not currently supported, uses default none compression
+}
+
+/// State storage configuration
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct StateStorageConfig {
+ /// Storage type
+ #[serde(default = "default_state_storage_type")]
+ pub storage_type: StateStorageType,
+ /// Base directory path (required for RocksDB)
+ /// Final path format: {base_dir}/state/{task_name}-{created_at}
+ /// Example: if base_dir is "data", task name is "my_task", created_at is 1234567890
+ /// then the full path is: data/state/my_task-1234567890
+ /// Default uses the data directory returned by find_or_create_data_dir()
+ #[serde(default = "default_base_dir")]
+ pub base_dir: Option,
+ /// RocksDB configuration (only used when storage_type is RocksDB)
+ #[serde(default)]
+ pub rocksdb: RocksDBStorageConfig,
+}
+
+fn default_state_storage_type() -> StateStorageType {
+ StateStorageType::RocksDB
+}
+
+fn default_base_dir() -> Option {
+ // Default base directory is "data" (lowercase)
+ // In actual use, if not specified in config, should use the result of find_or_create_data_dir()
+ Some("data".to_string())
+}
+
+impl Default for StateStorageConfig {
+ fn default() -> Self {
+ Self {
+ storage_type: StateStorageType::RocksDB,
+ base_dir: default_base_dir(),
+ rocksdb: RocksDBStorageConfig::default(),
+ }
+ }
+}
+
+/// Task storage type
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "lowercase")]
+pub enum TaskStorageType {
+ /// RocksDB storage
+ RocksDB,
+}
+
+/// Task storage configuration
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct TaskStorageConfig {
+ /// Storage type
+ #[serde(default = "default_task_storage_type")]
+ pub storage_type: TaskStorageType,
+ /// Database path (optional, if None, uses default path `data/task/{task_name}`)
+ /// Default path format: `data/task/{task_name}`
+ /// Example: `data/task/my_task`
+ pub db_path: Option,
+ /// RocksDB configuration
+ #[serde(default)]
+ pub rocksdb: RocksDBStorageConfig,
+}
+
+fn default_task_storage_type() -> TaskStorageType {
+ TaskStorageType::RocksDB
+}
+
+impl Default for TaskStorageConfig {
+ fn default() -> Self {
+ Self {
+ storage_type: TaskStorageType::RocksDB,
+ db_path: None,
+ rocksdb: RocksDBStorageConfig::default(),
+ }
+ }
+}
diff --git a/src/config/wasm_config.rs b/src/config/wasm_config.rs
new file mode 100644
index 00000000..f3d3a0f1
--- /dev/null
+++ b/src/config/wasm_config.rs
@@ -0,0 +1,47 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use serde::{Deserialize, Serialize};
+
+fn default_wasm_cache_dir() -> String {
+ crate::config::paths::get_wasm_cache_dir()
+ .to_string_lossy()
+ .to_string()
+}
+
+fn default_true() -> bool {
+ true
+}
+
+fn default_max_cache_size() -> u64 {
+ 100 * 1024 * 1024
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct WasmConfig {
+ #[serde(default = "default_wasm_cache_dir")]
+ pub cache_dir: String,
+ #[serde(default = "default_true")]
+ pub enable_cache: bool,
+ #[serde(default = "default_max_cache_size")]
+ pub max_cache_size: u64,
+}
+
+impl Default for WasmConfig {
+ fn default() -> Self {
+ Self {
+ cache_dir: default_wasm_cache_dir(),
+ enable_cache: true,
+ max_cache_size: default_max_cache_size(),
+ }
+ }
+}
diff --git a/common/run.go b/src/coordinator/analyze/analysis.rs
similarity index 63%
rename from common/run.go
rename to src/coordinator/analyze/analysis.rs
index 99031f1f..6542112d 100644
--- a/common/run.go
+++ b/src/coordinator/analyze/analysis.rs
@@ -1,5 +1,3 @@
-// Copyright 2023 StreamNative, Inc.
-//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -12,25 +10,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package common
+use crate::coordinator::statement::Statement;
-import (
- "io"
- "log/slog"
- "os"
-)
+#[derive(Debug)]
+pub struct Analysis {
+ pub statement: Box,
+}
-func RunProcess(startProcess func() (io.Closer, error)) {
- process, err := startProcess()
- if err != nil {
- slog.Error(
- "Failed to start the process",
- slog.Any("error", err),
- )
- os.Exit(1)
- }
+impl Analysis {
+ pub fn new(statement: Box) -> Self {
+ Self { statement }
+ }
- WaitUntilSignal(
- process,
- )
+ pub fn statement(&self) -> &dyn Statement {
+ self.statement.as_ref()
+ }
}
diff --git a/src/coordinator/analyze/analyzer.rs b/src/coordinator/analyze/analyzer.rs
new file mode 100644
index 00000000..30552191
--- /dev/null
+++ b/src/coordinator/analyze/analyzer.rs
@@ -0,0 +1,118 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use super::Analysis;
+use crate::coordinator::execution_context::ExecutionContext;
+use crate::coordinator::statement::{
+ CreateFunction, CreatePythonFunction, DropFunction, ShowFunctions, StartFunction, Statement,
+ StatementVisitor, StatementVisitorContext, StatementVisitorResult, StopFunction,
+};
+use std::fmt;
+
+#[derive(Debug, Clone)]
+pub struct AnalyzeError {
+ pub message: String,
+}
+
+impl AnalyzeError {
+ pub fn new(message: impl Into) -> Self {
+ Self {
+ message: message.into(),
+ }
+ }
+}
+
+impl fmt::Display for AnalyzeError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "Analyze error: {}", self.message)
+ }
+}
+
+impl std::error::Error for AnalyzeError {}
+
+/// Analyzer performs semantic analysis
+pub struct Analyzer<'a> {
+ #[allow(dead_code)]
+ context: &'a ExecutionContext,
+}
+
+impl<'a> Analyzer<'a> {
+ pub fn new(context: &'a ExecutionContext) -> Self {
+ Self { context }
+ }
+
+ /// Analyze Statement and return Analysis
+ pub fn analyze(&self, stmt: &dyn Statement) -> Result {
+ let visitor_context = StatementVisitorContext::Empty;
+ let analyzed_stmt = match stmt.accept(self, &visitor_context) {
+ StatementVisitorResult::Analyze(result) => result,
+ _ => return Err(AnalyzeError::new("Analyzer should return Analyze result")),
+ };
+ Ok(Analysis::new(analyzed_stmt))
+ }
+}
+
+impl StatementVisitor for Analyzer<'_> {
+ fn visit_create_function(
+ &self,
+ stmt: &CreateFunction,
+ _context: &StatementVisitorContext,
+ ) -> StatementVisitorResult {
+ // Function source is already validated during parsing (from_properties)
+ // So we just need to check if it exists
+ let _function_source = stmt.get_function_source();
+
+ // Note: name is read from config file, not from SQL statement
+ // So we don't validate name here - it will be validated when config file is read
+ StatementVisitorResult::Analyze(Box::new(stmt.clone()))
+ }
+
+ fn visit_drop_function(
+ &self,
+ stmt: &DropFunction,
+ _context: &StatementVisitorContext,
+ ) -> StatementVisitorResult {
+ StatementVisitorResult::Analyze(Box::new(stmt.clone()))
+ }
+
+ fn visit_start_function(
+ &self,
+ stmt: &StartFunction,
+ _context: &StatementVisitorContext,
+ ) -> StatementVisitorResult {
+ StatementVisitorResult::Analyze(Box::new(stmt.clone()))
+ }
+
+ fn visit_stop_function(
+ &self,
+ stmt: &StopFunction,
+ _context: &StatementVisitorContext,
+ ) -> StatementVisitorResult {
+ StatementVisitorResult::Analyze(Box::new(stmt.clone()))
+ }
+
+ fn visit_show_functions(
+ &self,
+ stmt: &ShowFunctions,
+ _context: &StatementVisitorContext,
+ ) -> StatementVisitorResult {
+ StatementVisitorResult::Analyze(Box::new(stmt.clone()))
+ }
+
+ fn visit_create_python_function(
+ &self,
+ stmt: &CreatePythonFunction,
+ _context: &StatementVisitorContext,
+ ) -> StatementVisitorResult {
+ StatementVisitorResult::Analyze(Box::new(stmt.clone()))
+ }
+}
diff --git a/src/coordinator/analyze/mod.rs b/src/coordinator/analyze/mod.rs
new file mode 100644
index 00000000..45d1b83b
--- /dev/null
+++ b/src/coordinator/analyze/mod.rs
@@ -0,0 +1,17 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+pub mod analysis;
+pub mod analyzer;
+
+pub use analysis::Analysis;
+pub use analyzer::Analyzer;
diff --git a/src/coordinator/coordinator.rs b/src/coordinator/coordinator.rs
new file mode 100644
index 00000000..7f5d4dbb
--- /dev/null
+++ b/src/coordinator/coordinator.rs
@@ -0,0 +1,131 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::time::Instant;
+
+use anyhow::{Context, Result};
+
+use crate::coordinator::analyze::{Analysis, Analyzer};
+use crate::coordinator::dataset::ExecuteResult;
+use crate::coordinator::execution::Executor;
+use crate::coordinator::plan::{LogicalPlanVisitor, LogicalPlanner, PlanNode};
+use crate::coordinator::statement::Statement;
+use crate::runtime::taskexecutor::TaskManager;
+
+use super::execution_context::ExecutionContext;
+
+pub struct Coordinator {}
+
+impl Coordinator {
+ pub fn new() -> Self {
+ Self {}
+ }
+
+ pub fn execute(&self, stmt: &dyn Statement) -> ExecuteResult {
+ let start_time = Instant::now();
+ let context = ExecutionContext::new();
+ let execution_id = context.execution_id;
+
+ match self.execute_pipeline(&context, stmt) {
+ Ok(result) => {
+ log::debug!(
+ "[{}] Execution completed in {}ms",
+ execution_id,
+ start_time.elapsed().as_millis()
+ );
+ result
+ }
+ Err(e) => {
+ log::error!(
+ "[{}] Execution failed after {}ms. Error: {:#}",
+ execution_id,
+ start_time.elapsed().as_millis(),
+ e
+ );
+ ExecuteResult::err(format!("Execution failed: {:#}", e))
+ }
+ }
+ }
+
+ fn execute_pipeline(
+ &self,
+ context: &ExecutionContext,
+ stmt: &dyn Statement,
+ ) -> Result {
+ let analysis = self.step_analyze(context, stmt)?;
+ let plan = self.step_build_logical_plan(&analysis)?;
+ let optimized_plan = self.step_optimize(&analysis, plan)?;
+ self.step_execute(optimized_plan)
+ }
+
+ fn step_analyze(&self, context: &ExecutionContext, stmt: &dyn Statement) -> Result {
+ let start = Instant::now();
+ let analyzer = Analyzer::new(context);
+ let result = analyzer
+ .analyze(stmt)
+ .map_err(|e| anyhow::anyhow!(e))
+ .context("Analyzer phase failed");
+
+ log::debug!(
+ "[{}] Analyze phase finished in {}ms",
+ context.execution_id,
+ start.elapsed().as_millis()
+ );
+ result
+ }
+
+ fn step_build_logical_plan(&self, analysis: &Analysis) -> Result> {
+ let visitor = LogicalPlanVisitor::new();
+ let plan = visitor.visit(analysis);
+ Ok(plan)
+ }
+
+ fn step_optimize(
+ &self,
+ analysis: &Analysis,
+ plan: Box,
+ ) -> Result> {
+ let start = Instant::now();
+ let planner = LogicalPlanner::new();
+ let optimized = planner.optimize(plan, analysis);
+
+ log::debug!(
+ "Optimizer phase finished in {}ms",
+ start.elapsed().as_millis()
+ );
+ Ok(optimized)
+ }
+
+ fn step_execute(&self, plan: Box) -> Result {
+ let start = Instant::now();
+ let task_manager = match TaskManager::get() {
+ Ok(tm) => tm,
+ Err(e) => {
+ return Ok(ExecuteResult::err(format!(
+ "Failed to get TaskManager: {}",
+ e
+ )));
+ }
+ };
+ let executor = Executor::new(task_manager.clone());
+ let result = executor
+ .execute(plan.as_ref())
+ .map_err(|e| anyhow::anyhow!(e))
+ .context("Executor phase failed");
+
+ log::debug!(
+ "Executor phase finished in {}ms",
+ start.elapsed().as_millis()
+ );
+ result
+ }
+}
diff --git a/src/coordinator/dataset/data_set.rs b/src/coordinator/dataset/data_set.rs
new file mode 100644
index 00000000..23519cad
--- /dev/null
+++ b/src/coordinator/dataset/data_set.rs
@@ -0,0 +1,34 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::sync::Arc;
+
+use arrow_array::RecordBatch;
+use arrow_schema::Schema;
+
+/// Create an empty RecordBatch (0 columns, 0 rows).
+pub fn empty_record_batch() -> RecordBatch {
+ RecordBatch::new_empty(Arc::new(Schema::empty()))
+}
+
+/// DataSet interface: conversion to Arrow RecordBatch.
+/// Supertrait `Any` allows downcasting `Arc` to concrete types (e.g. `ShowFunctionsResult`).
+pub trait DataSet: Send + Sync + std::any::Any {
+ /// Convert to RecordBatch.
+ fn to_record_batch(&self) -> RecordBatch;
+}
+
+impl DataSet for RecordBatch {
+ fn to_record_batch(&self) -> RecordBatch {
+ self.clone()
+ }
+}
diff --git a/src/coordinator/dataset/execute_result.rs b/src/coordinator/dataset/execute_result.rs
new file mode 100644
index 00000000..08b3da7c
--- /dev/null
+++ b/src/coordinator/dataset/execute_result.rs
@@ -0,0 +1,59 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::fmt;
+use std::sync::Arc;
+
+use super::DataSet;
+
+#[derive(Clone)]
+pub struct ExecuteResult {
+ pub success: bool,
+ pub message: String,
+ pub data: Option>,
+}
+
+impl fmt::Debug for ExecuteResult {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("ExecuteResult")
+ .field("success", &self.success)
+ .field("message", &self.message)
+ .field("data", &self.data.as_ref().map(|_| "..."))
+ .finish()
+ }
+}
+
+impl ExecuteResult {
+ pub fn ok(message: impl Into) -> Self {
+ Self {
+ success: true,
+ message: message.into(),
+ data: None,
+ }
+ }
+
+ pub fn ok_with_data(message: impl Into, data: impl DataSet + 'static) -> Self {
+ Self {
+ success: true,
+ message: message.into(),
+ data: Some(Arc::new(data)),
+ }
+ }
+
+ pub fn err(message: impl Into) -> Self {
+ Self {
+ success: false,
+ message: message.into(),
+ data: None,
+ }
+ }
+}
diff --git a/src/coordinator/dataset/mod.rs b/src/coordinator/dataset/mod.rs
new file mode 100644
index 00000000..b72613da
--- /dev/null
+++ b/src/coordinator/dataset/mod.rs
@@ -0,0 +1,19 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+mod data_set;
+mod execute_result;
+mod show_functions_result;
+
+pub use data_set::{DataSet, empty_record_batch};
+pub use execute_result::ExecuteResult;
+pub use show_functions_result::ShowFunctionsResult;
diff --git a/src/coordinator/dataset/show_functions_result.rs b/src/coordinator/dataset/show_functions_result.rs
new file mode 100644
index 00000000..c16edf6d
--- /dev/null
+++ b/src/coordinator/dataset/show_functions_result.rs
@@ -0,0 +1,62 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::sync::Arc;
+
+use arrow_array::{RecordBatch, StringArray};
+use arrow_schema::{DataType, Field, Schema};
+
+use super::DataSet;
+use crate::storage::task::FunctionInfo;
+
+#[derive(Clone, Debug)]
+pub struct ShowFunctionsResult {
+ functions: Vec,
+}
+
+impl ShowFunctionsResult {
+ pub fn new(functions: Vec) -> Self {
+ Self { functions }
+ }
+
+ pub fn functions(&self) -> &[FunctionInfo] {
+ &self.functions
+ }
+}
+
+impl DataSet for ShowFunctionsResult {
+ fn to_record_batch(&self) -> RecordBatch {
+ let names: Vec<&str> = self.functions.iter().map(|f| f.name.as_str()).collect();
+ let types: Vec<&str> = self
+ .functions
+ .iter()
+ .map(|f| f.task_type.as_str())
+ .collect();
+ let statuses: Vec<&str> = self.functions.iter().map(|f| f.status.as_str()).collect();
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("name", DataType::Utf8, false),
+ Field::new("task_type", DataType::Utf8, false),
+ Field::new("status", DataType::Utf8, false),
+ ]));
+
+ RecordBatch::try_new(
+ schema,
+ vec![
+ Arc::new(StringArray::from(names)),
+ Arc::new(StringArray::from(types)),
+ Arc::new(StringArray::from(statuses)),
+ ],
+ )
+ .unwrap_or_else(|_| RecordBatch::new_empty(Arc::new(Schema::empty())))
+ }
+}
diff --git a/src/coordinator/execution/executor.rs b/src/coordinator/execution/executor.rs
new file mode 100644
index 00000000..2f7e000f
--- /dev/null
+++ b/src/coordinator/execution/executor.rs
@@ -0,0 +1,211 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use crate::coordinator::dataset::{ExecuteResult, ShowFunctionsResult, empty_record_batch};
+use crate::coordinator::plan::{
+ CreateFunctionPlan, CreatePythonFunctionPlan, DropFunctionPlan, PlanNode, PlanVisitor,
+ PlanVisitorContext, PlanVisitorResult, ShowFunctionsPlan, StartFunctionPlan, StopFunctionPlan,
+};
+use crate::coordinator::statement::{ConfigSource, FunctionSource};
+use crate::runtime::taskexecutor::TaskManager;
+use std::sync::Arc;
+use thiserror::Error;
+use tracing::{debug, info};
+
+#[derive(Error, Debug)]
+pub enum ExecuteError {
+ #[error("Execution failed: {0}")]
+ Internal(String),
+ #[error("IO error during execution: {0}")]
+ Io(#[from] std::io::Error),
+ #[error("Task manager error: {0}")]
+ Task(String),
+ #[error("Validation error: {0}")]
+ Validation(String),
+}
+
+pub struct Executor {
+ task_manager: Arc,
+}
+
+impl Executor {
+ pub fn new(task_manager: Arc) -> Self {
+ Self { task_manager }
+ }
+
+ pub fn execute(&self, plan: &dyn PlanNode) -> Result {
+ let timer = std::time::Instant::now();
+ let context = PlanVisitorContext::new();
+
+ let visitor_result = plan.accept(self, &context);
+
+ match visitor_result {
+ PlanVisitorResult::Execute(result) => {
+ let elapsed = timer.elapsed();
+ debug!(target: "executor", elapsed_ms = elapsed.as_millis(), "Execution completed");
+ result
+ }
+ }
+ }
+}
+
+impl PlanVisitor for Executor {
+ fn visit_create_function(
+ &self,
+ plan: &CreateFunctionPlan,
+ _context: &PlanVisitorContext,
+ ) -> PlanVisitorResult {
+ let result = (|| -> Result {
+ let function_bytes = match &plan.function_source {
+ FunctionSource::Path(path) => std::fs::read(path).map_err(|e| {
+ ExecuteError::Validation(format!("Failed to read function at {}: {}", path, e))
+ })?,
+ FunctionSource::Bytes(bytes) => bytes.clone(),
+ };
+
+ let config_bytes = match &plan.config_source {
+ Some(ConfigSource::Path(path)) => std::fs::read(path).map_err(|e| {
+ ExecuteError::Validation(format!("Failed to read config at {}: {}", path, e))
+ })?,
+ Some(ConfigSource::Bytes(bytes)) => bytes.clone(),
+ None => {
+ return Err(ExecuteError::Validation(
+ "Configuration bytes required for function creation".into(),
+ ));
+ }
+ };
+
+ info!(config_size = config_bytes.len(), "Registering Wasm task");
+ self.task_manager
+ .register_task(&config_bytes, &function_bytes)
+ .map_err(|e| ExecuteError::Task(format!("Registration failed: {:?}", e)))?;
+
+ Ok(ExecuteResult::ok_with_data(
+ "Function registered successfully",
+ empty_record_batch(),
+ ))
+ })();
+
+ PlanVisitorResult::Execute(result)
+ }
+
+ fn visit_drop_function(
+ &self,
+ plan: &DropFunctionPlan,
+ _context: &PlanVisitorContext,
+ ) -> PlanVisitorResult {
+ let result = (|| -> Result {
+ let status = self
+ .task_manager
+ .get_task_status(&plan.name)
+ .map_err(|e| ExecuteError::Task(format!("Task discovery failed: {}", e)))?;
+
+ if status.is_running() {
+ return Err(ExecuteError::Validation(format!(
+ "Task '{}' is currently running. Use FORCE to drop.",
+ plan.name
+ )));
+ }
+
+ self.task_manager
+ .remove_task(&plan.name)
+ .map_err(|e| ExecuteError::Task(format!("Removal failed: {}", e)))?;
+
+ Ok(ExecuteResult::ok_with_data(
+ format!("Function '{}' dropped", plan.name),
+ empty_record_batch(),
+ ))
+ })();
+
+ PlanVisitorResult::Execute(result)
+ }
+
+ fn visit_start_function(
+ &self,
+ plan: &StartFunctionPlan,
+ _context: &PlanVisitorContext,
+ ) -> PlanVisitorResult {
+ let result = self
+ .task_manager
+ .start_task(&plan.name)
+ .map(|_| {
+ ExecuteResult::ok_with_data(
+ format!("Function '{}' started", plan.name),
+ empty_record_batch(),
+ )
+ })
+ .map_err(|e| ExecuteError::Task(e.to_string()));
+
+ PlanVisitorResult::Execute(result)
+ }
+
+ fn visit_show_functions(
+ &self,
+ _plan: &ShowFunctionsPlan,
+ _context: &PlanVisitorContext,
+ ) -> PlanVisitorResult {
+ let result = (|| -> Result {
+ let functions = self.task_manager.list_all_functions();
+
+ Ok(ExecuteResult::ok_with_data(
+ format!("Found {} task(s)", functions.len()),
+ ShowFunctionsResult::new(functions),
+ ))
+ })();
+
+ PlanVisitorResult::Execute(result)
+ }
+
+ fn visit_create_python_function(
+ &self,
+ plan: &CreatePythonFunctionPlan,
+ _context: &PlanVisitorContext,
+ ) -> PlanVisitorResult {
+ let result = (|| -> Result {
+ let modules: Vec<(String, Vec)> = plan
+ .modules
+ .iter()
+ .map(|m| (m.name.clone(), m.bytes.clone()))
+ .collect();
+
+ self.task_manager
+ .register_python_task(plan.config_content.as_bytes(), &modules)
+ .map_err(|e| ExecuteError::Task(format!("Python registration failed: {}", e)))?;
+
+ Ok(ExecuteResult::ok_with_data(
+ format!("Python function '{}' deployed", plan.class_name),
+ empty_record_batch(),
+ ))
+ })();
+
+ PlanVisitorResult::Execute(result)
+ }
+
+ fn visit_stop_function(
+ &self,
+ plan: &StopFunctionPlan,
+ _context: &PlanVisitorContext,
+ ) -> PlanVisitorResult {
+ let result = self
+ .task_manager
+ .stop_task(&plan.name)
+ .map(|_| {
+ ExecuteResult::ok_with_data(
+ format!("Function '{}' stopped", plan.name),
+ empty_record_batch(),
+ )
+ })
+ .map_err(|e| ExecuteError::Task(e.to_string()));
+
+ PlanVisitorResult::Execute(result)
+ }
+}
diff --git a/src/coordinator/execution/mod.rs b/src/coordinator/execution/mod.rs
new file mode 100644
index 00000000..c0890a88
--- /dev/null
+++ b/src/coordinator/execution/mod.rs
@@ -0,0 +1,15 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+mod executor;
+
+pub use executor::{ExecuteError, Executor};
diff --git a/src/coordinator/execution_context.rs b/src/coordinator/execution_context.rs
new file mode 100644
index 00000000..41e3f106
--- /dev/null
+++ b/src/coordinator/execution_context.rs
@@ -0,0 +1,55 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::sync::atomic::{AtomicU64, Ordering};
+use std::time::{Duration, Instant};
+
+static EXECUTION_ID_GENERATOR: AtomicU64 = AtomicU64::new(1);
+
+#[derive(Debug)]
+pub struct ExecutionContext {
+ pub execution_id: u64,
+ pub start_time: Instant,
+ pub timeout: Duration,
+}
+
+impl ExecutionContext {
+ pub fn new() -> Self {
+ Self {
+ execution_id: EXECUTION_ID_GENERATOR.fetch_add(1, Ordering::SeqCst),
+ start_time: Instant::now(),
+ timeout: Duration::from_secs(30),
+ }
+ }
+
+ pub fn set_timeout(&mut self, timeout: Duration) {
+ self.timeout = timeout;
+ }
+
+ pub fn elapsed(&self) -> Duration {
+ self.start_time.elapsed()
+ }
+
+ pub fn is_timeout(&self) -> bool {
+ self.elapsed() >= self.timeout
+ }
+
+ pub fn remaining_timeout(&self) -> Duration {
+ self.timeout.saturating_sub(self.elapsed())
+ }
+}
+
+impl Default for ExecutionContext {
+ fn default() -> Self {
+ Self::new()
+ }
+}
diff --git a/src/coordinator/mod.rs b/src/coordinator/mod.rs
new file mode 100644
index 00000000..b07de414
--- /dev/null
+++ b/src/coordinator/mod.rs
@@ -0,0 +1,26 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+mod analyze;
+mod coordinator;
+mod dataset;
+mod execution;
+mod execution_context;
+mod plan;
+mod statement;
+
+pub use coordinator::Coordinator;
+pub use dataset::{DataSet, ShowFunctionsResult};
+pub use statement::{
+ CreateFunction, CreatePythonFunction, DropFunction, PythonModule, ShowFunctions, StartFunction,
+ Statement, StopFunction,
+};
diff --git a/src/coordinator/plan/create_function_plan.rs b/src/coordinator/plan/create_function_plan.rs
new file mode 100644
index 00000000..1ec72675
--- /dev/null
+++ b/src/coordinator/plan/create_function_plan.rs
@@ -0,0 +1,42 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use super::{PlanNode, PlanVisitor, PlanVisitorContext, PlanVisitorResult};
+use crate::coordinator::statement::{ConfigSource, FunctionSource};
+use std::collections::HashMap;
+
+#[derive(Debug, Clone)]
+pub struct CreateFunctionPlan {
+ pub function_source: FunctionSource,
+ pub config_source: Option,
+ pub properties: HashMap,
+}
+
+impl CreateFunctionPlan {
+ pub fn new(
+ function_source: FunctionSource,
+ config_source: Option,
+ properties: HashMap,
+ ) -> Self {
+ Self {
+ function_source,
+ config_source,
+ properties,
+ }
+ }
+}
+
+impl PlanNode for CreateFunctionPlan {
+ fn accept(&self, visitor: &dyn PlanVisitor, context: &PlanVisitorContext) -> PlanVisitorResult {
+ visitor.visit_create_function(self, context)
+ }
+}
diff --git a/src/coordinator/plan/create_python_function_plan.rs b/src/coordinator/plan/create_python_function_plan.rs
new file mode 100644
index 00000000..7591e1bd
--- /dev/null
+++ b/src/coordinator/plan/create_python_function_plan.rs
@@ -0,0 +1,37 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use super::{PlanNode, PlanVisitor, PlanVisitorContext, PlanVisitorResult};
+use crate::coordinator::statement::PythonModule;
+
+#[derive(Debug, Clone)]
+pub struct CreatePythonFunctionPlan {
+ pub class_name: String,
+ pub modules: Vec,
+ pub config_content: String,
+}
+
+impl CreatePythonFunctionPlan {
+ pub fn new(class_name: String, modules: Vec, config_content: String) -> Self {
+ Self {
+ class_name,
+ modules,
+ config_content,
+ }
+ }
+}
+
+impl PlanNode for CreatePythonFunctionPlan {
+ fn accept(&self, visitor: &dyn PlanVisitor, context: &PlanVisitorContext) -> PlanVisitorResult {
+ visitor.visit_create_python_function(self, context)
+ }
+}
diff --git a/src/coordinator/plan/drop_function_plan.rs b/src/coordinator/plan/drop_function_plan.rs
new file mode 100644
index 00000000..5af51ed9
--- /dev/null
+++ b/src/coordinator/plan/drop_function_plan.rs
@@ -0,0 +1,30 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use super::{PlanNode, PlanVisitor, PlanVisitorContext, PlanVisitorResult};
+
+#[derive(Debug, Clone)]
+pub struct DropFunctionPlan {
+ pub name: String,
+}
+
+impl DropFunctionPlan {
+ pub fn new(name: String) -> Self {
+ Self { name }
+ }
+}
+
+impl PlanNode for DropFunctionPlan {
+ fn accept(&self, visitor: &dyn PlanVisitor, context: &PlanVisitorContext) -> PlanVisitorResult {
+ visitor.visit_drop_function(self, context)
+ }
+}
diff --git a/src/coordinator/plan/logical_plan_visitor.rs b/src/coordinator/plan/logical_plan_visitor.rs
new file mode 100644
index 00000000..536fec37
--- /dev/null
+++ b/src/coordinator/plan/logical_plan_visitor.rs
@@ -0,0 +1,109 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use crate::coordinator::analyze::analysis::Analysis;
+use crate::coordinator::plan::{
+ CreateFunctionPlan, CreatePythonFunctionPlan, DropFunctionPlan, PlanNode, ShowFunctionsPlan,
+ StartFunctionPlan, StopFunctionPlan,
+};
+use crate::coordinator::statement::{
+ CreateFunction, CreatePythonFunction, DropFunction, ShowFunctions, StartFunction,
+ StatementVisitor, StatementVisitorContext, StatementVisitorResult, StopFunction,
+};
+
+#[derive(Debug, Default)]
+pub struct LogicalPlanVisitor;
+
+impl LogicalPlanVisitor {
+ pub fn new() -> Self {
+ Self
+ }
+
+ pub fn visit(&self, analysis: &Analysis) -> Box {
+ let context = StatementVisitorContext::Empty;
+ let stmt = analysis.statement();
+
+ let result = stmt.accept(self, &context);
+
+ match result {
+ StatementVisitorResult::Plan(plan) => plan,
+ _ => panic!("LogicalPlanVisitor should return Plan"),
+ }
+ }
+}
+
+impl StatementVisitor for LogicalPlanVisitor {
+ fn visit_create_function(
+ &self,
+ stmt: &CreateFunction,
+ _context: &StatementVisitorContext,
+ ) -> StatementVisitorResult {
+ let function_source = stmt.get_function_source().clone();
+ let config_source = stmt.get_config_source().cloned();
+ let extra_props = stmt.get_extra_properties().clone();
+
+ // Name will be read from config file during execution
+ StatementVisitorResult::Plan(Box::new(CreateFunctionPlan::new(
+ function_source,
+ config_source,
+ extra_props,
+ )))
+ }
+
+ fn visit_drop_function(
+ &self,
+ stmt: &DropFunction,
+ _context: &StatementVisitorContext,
+ ) -> StatementVisitorResult {
+ StatementVisitorResult::Plan(Box::new(DropFunctionPlan::new(stmt.name.clone())))
+ }
+
+ fn visit_start_function(
+ &self,
+ stmt: &StartFunction,
+ _context: &StatementVisitorContext,
+ ) -> StatementVisitorResult {
+ StatementVisitorResult::Plan(Box::new(StartFunctionPlan::new(stmt.name.clone())))
+ }
+
+ fn visit_stop_function(
+ &self,
+ stmt: &StopFunction,
+ _context: &StatementVisitorContext,
+ ) -> StatementVisitorResult {
+ StatementVisitorResult::Plan(Box::new(StopFunctionPlan::new(stmt.name.clone())))
+ }
+
+ fn visit_show_functions(
+ &self,
+ _stmt: &ShowFunctions,
+ _context: &StatementVisitorContext,
+ ) -> StatementVisitorResult {
+ StatementVisitorResult::Plan(Box::new(ShowFunctionsPlan::new()))
+ }
+
+ fn visit_create_python_function(
+ &self,
+ stmt: &CreatePythonFunction,
+ _context: &StatementVisitorContext,
+ ) -> StatementVisitorResult {
+ let class_name = stmt.get_class_name().to_string();
+ let modules = stmt.get_modules().to_vec();
+ let config_content = stmt.get_config_content().to_string();
+
+ StatementVisitorResult::Plan(Box::new(CreatePythonFunctionPlan::new(
+ class_name,
+ modules,
+ config_content,
+ )))
+ }
+}
diff --git a/src/coordinator/plan/mod.rs b/src/coordinator/plan/mod.rs
new file mode 100644
index 00000000..9aa403b5
--- /dev/null
+++ b/src/coordinator/plan/mod.rs
@@ -0,0 +1,37 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+mod create_function_plan;
+mod create_python_function_plan;
+mod drop_function_plan;
+mod logical_plan_visitor;
+mod optimizer;
+mod show_functions_plan;
+mod start_function_plan;
+mod stop_function_plan;
+mod visitor;
+
+pub use create_function_plan::CreateFunctionPlan;
+pub use create_python_function_plan::CreatePythonFunctionPlan;
+pub use drop_function_plan::DropFunctionPlan;
+pub use logical_plan_visitor::LogicalPlanVisitor;
+pub use optimizer::LogicalPlanner;
+pub use show_functions_plan::ShowFunctionsPlan;
+pub use start_function_plan::StartFunctionPlan;
+pub use stop_function_plan::StopFunctionPlan;
+pub use visitor::{PlanVisitor, PlanVisitorContext, PlanVisitorResult};
+
+use std::fmt;
+
+pub trait PlanNode: fmt::Debug + Send + Sync {
+ fn accept(&self, visitor: &dyn PlanVisitor, context: &PlanVisitorContext) -> PlanVisitorResult;
+}
diff --git a/src/coordinator/plan/optimizer.rs b/src/coordinator/plan/optimizer.rs
new file mode 100644
index 00000000..e6294072
--- /dev/null
+++ b/src/coordinator/plan/optimizer.rs
@@ -0,0 +1,59 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use crate::coordinator::analyze::Analysis;
+use crate::coordinator::plan::PlanNode;
+use std::fmt;
+
+pub trait PlanOptimizer: fmt::Debug + Send + Sync {
+ fn optimize(&self, plan: Box, analysis: &Analysis) -> Box;
+
+ fn name(&self) -> &str;
+}
+
+#[derive(Debug)]
+pub struct LogicalPlanner {
+ optimizers: Vec>,
+}
+
+impl LogicalPlanner {
+ pub fn new() -> Self {
+ Self {
+ optimizers: Vec::new(),
+ }
+ }
+
+ pub fn with_optimizers(optimizers: Vec>) -> Self {
+ Self { optimizers }
+ }
+
+ pub fn add_optimizer(&mut self, optimizer: Box) {
+ self.optimizers.push(optimizer);
+ }
+
+ pub fn optimize(&self, plan: Box, analysis: &Analysis) -> Box {
+ let mut optimized_plan = plan;
+
+ for optimizer in &self.optimizers {
+ log::debug!("Applying optimizer: {}", optimizer.name());
+ optimized_plan = optimizer.optimize(optimized_plan, analysis);
+ }
+
+ optimized_plan
+ }
+}
+
+impl Default for LogicalPlanner {
+ fn default() -> Self {
+ Self::new()
+ }
+}
diff --git a/src/coordinator/plan/show_functions_plan.rs b/src/coordinator/plan/show_functions_plan.rs
new file mode 100644
index 00000000..e1046401
--- /dev/null
+++ b/src/coordinator/plan/show_functions_plan.rs
@@ -0,0 +1,28 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use super::{PlanNode, PlanVisitor, PlanVisitorContext, PlanVisitorResult};
+
+#[derive(Debug, Clone, Default)]
+pub struct ShowFunctionsPlan {}
+
+impl ShowFunctionsPlan {
+ pub fn new() -> Self {
+ Self {}
+ }
+}
+
+impl PlanNode for ShowFunctionsPlan {
+ fn accept(&self, visitor: &dyn PlanVisitor, context: &PlanVisitorContext) -> PlanVisitorResult {
+ visitor.visit_show_functions(self, context)
+ }
+}
diff --git a/src/coordinator/plan/start_function_plan.rs b/src/coordinator/plan/start_function_plan.rs
new file mode 100644
index 00000000..b1c27125
--- /dev/null
+++ b/src/coordinator/plan/start_function_plan.rs
@@ -0,0 +1,30 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use super::{PlanNode, PlanVisitor, PlanVisitorContext, PlanVisitorResult};
+
+#[derive(Debug, Clone)]
+pub struct StartFunctionPlan {
+ pub name: String,
+}
+
+impl StartFunctionPlan {
+ pub fn new(name: String) -> Self {
+ Self { name }
+ }
+}
+
+impl PlanNode for StartFunctionPlan {
+ fn accept(&self, visitor: &dyn PlanVisitor, context: &PlanVisitorContext) -> PlanVisitorResult {
+ visitor.visit_start_function(self, context)
+ }
+}
diff --git a/src/coordinator/plan/stop_function_plan.rs b/src/coordinator/plan/stop_function_plan.rs
new file mode 100644
index 00000000..8f1487e9
--- /dev/null
+++ b/src/coordinator/plan/stop_function_plan.rs
@@ -0,0 +1,38 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use super::{PlanNode, PlanVisitor, PlanVisitorContext, PlanVisitorResult};
+
+#[derive(Debug, Clone)]
+pub struct StopFunctionPlan {
+ pub name: String,
+ pub graceful: bool,
+}
+
+impl StopFunctionPlan {
+ pub fn new(name: String) -> Self {
+ Self {
+ name,
+ graceful: true,
+ }
+ }
+
+ pub fn with_graceful(name: String, graceful: bool) -> Self {
+ Self { name, graceful }
+ }
+}
+
+impl PlanNode for StopFunctionPlan {
+ fn accept(&self, visitor: &dyn PlanVisitor, context: &PlanVisitorContext) -> PlanVisitorResult {
+ visitor.visit_stop_function(self, context)
+ }
+}
diff --git a/src/coordinator/plan/visitor.rs b/src/coordinator/plan/visitor.rs
new file mode 100644
index 00000000..44059c67
--- /dev/null
+++ b/src/coordinator/plan/visitor.rs
@@ -0,0 +1,87 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use super::{
+ CreateFunctionPlan, CreatePythonFunctionPlan, DropFunctionPlan, ShowFunctionsPlan,
+ StartFunctionPlan, StopFunctionPlan,
+};
+
+/// Context passed to PlanVisitor methods
+///
+/// This context can be extended in the future to include additional information
+/// needed by visitors, such as execution environment, configuration, etc.
+#[derive(Debug, Clone, Default)]
+pub struct PlanVisitorContext {
+ // Future: Add fields as needed, e.g.:
+ // pub execution_env: Option,
+ // pub config: Option,
+}
+
+impl PlanVisitorContext {
+ pub fn new() -> Self {
+ Self::default()
+ }
+}
+
+use crate::coordinator::dataset::ExecuteResult;
+use crate::coordinator::execution::ExecuteError;
+
+/// Result returned by PlanVisitor methods
+///
+/// This enum represents all possible return types from PlanVisitor implementations.
+/// Different visitors can return different types, which are wrapped in this enum.
+#[derive(Debug)]
+pub enum PlanVisitorResult {
+ /// Execute result (from Executor)
+ Execute(Result),
+ // Future: Add more result variants as needed, e.g.:
+ // Optimize(BoxedPlanNode),
+ // Analyze(Analysis),
+}
+
+pub trait PlanVisitor {
+ fn visit_create_function(
+ &self,
+ plan: &CreateFunctionPlan,
+ context: &PlanVisitorContext,
+ ) -> PlanVisitorResult;
+
+ fn visit_drop_function(
+ &self,
+ plan: &DropFunctionPlan,
+ context: &PlanVisitorContext,
+ ) -> PlanVisitorResult;
+
+ fn visit_start_function(
+ &self,
+ plan: &StartFunctionPlan,
+ context: &PlanVisitorContext,
+ ) -> PlanVisitorResult;
+
+ fn visit_stop_function(
+ &self,
+ plan: &StopFunctionPlan,
+ context: &PlanVisitorContext,
+ ) -> PlanVisitorResult;
+
+ fn visit_show_functions(
+ &self,
+ plan: &ShowFunctionsPlan,
+ context: &PlanVisitorContext,
+ ) -> PlanVisitorResult;
+
+ fn visit_create_python_function(
+ &self,
+ plan: &CreatePythonFunctionPlan,
+ context: &PlanVisitorContext,
+ ) -> PlanVisitorResult;
+}
diff --git a/src/coordinator/statement/create_function.rs b/src/coordinator/statement/create_function.rs
new file mode 100644
index 00000000..997a67e8
--- /dev/null
+++ b/src/coordinator/statement/create_function.rs
@@ -0,0 +1,137 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use super::{Statement, StatementVisitor, StatementVisitorContext, StatementVisitorResult};
+use std::collections::HashMap;
+
+/// Source of function data (either file path or bytes)
+#[derive(Debug, Clone)]
+pub enum FunctionSource {
+ Path(String),
+ Bytes(Vec),
+}
+
+/// Source of config data (either file path or bytes)
+#[derive(Debug, Clone)]
+pub enum ConfigSource {
+ Path(String),
+ Bytes(Vec),
+}
+
+#[derive(Debug, Clone)]
+pub struct CreateFunction {
+ pub function_source: FunctionSource,
+ pub config_source: Option,
+ pub properties: HashMap,
+}
+
+impl CreateFunction {
+ pub const PROP_FUNCTION_PATH: &'static str = "function_path";
+
+ pub const PROP_CONFIG_PATH: &'static str = "config_path";
+
+ pub fn from_bytes(function_bytes: Vec, config_bytes: Option>) -> Self {
+ Self {
+ function_source: FunctionSource::Bytes(function_bytes),
+ config_source: config_bytes.map(ConfigSource::Bytes),
+ properties: HashMap::new(),
+ }
+ }
+
+ pub fn from_properties(properties: HashMap) -> Result {
+ let function_source = Self::parse_function_path(&properties)?;
+ let config_source = Self::parse_config_path(&properties);
+ let extra_props = Self::extract_extra_properties(&properties);
+
+ Ok(Self {
+ function_source,
+ config_source,
+ properties: extra_props,
+ })
+ }
+
+ /// Parse function path from properties (SQL only, Path mode)
+ fn parse_function_path(properties: &HashMap) -> Result {
+ // SQL only supports function_path (file path), not function (bytes)
+ if let Some(path) = Self::get_property_ci(properties, Self::PROP_FUNCTION_PATH) {
+ return Ok(FunctionSource::Path(path));
+ }
+
+ Err(format!(
+ "Missing required property '{}' (case-insensitive). SQL only supports path mode, not bytes mode.",
+ Self::PROP_FUNCTION_PATH
+ ))
+ }
+
+ /// Parse config path from properties (SQL only, Path mode)
+ fn parse_config_path(properties: &HashMap) -> Option {
+ // SQL only supports config_path (file path), not config (bytes)
+ if let Some(path) = Self::get_property_ci(properties, Self::PROP_CONFIG_PATH) {
+ return Some(ConfigSource::Path(path));
+ }
+
+ None
+ }
+
+ /// Extract extra properties (excluding function/config related properties)
+ fn extract_extra_properties(properties: &HashMap) -> HashMap {
+ let mut extra_props = properties.clone();
+ // Remove function_path and config_path properties (case-insensitive)
+ let keys_to_remove: Vec = extra_props
+ .keys()
+ .filter(|k| {
+ let k_lower = k.to_lowercase();
+ k_lower == Self::PROP_FUNCTION_PATH || k_lower == Self::PROP_CONFIG_PATH
+ })
+ .cloned()
+ .collect();
+ for key in keys_to_remove {
+ extra_props.remove(&key);
+ }
+ extra_props
+ }
+
+ /// Find property value by case-insensitive key
+ fn get_property_ci(properties: &HashMap, key: &str) -> Option {
+ let key_lower = key.to_lowercase();
+ for (k, v) in properties {
+ if k.to_lowercase() == key_lower {
+ return Some(v.clone());
+ }
+ }
+ None
+ }
+
+ /// Get function source
+ pub fn get_function_source(&self) -> &FunctionSource {
+ &self.function_source
+ }
+
+ /// Get config source
+ pub fn get_config_source(&self) -> Option<&ConfigSource> {
+ self.config_source.as_ref()
+ }
+
+ /// Get extra properties
+ pub fn get_extra_properties(&self) -> &HashMap {
+ &self.properties
+ }
+}
+impl Statement for CreateFunction {
+ fn accept(
+ &self,
+ visitor: &dyn StatementVisitor,
+ context: &StatementVisitorContext,
+ ) -> StatementVisitorResult {
+ visitor.visit_create_function(self, context)
+ }
+}
diff --git a/src/coordinator/statement/create_python_function.rs b/src/coordinator/statement/create_python_function.rs
new file mode 100644
index 00000000..378490e1
--- /dev/null
+++ b/src/coordinator/statement/create_python_function.rs
@@ -0,0 +1,59 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use super::{Statement, StatementVisitor, StatementVisitorContext, StatementVisitorResult};
+
+/// Module information for Python function execution
+#[derive(Debug, Clone)]
+pub struct PythonModule {
+ pub name: String,
+ pub bytes: Vec,
+}
+
+#[derive(Debug, Clone)]
+pub struct CreatePythonFunction {
+ pub class_name: String,
+ pub modules: Vec,
+ pub config_content: String,
+}
+
+impl CreatePythonFunction {
+ pub fn new(class_name: String, modules: Vec, config_content: String) -> Self {
+ Self {
+ class_name,
+ modules,
+ config_content,
+ }
+ }
+
+ pub fn get_class_name(&self) -> &str {
+ &self.class_name
+ }
+
+ pub fn get_modules(&self) -> &[PythonModule] {
+ &self.modules
+ }
+
+ pub fn get_config_content(&self) -> &str {
+ &self.config_content
+ }
+}
+
+impl Statement for CreatePythonFunction {
+ fn accept(
+ &self,
+ visitor: &dyn StatementVisitor,
+ context: &StatementVisitorContext,
+ ) -> StatementVisitorResult {
+ visitor.visit_create_python_function(self, context)
+ }
+}
diff --git a/src/coordinator/statement/drop_function.rs b/src/coordinator/statement/drop_function.rs
new file mode 100644
index 00000000..bed85782
--- /dev/null
+++ b/src/coordinator/statement/drop_function.rs
@@ -0,0 +1,33 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use super::{Statement, StatementVisitor, StatementVisitorContext, StatementVisitorResult};
+
+#[derive(Debug, Clone)]
+pub struct DropFunction {
+ pub name: String,
+}
+
+impl DropFunction {
+ pub fn new(name: String) -> Self {
+ Self { name }
+ }
+}
+impl Statement for DropFunction {
+ fn accept(
+ &self,
+ visitor: &dyn StatementVisitor,
+ context: &StatementVisitorContext,
+ ) -> StatementVisitorResult {
+ visitor.visit_drop_function(self, context)
+ }
+}
diff --git a/src/coordinator/statement/mod.rs b/src/coordinator/statement/mod.rs
new file mode 100644
index 00000000..f887209c
--- /dev/null
+++ b/src/coordinator/statement/mod.rs
@@ -0,0 +1,37 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+mod create_function;
+mod create_python_function;
+mod drop_function;
+mod show_functions;
+mod start_function;
+mod stop_function;
+mod visitor;
+
+pub use create_function::{ConfigSource, CreateFunction, FunctionSource};
+pub use create_python_function::{CreatePythonFunction, PythonModule};
+pub use drop_function::DropFunction;
+pub use show_functions::ShowFunctions;
+pub use start_function::StartFunction;
+pub use stop_function::StopFunction;
+pub use visitor::{StatementVisitor, StatementVisitorContext, StatementVisitorResult};
+
+use std::fmt;
+
+pub trait Statement: fmt::Debug + Send + Sync {
+ fn accept(
+ &self,
+ visitor: &dyn StatementVisitor,
+ context: &StatementVisitorContext,
+ ) -> StatementVisitorResult;
+}
diff --git a/src/coordinator/statement/show_functions.rs b/src/coordinator/statement/show_functions.rs
new file mode 100644
index 00000000..b31983e6
--- /dev/null
+++ b/src/coordinator/statement/show_functions.rs
@@ -0,0 +1,32 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use super::{Statement, StatementVisitor, StatementVisitorContext, StatementVisitorResult};
+
+#[derive(Debug, Clone, Default)]
+pub struct ShowFunctions;
+
+impl ShowFunctions {
+ pub fn new() -> Self {
+ Self
+ }
+}
+
+impl Statement for ShowFunctions {
+ fn accept(
+ &self,
+ visitor: &dyn StatementVisitor,
+ context: &StatementVisitorContext,
+ ) -> StatementVisitorResult {
+ visitor.visit_show_functions(self, context)
+ }
+}
diff --git a/src/coordinator/statement/start_function.rs b/src/coordinator/statement/start_function.rs
new file mode 100644
index 00000000..fe2c7861
--- /dev/null
+++ b/src/coordinator/statement/start_function.rs
@@ -0,0 +1,34 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use super::{Statement, StatementVisitor, StatementVisitorContext, StatementVisitorResult};
+
+#[derive(Debug, Clone)]
+pub struct StartFunction {
+ pub name: String,
+}
+
+impl StartFunction {
+ pub fn new(name: String) -> Self {
+ Self { name }
+ }
+}
+
+impl Statement for StartFunction {
+ fn accept(
+ &self,
+ visitor: &dyn StatementVisitor,
+ context: &StatementVisitorContext,
+ ) -> StatementVisitorResult {
+ visitor.visit_start_function(self, context)
+ }
+}
diff --git a/src/coordinator/statement/stop_function.rs b/src/coordinator/statement/stop_function.rs
new file mode 100644
index 00000000..ee48b378
--- /dev/null
+++ b/src/coordinator/statement/stop_function.rs
@@ -0,0 +1,34 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use super::{Statement, StatementVisitor, StatementVisitorContext, StatementVisitorResult};
+
+#[derive(Debug, Clone)]
+pub struct StopFunction {
+ pub name: String,
+}
+
+impl StopFunction {
+ pub fn new(name: String) -> Self {
+ Self { name }
+ }
+}
+
+impl Statement for StopFunction {
+ fn accept(
+ &self,
+ visitor: &dyn StatementVisitor,
+ context: &StatementVisitorContext,
+ ) -> StatementVisitorResult {
+ visitor.visit_stop_function(self, context)
+ }
+}
diff --git a/src/coordinator/statement/visitor.rs b/src/coordinator/statement/visitor.rs
new file mode 100644
index 00000000..13ce2cfc
--- /dev/null
+++ b/src/coordinator/statement/visitor.rs
@@ -0,0 +1,90 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use super::{
+ CreateFunction, CreatePythonFunction, DropFunction, ShowFunctions, StartFunction, StopFunction,
+};
+use crate::coordinator::plan::PlanNode;
+use crate::coordinator::statement::Statement;
+
+/// Context passed to StatementVisitor methods
+///
+/// This enum can be extended in the future to include additional context variants
+/// needed by different visitors, such as analysis context, execution context, etc.
+#[derive(Debug, Clone, Default)]
+pub enum StatementVisitorContext {
+ /// Empty context (default)
+ #[default]
+ Empty,
+ // Future: Add more context variants as needed, e.g.:
+ // Analyze(AnalyzeContext),
+ // Execute(ExecuteContext),
+}
+
+impl StatementVisitorContext {
+ pub fn new() -> Self {
+ Self::default()
+ }
+}
+
+/// Result returned by StatementVisitor methods
+///
+/// This enum represents all possible return types from StatementVisitor implementations.
+/// Different visitors can return different types, which are wrapped in this enum.
+#[derive(Debug)]
+pub enum StatementVisitorResult {
+ /// Statement (from Analyzer)
+ Analyze(Box),
+
+ /// Plan node result (from LogicalPlanVisitor)
+ Plan(Box),
+ // Future: Add more result variants as needed, e.g.:
+ // Execute(ExecuteResult),
+}
+
+pub trait StatementVisitor {
+ fn visit_create_function(
+ &self,
+ stmt: &CreateFunction,
+ context: &StatementVisitorContext,
+ ) -> StatementVisitorResult;
+
+ fn visit_drop_function(
+ &self,
+ stmt: &DropFunction,
+ context: &StatementVisitorContext,
+ ) -> StatementVisitorResult;
+
+ fn visit_start_function(
+ &self,
+ stmt: &StartFunction,
+ context: &StatementVisitorContext,
+ ) -> StatementVisitorResult;
+
+ fn visit_stop_function(
+ &self,
+ stmt: &StopFunction,
+ context: &StatementVisitorContext,
+ ) -> StatementVisitorResult;
+
+ fn visit_show_functions(
+ &self,
+ stmt: &ShowFunctions,
+ context: &StatementVisitorContext,
+ ) -> StatementVisitorResult;
+
+ fn visit_create_python_function(
+ &self,
+ stmt: &CreatePythonFunction,
+ context: &StatementVisitorContext,
+ ) -> StatementVisitorResult;
+}
diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644
index 00000000..b4d0c8e8
--- /dev/null
+++ b/src/lib.rs
@@ -0,0 +1,21 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Library crate for function-stream
+
+pub mod config;
+pub mod coordinator;
+pub mod logging;
+pub mod runtime;
+pub mod server;
+pub mod sql;
+pub mod storage;
diff --git a/src/logging/mod.rs b/src/logging/mod.rs
new file mode 100644
index 00000000..f0a7b881
--- /dev/null
+++ b/src/logging/mod.rs
@@ -0,0 +1,68 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use crate::config::{LogConfig, get_app_log_path, get_log_path, get_logs_dir};
+use anyhow::Result;
+use std::fs::OpenOptions;
+use std::path::{Path, PathBuf};
+use tracing_subscriber::{EnvFilter, Registry, fmt, layer::SubscriberExt, util::SubscriberInitExt};
+
+pub fn init_logging(config: &LogConfig) -> Result<()> {
+ let (log_dir, log_file) = if let Some(ref file_path) = config.file_path {
+ let path = PathBuf::from(file_path);
+ if path.is_absolute() {
+ let dir = path
+ .parent()
+ .unwrap_or_else(|| Path::new("logs"))
+ .to_path_buf();
+ (dir, path)
+ } else {
+ let full_path = get_log_path(file_path);
+ let dir = full_path.parent().unwrap_or(&get_logs_dir()).to_path_buf();
+ (dir, full_path)
+ }
+ } else {
+ let file = get_app_log_path();
+ (get_logs_dir(), file)
+ };
+
+ std::fs::create_dir_all(&log_dir)?;
+
+ let log_level = config.level.parse::().unwrap_or_else(|_| {
+ EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"))
+ });
+
+ let file = OpenOptions::new()
+ .create(true)
+ .append(true)
+ .open(&log_file)?;
+
+ let (non_blocking, _guard) = tracing_appender::non_blocking(file);
+
+ let subscriber = Registry::default()
+ .with(log_level)
+ .with(
+ fmt::layer()
+ .with_writer(non_blocking)
+ .with_ansi(false)
+ .json(),
+ )
+ .with(fmt::layer().with_writer(std::io::stdout).with_ansi(true));
+
+ subscriber.init();
+
+ tracing::info!("Logging initialized, log file: {}", log_file.display());
+
+ std::mem::forget(_guard);
+
+ Ok(())
+}
diff --git a/src/main.rs b/src/main.rs
new file mode 100644
index 00000000..1454f132
--- /dev/null
+++ b/src/main.rs
@@ -0,0 +1,222 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+mod config;
+mod coordinator;
+mod logging;
+mod runtime;
+mod server;
+mod sql;
+mod storage;
+
+use anyhow::{Context, Result};
+use std::thread;
+use tokio::sync::oneshot;
+
+pub struct ServerHandle {
+ join_handle: Option>,
+ shutdown_tx: Option>,
+ error_rx: oneshot::Receiver,
+}
+
+impl ServerHandle {
+ pub fn stop(mut self) {
+ log::info!("Initiating server shutdown sequence...");
+
+ if let Some(tx) = self.shutdown_tx.take() {
+ if tx.send(()).is_err() {
+ log::warn!("Server shutdown signal failed to send (receiver dropped)");
+ }
+ }
+
+ if let Some(handle) = self.join_handle.take() {
+ log::info!("Waiting for server thread to finalize...");
+ if let Err(e) = handle.join() {
+ log::error!("Failed to join server thread: {:?}", e);
+ }
+ }
+
+ log::info!("Server shutdown completed.");
+ }
+
+ pub async fn wait_for_error(&mut self) -> Result<()> {
+ if let Ok(err) = (&mut self.error_rx).await {
+ return Err(err);
+ }
+ Ok(())
+ }
+}
+
+async fn wait_for_signal() -> Result {
+ #[cfg(unix)]
+ {
+ use tokio::signal::unix::{SignalKind, signal};
+ let mut sigterm = signal(SignalKind::terminate()).context("Failed to register SIGTERM")?;
+ let mut sigint = signal(SignalKind::interrupt()).context("Failed to register SIGINT")?;
+ let mut sighup = signal(SignalKind::hangup()).context("Failed to register SIGHUP")?;
+
+ tokio::select! {
+ _ = sigterm.recv() => Ok("SIGTERM".to_string()),
+ _ = sigint.recv() => Ok("SIGINT".to_string()),
+ _ = sighup.recv() => Ok("SIGHUP".to_string()),
+ }
+ }
+ #[cfg(not(unix))]
+ {
+ tokio::signal::ctrl_c()
+ .await
+ .context("Failed to listen for Ctrl+C")?;
+ Ok("Ctrl+C".to_string())
+ }
+}
+
+fn spawn_server_thread(config: config::GlobalConfig) -> Result {
+ let (shutdown_tx, shutdown_rx) = oneshot::channel();
+ let (error_tx, error_rx) = oneshot::channel();
+
+ let cpu_count = num_cpus::get();
+ let worker_threads = config.service.workers.unwrap_or_else(|| {
+ let multiplier = config.service.worker_multiplier.unwrap_or(4);
+ cpu_count * multiplier
+ });
+
+ log::info!(
+ "Spawning gRPC server thread (Workers: {}, Cores: {})",
+ worker_threads,
+ cpu_count
+ );
+
+ let handle = thread::Builder::new()
+ .name("grpc-runtime".to_string())
+ .spawn(move || {
+ let rt = match tokio::runtime::Builder::new_multi_thread()
+ .worker_threads(worker_threads)
+ .thread_name("grpc-worker")
+ .enable_all()
+ .build()
+ {
+ Ok(rt) => rt,
+ Err(e) => {
+ let _ = error_tx.send(anyhow::anyhow!("Failed to build runtime: {}", e));
+ return;
+ }
+ };
+
+ rt.block_on(async {
+ if let Err(e) = server::start_server_with_shutdown(&config, shutdown_rx, None).await
+ {
+ log::error!("Server runtime loop crashed: {}", e);
+ let _ = error_tx.send(e);
+ }
+ });
+ })
+ .context("Failed to spawn server thread")?;
+
+ Ok(ServerHandle {
+ join_handle: Some(handle),
+ shutdown_tx: Some(shutdown_tx),
+ error_rx,
+ })
+}
+
+fn setup_environment() -> Result {
+ let data_dir = config::get_data_dir();
+ let conf_dir = config::get_conf_dir();
+
+ let config = if let Some(path) = config::find_config_file("config.yaml") {
+ log::info!("Loading configuration from: {}", path.display());
+ config::load_global_config(&path)
+ .map_err(|e| anyhow::anyhow!("{}", e))
+ .context("Configuration load failed")?
+ } else {
+ log::warn!("Configuration file not found, defaulting to built-in values.");
+ config::GlobalConfig::default()
+ };
+
+ logging::init_logging(&config.logging).context("Logging initialization failed")?;
+
+ log::debug!(
+ "Environment initialized. Data: {}, Conf: {}",
+ data_dir.display(),
+ conf_dir.display()
+ );
+ Ok(config)
+}
+
+fn main() -> Result<()> {
+ // 1. Bootstrap
+ let config = match setup_environment() {
+ Ok(c) => c,
+ Err(e) => {
+ eprintln!("Bootstrap failure: {:#}", e);
+ std::process::exit(1);
+ }
+ };
+
+ config
+ .validate()
+ .map_err(|e| anyhow::anyhow!(e))
+ .context("Configuration validation failed")?;
+
+ proctitle::set_title(format!("function-stream-{}", config.service.service_id));
+ log::info!(
+ "Starting Service [Name: {}, ID: {}] on {}:{}",
+ config.service.service_name,
+ config.service.service_id,
+ config.service.host,
+ config.service.port
+ );
+
+ // 2. Component Initialization
+ let registry = server::register_components();
+ registry
+ .initialize_all(&config)
+ .context("Component initialization failed")?;
+
+ // 3. Server Startup
+ let mut server_handle = spawn_server_thread(config.clone())?;
+ log::info!("Service is running and accepting requests.");
+
+ // 4. Main Event Loop
+ let rt = tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .context("Control plane runtime failed")?;
+
+ let exit_result = rt.block_on(async {
+ tokio::select! {
+ // Case A: Server crashed internally
+ err = server_handle.wait_for_error() => {
+ log::error!("Server process exited unexpectedly.");
+ Err(err.unwrap_err())
+ }
+ // Case B: System signal received
+ sig = wait_for_signal() => {
+ log::info!("Received signal: {}. shutting down...", sig.unwrap_or_default());
+ Ok(())
+ }
+ }
+ });
+
+ // 5. Teardown
+ match exit_result {
+ Ok(_) => {
+ server_handle.stop();
+ log::info!("Service stopped gracefully.");
+ Ok(())
+ }
+ Err(e) => {
+ log::error!("Service terminated with error: {:#}", e);
+ std::process::exit(1);
+ }
+ }
+}
diff --git a/src/runtime/buffer_and_event/buffer_or_event.rs b/src/runtime/buffer_and_event/buffer_or_event.rs
new file mode 100644
index 00000000..50ae0701
--- /dev/null
+++ b/src/runtime/buffer_and_event/buffer_or_event.rs
@@ -0,0 +1,92 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#[derive(Debug)]
+pub struct BufferOrEvent {
+ /// Buffer data (byte array, if is_buffer() returns true)
+ buffer: Option>,
+ /// Whether event has priority (can skip buffer)
+ has_priority: bool,
+ /// Channel/partition information (optional)
+ channel_info: Option,
+ /// Size (bytes)
+ size: usize,
+ /// Whether more data is available
+ more_available: bool,
+ /// Whether more priority events are available
+ more_priority_events: bool,
+}
+
+impl BufferOrEvent {
+ /// Create BufferOrEvent of buffer type
+ pub fn new_buffer(
+ buffer: Vec,
+ channel_info: Option,
+ more_available: bool,
+ more_priority_events: bool,
+ ) -> Self {
+ let size = buffer.len();
+ Self {
+ buffer: Some(buffer),
+ has_priority: false,
+ channel_info,
+ size,
+ more_available,
+ more_priority_events,
+ }
+ }
+
+ /// Check if it's a buffer
+ pub fn is_buffer(&self) -> bool {
+ self.buffer.is_some()
+ }
+
+ /// Check if event has priority
+ pub fn has_priority(&self) -> bool {
+ self.has_priority
+ }
+
+ /// Get buffer data (if it's a buffer, returns reference to byte array)
+ pub fn get_buffer(&self) -> Option<&[u8]> {
+ self.buffer.as_deref()
+ }
+
+ /// Get buffer data ownership (if it's a buffer)
+ pub fn into_buffer(self) -> Option> {
+ self.buffer
+ }
+
+ /// Get channel/partition information
+ pub fn get_channel_info(&self) -> Option<&str> {
+ self.channel_info.as_deref()
+ }
+
+ /// Get size (bytes)
+ pub fn get_size(&self) -> usize {
+ self.size
+ }
+
+ /// Whether more data is available
+ pub fn more_available(&self) -> bool {
+ self.more_available
+ }
+
+ /// Whether more priority events are available
+ pub fn more_priority_events(&self) -> bool {
+ self.more_priority_events
+ }
+
+ /// Set whether more data is available
+ pub fn set_more_available(&mut self, more_available: bool) {
+ self.more_available = more_available;
+ }
+}
diff --git a/src/runtime/buffer_and_event/mod.rs b/src/runtime/buffer_and_event/mod.rs
new file mode 100644
index 00000000..cca736f6
--- /dev/null
+++ b/src/runtime/buffer_and_event/mod.rs
@@ -0,0 +1,22 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// BufferAndEvent module - Buffer and event module
+//
+// Provides BufferOrEvent implementation, unified representation of data received from network or message queue
+// Can be a buffer containing data records, or an event
+
+mod buffer_or_event;
+pub mod stream_element;
+
+pub use buffer_or_event::BufferOrEvent;
+// StreamRecord is now in the stream_element submodule, exported through stream_element
diff --git a/src/runtime/buffer_and_event/stream_element/mod.rs b/src/runtime/buffer_and_event/stream_element/mod.rs
new file mode 100644
index 00000000..5dd4cf1c
--- /dev/null
+++ b/src/runtime/buffer_and_event/stream_element/mod.rs
@@ -0,0 +1,13 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+mod stream_element;
diff --git a/src/runtime/buffer_and_event/stream_element/stream_element.rs b/src/runtime/buffer_and_event/stream_element/stream_element.rs
new file mode 100644
index 00000000..7baefaf0
--- /dev/null
+++ b/src/runtime/buffer_and_event/stream_element/stream_element.rs
@@ -0,0 +1,26 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::fmt::Debug;
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub enum StreamElementType {
+ Record,
+ Watermark,
+ LatencyMarker,
+ RecordAttributes,
+ WatermarkStatus,
+}
+
+pub trait StreamElement: Send + Sync + Debug {
+ fn get_type(&self) -> StreamElementType;
+}
diff --git a/src/runtime/common/component_state.rs b/src/runtime/common/component_state.rs
new file mode 100644
index 00000000..ea251054
--- /dev/null
+++ b/src/runtime/common/component_state.rs
@@ -0,0 +1,193 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Component State - Task component state machine
+//
+// Defines common state and control mechanisms for all task components (Input, Output, Processor, etc.)
+//
+// This is a pure state machine definition without any interface constraints
+// Each component can choose how to use these states according to its own needs
+
+/// Control task channel capacity (maximum number of tasks in fixed-length channel)
+/// Since control tasks (CheckPoint, Stop, Close) have low frequency, capacity doesn't need to be too large
+pub const CONTROL_TASK_CHANNEL_CAPACITY: usize = 10;
+
+/// Task component state
+///
+/// Represents the lifecycle state of task components (Input, Output, Processor, etc.)
+///
+/// State transition diagram:
+/// ```ignore
+/// Uninitialized -> Initialized -> Starting -> Running
+/// |
+/// v
+/// Checkpointing (checkpointing)
+/// |
+/// v
+/// Stopping -> Stopped
+/// |
+/// v
+/// Closing -> Closed
+///
+/// Error (any state can transition to error)
+/// ```
+use serde::{Deserialize, Serialize};
+
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
+pub enum ComponentState {
+ /// Uninitialized
+ #[default]
+ Uninitialized,
+ /// Initialized
+ Initialized,
+ /// Starting
+ Starting,
+ /// Running
+ Running,
+ /// Checkpointing
+ Checkpointing,
+ /// Stopping
+ Stopping,
+ /// Stopped
+ Stopped,
+ /// Closing
+ Closing,
+ /// Closed
+ Closed,
+ /// Error state
+ Error {
+ /// Error message
+ error: String,
+ },
+}
+
+impl ComponentState {
+ /// Check if state can accept new operations
+ pub fn can_accept_operations(&self) -> bool {
+ matches!(
+ self,
+ ComponentState::Initialized | ComponentState::Running | ComponentState::Stopped
+ )
+ }
+
+ /// Check if state is running
+ pub fn is_running(&self) -> bool {
+ matches!(
+ self,
+ ComponentState::Running | ComponentState::Checkpointing
+ )
+ }
+
+ /// Check if state is closed
+ pub fn is_closed(&self) -> bool {
+ matches!(self, ComponentState::Closed)
+ }
+
+ /// Check if state is in error state
+ pub fn is_error(&self) -> bool {
+ matches!(self, ComponentState::Error { .. })
+ }
+
+ /// Check if can transition from current state to target state
+ pub fn can_transition_to(&self, target: &ComponentState) -> bool {
+ use ComponentState::*;
+
+ match (self, target) {
+ // Can transition from Uninitialized to Initialized
+ (Uninitialized, Initialized) => true,
+
+ // Can transition from Initialized to Starting
+ (Initialized, Starting) => true,
+
+ // Can transition from Starting to Running
+ (Starting, Running) => true,
+
+ // Can transition from Running to Checkpointing
+ (Running, Checkpointing) => true,
+
+ // Can transition from Checkpointing back to Running
+ (Checkpointing, Running) => true,
+
+ // Can transition from Running or Checkpointing to Stopping
+ (Running, Stopping) | (Checkpointing, Stopping) => true,
+
+ // Can transition from Stopping to Stopped
+ (Stopping, Stopped) => true,
+
+ // Can restart from Stopped
+ (Stopped, Starting) => true,
+
+ // Can transition from Running, Checkpointing, or Stopped to Closing
+ (Running, Closing) | (Checkpointing, Closing) | (Stopped, Closing) => true,
+
+ // Can transition from Closing to Closed
+ (Closing, Closed) => true,
+
+ // Any state can transition to Error state
+ (_, Error { .. }) => true,
+
+ // Other transitions are not allowed
+ _ => false,
+ }
+ }
+}
+
+impl std::fmt::Display for ComponentState {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ ComponentState::Uninitialized => write!(f, "Uninitialized"),
+ ComponentState::Initialized => write!(f, "Initialized"),
+ ComponentState::Starting => write!(f, "Starting"),
+ ComponentState::Running => write!(f, "Running"),
+ ComponentState::Checkpointing => write!(f, "Checkpointing"),
+ ComponentState::Stopping => write!(f, "Stopping"),
+ ComponentState::Stopped => write!(f, "Stopped"),
+ ComponentState::Closing => write!(f, "Closing"),
+ ComponentState::Closed => write!(f, "Closed"),
+ ComponentState::Error { error } => write!(f, "Error({})", error),
+ }
+ }
+}
+
+/// Control task type
+///
+/// Used to pass various control tasks between component threads and main thread
+/// All task components should support these control tasks
+#[derive(Debug, Clone)]
+pub enum ControlTask {
+ /// Checkpoint task
+ Checkpoint {
+ /// Checkpoint ID
+ checkpoint_id: u64,
+ /// Timestamp (optional)
+ timestamp: Option,
+ },
+ /// Stop task
+ Stop {
+ /// Stop reason (optional)
+ reason: Option,
+ },
+ /// Close task
+ Close {
+ /// Close reason (optional)
+ reason: Option,
+ },
+ /// Error task
+ Error {
+ /// Error message
+ error: String,
+ /// Error type (optional)
+ error_type: Option,
+ /// Whether component should be stopped
+ should_stop: bool,
+ },
+}
diff --git a/src/runtime/common/mod.rs b/src/runtime/common/mod.rs
new file mode 100644
index 00000000..26993dd4
--- /dev/null
+++ b/src/runtime/common/mod.rs
@@ -0,0 +1,21 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Common runtime components module
+//
+// Provides common components and state definitions for runtime
+
+pub mod component_state;
+pub mod task_completion;
+
+pub use component_state::*;
+pub use task_completion::*;
diff --git a/src/runtime/common/task_completion.rs b/src/runtime/common/task_completion.rs
new file mode 100644
index 00000000..da6d19f7
--- /dev/null
+++ b/src/runtime/common/task_completion.rs
@@ -0,0 +1,283 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// TaskCompletionFlag - Task completion flag
+//
+// Used to track whether control tasks have completed processing, supports blocking wait and error message recording
+
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::{Condvar, Mutex};
+use std::time::Duration;
+
+/// Default timeout (milliseconds)
+pub const DEFAULT_COMPLETION_TIMEOUT_MS: u64 = 1000;
+
+/// Task completion result
+#[derive(Debug, Clone)]
+pub enum TaskResult {
+ /// Task completed successfully
+ Success,
+ /// Task failed
+ Error(String),
+}
+
+impl TaskResult {
+ /// Check if successful
+ pub fn is_success(&self) -> bool {
+ matches!(self, TaskResult::Success)
+ }
+
+ /// Check if failed
+ pub fn is_error(&self) -> bool {
+ matches!(self, TaskResult::Error(_))
+ }
+
+ /// Get error message
+ pub fn error_message(&self) -> Option<&str> {
+ match self {
+ TaskResult::Error(msg) => Some(msg),
+ TaskResult::Success => None,
+ }
+ }
+}
+
+/// Task completion flag
+///
+/// Used to track whether control tasks have completed processing
+///
+/// Supports:
+/// - Blocking wait (using Condvar, default timeout 1 second)
+/// - Non-blocking check
+/// - Completion notification (wake up all waiting threads)
+/// - Error message recording
+#[derive(Debug, Clone)]
+pub struct TaskCompletionFlag {
+ /// Completion flag
+ completed: Arc,
+ /// Condition variable (for blocking wait notification)
+ condvar: Arc<(Mutex, Condvar)>,
+ /// Task result (success or error message)
+ result: Arc>>,
+}
+
+impl TaskCompletionFlag {
+ /// Create new task completion flag
+ pub fn new() -> Self {
+ Self {
+ completed: Arc::new(AtomicBool::new(false)),
+ condvar: Arc::new((Mutex::new(false), Condvar::new())),
+ result: Arc::new(Mutex::new(None)),
+ }
+ }
+
+ /// Mark task as successfully completed and notify all waiting threads
+ pub fn mark_completed(&self) {
+ self.complete_with_result(TaskResult::Success);
+ }
+
+ /// Mark task as failed and notify all waiting threads
+ ///
+ /// # Arguments
+ /// - `error`: Error message
+ pub fn mark_error(&self, error: String) {
+ self.complete_with_result(TaskResult::Error(error));
+ }
+
+ /// Complete task with specified result
+ fn complete_with_result(&self, task_result: TaskResult) {
+ // Save result
+ {
+ let mut result = self.result.lock().unwrap();
+ *result = Some(task_result);
+ }
+
+ // Set completion flag
+ self.completed.store(true, Ordering::SeqCst);
+
+ // Notify all waiting threads
+ let (lock, cvar) = &*self.condvar;
+ let mut completed = lock.lock().unwrap();
+ *completed = true;
+ cvar.notify_all();
+ }
+
+ /// Check if task is completed (non-blocking)
+ pub fn is_completed(&self) -> bool {
+ self.completed.load(Ordering::SeqCst)
+ }
+
+ /// Check if task completed successfully
+ pub fn is_success(&self) -> bool {
+ if let Ok(result) = self.result.lock() {
+ result.as_ref().map(|r| r.is_success()).unwrap_or(false)
+ } else {
+ false
+ }
+ }
+
+ /// Check if task failed
+ pub fn is_error(&self) -> bool {
+ if let Ok(result) = self.result.lock() {
+ result.as_ref().map(|r| r.is_error()).unwrap_or(false)
+ } else {
+ false
+ }
+ }
+
+ /// Get task result
+ pub fn get_result(&self) -> Option {
+ self.result.lock().ok().and_then(|r| r.clone())
+ }
+
+ /// Get error message
+ pub fn get_error(&self) -> Option {
+ self.result.lock().ok().and_then(|r| {
+ r.as_ref()
+ .and_then(|res| res.error_message().map(|s| s.to_string()))
+ })
+ }
+
+ /// Blocking wait for task completion (default timeout 1 second)
+ ///
+ /// # Returns
+ /// - `Ok(())`: Task completed successfully
+ /// - `Err(String)`: Task failed or timeout
+ pub fn wait(&self) -> Result<(), String> {
+ self.wait_timeout(Duration::from_millis(DEFAULT_COMPLETION_TIMEOUT_MS))
+ }
+
+ /// Blocking wait for task completion (specified timeout)
+ ///
+ /// # Arguments
+ /// - `timeout`: Timeout duration
+ ///
+ /// # Returns
+ /// - `Ok(())`: Task completed successfully
+ /// - `Err(String)`: Task failed or timeout
+ pub fn wait_timeout(&self, timeout: Duration) -> Result<(), String> {
+ // Quick check
+ if self.is_completed() {
+ return self.check_result();
+ }
+
+ let (lock, cvar) = &*self.condvar;
+ let completed = lock.lock().unwrap();
+
+ if *completed {
+ return self.check_result();
+ }
+
+ // Use condition variable to block wait for notification
+ let result = cvar.wait_timeout(completed, timeout).unwrap();
+
+ if *result.0 || self.is_completed() {
+ self.check_result()
+ } else {
+ Err("Task completion timeout".to_string())
+ }
+ }
+
+ /// Check task result
+ fn check_result(&self) -> Result<(), String> {
+ match self.get_result() {
+ Some(TaskResult::Success) => Ok(()),
+ Some(TaskResult::Error(e)) => Err(e),
+ None => Err("Task result not set".to_string()),
+ }
+ }
+
+ /// Blocking wait for task completion (wait forever)
+ ///
+ /// # Returns
+ /// - `Ok(())`: Task completed successfully
+ /// - `Err(String)`: Task failed
+ pub fn wait_forever(&self) -> Result<(), String> {
+ if self.is_completed() {
+ return self.check_result();
+ }
+
+ let (lock, cvar) = &*self.condvar;
+ let mut completed = lock.lock().unwrap();
+
+ while !*completed && !self.is_completed() {
+ completed = cvar.wait(completed).unwrap();
+ }
+
+ self.check_result()
+ }
+}
+
+impl Default for TaskCompletionFlag {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::thread;
+
+ #[test]
+ fn test_task_completion_success() {
+ let flag = TaskCompletionFlag::new();
+
+ assert!(!flag.is_completed());
+ assert!(!flag.is_success());
+
+ flag.mark_completed();
+
+ assert!(flag.is_completed());
+ assert!(flag.is_success());
+ assert!(!flag.is_error());
+ assert!(flag.wait().is_ok());
+ }
+
+ #[test]
+ fn test_task_completion_error() {
+ let flag = TaskCompletionFlag::new();
+
+ flag.mark_error("Test error".to_string());
+
+ assert!(flag.is_completed());
+ assert!(flag.is_error());
+ assert!(!flag.is_success());
+ assert_eq!(flag.get_error(), Some("Test error".to_string()));
+ assert!(flag.wait().is_err());
+ }
+
+ #[test]
+ fn test_task_completion_wait_timeout() {
+ let flag = TaskCompletionFlag::new();
+
+ let result = flag.wait_timeout(Duration::from_millis(10));
+ assert!(result.is_err());
+ assert_eq!(result.unwrap_err(), "Task completion timeout");
+ }
+
+ #[test]
+ fn test_task_completion_cross_thread() {
+ let flag = TaskCompletionFlag::new();
+ let flag_clone = flag.clone();
+
+ let handle = thread::spawn(move || {
+ thread::sleep(Duration::from_millis(50));
+ flag_clone.mark_completed();
+ });
+
+ let result = flag.wait_timeout(Duration::from_secs(1));
+ assert!(result.is_ok());
+
+ handle.join().unwrap();
+ }
+}
diff --git a/src/runtime/input/input_source.rs b/src/runtime/input/input_source.rs
new file mode 100644
index 00000000..edfb1f31
--- /dev/null
+++ b/src/runtime/input/input_source.rs
@@ -0,0 +1,111 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// InputSource - Input source interface
+//
+// Defines the standard interface for input sources, including lifecycle management and data retrieval
+// State is uniformly managed by the runloop thread
+
+use crate::runtime::buffer_and_event::BufferOrEvent;
+use crate::runtime::taskexecutor::InitContext;
+
+// Re-export common component state for compatibility
+pub use crate::runtime::common::ComponentState as InputSourceState;
+
+/// InputSource - Input source interface
+///
+/// Defines the standard interface for input sources, including:
+/// - Lifecycle management (init, start, stop, close)
+/// - Data retrieval (get_next, poll_next)
+/// - Checkpoint support (take_checkpoint, finish_checkpoint)
+///
+/// State is uniformly managed by the runloop thread, callers don't need to directly manipulate state
+pub trait InputSource: Send + Sync {
+ fn init_with_context(
+ &mut self,
+ init_context: &InitContext,
+ ) -> Result<(), Box>;
+
+ /// Start input source
+ ///
+ /// Start reading data from input source
+ /// State is set to Running by the runloop thread
+ fn start(&mut self) -> Result<(), Box>;
+
+ /// Stop input source
+ ///
+ /// Stop reading data from input source, but keep resources available
+ /// State is set to Stopped by the runloop thread
+ fn stop(&mut self) -> Result<(), Box>;
+
+ /// Close input source
+ ///
+ /// Release all resources, the input source will no longer be usable
+ /// State is set to Closed by the runloop thread
+ fn close(&mut self) -> Result<(), Box>;
+
+ /// Get next data
+ ///
+ /// Get next BufferOrEvent from input source
+ /// Returns None to indicate no data is currently available (non-blocking)
+ ///
+ /// # Returns
+ /// - `Ok(Some(BufferOrEvent))`: Data retrieved
+ /// - `Ok(None)`: No data currently available
+ /// - `Err(...)`: Error occurred
+ fn get_next(&mut self) -> Result