1

all

I'm trying to check multithread processing of the some data set that contain number from 1 to N. For example, I want to sum all this number:

1) Hold the sum (result).

public class ResultHolder {

    public static AtomicLong total_time = new AtomicLong(0);

    public static Long sum = 0l;

    public Long getSum() {

        return sum;

    } // END: getSum()

    @PostConstruct
    public void init() {

    } // END: init()

    public void setSum(Long sum) {

        this.sum = sum;

    } // END: setSum()

    public void printSum() {
        System.out.println("Sum is " + sum);
    }

    public void clearSum() {
        sum = 0l;
    }

} // ENDC: ResultHolder

2) Process part of number's set:

public class SumProcessor {

    private static int global_id = 0;
    final public int processor_id;
    private final ArrayList<Long> numbers;
    private Long processor_sum = 0l;

    @Autowired
    private final ResultHolder sumHoldder = null;

    public SumProcessor(ArrayList<Long> numbers) {

        this.numbers = numbers;

        processor_id = ++global_id;

    } // END: constructor

    public void work() throws Exception {

        long t1 = new java.util.Date().getTime();

        int i = 0;

        try {

            if (numbers == null) throw new Exception("Не удалось получить массив чисел.");
            for (i = 0; i < numbers.size(); i++) {
                Long o = null;
                try {
                    o = numbers.get(i);
                    if (o == null) throw new Exception("no number");
                } catch (Exception e) {
                    throw new Exception("Ошибка извлечения числа из массива: " + e);
                }
                processor_sum += o;
            } // END: for

            if (sumHoldder == null) throw new Exception("No sum holder");

            synchronized (sumHoldder) {
                sumHoldder.setSum(sumHoldder.getSum() + processor_sum);
            }

            long t2 = new java.util.Date().getTime();

            this.sumHoldder.total_time.addAndGet(t2 - t1);

        } catch (Exception e) {

            System.out.println("Work() error (" + i + ") " + e);

        }

        return;

    } //END: method1

    @PostConstruct
    public void init() {

        System.out.println("Initializated B: " + this);

    } //END: method2

    @PreDestroy
    public void destroy() {

        System.out.println("Destroy B: " + this);

    } //END: method3

    @Override
    public String toString() {
        return "" +
                "Processor " + processor_id + " " +
                "contain " + numbers.size() + " " +
                "numbers from " + numbers.get(0) +
                " to " + numbers.get(numbers.size() - 1);

    } //END: toString()

} //END: class SumProcessor

3) Very simple profiler (calcs processing time)

@Aspect
public class MethodLoggerBasic {

@Pointcut("execution(* *.work(..))")
void around_work() {};

@Around("around_work()")
public void logMethodName(ProceedingJoinPoint joinPoint) throws Throwable {

    long starttime = new Date().getTime();
    joinPoint.proceed();
    long endtime = new Date().getTime();
    long time = endtime - starttime;
    MainApp.time += time;

} // END:
} // ENDC

4) Main program (can start processing linear or in parallel)

public class MainApp {

static AnnotationConfigApplicationContext context;

public static long time = 0l;
public final static int SIZE = 40_000_000;
public final static int DIVIDE_FACTOR = 4;
public static ArrayList<Long>[] numbers = new ArrayList[DIVIDE_FACTOR];

public static ArrayList<SumProcessor> processors = new ArrayList<>();

public static void main(String[] args) throws Exception {

    context = new AnnotationConfigApplicationContext(myConfig.class);

    // form 4 datasets

    int part_size = SIZE / DIVIDE_FACTOR;

    int i;
    int j;

    for (j = 0; j < DIVIDE_FACTOR; j++) {
        numbers[j] = new ArrayList<>();
        for (i = 0; i < (int) part_size; i++) {
            numbers[j].add(((j * part_size) + i + 1l));
        }
    }

    // create 4 processors (bean)

    for (i = 0; i < DIVIDE_FACTOR; i++) {
        SumProcessor bean = context.getBean(SumProcessor.class, numbers[i]);
        if (bean == null) throw new Exception("Error recive bean SumProcessor.class");
        processors.add(bean);
    }

    // creates 4 threads fro processors
    thread_process thread1 = new thread_process();
    thread_process thread2 = new thread_process();
    thread_process thread3 = new thread_process();
    thread_process thread4 = new thread_process();

    ResultHolder a;

    a = context.getBean(ResultHolder.class);

    try {

        boolean isByPool = true; // flag

        time = 0;

        if (isByPool) {

            System.out.println("-------------------");
            System.out.println("Multithread compute");
            System.out.println("-------------------");
            ExecutorService pool = new ThreadPoolExecutor(
                    4,
                    4,
                    0,
                    TimeUnit.MICROSECONDS,
                    new ArrayBlockingQueue<>(4)
            );

            List<Callable<Boolean>> tasks = new ArrayList();

            tasks.add(thread1);
            tasks.add(thread2);
            tasks.add(thread3);
            tasks.add(thread4);

            pool.invokeAll(tasks);

            pool.shutdown();

            pool.awaitTermination(60, TimeUnit.SECONDS);

        } else {

            thread1.start();
            thread2.start();
            thread3.start();
            thread4.start();

            thread1.join();
            thread2.join();
            thread3.join();
            thread4.join();

        }

        a.printSum();
        a.clearSum();

        System.out.println("total time is " + a.total_time);
        System.out.println("basic time is " + MainApp.time);

        System.out.println("-------------");
        System.out.println("Single thread");
        System.out.println("-------------");

        ArrayList<Long> numbers_tolal = new ArrayList<>();
        for (i = 0; i < SIZE; i++) {
            numbers_tolal.add((i + 1l));
        }

        SumProcessor sumProcessor = context.getBean(SumProcessor.class, numbers_tolal);

        a.total_time.set(0l);
        time = 0l;

        sumProcessor.work();

        a.printSum();

        System.out.println("total time is " + a.total_time);
        System.out.println("basic time is " + MainApp.time);

    } catch (Exception e) {

        throw new Exception("MainApp error: " + e);

    }

    context.close();

} // END: main

} // END: class MainApp

5) Thread process:

public class thread_process extends Thread implements Callable, Runnable {

static int index = 0;

@Override
public void run() {

    try {

        SumProcessor next = MainApp.processors.get(index++);

        if (next == null) {

            System.out.println("Нет процессора");

            System.exit(-1);

        }

        next.work();

        System.out.println("Thread " + this + " complete!");

    } catch (Exception e) {

        System.out.println("Error in thread " + this + ": " + e);

    }

} //END: run()

@Override
public Boolean call() throws Exception {

    run();

    return true;

} //END: call()
}; //END: class thread_process

The output is:

Initializated B: Processor 1 contain 10000000 numbers from 1 to 10000000
Initializated B: Processor 2 contain 10000000 numbers from 10000001 to 20000000
Initializated B: Processor 3 contain 10000000 numbers from 20000001 to 30000000
Initializated B: Processor 4 contain 10000000 numbers from 30000001 to 40000000
-------------------
Multithread compute
-------------------
Thread Thread[Thread-3,5,main] complete!
Thread Thread[Thread-4,5,main] complete!
Thread Thread[Thread-2,5,main] complete!
Thread Thread[Thread-1,5,main] complete!
Sum is 800000020000000
total time is 11254
basic time is 11254
-------------
Single thread
-------------
Initializated B: Processor 5 contain 40000000 numbers from 1 to 40000000
Sum is 800000020000000
total time is 6995
basic time is 6995

Is there a method to make it faster in parallel than linear? Or do I perhaps not need to fork this task? Or maybe my profiler is not so good...

GitHub project

2
  • 1
    Sure, for example by using a ForkJoinPool. Commented Sep 10, 2019 at 11:46
  • 1
    To sum numbers on a subset you do not need to block other threads to access elements from another subset. You do not need blocking here at all, you just need to correctly define ranges of elements for each thread Commented Sep 10, 2019 at 17:41

1 Answer 1

1

You are trying to perform a sequential task using multithreading, that isn't correct use of multithreading. Here, you have a resource for which you need to perform some work. You are using multiple threads to distribute that work, but at the same time, you are blocking one thread when the other thread is using the resource. So, why have worker threads in the first place if you don't want them to access the resource in parallel.

If not necessary, you can drop the Set implementation of the dataset and use List or Arrays where you can access elements using indices without blocking the worker thread.


Update 1: Just add one more line after pool.shutdown() call.

pool.shutdown(); // starts thread shutdown, or force execution of worker threads
pool.awaitTermination(60, TimeUnit.SECONDS); // blocks main thread until thread pool finishes
// ...
// now you can do your single thread task

Also, don't create too many threads since a single thread is fast enough to handle million array elements.


Update 2: So, I don't know why but putting the single thread out of try block seems to get me the expected result.

public class MainApp {static AnnotationConfigApplicationContext context;

    public static long time = 0;
    public final static int SIZE = 28_000_000;
    public final static int DIVIDE_FACTOR = 4;
    public static ArrayList<Long>[] numbers = new ArrayList[DIVIDE_FACTOR];

    public static ArrayList<SumProcessor> processors = new ArrayList<>();

    public static void main(String[] args) throws Exception {

        context = new AnnotationConfigApplicationContext(AppConfig.class);

        ResultHolder a = context.getBean(ResultHolder.class);

        // form 4 datasets

        int part_size = SIZE / DIVIDE_FACTOR;

        int i;
        int j;

        for (j = 0; j < DIVIDE_FACTOR; j++) {
            numbers[j] = new ArrayList<>(part_size);
            for (i = 0; i < (int) part_size; i++) {
                numbers[j].add(((j * part_size) + i + 1l));
            }
        }

        // create 4 processors (bean)

        for (i = 0; i < DIVIDE_FACTOR; i++) {
            SumProcessor bean = context.getBean(SumProcessor.class, numbers[i]);
            if (bean == null) throw new Exception("Error receive bean SumProcessor.class");
            processors.add(bean);
        }

        // creates 4 threads fro processors
        thread_process thread1 = new thread_process();
        thread_process thread2 = new thread_process();
        thread_process thread3 = new thread_process();
        thread_process thread4 = new thread_process();

        try {
            boolean isByThread = true; // flag
            time = 0;
            System.out.println("-------------------");
            System.out.println("Multithread compute");
            System.out.println("-------------------");
            ExecutorService pool = new ThreadPoolExecutor(
                    4,
                    4,
                    0,
                    TimeUnit.MICROSECONDS,
                    new LinkedBlockingDeque<>(4) // or ArrayBlockingDeque<>(4)
            );
            List<Callable<Boolean>> tasks = new ArrayList();
            tasks.add(thread1);
            tasks.add(thread2);
            tasks.add(thread3);
            tasks.add(thread4);
            List<Future<Boolean>> futures = pool.invokeAll(tasks);
            pool.shutdown();
            pool.awaitTermination(60, TimeUnit.SECONDS);
            System.out.println("Time is: " + time);

            a.printSum();
            a.clearSum();
            time = 0;

        } catch (Exception e) {
            throw new Exception("MainApp error: " + e);

        } // <---- moved single thread out of try block

        ArrayList<Long> numbers_total = new ArrayList<>(SIZE);
        for (i = 0; i < SIZE; i++) {
            numbers_total.add((i + 1l));
        }

        System.out.println("-------------");
        System.out.println("Single thread");
        System.out.println("-------------");
        SumProcessor sumProcessor = context.getBean(SumProcessor.class, numbers_total);
        sumProcessor.work();
        System.out.println("Time is: " + time);
        a.printSum();
        a.clearSum();
        time = 0;

        context.close();
    } // END: main
}

Output:

Initialized B: Processor 1 contain 7000000 numbers from 1 to 7000000
Initialized B: Processor 2 contain 7000000 numbers from 7000001 to 14000000
Initialized B: Processor 3 contain 7000000 numbers from 14000001 to 21000000
Initialized B: Processor 4 contain 7000000 numbers from 21000001 to 28000000
-------------------
Multithread compute
-------------------
Thread[Thread-3,5,main] complete task.
Thread[Thread-2,5,main] complete task.
Thread[Thread-1,5,main] complete task.
Thread[Thread-4,5,main] complete task.
Time is: 5472
Sum is 392000014000000
-------------
Single thread
-------------
Initialized B: Processor 5 contain 28000000 numbers from 1 to 28000000
Time is: 10653
Sum is 392000014000000

Output [Reverse order]:

-------------
Single thread
-------------
Initialized B: Processor 1 contain 28000000 numbers from 1 to 28000000
Time is: 2265
Sum is 392000014000000
Initialized B: Processor 2 contain 7000000 numbers from 1 to 7000000
Initialized B: Processor 3 contain 7000000 numbers from 7000001 to 14000000
Initialized B: Processor 4 contain 7000000 numbers from 14000001 to 21000000
Initialized B: Processor 5 contain 7000000 numbers from 21000001 to 28000000
-------------------
Multithread compute
-------------------
Thread[Thread-2,5,main] complete task.
Thread[Thread-4,5,main] complete task.
Thread[Thread-1,5,main] complete task.
Thread[Thread-3,5,main] complete task.
Time is: 2115
Sum is 392000014000000
Sign up to request clarification or add additional context in comments.

7 Comments

pool.shutdown() will not wait for thread execution to finish. So, your single (main) thread is actually competing with the other worker threads. That is why you are getting unexpected results.
I think pool.invokeAll(tasks); works correct because it wating for all task completion. Like threadN.join(); without using pool. But anyway - same result exits.
Can you try it once again with two threads only, and share the time taken by both methods?
13878 / 9023. The Same result without pool. Tried to clear cache of JVM but the same. Tried on Core2Duo and i5-8600 - the same. I do something wrong but what?
I agree, it is possibly some kind of optimization thing in JVM. I am getting 2xxx/2xxx on single thread then multi-thread, and 5xxx/10xxx on multi-thread then single thread order of execution.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.