开发手册 欢迎您!
软件开发者资料库

Java Stream处理大量数据时显示处理进度信息方法及示例代码

Java Stream处理数百万个元素。Map-Reduce算法需要几毫秒的时间,因此任务完成大约需要20分钟。本文主要介绍Java Stream处理大数据量信息显示处理进度信息及示例代码。

提示进度信息类似如下:

 5% (08s)
10% (14s)
15% (20s)
...

1、通过peek显示进度信息

Stream myStream = readData();
final AtomicInteger loader = new AtomicInteger();
int fivePercent = elementsCount / 20;
MyResult result = myStream
.map(row -> process(row))
.peek(stat -> {
if (loader.incrementAndGet() % fivePercent == 0) {
System.out.println(loader.get() + " elements on " + elementsCount + " treated");
System.out.println((5*(loader.get() / fivePercent)) + "%");
}
})
.reduce(MyStat::aggregate);

2、使用map显示进度信息

import java.util.Iterator;import java.util.Locale;import java.util.Spliterator;import java.util.Spliterators;import java.util.concurrent.atomic.AtomicLong;import java.util.function.Function;import java.util.function.LongConsumer;import java.util.stream.Stream;import java.util.stream.StreamSupport;public class StreamProgress{    public static void main(String[] args)    {        int size = 250;        Stream stream = readData(size);        LongConsumer progressConsumer = progress ->         {            // "Filter" the output here: Report only every 10th element            if (progress % 10 == 0)            {                double relative = (double) progress / (size - 1);                double percent = relative * 100;                System.out.printf(Locale.ENGLISH,                    "Progress %8d, relative %2.5f, percent %3.2f\n",                    progress, relative, percent);            }        };        Integer result = stream            .map(element -> process(element))            .map(progressMapper(progressConsumer))            .reduce(0, (a, b) -> a + b);        System.out.println("result " + result);    }    private static  Function progressMapper(        LongConsumer progressConsumer)    {        AtomicLong counter = new AtomicLong(0);        return t ->         {            long n = counter.getAndIncrement();            progressConsumer.accept(n);            return t;        };    }    private static Integer process(Integer element)    {        return element * 2;    }    private static Stream readData(int size)    {        Iterator iterator = new Iterator()        {            int n = 0;            @Override            public Integer next()            {                try                {                    Thread.sleep(10);                }                catch (InterruptedException e)                {                    e.printStackTrace();                }                return n++;            }            @Override            public boolean hasNext()            {                return n < size;            }        };        return StreamSupport.stream(            Spliterators.spliteratorUnknownSize(                iterator, Spliterator.ORDERED), false);    }}