package cn.edu.sdk;
import sun.misc.Unsafe;
import java.lang.reflect.Field;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.LockSupport;
public class MyAQS {
/*
* 当前加锁状态 记录加锁的次数 (没有实现重入 AQS 没有实现重入 而是在ReentrankLock中实现重入)
* */
private volatile int state = 0;
/*
* 记录当前加锁的线程 这两个属性 和 synchronized monitor对象 中的是一样的 owner 和 recursion
* */
private Thread threadHolder;
/*
* 使用一个队列进行加锁失败的线程的保存
* */
private ConcurrentLinkedQueue<Thread> waiters = new ConcurrentLinkedQueue<>();
/*
* 使用unsafe类 进行cas操作
* */
private static Unsafe unsafe;
static {
try {
unsafe = MyAQS.getUnsafeInstance();
} catch (Exception e) {
e.printStackTrace();
}
}
public static Unsafe getUnsafeInstance() throws Exception {
// 通过反射获取rt.jar下的Unsafe类
Field theUnsafeInstance = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafeInstance.setAccessible(true);
// return (Unsafe) theUnsafeInstance.get(null);是等价的
return (Unsafe) theUnsafeInstance.get(Unsafe.class);
}
public int getState() {
return state;
}
public void setState(int state) {
state = state;
}
public Thread getThreadHolder() {
return threadHolder;
}
public void setThreadHolder(Thread threadHolder) {
this.threadHolder = threadHolder;
}
public boolean acquire() throws Exception {
// 使用cas判断是否是能进行加锁
int c = getState();
if (c == 0){
long offset = unsafe.objectFieldOffset(MyAQS.class.getDeclaredField("state"));
// 这里使用的是公平锁的思想 没有排队的才能进行判断是否是能加锁
// 要不就是排序对队列中没有 等待的线程(第一次加锁)
// 要不就是 当前线程是 队列中的第一个
// 没有这个条件就是 导致只有一个线程才能唤醒
Thread thread = Thread.currentThread();
if ( (waiters.size() == 0 || thread == waiters.peek())&& unsafe.compareAndSwapInt(state, offset, 0 , 1)){
// 进行持有锁线程的设置
threadHolder = thread;
return true;
}
}
return false;
}
// 加锁方法
public void lock() throws Exception {
if (acquire()){
// 获取到了锁
return;
}
// 没有加锁成功的话
Thread c = Thread.currentThread();
waiters.add(c);
for(;;){
// 只有加上锁才能跳出循环
if ((c == waiters.peek() ) && acquire()){
// 加锁 就是从队列中出队
waiters.poll();
break;
}
// 为了减少cpu的消耗 将没有加锁的线程放到 阻塞线程 等待队列中
// Thread.yield(); 不一定成功 还有还有优先级的问题
// TimeUnit.SECONDS.sleep(1); 加锁操作的事件不确定 也不好 浪费cpu
// 也是有问题的 阻塞之后需要重新启动
LockSupport.park();
// 进行排队 同时需要这个队列是线程安全的 防止一个线程永远的阻塞
}
}
public void unLock() throws Exception {
// 判断是否是持有锁的线程进行解锁
if (threadHolder != Thread.currentThread()){
throw new RuntimeException(" 不是持有锁的线程不能枷锁 ");
}
// 进行解锁 唤醒排队的线程 进行加锁
// 使用cas 进行解锁
long offset = unsafe.objectFieldOffset(MyAQS.class.getDeclaredField("state"));
boolean isUnLock = unsafe.compareAndSwapInt(state, offset, 1, 0);
if (isUnLock){
// 通知排队的线程加锁
setThreadHolder(null);
//获取队列头的线程 进行加锁
Thread first = waiters.peek();
if (first != null){
// 进行唤醒 唤醒的是队列中的头部的thread
LockSupport.unpark(first);
}
}
}
}
package cn.edu.sdk;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class TestThread {
public static void main(String[] args) {
Order order = new Order();
AtomicInteger integer = new AtomicInteger(0);
for (int i = 0; i < 50; i++) {
new Thread(() ->{
for (int j = 0; j < 20; j++) {
order.descOrder();
integer.getAndIncrement();
}
}, String.valueOf(i)).start();
}
// 两个线程 一个就是 主线程 另一个就是 GC线程 保证前面的五十个线程执行完
while (Thread.activeCount() > 2){
Thread.yield();
}
//可以看出一共是操作1000次 实现加锁操作:
System.out.println(integer);
}
}
class Order{
private int orders = 10;
// 线程操作资源类
private MyAQS myAQS = new MyAQS();
public void descOrder() {
try {
myAQS.lock();
if (orders > 0){
// 模拟订单操作的时间200毫秒
TimeUnit.MILLISECONDS.sleep(200);
System.out.println("线程" + Thread.currentThread().getName() + "---->获取订单" + orders --);
}
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
myAQS.unLock();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- igbc.cn 版权所有 湘ICP备2023023988号-5
违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务