The Distributed Computing module in Neurenix provides tools and utilities for distributed training and inference across multiple GPUs and compute nodes. This module enables users to scale their machine learning workloads to leverage the computational power of multiple devices, from edge devices to multi-GPU clusters.
Neurenix's distributed computing capabilities are implemented using a multi-language architecture, where the high-performance operations are implemented in the Rust/C++ Phynexus engine, while the Python API provides a user-friendly interface. The distributed systems integration is powered by Go, which provides robust networking and concurrency features. This architecture enables Neurenix to deliver optimal performance across a wide range of devices and deployment scenarios.
The distributed context manages the communication between processes in a distributed training setup. It handles initialization, synchronization, and cleanup of the distributed environment. The context includes information about the world size (total number of processes), rank (process identifier), and device mapping.
Data parallelism is a distributed training strategy where the model is replicated across multiple devices, and each device processes a different subset of the training data. The gradients from each device are then synchronized to update the model parameters. This approach is particularly effective for large datasets and models that fit on a single device.
Remote Procedure Call (RPC) enables communication between processes in a distributed environment. It allows one process to execute a function on another process, which is useful for parameter server architectures and distributed inference.
Synchronization primitives ensure consistent behavior across processes in a distributed environment. These include barriers for synchronizing all processes and synchronized batch normalization for consistent normalization statistics across devices.
Neurenix provides integrations with external distributed computing frameworks, such as Dask for scalable data processing and PyCUDA for low-level GPU operations. These integrations enable users to leverage the strengths of these frameworks within the Neurenix ecosystem.
neurenix.distributed.DistributedContext(
backend="nccl",
init_method=None,
world_size=-1,
rank=-1,
device_id=None,
timeout=1800.0
)
Context manager for distributed training.
Parameters:
- backend: Communication backend ('nccl', 'gloo', or 'mpi')
- init_method: URL specifying how to initialize the process group (optional)
- world_size: Number of processes in the group (default: auto-detect)
- rank: Rank of the current process (default: auto-detect)
- device_id: Device ID for the current process (default: auto-detect)
- timeout: Timeout for operations (default: 1800.0 seconds)
Methods:
- initialize(): Initialize the distributed context
- shutdown(): Shut down the distributed context
- is_initialized: Check if the distributed context is initialized
- is_main_process: Check if this is the main process (rank 0)
neurenix.distributed.init_distributed(
backend="nccl",
init_method=None,
world_size=-1,
rank=-1,
device_id=None
)
Initialize distributed training.
Parameters:
- backend: Communication backend ('nccl', 'gloo', or 'mpi')
- init_method: URL specifying how to initialize the process group (optional)
- world_size: Number of processes in the group (default: auto-detect)
- rank: Rank of the current process (default: auto-detect)
- device_id: Device ID for the current process (default: auto-detect)
Returns: - Distributed context
neurenix.distributed.get_rank()
neurenix.distributed.get_world_size()
neurenix.distributed.is_main_process()
neurenix.distributed.barrier()
Utility functions for distributed training.
get_rank(): Get the rank of the current processget_world_size(): Get the world size (number of processes)is_main_process(): Check if this is the main process (rank 0)barrier(): Synchronize all processesneurenix.distributed.DataParallel(module, device_ids=None)
Data parallel wrapper for modules.
Parameters:
- module: Module to parallelize
- device_ids: List of device IDs to use (default: all available devices)
Methods:
- forward(*args, **kwargs): Forward pass with data parallelism
- parameters(): Get module parameters
- to(device): Move module to device
- train(mode=True): Set module to training mode
- eval(): Set module to evaluation mode
neurenix.distributed.RpcContext(
backend="tensorpipe",
init_method=None,
world_size=-1,
rank=-1,
timeout=1800.0
)
RPC context for distributed training.
Parameters:
- backend: RPC backend ('tensorpipe' or 'gloo')
- init_method: URL specifying how to initialize the RPC (optional)
- world_size: Number of processes in the group (default: auto-detect)
- rank: Rank of the current process (default: auto-detect)
- timeout: Timeout for operations (default: 1800.0 seconds)
Methods:
- initialize(): Initialize the RPC context
- shutdown(): Shut down the RPC context
- register_function(name, func): Register a function for RPC
neurenix.distributed.init_rpc(
backend="tensorpipe",
init_method=None,
world_size=-1,
rank=-1
)
neurenix.distributed.rpc_sync(dst_rank, function_name, *args, **kwargs)
neurenix.distributed.rpc_async(dst_rank, function_name, *args, **kwargs)
Functions for RPC communication.
init_rpc(): Initialize RPCrpc_sync(): Synchronous RPC callrpc_async(): Asynchronous RPC callneurenix.distributed.SyncBatchNorm(
num_features,
eps=1e-5,
momentum=0.1,
affine=True,
track_running_stats=True
)
Synchronized Batch Normalization.
Parameters:
- num_features: Number of features
- eps: Small constant for numerical stability (default: 1e-5)
- momentum: Momentum for running statistics (default: 0.1)
- affine: Whether to use learnable affine parameters (default: True)
- track_running_stats: Whether to track running statistics (default: True)
Methods:
- forward(x): Forward pass
neurenix.distributed.DaskCluster(
n_workers=4,
threads_per_worker=2,
memory_limit="4GB",
scheduler_address=None
)
Dask cluster for distributed computing.
Parameters:
- n_workers: Number of workers (default: 4)
- threads_per_worker: Number of threads per worker (default: 2)
- memory_limit: Memory limit per worker (default: "4GB")
- scheduler_address: Address of existing scheduler to connect to (optional)
Methods:
- start(): Start the Dask cluster
- stop(): Stop the Dask cluster
- is_running: Check if the Dask cluster is running
- map(func, *iterables, **kwargs): Map a function to iterables in parallel
- submit(func, *args, **kwargs): Submit a function for execution
- gather(futures): Gather results from futures
- scatter(data, broadcast=False): Scatter data to workers
- replicate(future): Replicate data to all workers
- get_worker_info(): Get worker information
neurenix.distributed.tensor_to_dask(tensor, chunks="auto")
neurenix.distributed.dask_to_tensor(dask_array)
Functions for converting between Neurenix tensors and Dask arrays.
tensor_to_dask(): Convert a Neurenix tensor to a Dask arraydask_to_tensor(): Convert a Dask array to a Neurenix tensorneurenix.distributed.CudaContext(
device_id=0,
enable_profiling=False
)
CUDA context for GPU computing.
Parameters:
- device_id: GPU device ID (default: 0)
- enable_profiling: Whether to enable profiling (default: False)
Methods:
- start(): Start the CUDA context
- stop(): Stop the CUDA context
- is_running: Check if the CUDA context is running
- get_kernel(name, source, function_name): Get a CUDA kernel
- allocate(shape, dtype): Allocate memory on the GPU
- to_gpu(array): Copy array to GPU
- from_gpu(array): Copy array from GPU
- synchronize(): Synchronize the CUDA context
neurenix.distributed.tensor_to_gpu(tensor)
neurenix.distributed.gpu_to_tensor(gpu_array)
Functions for converting between Neurenix tensors and GPU arrays.
tensor_to_gpu(): Convert a Neurenix tensor to a GPU arraygpu_to_tensor(): Convert a GPU array to a Neurenix tensor| Feature | Neurenix | TensorFlow |
|---|---|---|
| Multi-language Architecture | Rust/C++ core, Python API, Go for distributed systems | C++ core with Python API |
| Distributed Training | Native support for multi-GPU and multi-node training | TensorFlow Distributed Strategy |
| Communication Backends | NCCL, Gloo, MPI | gRPC, NCCL |
| RPC Framework | Built-in RPC framework | TensorFlow Serving for inference |
| Edge Device Support | Native optimization for edge devices | TensorFlow Lite for edge devices |
| Hardware Support | CPU, CUDA, ROCm, WebGPU | CPU, CUDA, TPU |
| External Integrations | Dask, PyCUDA | Horovod, Ray |
Neurenix's distributed computing capabilities offer a more comprehensive solution compared to TensorFlow, with native support for a wider range of hardware platforms and communication backends. The multi-language architecture with a high-performance Rust/C++ core and Go-powered distributed systems enables optimal performance across different deployment scenarios. Additionally, Neurenix's integrations with Dask and PyCUDA provide more flexibility for distributed data processing and low-level GPU operations.
| Feature | Neurenix | PyTorch |
|---|---|---|
| Multi-language Architecture | Rust/C++ core, Python API, Go for distributed systems | C++ core with Python API |
| Distributed Training | Native support for multi-GPU and multi-node training | PyTorch Distributed Data Parallel |
| Communication Backends | NCCL, Gloo, MPI | NCCL, Gloo, MPI |
| RPC Framework | Built-in RPC framework | PyTorch RPC |
| Hardware Support | CPU, CUDA, ROCm, WebGPU | CPU, CUDA |
| Synchronization Primitives | Synchronized Batch Normalization, Barriers | Synchronized Batch Normalization, Barriers |
| External Integrations | Dask, PyCUDA | Ray, CUDA |
Neurenix and PyTorch both offer comprehensive distributed computing capabilities, but Neurenix's multi-language architecture with a Go-powered distributed systems layer provides better performance and scalability for large-scale deployments. Neurenix also extends hardware support to include ROCm and WebGPU, making it more versatile across different hardware platforms. The integration with Dask provides more advanced distributed data processing capabilities compared to PyTorch's integration with Ray.
| Feature | Neurenix | Horovod |
|---|---|---|
| Framework Integration | Native part of Neurenix | Add-on for TensorFlow, PyTorch, and MXNet |
| Communication Backends | NCCL, Gloo, MPI | MPI, NCCL |
| Ease of Use | Integrated API with automatic device detection | Requires explicit initialization and synchronization |
| Hardware Support | CPU, CUDA, ROCm, WebGPU | CPU, CUDA |
| Scalability | Designed for edge devices to clusters | Primarily designed for clusters |
| External Integrations | Dask, PyCUDA | Limited to MPI-based systems |
Neurenix provides a more integrated and user-friendly distributed computing solution compared to Horovod, with automatic device detection and a consistent API across different deployment scenarios. While Horovod is primarily designed for large-scale clusters, Neurenix's distributed computing capabilities scale from edge devices to multi-node clusters, making it more versatile for different use cases. Additionally, Neurenix's multi-language architecture with a Go-powered distributed systems layer provides better performance and scalability for large-scale deployments.
When selecting a communication backend for distributed training, consider the following factors:
To get the best performance from data parallel training, consider these optimizations:
When scaling to multiple nodes, consider these best practices:
When deploying distributed training to edge devices, consider these optimizations:
import neurenix
from neurenix.nn import Module, Linear
from neurenix.distributed import DataParallel
from neurenix.optim import SGD
# Create a simple model
class SimpleModel(Module):
def __init__(self):
super().__init__()
self.fc1 = Linear(784, 256)
self.fc2 = Linear(256, 10)
def forward(self, x):
x = self.fc1(x).relu()
x = self.fc2(x)
return x
# Create model
model = SimpleModel()
# Wrap model with DataParallel
model = DataParallel(model)
# Create optimizer
optimizer = SGD(model.parameters(), lr=0.01)
# Training loop
for epoch in range(10):
for batch_idx, (data, target) in enumerate(train_loader):
# Forward pass
output = model(data)
loss = loss_function(output, target)
# Backward pass
optimizer.zero_grad()
loss.backward()
optimizer.step()
if batch_idx % 100 == 0:
print(f"Epoch: {epoch}, Batch: {batch_idx}, Loss: {loss.item()}")
import neurenix
from neurenix.distributed import init_distributed, get_rank, get_world_size, is_main_process
# Initialize distributed training
dist_ctx = init_distributed(
backend="nccl",
init_method="tcp://localhost:23456",
world_size=4,
rank=0, # Set this to the current process rank
)
# Create model and move to device
model = create_model()
device = neurenix.get_device(f"cuda:{get_rank()}")
model.to(device)
# Create optimizer
optimizer = create_optimizer(model)
# Training loop
for epoch in range(10):
for batch_idx, (data, target) in enumerate(train_loader):
# Move data to device
data = data.to(device)
target = target.to(device)
# Forward pass
output = model(data)
loss = loss_function(output, target)
# Backward pass
optimizer.zero_grad()
loss.backward()
optimizer.step()
# Print progress on main process
if is_main_process() and batch_idx % 100 == 0:
print(f"Epoch: {epoch}, Batch: {batch_idx}, Loss: {loss.item()}")
# Clean up
dist_ctx.shutdown()
import neurenix
from neurenix.distributed import DaskCluster, tensor_to_dask, dask_to_tensor
# Create Dask cluster
with DaskCluster(n_workers=4, threads_per_worker=2) as cluster:
# Load data
data = neurenix.tensor.load("large_dataset.npy")
# Convert to Dask array
dask_data = tensor_to_dask(data, chunks=(1000, -1))
# Define processing function
def process_chunk(chunk):
# Convert to Neurenix tensor
tensor = neurenix.tensor.Tensor(chunk)
# Process tensor
processed = tensor - tensor.mean(dim=0)
processed = processed / tensor.std(dim=0)
return processed.numpy()
# Process data in parallel
processed_dask = dask_data.map_blocks(process_chunk)
# Compute result
processed_data = dask_to_tensor(processed_dask)
# Save result
processed_data.save("processed_dataset.npy")
import neurenix
from neurenix.distributed import CudaContext, tensor_to_gpu, gpu_to_tensor
import numpy as np
# Define CUDA kernel
kernel_source = """
extern "C" __global__ void add_arrays(float* a, float* b, float* c, int n) {
int idx = blockIdx.x * blockDim.x + threadIdx.x;
if (idx < n) {
c[idx] = a[idx] + b[idx];
}
}
"""
# Create CUDA context
with CudaContext(device_id=0) as ctx:
# Get kernel
kernel = ctx.get_kernel("add_arrays", kernel_source, "add_arrays")
# Create input tensors
a = neurenix.tensor.randn((1000,))
b = neurenix.tensor.randn((1000,))
# Convert to GPU arrays
a_gpu = tensor_to_gpu(a)
b_gpu = tensor_to_gpu(b)
# Allocate output array
c_gpu = ctx.allocate(a.shape, a.dtype)
# Launch kernel
block_size = 256
grid_size = (a.shape[0] + block_size - 1) // block_size
kernel.launch((grid_size, 1, 1), (block_size, 1, 1), (a_gpu, b_gpu, c_gpu, np.int32(a.shape[0])))
# Synchronize
ctx.synchronize()
# Convert back to tensor
c = gpu_to_tensor(c_gpu)
# Verify result
expected = a + b
assert np.allclose(c.numpy(), expected.numpy())
print("CUDA kernel execution successful!")
The Distributed Computing module of Neurenix provides a comprehensive set of tools for distributed training and inference across multiple GPUs and compute nodes. Its multi-language architecture with a high-performance Rust/C++ core and Go-powered distributed systems enables optimal performance across a wide range of devices and deployment scenarios.
Compared to other frameworks like TensorFlow, PyTorch, and Horovod, Neurenix's distributed computing capabilities offer advantages in terms of hardware support, communication backends, and external integrations. These features make Neurenix particularly well-suited for deploying AI applications in production environments, from edge devices to multi-GPU clusters.