生产消费者模式 之 实现一个积分系统

首先从需求开始分析:  1、积分系统要有任务(做任务赚积分) 要有规则(每条任务对应什么规则 或者对应多个规则  每条规则 对应的积分)

                                          
                                           2、开始设计数据库: 主要的表有(资源表,规则表)


                                          生产消费者模式 之 实现一个积分系统生产消费者模式 之 实现一个积分系统

     思想(当系统初始化的时候加载这些数据 并生成一个容器,每次有人做任务得到积分的时候就从这个容器里面拿)

    这时候就需要用到设计模式:

    先定义一个生产的接口:

  public interface IProducerService {


/**
* 生产

* @param service
* @return
*/
Map<Object, WolfBlockingQueue> producer(WolfRulesModel model);


}


 public class ProducerService implements IProducerService, IService {


@Override
public Map<Object, WolfBlockingQueue> producer(WolfRulesModel model) {
Map<Object, WolfBlockingQueue> map = new HashMap<>();
// WolfRulesModel model = service.exec();


List<PcmResource> list = model.getPcmResources();


list.stream().forEach(t -> {
WolfBlockingQueue queue = new WolfBlockingQueue(t.queueSize());

for(int i = t.queueSize(); i > 0; i--) {
queue.offer(i);
}
if (t.getResourcesId() == null) {
// TODO
} else {
map.put(t.getResourcesId(), queue);
}
});
return map;
}


}


再定义一个消费的接口:


  public interface IPcmService extends IWolfService {


/**
* 根据规则进行生产*阻塞队列,用户可根据规则需要进行周期性的下单生产

* @param service
*/
boolean producer(WolfRulesModel model);


/**
* 根据约定规则消费某种资源

* @param id
*            消费者
* @param resourceId
*            资源ID
* @return
*/
<T extends PcmResource> ResultIntegralModel consume(String id, T resourceId);


/**
* 返回队列的大小

* @param id
* @param resourceId
* @return
*/
Integer getQueueSize(String id, Object resourceId);



public class ProducerConsumerService implements IPcmService, IService {


private UserMap map = new UserMap(200);


private List<WolfRulesModel> rules;


@Resource
private IProducerService producerService;


@Override
public boolean producer(WolfRulesModel model) {
if (model == null || model.getPcmResources() == null || model.getPcmResources().isEmpty()) {
// 不为null集合创建队列
return false;
}
boolean flag = false;
if (rules == null) {
rules = new ArrayList<>(3);
} else {
flag = rules.contains(model);
}
rules.add(model);


// 修改规则时,清理所有用户的资源数据,使其重建
if (flag) {
map.values().stream().forEach(t -> {
model.getPcmResources().stream().forEach(r -> t.remove(r.getResourcesId()));
});
}


return true;
}


@Override
public <T extends PcmResource> ResultIntegralModel consume(String id, T resource) {
WolfBlockingQueue queue = initByUser(id, resource.getResourcesId());
if (queue != null) {
int size = queue.size();
boolean bool = queue.consume(resource);
ResultIntegralModel resultModel = new ResultIntegralModel();
resultModel.setIntegralResult(bool);
resultModel.setIntegral(size);
return resultModel;
}
return null;
}


public WolfBlockingQueue initByUser(String id, Object resourceId) {
Map<Object, WolfBlockingQueue> queues = map.get(id);
if (queues == null || queues.isEmpty()) {
final Map<Object, WolfBlockingQueue> m = new ConcurrentHashMap<>(rules.size());
rules.forEach(t -> {
m.putAll(producerService.producer(t));
});
map.put(id, m);
queues = m;
} else if (queues.get(resourceId) == null) {
// TODO 没有设置西区该资源的规则
}
return queues.get(resourceId);
}


@Override
public void wolfInit() {
// TODO Auto-generated method stub


}


@Override
public void wolfClose() {
// TODO Auto-generated method stub


}


@Override
public Integer getQueueSize(String userId, Object resourceId) {
Map<Object, WolfBlockingQueue> queues = map.get(userId);
if (queues != null) {
WolfBlockingQueue queue = queues.get(resourceId);
if (queue != null) {
return queue.size();
}
}
PcmResource resource = null;
for (WolfRulesModel m : rules) {
Optional<PcmResource> o = m.getPcmResources().stream().filter(r -> {
return r.getResourcesId().equals(resourceId);
}).findAny();
if (o.isPresent()) {
resource = o.get();
}
}
if (resource != null) {
return resource.queueSize();
} else {
// TODO 异常
}
return null;
}
}



 再帖一下消费的方法:


ResultIntegralModel resultModel = pcmService.consume(userId, new PcmResource() {


@Override
public Integer queueSize() {
IntegralRule rule = ruleService.selectByPrimaryKey(resurce.getRuleId());
IRuleService service = ruleFactory.getRuleService(rule.getRuleModel());
service.queueSize(rule.getRuleCondition());
return service.currentSize();
}


@Override
public Object getResourcesId() {
return resurce.getId();
}
});


生产消费  生产什么 消费什么所以需要把资源作为对象带进去生产带进去消费  其中用到了WolfBlockingQueue 阴塞队列  当消费时需要实例化资源对象 new PcmResource 并重写queueSize  getResourcesId方法  再根据当前的算法 计算出  return service.currentSize();  return resurce.getId(); 其中消费的时候 注意 调用initByUser方法  因为每个规则 生成一个容器 而做积分任务又是根据人来的  所以要调用initByUser方法 生成当前人的一个容器userMap  这个时候 特别注意要用ConcurrentHashMap 不要用hashMap 因为hashMap是线程不安全的  开始我用的就是hashMap 结果出问题了  

public WolfBlockingQueue initByUser(String id, Object resourceId) {
Map<Object, WolfBlockingQueue> queues = map.get(id);
if (queues == null || queues.isEmpty()) {
final Map<Object, WolfBlockingQueue> m = new ConcurrentHashMap<>(rules.size());
rules.forEach(t -> {
m.putAll(producerService.producer(t));   //开始生产
});
map.put(id, m);
queues = m;
} else if (queues.get(resourceId) == null) {
// TODO 没有设置西区该资源的规则
}
return queues.get(resourceId);
}


private UserMap map = new UserMap(200);


public class UserMap extends HashMap<String, Map<Object, WolfBlockingQueue>> {


private static final long serialVersionUID = 6738114262753077630L;


public UserMap() {
super();
// TODO Auto-generated constructor stub
}


public UserMap(int initialCapacity, float loadFactor) {
super(initialCapacity, loadFactor);
// TODO Auto-generated constructor stub
}


public UserMap(int initialCapacity) {
super(initialCapacity);
// TODO Auto-generated constructor stub
}


}

 最后讲一句 : 开始初始化规则容器的时候 需要用到定时器 因为有些规则 是周期性的  


private void init(IntegralRule record) {
logger.info("生产规则服务启动了init方法 等待定时器生产 =====");
IRuleService service = ruleFactory
.getRuleService(record.getRuleModel());


// 启动定时器
timerService.schedule(new TaskService(record, service), 0);
}

这里封闭了一个定时器模块  作为一个服务向外提供接口