Skip to content

The library hangs on the Flush method #68

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Closed
NektoDron opened this issue Apr 2, 2020 · 33 comments · Fixed by #70
Closed

The library hangs on the Flush method #68

NektoDron opened this issue Apr 2, 2020 · 33 comments · Fixed by #70
Assignees
Labels
bug Something isn't working
Milestone

Comments

@NektoDron
Copy link

NektoDron commented Apr 2, 2020

I tried to upload many records via WriteAPI. When I try to send to many records (10000-100000) they are doesn't be received all. When I try to split to 1000 records per request the library hangs on Flush method. If I doesn't use Flush before disposing WriteAPI some data is lost.

The sample code:

            var start = 0;
            for (;;)
            {
                var historyBars = bars.Skip(start).Take(MaxBarsPerRequest).ToArray();
                if (historyBars.Length == 0)
                {
                    return;
                }
                if (start != 0)
                    await Task.Delay(100, token);
                start += MaxBarsPerRequest;
                m_logger.LogDebug(
                    $"Write bars for {historyBars.First().Security}. From: {historyBars.First().Date}, To: {historyBars.Last().Date}");
                using var api = m_client.GetWriteApi();
                api.WriteMeasurements(HistoryBarConstant.Bucket, HistoryBarConstant.OrgId, WritePrecision.S,
                    historyBars);
                //api.Flush();
            }
@bednar bednar self-assigned this Apr 2, 2020
@bednar bednar added the bug Something isn't working label Apr 2, 2020
@bednar bednar added this to the 1.7.0 milestone Apr 2, 2020
@bednar
Copy link
Contributor

bednar commented Apr 2, 2020

@NektoDron thanks! we will take a look.

@bednar
Copy link
Contributor

bednar commented Apr 3, 2020

Hi @NektoDron,

thanks again for the issue.

We recognized that our write buffer use incorrect batch size during heavy load. I merged #69 that fix it.

The issue is fixed in 1.7.0 milestone. If you would like to use a preview version use our nightly build: InfluxDB.Client 1.7.0-dev.512.

One note to your code... write_api should be used as a singleton. Try to use something like that:

var m_client = InfluxDBClientFactory.Create("http://localhost:9999", "my-token".ToCharArray());
var api = m_client.GetWriteApi(WriteOptions.CreateNew().BatchSize(MaxBarsPerRequest).FlushInterval(10_000).Build());

var start = 0;
for (;;)
{
    var historyBars = bars.Skip(start).Take(MaxBarsPerRequest).ToArray();
    if (historyBars.Length == 0)
    {
        break;
    }
    if (start != 0) {
        Trace.WriteLine("Delaying...");
        await Task.Delay(100);
    }
        
    start += MaxBarsPerRequest;
    Trace.WriteLine(
        $"Add bars to buffer From: {historyBars.First().Date}, To: {historyBars.Last().Date}. Remaining {CountToWrite-start}");
    api.WriteMeasurements(HistoryBarConstant.Bucket, HistoryBarConstant.OrgId, WritePrecision.S,
        historyBars);
}

Trace.WriteLine("Flushing data...");

m_client.Dispose();

Trace.WriteLine("Finished");

I am also working on improve a serialization of measurements into LineProtocol... stay tuned ;)

Regards

@NektoDron
Copy link
Author

Can I use WriteApi for multi threads?

@bednar
Copy link
Contributor

bednar commented Apr 6, 2020

Yes, It's support multithreading.

@bednar
Copy link
Contributor

bednar commented Apr 6, 2020

Hi @NektoDron,

The improved serialization of measurements into LineProtocol - #70 - is merged.

If you would like to use a preview version use our nightly build: InfluxDB.Client 1.7.0-dev.528.

Regards

@NektoDron
Copy link
Author

NektoDron commented Apr 6, 2020

No, it is not worked. I've removed the old bucket and made the new one. After I have reloaded the all of data with new library and modified code like your sample. In the logs I see what all data has been success written up to 2 April. But in the DB I see only half or less.

@bednar bednar reopened this Apr 6, 2020
@bednar
Copy link
Contributor

bednar commented Apr 6, 2020

You could enable detail logging by: Client.SetLogLevel(LogLevel.Body) and you will see output requests to InfuxDB.

Are your data a unique points?

https://docs.influxdata.com/influxdb/v1.7/troubleshooting/frequently-asked-questions/#how-does-influxdb-handle-duplicate-points

@NektoDron
Copy link
Author

I see the library is locked here:
https://www.screencast.com/t/JpQgrB61

I will try with loglevel

@bednar
Copy link
Contributor

bednar commented Apr 6, 2020

This lock means that the library waiting to flush all data into InfluxDB.

Anyway thanks for cooperation!

@NektoDron
Copy link
Author

log.zip

@NektoDron
Copy link
Author

How many it should be uploaded? I have wait some minutes.

@bednar
Copy link
Contributor

bednar commented Apr 6, 2020

It depends how much data you want to save into InfluxDB, as you see in log:

<-- Header: Date=Mon, 06 Apr 2020 10:43:56 GMT
<-- END
<-- 204
<-- Header: Date=Mon, 06 Apr 2020 10:43:56 GMT
<-- END
<-- 204
<-- Header: Date=Mon, 06 Apr 2020 10:43:56 GMT
<-- END

you are able to do 3 requests per seconds...

The best approach will be directly prepare LineProtocol to avoid type overhead in MeasurementMapper and PointData.

Regards

@NektoDron
Copy link
Author

I have waited for 15 minutes. There are no new records in the log and library is stuck at the WaitToCondition method.

@NektoDron
Copy link
Author

https://www.screencast.com/t/o32qAhK5LH
There are no active threads with data uploading but thread is locked.

@NektoDron
Copy link
Author

I made some experiments. All works fine if I don't call Dispose. All data is loaded fine. But garbage collector can't Dispose WriteApi's objects in the finalize thread.

@NektoDron
Copy link
Author

Got any news? I can help if necessary.

@bednar
Copy link
Contributor

bednar commented Apr 12, 2020

I am out of an office, but next week I will will take a look.

Thanks for good cooperation

@bednar
Copy link
Contributor

bednar commented Apr 16, 2020

Hi @NektoDron,

I tried simulate it by import large amout of data as a simple program with main method and everything was fine... all threads was disposed :(

Could you please describe your environment?
Which version of dotnet do you use, which platform?

It's strange behaviour because InfluxDB.Client.WriteApi.Dispose - WaitToCondition is limited by 30seconds.

Regards

@bednar bednar modified the milestones: 1.7.0, 1.8.0 Apr 16, 2020
@bednar
Copy link
Contributor

bednar commented Apr 20, 2020

Hi @NektoDron,

The problem was wrong implementation of WaitToCondition. It is fixed by #81.

If you would like to use a preview version use our nightly build: InfluxDB.Client 1.8.0-dev.639.

Regards

@NektoDron
Copy link
Author

It's not worked. The problem is same. There is deadlock at Dispose.

@NektoDron
Copy link
Author

It's work, but write freeze thread for 30sec. It' not a good solution.

Flushing batches before shutdown.
Exception thrown: 'System.ArgumentNullException' in System.Private.CoreLib.dll
TSCloud.TSSignal.Agent Error: 0 : The WriteApi can't be gracefully dispose! - 30000ms elapsed.
Flushing batches before shutdown.
Exception thrown: 'System.ArgumentNullException' in System.Private.CoreLib.dll
TSCloud.TSSignal.Agent Error: 0 : The WriteApi can't be gracefully dispose! - 30000ms elapsed.
Flushing batches before shutdown.

@NektoDron
Copy link
Author

NektoDron commented Apr 20, 2020

I think bug is at the disposing logic.

2020-04-20_1309

@bednar
Copy link
Contributor

bednar commented Apr 20, 2020

It looks like that is related to handling an exception. Could you share an exception log from your debug output?

@NektoDron
Copy link
Author

I modified code to this and now it's works fine for me
2020-04-20_1325

PS. The exceptions are not from this part.
.

@chungou1996
Copy link

@NektoDron
Are you running in UI Thread. ( WPF or WinForm
If running in UI Thread, the asynchronous method will hang.

@NektoDron
Copy link
Author

I'm running net core console application.

@chungou1996
Copy link

I can't reproduce your bug, but I am very interested in this question, can you upload the code where you have a problem?
My test code is like this

        async static void Write()
        {
            var bars = Enumerable.Range(1, 100000).Select(
                (n) =>
                {
                    return new Bar() { Security = "Safe", Value = n, Date = DateTime.UtcNow.AddMinutes(n) };
                }).ToList();

            CancellationToken token;
            var MaxBarsPerRequest = 1000;
            var start = 0;
            var m_client = InfluxDB.Client.InfluxDBClientFactory.CreateV1("http://localhost:8086", "admin", "admin".ToCharArray(), "SCTech", "autogen");
            for (; ; )
            {
                var historyBars = bars.Skip(start).Take(MaxBarsPerRequest).ToArray();
                if (historyBars.Length == 0)
                {
                    return;
                }
                if (start != 0)
                    await Task.Delay(100, token);
                start += MaxBarsPerRequest;
                Console.WriteLine(
                    $"Write bars for {historyBars.First().Security}. From: {historyBars.First().Date}, To: {historyBars.Last().Date}");
                using var api = m_client.GetWriteApi();
                api.WriteMeasurements("SCTech", "-", WritePrecision.S,
                    historyBars);
                api.Flush();
            }
        }

        [Measurement("History")]
        class Bar
        {
            [Column("Security", IsTag = true)]
            public string Security { get; set; }
            [Column("Time", IsTimestamp = true)]
            public DateTime Date { get; set; }
            [Column("Value")]
            public int Value { get; set; }
        }

@NektoDron
Copy link
Author

My code is a part of big application. It can't work separately. May be problem is what I have about 10 field per history bar (different parameters).
Now I'm using WriteRecord method but result is same.

        public static string GetInfluxRecord(this IDataBar bar, string broker, string security, ChartInterval interval, bool convertTime)
        {
            var b = new StringBuilder();

            void Append(string field, double v, bool isFirst = false)
            {
                if (!isFirst && DoubleUtil.IsZero(v))
                    return;
                if (!isFirst)
                    b.Append(',');
                b.Append(field);
                b.Append('=');
                b.Append(v.ToString(CultureInfo.InvariantCulture));
            }

            void AppendInt(string field, int v)
            {
                if (v == 0)
                    return;
                b.Append(',');
                b.Append(field);
                b.Append('=');
                b.Append(v);
            }

            b.Append(HistoryBarConstant.Measurement);
            b.Append(",broker=");
            b.Append(broker);
            b.Append(",security=");
            b.Append(security);
            b.Append(",interval=");
            b.Append(interval);
            b.Append(' ');
            Append("open", bar.Open, true);
            Append("low", bar.Low);
            Append("high", bar.High);
            Append("close", bar.Close);
            Append("volume", bar.Volume);
            Append("interest", bar.Interest);
            AppendInt("ticks_count", bar.TicksCount);

            if (bar is IBar br)
            {
                Append("bid", br.Bid);
                Append("bid_qty", br.BidQty);
                Append("ask", br.Ask);
                Append("ask_qty", br.AskQty);
                Append("step_price", br.StepPrice);
            }

            var date = convertTime ? bar.Date.ToUniversalTime() : new DateTime(bar.Date.Ticks, DateTimeKind.Utc);
            b.Append(' ');
            b.Append(date.Subtract(new DateTime(1970, 1, 1)).TotalSeconds);

            return b.ToString();
        }

And I written later what fix in the WriteApi constructor helps me:

            _subject
                //
                // Batching
                //
                .Publish(connectedSource =>
                {
                    return connectedSource
                        .Window(tempBoundary)
                        .Merge(Observable.Defer(() =>
                        {
                            connectedSource
                                .Window(TimeSpan.FromMilliseconds(writeOptions.FlushInterval), writeOptions.BatchSize,
                                    writeOptions.WriteScheduler)
                                .Merge(_flush)
                                .Subscribe(t => {},
                                    exception =>
                                    {
                                        _disposed = true;
                                        Console.WriteLine($"The unhandled exception occurs: {exception}");
                                    },
                                    () =>
                                    {
                                        _disposed = true;
                                        Console.WriteLine("The WriteApi was disposed.");
                                    });

                            return Observable.Empty<IObservable<BatchWriteData>>();
                        }));
                })

It happens because Dispose at "if (!_subject.IsDisposed) _subject.OnCompleted();" doesn't set "_disposed" for me. But my correction set it at "if (!_flush.IsDisposed) _flush.OnCompleted()"

@chungou1996
Copy link

chungou1996 commented Apr 23, 2020

@NektoDron
hi,
I try to use 10 or more fields, but it works fine.
I think this exception may be caused when the asynchronous task hangs, so after onComplete, the thread does not return.
You can track the calling chains, for example

 return Observable
                        .Defer(() =>
                            service.PostWriteAsyncWithIRestResponse(org, bucket,
                                    Encoding.UTF8.GetBytes(lineProtocol), null,
                                    "identity", "text/plain; charset=utf-8", null, "application/json", null, precision)
                                .ToObservable())
                        .RetryWhen(f => f.SelectMany(e =>

Track PostWriteAsyncWithIRestResponse

// make the HTTP request
            IRestResponse localVarResponse = (IRestResponse) await this.Configuration.ApiClient.CallApiAsync(localVarPath,
                Method.POST, localVarQueryParams, localVarPostBody, localVarHeaderParams, localVarFormParams, localVarFileParams,
                localVarPathParams, localVarHttpContentType);

Track CallApiAsync

When I run in UI Thread, the asynchronous method will deadlock, it looks like you.

@NektoDron
Copy link
Author

Where I can track it? I'm using WriteApi only at this simple code and I don't want to go down to such a low level.

@bednar bednar removed this from the 1.8.0 milestone May 13, 2020
@NektoDron
Copy link
Author

I've tried latest version and this bug isn't fixed.
I am using Console application and it hasn't UI thread.

@rock77115
Copy link

I guess the server can't accept much data in short time, because I change my send function to use queue, it will run ok

@michaelahojna
Copy link
Contributor

The issue is no longer valid - it was fixed by migration to System.Net.Http.HttpClient - https://github.com/influxdata/influxdb-client-csharp/blob/master/CHANGELOG.md#400-2022-03-18

@bednar bednar closed this as not planned Won't fix, can't repro, duplicate, stale Sep 26, 2022
@bednar bednar added this to the 4.6.0 milestone Sep 26, 2022
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants