SharpPulsar 2.11.0-rc0097
See the version list below for details.
dotnet add package SharpPulsar --version 2.11.0-rc0097
NuGet\Install-Package SharpPulsar -Version 2.11.0-rc0097
<PackageReference Include="SharpPulsar" Version="2.11.0-rc0097" />
paket add SharpPulsar --version 2.11.0-rc0097
#r "nuget: SharpPulsar, 2.11.0-rc0097"
// Install SharpPulsar as a Cake Addin #addin nuget:?package=SharpPulsar&version=2.11.0-rc0097&prerelease // Install SharpPulsar as a Cake Tool #tool nuget:?package=SharpPulsar&version=2.11.0-rc0097&prerelease
SharpPulsar
SharpPulsar is an Apache Pulsar Client built on top Akka.net, which can handle millions of Apache Pulsar Producers/Consumers (in theory).
What Is Akka.NET?
Akka.NET is a toolkit and runtime for building highly concurrent, distributed, and fault tolerant event-driven applications on .NET & Mono that is able to support up to 50 million msg/sec on a single machine, with small memory footprint and ~2.5 million actors(or Apache Pulsar Producers/Consumers) per GB of heap.
What Is Apache Pulsar?
Apache Pulsar is a cloud-native, distributed messaging and streaming platform that is able to support millions of topics while delivering high-throughput and low-latency performance.
Supported features
Client
- TLS
- Authentication (token, tls, OAuth2)
- Multi-Hosts Service URL
- Proxy
- SNI Routing
- Transactions
- Subscription(Durable, Non-durable)
- Cluster-level Auto Failover
Producer
- Exclusive Producer
- Partitioned Topics
- Batching
- Compression (LZ4, ZLIB, ZSTD, SNAPPY)
- Schema (Primitive, Avro, Json, KeyValue, AutoSchema)
- User-defined properties
- Key-based batcher
- Delayed/Scheduled messages
- Interceptors
- Message Router (RoundRobin, ConsistentHashing, Broadcast, Random)
- End-to-end Encryption
- Chunking
- Transactions
Consumer
- User-defined properties
- HasMessageAvailable
- Subscription Type (Exclusive, Failover, Shared, Key_Shared)
- Subscription Mode (Durable, Non-durable)
- Interceptors
- Ack (Ack Individual, Ack Commulative, Batch-Index Ack)
- Ack Timeout
- Negative Ack
- Dead Letter Policy
- End-to-end Encryption
- SubscriptionInitialPosition
- Partitioned Topics
- Batching
- Compression (LZ4, ZLIB, ZSTD, SNAPPY)
- Schema (Primitive, Avro, Json, KeyValue, AutoSchema)
- Compacted Topics
- Multiple Topics
- Regex Consumer
- Broker Entry Metadata
Reader
- User-defined properties
- HasMessageAvailable
- Schema (Primitive, Avro, Json, KeyValue, AutoSchema)
- Seek (MessageID, Timestamp)
- Multiple Topics
- End-to-end Encryption
- Interceptors
TableView
- Compacted Topics
- Schema (All supported schema types)
- Register Listener
Extras
- Pulsar SQL
- Pulsar Admin REST API
- Function REST API
- EventSource(Reader/SQL)
- OpenTelemetry (
ProducerOTelInterceptor
,ConsumerOTelInterceptor
)
Getting Started
Install the NuGet package SharpPulsar and follow the Tutorials.
//pulsar client settings builder
var clientConfig = new PulsarClientConfigBuilder()
.ServiceUrl("pulsar://localhost:6650");
//pulsar actor system
var pulsarSystem = PulsarSystem.GetInstance(clientConfig);
var pulsarClient = pulsarSystem.NewClient();
var consumer = pulsarClient.NewConsumer(new ConsumerConfigBuilder<sbyte[]>()
.Topic(myTopic)
.ForceTopicCreation(true)
.SubscriptionName("myTopic-sub"));
var producer = pulsarClient.NewProducer(new ProducerConfigBuilder<sbyte[]>()
.Topic(myTopic));
for (var i = 0; i < 10; i++)
{
var data = Encoding.UTF8.GetBytes($"tuts-{i}").ToSBytes();
producer.NewMessage().Value(data).Send();
}
Thread.Sleep(TimeSpan.FromSeconds(5));
for (var i = 0; i < 10; i++)
{
var message = (Message<sbyte[]>)consumer.Receive();
consumer.Acknowledge(message);
var res = Encoding.UTF8.GetString(message.Data.ToBytes());
Console.WriteLine($"message '{res}' from topic: {message.TopicName}");
}
Logical Types
Avro Logical Types are supported. Message object MUST implement ISpecificRecord
AvroSchema<LogicalMessage> avroSchema = AvroSchema<LogicalMessage>.Of(ISchemaDefinition<LogicalMessage>.Builder().WithPojo(typeof(LogicalMessage)).WithJSR310ConversionEnabled(true).Build());
public class LogicalMessage : ISpecificRecord
{
[LogicalType(LogicalTypeKind.Date)]
public DateTime CreatedTime { get; set; }
[LogicalType(LogicalTypeKind.TimestampMicrosecond)]
public DateTime StampMicros { get; set; }
[LogicalType(LogicalTypeKind.TimestampMillisecond)]
public DateTime StampMillis { get; set; }
[LogicalType(LogicalTypeKind.TimeMicrosecond)]
public TimeSpan TimeMicros { get; set; }
[LogicalType(LogicalTypeKind.TimeMillisecond)]
public TimeSpan TimeMillis { get; set; }
public AvroDecimal Size { get; set; }
public string DayOfWeek { get; set; }
[Ignore]
public Avro.Schema Schema { get; set; }
public object Get(int fieldPos)
{
switch (fieldPos)
{
case 0: return CreatedTime;
case 1: return StampMicros;
case 2: return StampMillis;
case 3: return TimeMicros;
case 4: return TimeMillis;
case 5: return Size;
case 6: return DayOfWeek;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()");
};
}
public void Put(int fieldPos, object fieldValue)
{
switch (fieldPos)
{
case 0: CreatedTime = (DateTime)fieldValue; break;
case 1: StampMicros = (DateTime)fieldValue; break;
case 2: StampMillis = (DateTime)fieldValue; break;
case 3: TimeMicros = (TimeSpan)fieldValue; break;
case 4: TimeMillis = (TimeSpan)fieldValue; break;
case 5: Size = (AvroDecimal)fieldValue; break;
case 6: DayOfWeek = (String)fieldValue; break;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()");
};
}
}
KeyValue Schema ALERT!!!!
Because I have become lazy and a lover of "peace of mind":
- For schema type of KEYVALUESCHEMA:
ORproducer.NewMessage().Value<TK, TV>(data).Send();
producer.Send<TK, TV>(data);
TK, TV
represents the key and value types of the KEYVALUESCHEMA
respectively.
TableView
var topic = $"persistent://public/default/tableview-{DateTime.Now.Ticks}";
var count = 20;
var keys = await PublishMessages(topic, count, false);
var tv = await _client.NewTableViewBuilder(ISchema<string>.Bytes)
.Topic(topic)
.AutoUpdatePartitionsInterval(TimeSpan.FromSeconds(60))
.CreateAsync();
Console.WriteLine($"start tv size: {tv.Size()}");
tv.ForEachAndListen((k, v) => Console.WriteLine($"{k} -> {Encoding.UTF8.GetString(v)}"));
await Task.Delay(5000);
Console.WriteLine($"Current tv size: {tv.Size()}");
tv.ForEachAndListen((k, v) => Console.WriteLine($"checkpoint {k} -> {Encoding.UTF8.GetString(v)}"));
OpenTelemetry
var exportedItems = new List<Activity>();
using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddSource("producer", "consumer")
.SetResourceBuilder(ResourceBuilder.CreateDefault().AddService("inmemory-test"))
.AddInMemoryExporter(exportedItems)
.Build();
var producerBuilder = new ProducerConfigBuilder<byte[]>()
.Intercept(new ProducerOTelInterceptor<byte[]>("producer", _client.Log))
.Topic(topic);
var consumerBuilder = new ConsumerConfigBuilder<byte[]>()
.Intercept(new ConsumerOTelInterceptor<byte[]>("consumer", _client.Log))
.Topic(topic);
Cluster-level Auto Failover
var config = new PulsarClientConfigBuilder();
var builder = AutoClusterFailover.Builder().Primary(serviceUrl)
.Secondary(new List<string> { secondary })
.FailoverDelay(TimeSpan.FromSeconds(failoverDelay))
.SwitchBackDelay(TimeSpan.FromSeconds(switchBackDelay))
.CheckInterval(TimeSpan.FromSeconds(checkInterval));
config.ServiceUrlProvider(new AutoClusterFailover((AutoClusterFailoverBuilder)builder));
[Experimental]Running SharpPulsar Tests in docker container (the issue I have faced is how to create container from within a container)
You can run SharpPulsar
tests in docker container. A Dockerfile
and docker-compose
file is provided at the root folder to help you run these tests in a docker container.
docker-compose.yml
:
version: "2.4"
services:
akka-test:
image: sharp-pulsar-test
build:
context: .
cpu_count: 1
mem_limit: 1g
environment:
run_count: 2
# to filter tests, uncomment
# test_filter: "--filter FullyQualifiedName=SharpPulsar.Test.MessageChunkingTest"
test_file: Tests/SharpPulsar.Test/SharpPulsar.Test.csproj
Dockerfile
:
FROM mcr.microsoft.com/dotnet/sdk:6.0
ENV test_file="Tests/SharpPulsar.Test/SharpPulsar.Test.csproj"
ENV test_filter=""
ENV run_count=2
RUN mkdir sharppulsar
COPY . ./sharppulsar
RUN ls
WORKDIR /sharppulsar
CMD ["/bin/bash", "-c", "x=1; c=0; while [ $x -le 1 ] && [ $c -le ${run_count} ]; do dotnet test ${test_file} ${test_filter} --framework net6.0 --logger trx; c=$(( $c + 1 )); if [ $? -eq 0 ]; then x=1; else x=0; fi; done"]
How to:
cd
into the root directory and execute docker-compose up
run-count
is the number of times you want the test repeated.
test_filter
is used when you need to test a specific test instead of running all the tests in the test suite.
License
This project is licensed under the Apache License Version 2.0 - see the LICENSE file for details.
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net7.0 is compatible. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
-
net7.0
- Akka (>= 1.4.48)
- Akka.Logger.NLog (>= 1.4.10)
- Apache.Avro (>= 1.11.1)
- App.Metrics.Concurrency (>= 4.3.0)
- AvroSchemaGenerator (>= 2.8.1)
- DotNetty.Common (>= 0.7.5)
- Google.Protobuf (>= 3.21.12)
- IdentityModel (>= 6.0.0)
- JsonSubTypes (>= 2.0.1)
- K4os.Compression.LZ4 (>= 1.3.5)
- Microsoft.CSharp (>= 4.7.0)
- Microsoft.Extensions.Logging.Abstractions (>= 7.0.0)
- Microsoft.Extensions.Logging.Console (>= 7.0.0)
- Microsoft.IO.RecyclableMemoryStream (>= 2.2.1)
- Microsoft.Rest.ClientRuntime (>= 2.3.24)
- Nager.PublicSuffix (>= 2.4.0)
- Newtonsoft.Json (>= 13.0.2)
- Nito.AsyncEx (>= 5.1.2)
- NodaTime (>= 3.1.6)
- OpenTelemetry (>= 1.3.2)
- Portable.BouncyCastle (>= 1.9.0)
- Pro.NBench.xUnit (>= 2.0.0)
- protobuf-net (>= 3.1.26)
- SharpPulsar.Admin (>= 2.11.0-rc0097)
- SharpPulsar.Trino (>= 2.11.0-rc0097)
- SharpZipLib (>= 1.4.1)
- Snappy.Standard (>= 0.2.0)
- System.Diagnostics.Contracts (>= 4.3.0)
- System.IO.Pipelines (>= 7.0.0)
- System.Net.NetworkInformation (>= 4.3.0)
- System.Reactive (>= 5.0.0)
- System.Runtime.CompilerServices.Unsafe (>= 6.0.0)
- System.Runtime.Serialization.Primitives (>= 4.3.0)
- System.Security.Cryptography.Cng (>= 5.0.0)
- System.Text.Json (>= 7.0.1)
- System.Threading.Tasks.Dataflow (>= 7.0.0)
- zlib.net-mutliplatform (>= 1.0.6)
- ZstdNet (>= 1.4.5)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last updated |
---|---|---|
2.15.1 | 232 | 2/15/2024 |
2.15.0 | 129 | 2/3/2024 |
2.14.1 | 582 | 9/22/2023 |
2.14.0 | 347 | 7/12/2023 |
2.13.0 | 193 | 5/20/2023 |
2.12.1 | 169 | 5/10/2023 |
2.12.0 | 214 | 5/1/2023 |
2.11.2 | 290 | 3/19/2023 |
2.11.1 | 267 | 3/18/2023 |
2.11.0 | 279 | 3/5/2023 |
2.11.0-rc0117 | 136 | 3/5/2023 |
2.11.0-rc0112 | 173 | 2/2/2023 |
2.11.0-rc0107 | 167 | 2/1/2023 |
2.11.0-rc0105 | 140 | 1/18/2023 |
2.11.0-rc0097 | 166 | 1/15/2023 |
2.10.0-rc1193 | 170 | 1/15/2023 |
2.10.0-rc1191 | 858 | 10/6/2022 |
2.10.0-rc1186 | 160 | 10/4/2022 |
2.10.0-rc1162 | 220 | 9/14/2022 |
2.10.0-rc1158 | 202 | 8/9/2022 |
2.10.0-rc1157 | 178 | 8/9/2022 |
2.10.0-rc1150 | 158 | 8/8/2022 |
2.10.0-rc1136 | 249 | 7/23/2022 |
2.10.0-rc1135 | 194 | 7/23/2022 |
2.10.0-rc1134 | 189 | 7/23/2022 |
2.10.0-rc1133 | 185 | 7/23/2022 |
2.10.0-rc1132 | 144 | 7/21/2022 |
2.10.0-rc1130 | 200 | 7/21/2022 |
2.10.0-rc1129 | 182 | 7/17/2022 |
2.10.0-rc1125 | 217 | 7/10/2022 |
2.10.0-rc1123 | 184 | 7/9/2022 |
2.10.0-rc.1117 | 148 | 7/3/2022 |
2.10.0-rc.1108 | 187 | 6/25/2022 |
2.10.0-rc.1090 | 165 | 6/9/2022 |
2.10.0-rc.1088 | 141 | 6/9/2022 |
2.10.0-rc.1083 | 139 | 6/8/2022 |
2.10.0-rc.1070 | 146 | 6/3/2022 |
2.10.0-rc.1051 | 147 | 5/26/2022 |
2.10.0-rc.1037 | 147 | 5/16/2022 |
2.10.0-rc.1035 | 160 | 5/13/2022 |
2.10.0-rc.1034 | 154 | 5/13/2022 |
2.10.0-rc.1033 | 157 | 5/13/2022 |
2.10.0-rc.1032 | 156 | 5/13/2022 |
2.10.0-rc.1031 | 164 | 5/13/2022 |
2.10.0-rc.1022 | 258 | 4/2/2022 |
2.10.0-rc.1013 | 162 | 3/26/2022 |
2.9.0 | 1,482 | 2/21/2022 |
2.9.0-rc.975 | 187 | 2/13/2022 |
2.9.0-beta.971 | 154 | 2/13/2022 |
2.9.0-beta.47 | 174 | 1/9/2022 |
2.9.0-beta.45 | 170 | 12/29/2021 |
2.9.0-beta.44 | 171 | 12/21/2021 |
2.9.0-beta.43 | 176 | 12/19/2021 |
2.9.0-beta.1 | 154 | 2/13/2022 |
2.2.4 | 511 | 11/22/2021 |
2.2.4-beta.42 | 196 | 11/10/2021 |
2.2.4-beta.41 | 205 | 11/3/2021 |
2.2.4-beta.40 | 185 | 9/29/2021 |
2.2.4-beta | 316 | 9/28/2021 |
2.2.3 | 482 | 9/22/2021 |
2.2.3-beta | 325 | 9/21/2021 |
2.2.2 | 512 | 9/13/2021 |
2.2.2-beta | 299 | 9/13/2021 |
2.2.1 | 553 | 9/11/2021 |
2.2.1-beta | 353 | 9/11/2021 |
2.2.0 | 527 | 9/10/2021 |
2.2.0-beta | 338 | 9/10/2021 |
2.1.0 | 519 | 9/5/2021 |
2.1.0-beta.33 | 202 | 9/5/2021 |
2.0.18 | 509 | 8/14/2021 |
2.0.0-beta.31 | 200 | 8/14/2021 |
2.0.0-beta.30 | 185 | 8/13/2021 |
2.0.0-beta.29 | 194 | 8/12/2021 |
2.0.0-beta.28 | 191 | 8/11/2021 |
2.0.0-beta.27 | 201 | 8/10/2021 |
2.0.0-beta.26 | 190 | 8/9/2021 |
2.0.0-beta.25 | 203 | 8/6/2021 |
2.0.0-beta.24 | 196 | 8/5/2021 |
2.0.0-beta.23 | 207 | 8/4/2021 |
2.0.0-beta.22 | 200 | 8/4/2021 |
2.0.0-beta.20 | 190 | 7/31/2021 |
2.0.0-beta.19 | 246 | 7/30/2021 |
2.0.0-beta.15 | 211 | 5/13/2021 |
2.0.0-beta.14 | 203 | 5/12/2021 |
2.0.0-beta.13 | 202 | 5/11/2021 |
2.0.0-beta.12 | 223 | 5/10/2021 |
2.0.0-beta.11 | 212 | 5/9/2021 |
2.0.0-beta.10 | 246 | 5/7/2021 |
2.0.0-beta.9 | 210 | 4/23/2021 |
2.0.0-beta.8 | 211 | 4/22/2021 |
2.0.0-beta.7 | 215 | 4/22/2021 |
2.0.0-beta.6 | 203 | 4/22/2021 |
2.0.0-beta.5 | 202 | 4/15/2021 |
2.0.0-beta.4 | 213 | 4/14/2021 |
2.0.0-beta | 325 | 4/10/2021 |
1.4.2.1 | 685 | 9/3/2020 |
1.4.2 | 601 | 9/2/2020 |
1.4.1 | 614 | 8/29/2020 |
1.4.0 | 621 | 8/29/2020 |
1.4.0-release.1 | 156 | 2/13/2022 |
1.3.5 | 606 | 6/9/2020 |
1.3.4 | 599 | 6/9/2020 |
1.3.3 | 628 | 6/8/2020 |
1.3.2 | 598 | 6/8/2020 |
1.3.1 | 749 | 6/5/2020 |
1.3.0 | 635 | 6/3/2020 |
1.2.0 | 780 | 5/26/2020 |
1.1.0 | 676 | 5/26/2020 |
1.0.0 | 685 | 5/23/2020 |
0.9.0 | 633 | 5/21/2020 |
0.8.5 | 602 | 5/20/2020 |
0.8.4 | 664 | 5/9/2020 |
0.8.3 | 611 | 5/8/2020 |
0.8.2 | 623 | 5/2/2020 |
0.8.1 | 642 | 4/30/2020 |
0.8.0 | 681 | 4/28/2020 |
0.7.0 | 653 | 4/20/2020 |
0.6.5 | 591 | 4/16/2020 |
0.6.4 | 665 | 4/15/2020 |
0.6.3 | 644 | 4/14/2020 |
0.6.2 | 605 | 4/14/2020 |
0.6.1 | 625 | 4/13/2020 |
0.6.0 | 660 | 4/12/2020 |
0.5.3 | 811 | 4/5/2020 |
0.5.2 | 642 | 3/30/2020 |
0.5.1 | 750 | 3/28/2020 |
0.5.0 | 622 | 3/27/2020 |
0.4.0 | 671 | 3/17/2020 |
0.3.0 | 631 | 3/13/2020 |
0.2.0 | 600 | 3/11/2020 |
0.0.1.1 | 654 | 3/7/2020 |
0.0.1 | 646 | 3/7/2020 |
0.0.1-alpha | 437 | 3/8/2020 |
Added OpenTelemetry