Java线程间协作

编程实现线程间协作,可借助volatile、wait和notifyAll协调线程。

异步任务间的线程协作,可使用Future和CompletableFuture

线程间有序

CountDownLatch

package com.xieyonghui.art;

import java.util.concurrent.CountDownLatch;

public class Dq {

  public static void main(String[] args) {
    CountDownLatch cd = new CountDownLatch(1);
    CountDownLatch cd1 = new CountDownLatch(1);
    Thread t = new Thread(() -> {
      System.out.println(1);
      try {
        cd.countDown();
      } catch (Exception e) {
        e.printStackTrace();
      }
    });
    t.start();
    t = new Thread(() -> {
      try {
        cd.await();
        System.out.println(2);
        cd1.countDown();
      } catch (Exception e1) {
        e1.printStackTrace();
      }
    });
    t.start();
    t = new Thread(() -> {
      try {
        cd1.await();
      } catch (Exception e) {
        e.printStackTrace();
      }
      System.out.println(3);
    });
    t.start();

  }

}

阻塞队列

package com.xieyonghui.art;

import java.util.concurrent.SynchronousQueue;

public class Dq {

  public static void main(String[] args) {
    SynchronousQueue sq = new SynchronousQueue();
    SynchronousQueue sq1 = new SynchronousQueue();
    Thread t = new Thread(() -> {
      System.out.println(1);
      try {
        sq.put(1);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    });
    t.start();
    t = new Thread(() -> {
      try {
        sq.take();
        System.out.println(2);
        sq1.put(1);
      } catch (Exception e1) {
        e1.printStackTrace();
      }
    });
    t.start();
    t = new Thread(() -> {
      try {
        sq1.take();
      } catch (Exception e) {
        e.printStackTrace();
      }
      System.out.println(3);
    });
    t.start();

  }

}

锁和状态位

package com.xieyonghui.art;

public class Aqs {

  volatile static int state = 3;

  public static void main(String[] args) {

    String lock_a = "lock_A";
    String lock_b = "lock_B";
    new Thread(() -> {
      System.out.println(1);
      synchronized (lock_a) {
        lock_a.notify();
        state--;
      }
    }).start();

    new Thread(() -> {
      if (state != 2) {
        try {
          synchronized (lock_a) {
            lock_a.wait();
          }
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
      System.out.println(2);
      synchronized (lock_b) {
        lock_b.notify();
        state--;
      }

    }).start();

    new Thread(() -> {
      if (state != 1) {
        try {
          synchronized (lock_b) {
            lock_b.wait();
          }
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
      System.out.println(3);
    }).start();

  }

}

状态位

package com.xieyonghui.art;

public class SequenceThreads {

  public static volatile int flag = 1;

  private static final String lock = "Lock";

  public static void main(String[] args) {
    Thread t = new Thread(() -> {
      System.out.println(1);
      n(2);
    });
    t.start();
    t = new Thread(() -> {
      check(2);
      System.out.println(2);
      n(3);
    });
    t.start();
    t = new Thread(() -> {
      check(3);
      System.out.println(3);
    });
    t.start();

  }

  public static void check(int s) {
    while (flag != s) {
      try {
        synchronized (lock) {
          lock.wait();
        }
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

  public static void n(int s) {
    try {
      synchronized (lock) {
        flag = s;
        lock.notifyAll();
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

}

ReentrantLock

package com.xieyonghui.art;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class Ts2 {

  public static void main(String[] args) {
    ReentrantLock lock = new ReentrantLock();
    Condition a = lock.newCondition();
    Condition b = lock.newCondition();
    CountDownLatch c = new CountDownLatch(2);
    Thread t = new Thread(() -> {
      System.out.println(1);
      try {
        c.await();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      lock.lock();
      a.signal();
      lock.unlock();
    });
    t.start();
    t = new Thread(() -> {
      try {
        lock.lock();
        c.countDown();
        a.await();
        System.out.println(2);
        b.signal();
        lock.unlock();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    });
    t.start();
    t = new Thread(() -> {
      try {
        lock.lock();
        c.countDown();
        b.await();
        lock.unlock();
        System.out.println(3);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    });
    t.start();
  }

}