Skip to content

Commit

Permalink
ARROW-4502: [C#] Add support for zero-copy reads
Browse files Browse the repository at this point in the history
- Update to the latest Google FlatBuffers code to support Spans/Memory.

- Add a constructor for ArrowStreamReader that takes a ReadOnlyMemory<byte>.
- Add a synchronous ReadNextRecordBatch() method.

- Since we are now enabling Spans with FlatBuffers, we need to change the way we write to streams in the ArrowStreamWriter to use Memory<byte> instead of byte[]. This API is in netcoreapp2.1, but not in netstandard, so cross compile for netcoreapp2.1 and add a shim for netstandard.

~Unit tests are coming. I currently haven't found a great way to "read" arrow streams out of thin air. My initial thought is to use the writer to write some made up data, and then read it in using the reader and ensure the values coming back are the same. @wesm - does that sound like a good approach? I was using a binary file (that was written by PyArrow) locally to test this out.~

~I also plan on adding some benchmark tests to compare between the Stream and the ReadOnlyMemory implementations, but again am having trouble with "how to get the stream to read?".~

@stephentoub @pgovind @chutchinson

Author: Eric Erhardt <eric.erhardt@microsoft.com>

Closes #3736 from eerhardt/ZeroCopyReads and squashes the following commits:

21f41ba <Eric Erhardt> Add RAT exclude for csharp benchmark tests csproj
558ec56 <Eric Erhardt> Address PR feedback.
6ebc80e <Eric Erhardt> Add perf benchmarks for the ArrowStreamReader.
18db336 <Eric Erhardt> Respond to PR feedback.
98e1b11 <Eric Erhardt> Add more types to tests.
f6942cf <Eric Erhardt> Add initial unit tests for ArrowStreamReader.
f33e294 <Eric Erhardt> ARROW-4502:  Add support for zero-copy reads
  • Loading branch information
eerhardt authored and wesm committed Mar 7, 2019
1 parent 09466ce commit d2dbf1e
Show file tree
Hide file tree
Showing 22 changed files with 1,479 additions and 598 deletions.
6 changes: 6 additions & 0 deletions csharp/Apache.Arrow.sln
Expand Up @@ -7,6 +7,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache.Arrow", "src\Apache.
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache.Arrow.Tests", "test\Apache.Arrow.Tests\Apache.Arrow.Tests.csproj", "{9CCEC01B-E67A-4726-BE72-7B514F76163F}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache.Arrow.Benchmarks", "test\Apache.Arrow.Benchmarks\Apache.Arrow.Benchmarks.csproj", "{742DF47D-77C5-4B84-9E0C-69645F1161EA}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -21,6 +23,10 @@ Global
{9CCEC01B-E67A-4726-BE72-7B514F76163F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9CCEC01B-E67A-4726-BE72-7B514F76163F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9CCEC01B-E67A-4726-BE72-7B514F76163F}.Release|Any CPU.Build.0 = Release|Any CPU
{742DF47D-77C5-4B84-9E0C-69645F1161EA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{742DF47D-77C5-4B84-9E0C-69645F1161EA}.Debug|Any CPU.Build.0 = Debug|Any CPU
{742DF47D-77C5-4B84-9E0C-69645F1161EA}.Release|Any CPU.ActiveCfg = Release|Any CPU
{742DF47D-77C5-4B84-9E0C-69645F1161EA}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
6 changes: 5 additions & 1 deletion csharp/src/Apache.Arrow/Apache.Arrow.csproj
Expand Up @@ -3,7 +3,7 @@
<Import Project="../../build/Common.props" />

<PropertyGroup>
<TargetFramework>netstandard1.3</TargetFramework>
<TargetFrameworks>netstandard1.3;netcoreapp2.1</TargetFrameworks>
<LangVersion>7.2</LangVersion>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<Authors>Apache</Authors>
Expand All @@ -15,6 +15,7 @@
<PackageTags>apache arrow</PackageTags>
<Company>Apache</Company>
<Version>0.0.1</Version>
<DefineConstants>$(DefineConstants);UNSAFE_BYTEBUFFER;BYTEBUFFER_NO_BOUNDS_CHECK;ENABLE_SPAN_T</DefineConstants>
</PropertyGroup>

<ItemGroup>
Expand All @@ -39,4 +40,7 @@
</EmbeddedResource>
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'netcoreapp2.1'">
<Compile Remove="Extensions\StreamExtensions.netstandard.cs" />
</ItemGroup>
</Project>
2 changes: 1 addition & 1 deletion csharp/src/Apache.Arrow/ArrowBuffer.cs
Expand Up @@ -23,7 +23,7 @@ namespace Apache.Arrow
{
public static ArrowBuffer Empty => new ArrowBuffer(Memory<byte>.Empty);

private ArrowBuffer(Memory<byte> data)
internal ArrowBuffer(ReadOnlyMemory<byte> data)
{
Memory = data;
}
Expand Down
54 changes: 54 additions & 0 deletions csharp/src/Apache.Arrow/Extensions/StreamExtensions.netstandard.cs
@@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Buffers;
using System.IO;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace Apache.Arrow
{
// Helpers to write Memory<byte> to Stream on netstandard
internal static class StreamExtensions
{
public static Task WriteAsync(this Stream stream, ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
{
if (MemoryMarshal.TryGetArray(buffer, out ArraySegment<byte> array))
{
return stream.WriteAsync(array.Array, array.Offset, array.Count, cancellationToken);
}
else
{
byte[] sharedBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length);
buffer.Span.CopyTo(sharedBuffer);
return FinishWriteAsync(stream.WriteAsync(sharedBuffer, 0, buffer.Length, cancellationToken), sharedBuffer);
}
}

private static async Task FinishWriteAsync(Task writeTask, byte[] localBuffer)
{
try
{
await writeTask.ConfigureAwait(false);
}
finally
{
ArrayPool<byte>.Shared.Return(localBuffer);
}
}
}
}

0 comments on commit d2dbf1e

Please sign in to comment.