From wpf-dev-pack
Guides high-performance streaming with System.IO.Pipelines in .NET for network protocols, binary data parsing, and large stream processing.
npx claudepluginhub christian289/dotnet-with-claudecode --plugin wpf-dev-packThis skill uses the workspace's default tool permissions.
A guide for System.IO.Pipelines API for high-performance I/O pipelines.
Generates design tokens/docs from CSS/Tailwind/styled-components codebases, audits visual consistency across 10 dimensions, detects AI slop in UI.
Records polished WebM UI demo videos of web apps using Playwright with cursor overlay, natural pacing, and three-phase scripting. Activates for demo, walkthrough, screen recording, or tutorial requests.
Delivers idiomatic Kotlin patterns for null safety, immutability, sealed classes, coroutines, Flows, extensions, DSL builders, and Gradle DSL. Use when writing, reviewing, refactoring, or designing Kotlin code.
A guide for System.IO.Pipelines API for high-performance I/O pipelines.
Quick Reference: See QUICKREF.md for essential patterns at a glance.
| Concept | Description |
|---|---|
Pipe | Memory buffer-based read/write pipe |
PipeReader | Read data from pipe |
PipeWriter | Write data to pipe |
ReadOnlySequence<T> | Non-contiguous memory sequence |
using System.IO.Pipelines;
public sealed class PipelineProcessor
{
public async Task ProcessAsync(Stream stream)
{
var pipe = new Pipe();
// Run Writer and Reader concurrently
var writing = FillPipeAsync(stream, pipe.Writer);
var reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(writing, reading);
}
private async Task FillPipeAsync(Stream stream, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Acquire buffer from memory pool
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
int bytesRead = await stream.ReadAsync(memory);
if (bytesRead == 0)
break;
// Notify bytes written
writer.Advance(bytesRead);
// Flush data and notify Reader
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
break;
}
// Signal write completion
await writer.CompleteAsync();
}
private async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
// Process buffer
ProcessBuffer(buffer);
// Notify consumption up to processed position
reader.AdvanceTo(buffer.End);
if (result.IsCompleted)
break;
}
// Signal read completion
await reader.CompleteAsync();
}
}
private async Task ReadLinesAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
ProcessLine(line);
}
// Notify unprocessed data position
reader.AdvanceTo(buffer.Start, buffer.End);
if (result.IsCompleted)
break;
}
await reader.CompleteAsync();
}
private bool TryReadLine(
ref ReadOnlySequence<byte> buffer,
out ReadOnlySequence<byte> line)
{
// Find newline
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position is null)
{
line = default;
return false;
}
// Slice up to newline
line = buffer.Slice(0, position.Value);
// Move buffer past newline
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
private void ProcessBuffer(ReadOnlySequence<byte> buffer)
{
if (buffer.IsSingleSegment)
{
// Single segment - direct access
ProcessSpan(buffer.FirstSpan);
}
else
{
// Multiple segments - iteration required
foreach (var segment in buffer)
{
ProcessSpan(segment.Span);
}
}
}
public async Task ProcessSocketAsync(Socket socket)
{
var pipe = new Pipe();
var writing = ReceiveAsync(socket, pipe.Writer);
var reading = ProcessAsync(pipe.Reader);
await Task.WhenAll(writing, reading);
}
private async Task ReceiveAsync(Socket socket, PipeWriter writer)
{
while (true)
{
Memory<byte> memory = writer.GetMemory(4096);
int bytesReceived = await socket.ReceiveAsync(
memory,
SocketFlags.None);
if (bytesReceived == 0)
break;
writer.Advance(bytesReceived);
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
break;
}
await writer.CompleteAsync();
}
var pipeOptions = new PipeOptions(
pool: MemoryPool<byte>.Shared, // Memory pool
readerScheduler: PipeScheduler.ThreadPool, // Reader scheduler
writerScheduler: PipeScheduler.ThreadPool, // Writer scheduler
pauseWriterThreshold: 64 * 1024, // Writer pause threshold
resumeWriterThreshold: 32 * 1024, // Writer resume threshold
minimumSegmentSize: 4096, // Minimum segment size
useSynchronizationContext: false
);
var pipe = new Pipe(pipeOptions);
<ItemGroup>
<!-- Included in BCL for .NET Core 3.0+ -->
<PackageReference Include="System.IO.Pipelines" Version="9.0.*" />
</ItemGroup>
// Must call AdvanceTo after ReadAsync
ReadResult result = await reader.ReadAsync();
// ... processing ...
reader.AdvanceTo(consumed, examined);
// ❌ Bad example: Saving buffer after ReadAsync
ReadOnlySequence<byte> saved;
var result = await reader.ReadAsync();
saved = result.Buffer; // Dangerous! Invalidated after AdvanceTo
// ✅ Good example: Copy needed data
var copy = result.Buffer.ToArray();
reader.AdvanceTo(result.Buffer.End);
// Must call CompleteAsync for both Writer and Reader
await writer.CompleteAsync();
await reader.CompleteAsync();