FIFOs

A FIFO DSD is useful to buffer input going into or out of a PE, as a way to extend the small hardware queues used for fabric communication. In particular, this may prevent stalls in the communication fabric when input or output happens in bursts. It is also possible to operate on the values while they flow through the FIFO, as this code sample demonstrates.

This example illustrates a typical pattern in the use of FIFOs, where a receiver receives wavelets from the fabric and forwards them to a task that performs some computation. Specifically, incoming data from the host is stored in the FIFO, thus relieving the sender from being blocked until the receiver has received all wavelets. While the incoming wavelets are being asynchronously received into the FIFO buffer, we also start a second asynchronous DSD operation that pulls data from the FIFO and forwards it to a wavelet-triggered task.

This example also illustrates another common pattern, where a PE starts a wavelet-triggered task using its own wavelets, by sending them to the router which immediately sends them back to the compute element. In our example, this wavelet-triggered task simply computes the cube of the wavelet’s data, before sending the result to the host.

code.csl

// resources to route the data between the host and the device.
//

// color map
//
// color  var    color  var              color  var              color  var
//   0 H2D          9   out_color          18                      27   reserved (memcpy)
//   1 D2H         10                      19                      28   reserved (memcpy)
//   2 LAUNCH      11                      20                      29   reserved
//   3             12                      21    reserved (memcpy) 30   reserved (memcpy)
//   4             13                      22    reserved (memcpy) 31   reserved
//   5             14                      23    reserved (memcpy) 32
//   6             15                      24                      33
//   7             16                      25                      34
//   8 main_color  17                      26                      35

param num_elements_to_process : i16;

param MEMCPYH2D_DATA_1_ID: i16;
param MEMCPYD2H_DATA_1_ID: i16;
param LAUNCH_ID: i16;

const MEMCPYH2D_DATA_1: color = @get_color(MEMCPYH2D_DATA_1_ID);
const MEMCPYD2H_DATA_1: color = @get_color(MEMCPYD2H_DATA_1_ID);
const LAUNCH: color = @get_color(LAUNCH_ID);

const out_color: color = @get_color(9);
const main_color: color = @get_color(8);

const memcpy = @import_module( "<memcpy_multi/get_params>", .{
    .width = 1,
    .height = 1
    });

layout {
  @set_rectangle(1,1);

  const memcpy_params = memcpy.get_params(0);

  @set_tile_code(0, 0, "buffer.csl", .{
    .num_elements_to_process = num_elements_to_process,
    .out_color = out_color,
    .main_task_color = main_color,
    .memcpy_params = memcpy_params,
    .MEMCPYH2D_DATA_1 = MEMCPYH2D_DATA_1,
    .MEMCPYD2H_DATA_1 = MEMCPYD2H_DATA_1,
    .LAUNCH = LAUNCH
  });
}

buffer.csl

// Not a complete program; the top-level source file is code.csl.

// The communication pattern:
// streaming H2D --> FIFO --> C9 (out_color) --> process_task --> streaming D2H
//
// WARNING: it can stall if the input length exceeds the FIFO's capacity
// H2D and D2H are serialized. Logically speaking it is NOT valid to send to and
// receive from at the same time on the same PE.
// If input length does not exceed the FIFO's capacity, the H2D will stream all
// data into FIFO, then D2H proceeds to process the data from process_task(),
// so microthread 2 can continue to pop data in FIFO.
// Otherwise, H2D will not finish because FIFO is full and D2H is not started
// yet to consume the data from process_task().
//
// If both input and output of the FIFO are not H2D/D2H, no limitation on the size.
// If either input is H2D or output is D2H but not both, it depends, case by case.

param num_elements_to_process : i16;

param out_color: color;
param main_task_color: color;

param memcpy_params: comptime_struct;
param LAUNCH: color;
param MEMCPYH2D_DATA_1: color;
param MEMCPYD2H_DATA_1: color;

// memcpy module reserves input queue 0 and output queue 0
const sys_mod = @import_module( "<memcpy_multi/memcpy>", @concat_structs(memcpy_params, .{
     .MEMCPYH2D_1 = MEMCPYH2D_DATA_1,
     .MEMCPYD2H_1 = MEMCPYD2H_DATA_1,
     .LAUNCH = LAUNCH
      }));

var fifo_buffer = @zeros([1024]f16);
const fifo = @allocate_fifo(fifo_buffer);

const in_queue = @get_input_queue(1);
const in_dsd = @get_dsd(fabin_dsd, .{.extent = num_elements_to_process,
                                     .fabric_color = MEMCPYH2D_DATA_1,
                                     .input_queue = in_queue});
comptime {
  // use UT1 to read data into fifo
  @block(MEMCPYH2D_DATA_1);
}

const out_queue = @get_output_queue(2);
const out_dsd = @get_dsd(fabout_dsd, .{.extent = num_elements_to_process,
                                       .fabric_color = out_color,
                                       .output_queue = out_queue});
comptime {
  @set_local_color_config(out_color, .{.routes = .{.rx = .{RAMP}, .tx = .{RAMP}}});
  @bind_task(process_task, out_color);
}

const ten = [1]f16 {10.0};
const dsd_ten = @get_dsd(mem1d_dsd, .{.tensor_access = |i|{num_elements_to_process} -> ten[0]});

task main_task() void {
  // Move from the fabric to the FIFO
  // adding 10.0 to each element at the same time
  @faddh(fifo, in_dsd, dsd_ten, .{.async = true});

  // Move from the FIFO to a process_task
  // negating values at the same time
  @fnegh(out_dsd, fifo, .{.async = true});
}

comptime {
  @bind_task(main_task, main_task_color);
  // enable FIFO at the beginning
  // FIFO is waiting for the data from streaming H2D
  @activate(main_task_color);
}

const result_dsd = @get_dsd(fabout_dsd, .{
  .extent = 1,
  .fabric_color = MEMCPYD2H_DATA_1,
  .output_queue = @get_output_queue(3)
});

var buf = @zeros([1]f16);
const buf_dsd = @get_dsd(mem1d_dsd, .{.tensor_access = |i|{1} -> buf[i]});

task process_task(element:f16) void {
  @block(out_color); // necessary to avoid re-entry of the same microthread
  buf[0] = element * element * element;
  // WARNING: it stalls if blocking fmovh when the input length exceeds
  // the internal queue because main thread stalls so D2H has no chance
  // to start.
  @fmovh(result_dsd, buf_dsd, .{.async = true, .unblock = out_color} );
}

run.py

#!/usr/bin/env cs_python

import argparse
import json
import numpy as np

from cerebras.sdk.sdk_utils import memcpy_view
from cerebras.sdk.runtime import runtime_utils # pylint: disable=no-name-in-module
from cerebras.sdk.runtime.sdkruntimepybind import SdkRuntime, MemcpyDataType # pylint: disable=no-name-in-module
from cerebras.sdk.runtime.sdkruntimepybind import MemcpyOrder # pylint: disable=no-name-in-module

parser = argparse.ArgumentParser()
parser.add_argument('--name', help='the test name')
parser.add_argument("--cmaddr", help="IP:port for CS system")
args = parser.parse_args()
dirname = args.name

# Parse the compile metadata
with open(f"{dirname}/out.json", encoding="utf-8") as json_file:
  compile_data = json.load(json_file)
params = compile_data["params"]
MEMCPYH2D_DATA_1 = int(params["MEMCPYH2D_DATA_1_ID"])
MEMCPYD2H_DATA_1 = int(params["MEMCPYD2H_DATA_1_ID"])
size = int(params["num_elements_to_process"])
print(f"MEMCPYH2D_DATA_1 = {MEMCPYH2D_DATA_1}")
print(f"MEMCPYD2H_DATA_1 = {MEMCPYD2H_DATA_1}")
print(f"size = {size}")

assert size <= 1024, "size cannot exceed the capacity of the FIFO"

memcpy_dtype = MemcpyDataType.MEMCPY_16BIT
runner = SdkRuntime(dirname, cmaddr=args.cmaddr)

runner.load()
runner.run()

np.random.seed(seed=7)

input_tensor = np.random.random(size).astype(np.float16)
print("step 1: streaming H2D")
# "input_tensor" is an 1d array
# The type of input_tensor is f16, we need to extend it to uint32
# There are two kind of extension when using the utility function input_array_to_u32
#    input_array_to_u32(np_arr: np.ndarray, sentinel: Optional[int], fast_dim_sz: int)
# 1) zero extension:
#    sentinel = None
# 2) upper 16-bit is the index of the array:
#    sentinel is Not None
#
# In this example, the upper 16-bit is don't care because buffer.csl only
# reads lower 16-bit
tensors_u32 = runtime_utils.input_array_to_u32(input_tensor, 1, size)
runner.memcpy_h2d(MEMCPYH2D_DATA_1, tensors_u32, 0, 0, 1, 1, size, \
    streaming=True, data_type=memcpy_dtype, order=MemcpyOrder.COL_MAJOR, nonblock=True)

print("step 2: streaming D2H")
# The D2H buffer must be of type u32
out_tensors_u32 = np.zeros(size, np.uint32)
runner.memcpy_d2h(out_tensors_u32, MEMCPYD2H_DATA_1, 0, 0, 1, 1, size, \
    streaming=True, data_type=memcpy_dtype, order=MemcpyOrder.COL_MAJOR, nonblock=False)
# remove upper 16-bit of each u32
result_tensor = memcpy_view(out_tensors_u32, np.dtype(np.float16))

runner.stop()

add_ten_negate = -(input_tensor + 10.0)
expected = add_ten_negate * add_ten_negate * add_ten_negate

np.testing.assert_equal(result_tensor, expected)
print("SUCCESS!")

commands.sh

#!/usr/bin/env bash

set -e

cslc ./code.csl \
--fabric-dims=8,3 --fabric-offsets=4,1 \
--params=num_elements_to_process:12 \
-o out \
--params=MEMCPYH2D_DATA_1_ID:0 \
--params=MEMCPYD2H_DATA_1_ID:1 --params=LAUNCH_ID:2 \
--memcpy --channels=1 --width-west-buf=0 --width-east-buf=0
cs_python run.py --name out