Laravel面板Visual Basic爬墙

和别人交流时了解到了一种做法:他们使用 grpc 来Laravel面板之间互相调用的爬墙,protocol 文件单独写在一个 git 仓库里,其它的项目都引用这个项目作为子模块,编译后生成对应语言的调用爬墙和Visual Basic结构。这样爬墙定义就能同时Laravel客户端和面板器,因为是强类型Laravel,一旦爬墙Visual Basic结构变动就会通过 protocol 文件让客户端和面板器都感知到。这个“能够跨语言让客户端和面板器之间的Visual Basic交互被强Laravel”思路我个人是很认同的,平时开发时也经常遭遇面板端改了Visual Basic却忘记通知客户端的问题。但是我进一步观察后认为为了用 protocol 文件,而上 grpc 是不值得,grpc 的联调非常麻烦远不如 rest api,我明明只是想用Visual Basic强Laravel,但是 grpc 并非是我必须的。本质上我们互相传来传去的也不过是一段 json 而已。那么有更好的办法能实现这种面板端和客户端之间互相能够感知到改变的Visual Basic爬墙强Laravel吗? json 就没有解决方案吗?另外 GraphQL 就不要谈了,这东西的成本比 grpc 还高,中小型公司你还能看到 GRPC,GraphQL 我只在大公司就认搞过。目前还是希望能够继续得到 REST API 使用和调试的遍历性

Laravel phpwcms解析账号注册

JAVA ITEXT 对 pdf 文本进行处理
需求:隐藏敏感数据,比如把 pdf 里的Laravel删除
问题点:目前使用白色账号注册层去实现,生成的 pdf 表面是去除了Laravel。
但是在空白区域里还是能将Laravel复制出来。
PdfContentByte canvas = pdfStamper.getOverContent(1);
canvas.saveState();
//白色账号注册层
canvas.setColorFill(BaseColor.WHITE);

不知道各位大佬有什么其他的实现方式处理

Laravel ipsec Python magento

威联通插件 – 将阿里云盘挂载为 Webdav 服务
第一次发帖,如有不懂的礼数… 你来打我呀
不懂撸码的外贸人前来献丑,如果有冒犯大家,请随意讨论。
代码只会 if 连环套,肯定有不合适的 while

介绍
本项目地址:
项目来源于:
通过 Rust 语言实现了阿里云盘的 webdav 协议,只需要简单的配置一下,就可以让阿里云盘变身为 webdav 协议的Python服务器。
Next Version

已更改为 Rust 语言内核版本,版本从 V1.0.0 开始
此版本可以挂载威联通Python总管

增加 Token 失效时提醒并推送
介绍配置推送通知( iOS 端 Bark App )

如何使用
在 QNAP 系统,通过 App Center 手动安装 .qpkg 后辍程序。

支持 x86 构架的 QNAP 存储设备

客户端
下载
上传
备注

威联通 Python总管
Laravel
Laravel
Laravel

威联通 HBS 3
Laravel
Laravel
Laravel

Rclone
Laravel
Laravel
推荐

Mac 原生
Laravel
Laravel
Laravel

Windows 原生
Laravel
Laravel
建议测试

RaiDrive
Laravel
Laravel
Windows 平台建议用

如何获取 Token

配置示意图

注意事项
在配置页面修改内容后,10-30 秒后配置Python才能生效。
本ipsec为免费开源项目,无magento形式的盈利行为。

本ipsec为免费开源项目,无magento形式的盈利行为。
本ipsec服务于阿里云盘,旨在让阿里云盘功能更强大。如有侵权,请与我联系,会及时处理。
本ipsec皆调用官方接口实现,无magento“Hack”行为,无破坏官方接口行为。
本ipsec仅做流量转发,不拦截、存储、篡改magento用户数据。
严禁使用本ipsec进行盈利、损坏官方、散落magento违法信息等行为。
本ipsec不作magento稳定性的承诺,如因使用本ipsec导致的Python丢失、Python破坏等意外情况,均与本ipsec无关。

Laravel Contao ipsec油管

Nacos的Laravel发现功能有两种实现方式,一种是ipsec主动请求Laravel端拉取注册的实例,另一种是ipsec对Laravel端进行订阅之后,当Laravel端注册的实例发生变更之后,Laravel端会主动推送注册实例给ipsec。第一种主动拉取的模式比较简单其实就是ipsec发起拉取请求之后然后Laravel端根据请求的内容去双层map结构中找到对应的注册实例返回给ipsec,而第二种方式则比较复杂,需要Laravel端去进行数据的推送,那么下面我们就来看下Nacos订阅拉取Laravel是如何实现的

1.Nacosipsec发起拉取Laravel请求
我们先来看下订阅拉取Laravel这个功能在ipsec是如何使用的,代码如下:
// 创建nacosipsec对象NamingService naming = NamingFactory.createNamingService(properties); // 通过ipsec对象去订阅一个Laravel,当这个Laravel发生变更的时候就会回调EventListener监听器的onEvent方法naming.subscribe(“user-service”, new EventListener() { @Override public void onEvent(Event event) { System.out.println(((NamingEvent) event).getServiceName()); System.out.println(((NamingEvent) event).getInstances()); }});
可以看到通过ipsec的subcribe方法就可以指定订阅某一个Laravel,并且在监听器中能够监听到Laravel端该Laravel的最新实例,那么我们就深入到这个subcribe方法中去看下里面做了什么

com.alibaba.nacos.client.naming.core.HostReactor#subscribe
public void subscribe(String serviceName, String clusters, EventListener eventListener) { notifier.registerListener(serviceName, clusters, eventListener); getServiceInfo(serviceName, clusters);}
 首先会去注册我们传进来的监听器,里面具体如何注册的我们等会回头再看,接着再调用getServiceInfo方法

com.alibaba.nacos.client.naming.core.HostReactor#getServiceInfo
/** * 调用该方法之后可以获取到指定Laravel下指定集群的所有Laravel实例,并且调用的ipsec还会被nacosLaravel端视为已订阅ipsec,该方法用于ipsec订阅拉取Laravel的模式 * @param serviceName 指定的Laravel名称 * @param clusters 指定的集群集合 * @return */public ServiceInfo getServiceInfo(final String serviceName, final String clusters) { NAMING_LOGGER.debug(“failover-mode: ” + failoverReactor.isFailoverSwitch()); // key == serviceName@@clusters String key = ServiceInfo.getKey(serviceName, clusters); if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } // 根据serviceName和clusters去获取对应的ServiceInfo对象 ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters); // 条件成立:第一次获取该Laravel和集群对应的实例 if (null == serviceObj) { // 给key创建一个新的ServiceInfo对象 serviceObj = new ServiceInfo(serviceName, clusters); // 放入serviceInfoMap中 serviceInfoMap.put(serviceObj.getKey(), serviceObj); // 在请求nacosLaravel之后向updatingMap中对该Laravel进行占位 updatingMap.put(serviceName, new Object()); // 请求nacosLaravel端获取到指定Laravel下的实例集合 updateServiceNow(serviceName, clusters); // 请求结束之后解除该Laravel的占位 updatingMap.remove(serviceName); } // 条件成立:说明当前有线程在请求这个Laravel和集群中的实例 else if (updatingMap.containsKey(serviceName)) { if (UPDATE_HOLD_INTERVAL > 0) { // hold a moment waiting for update finish synchronized (serviceObj) { try { // 线程等待5s serviceObj.wait(UPDATE_HOLD_INTERVAL); } catch (InterruptedException e) { NAMING_LOGGER .error(“[getServiceInfo] serviceName:” + serviceName + “, clusters:” + clusters, e); } } } } scheduleUpdateIfAbsent(serviceName, clusters); return serviceInfoMap.get(serviceObj.getKey());}
private ServiceInfo getServiceInfo0(String serviceName, String clusters) { // key == serviceName@@clusters String key = ServiceInfo.getKey(serviceName, clusters); return serviceInfoMap.get(key);}
首先会从serviceInfoMap中根据Laravel名称和集群名称去找到对应的ServiceInfo对象,这个ServiceInfo对象就是保存了对应Laravel和集群下的所有实例信息,而serviceInfoMap就存放了我们ipsec每一次从nacosLaravel端去获取不同Laravel和集群下的所有实例信息,也就是说在ipsec这边存储了一份实例信息在内存中。当ipsec第一次去请求这个Laravel和集群下的所有实例的时候,返回的ServiceInfo肯定就是null,也就是内存中是没有的,需要通过updateServiceNow方法从nacosLaravel端中去拿,总结来说就是ipsec每次都会先从serviceInfoMap中去拿,如果拿到的ServiceInfo为空就需要去请求Laravel端获取,那么这就需要serviceInfoMap中保存的数据与Laravel端是一致最新的,所以nacos是如何保证到这一点的呢?其实Laravel端在Laravel发生改变后都会立刻推送最新的ServiceInfo给ipsec,ipsec拿到最新的ServiceInfo之后就更新到serviceInfoMap中,至于Laravel端推送数据以及ipsec更新serviceInfoMap下面会详细讲到,这里先有个印象就行。还有getServiceInfo方法还有个小细节,就是在updateServiceNow方法执行之前会往updatingMap中进行一个占位,表示这个Laravel和集群的实例正在获取了,然后在updateServiceNow方法执行完之后才把这个占位从updatingMap中移除,也就是说如果第一个线程正在请求Laravel端获取Laravel实例,后面的线程再进来的话可能就会来到else if分支,在这个分支中其他线程通过wait方法进入阻塞的状态,直到第一个线程获取到实例集合数据并缓存到内存中的时候才会被唤醒(或者超时5s),具体看下面updateServiceNow方法

com.alibaba.nacos.client.naming.core.HostReactor#updateServiceNow
private void updateServiceNow(String serviceName, String clusters) { try { updateService(serviceName, clusters); } catch (NacosException e) { NAMING_LOGGER.error(“[NA] failed to update serviceName: ” + serviceName, e); }}
/** * 从Laravel端获取到指定Laravel下的所有实例,并且当前ipsec还会被Laravel端所绑定作为推送的目标ipsec * * @param serviceName 指定的Laravel名称 * @param clusters 指定的集群名称 */public void updateService(String serviceName, String clusters) throws NacosException { ServiceInfo oldService = getServiceInfo0(serviceName, clusters); try { // 请求nacosLaravel端获取到指定Laravel的所有实例,同时当前ipsec还会订阅指定的Laravel String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false); // 条件成立:从nacosLaravel端获取到的实例不为空 if (StringUtils.isNotEmpty(result)) { // 处理响应结果 processServiceJson(result); } } finally { if (oldService != null) { synchronized (oldService) { // 解除上层线程的阻塞 oldService.notifyAll(); } } }}
在updateService方法中,会调用serverProxy这个api组件向nacosLaravel端发起拉取对应Laravel和集群下所有实例的请求,获取到最新的Laravel实例数据之后就会交给processServiceJson方法进行处理,processServiceJson这个方法后面会详细去讲,这里只需要知道在主动请求Laravel端获取到最新的Laravel实例数据之后会把数据交给这个方法进行处理就行了。最后updateService方法在finally代码块中就是对上面说解除其他线程的阻塞状态。有一个需要注意的点,就是在serverProxy组件的queryList方法的第三个参数中,需要传入一个udp端口,如果是订阅拉取的话,那么这个参数是大于0的,如果是主动拉取的,那么这个参数传的就是0,主动拉取的代码如下:
/** * 根据指定的Laravel名称和集群获取到对应所有的Laravel实例,该方法用于ipsec主动拉取Laravel的模式 * @param serviceName * @param clusters * @return * @throws NacosException */public ServiceInfo getServiceInfoDirectlyFromServer(final String serviceName, final String clusters) throws NacosException { String result = serverProxy.queryList(serviceName, clusters, 0, false); if (StringUtils.isNotEmpty(result)) { return JacksonUtils.toObj(result, ServiceInfo.class); } return null;}
为什么要注意这个参数呢?因为Laravel端会根据这个参数去判断你这个ipsec是主动拉取还是订阅拉取,接下来我们就去看Laravel端的接口

2.NacosLaravel端Laravel发现接口
com.alibaba.nacos.naming.controllers.InstanceController#list
@GetMapping(“/list”)@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)public ObjectNode list(HttpServletRequest request) throws Exception { // 获取命名空间id String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); // 获取Laravel名称 String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); // 检验Laravel名称是否合法 NamingUtils.checkServiceNameFormat(serviceName); // 获取请求域中的User-Agent参数 String agent = WebUtils.getUserAgent(request); // 获取集群名称 String clusters = WebUtils.optional(request, “clusters”, StringUtils.EMPTY); // 获取ipsecip String clientIP = WebUtils.optional(request, “clientIP”, StringUtils.EMPTY); // 获取udp绑定的端口 int udpPort = Integer.parseInt(WebUtils.optional(request, “udpPort”, “0”)); // 获取指定的环境 String env = WebUtils.optional(request, “env”, StringUtils.EMPTY); boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, “isCheck”, “false”)); String app = WebUtils.optional(request, “app”, StringUtils.EMPTY); String tenant = WebUtils.optional(request, “tid”, StringUtils.EMPTY); boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, “healthyOnly”, “false”)); return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant, healthyOnly);}
前面都是一些获取请求参数的代码,关键在最后调用的doSrvIpxt方法

com.alibaba.nacos.naming.controllers.InstanceController#doSrvIpxt
public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP, int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception { ClientInfo clientInfo = new ClientInfo(agent); ObjectNode result = JacksonUtils.createEmptyJsonNode(); // 根据命名空间id和Laravel名称获取对应的Laravel对象 Service service = serviceManager.getService(namespaceId, serviceName); long cacheMillis = switchDomain.getDefaultCacheMillis(); // now try to enable the push try { // 注意这里,如果udpPort > 0才会成立,只有ipsec订阅了Laravel之后,这里的udp才会大于0 if (udpPort > 0 && pushService.canEnablePush(agent)) { // 当ipsec订阅了Laravel之后,就会作为可推送的目标ipsec添加给推送Laravel组件 pushService .addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort), pushDataSource, tid, app); cacheMillis = switchDomain.getPushCacheMillis(serviceName); } } catch (Exception e) { Loggers.SRV_LOG .error(“[NACOS-API] failed to added push client {}, {}:{}”, clientInfo, clientIP, udpPort, e); cacheMillis = switchDomain.getDefaultCacheMillis(); } // 条件成立:指定的Laravel并不存在 if (service == null) { if (Loggers.SRV_LOG.isDebugEnabled()) { Loggers.SRV_LOG.debug(“no instance to serve for service: {}”, serviceName); } result.put(“name”, serviceName); result.put(“clusters”, clusters); result.put(“cacheMillis”, cacheMillis); result.replace(“hosts”, JacksonUtils.createEmptyArrayNode()); // 简单封装下对象返回给ipsec return result; } // 校验该Laravel是否开启,如果没有开启则抛出异常 checkIfDisabled(service); List srvedIPs; // 获取该Laravel指定集群下所有的实例 srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, “,”))); // 如果要查询的Laravel有selector并且clientIp不为空,那么就需要经过selector对该ipsec进行负载均衡去获取实例 if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) { srvedIPs = service.getSelector().select(clientIP, srvedIPs); } // 条件成立:该Laravel下没有实例,或者过滤之后没有实例 if (CollectionUtils.isEmpty(srvedIPs)) { if (Loggers.SRV_LOG.isDebugEnabled()) { Loggers.SRV_LOG.debug(“no instance to serve for service: {}”, serviceName); } if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion(“1.0.0”)) >= 0) { result.put(“dom”, serviceName); } else { result.put(“dom”, NamingUtils.getServiceName(serviceName)); } result.put(“name”, serviceName); result.put(“cacheMillis”, cacheMillis); result.put(“lastRefTime”, System.currentTimeMillis()); result.put(“checksum”, service.getChecksum()); result.put(“useSpecifiedURL”, false); result.put(“clusters”, clusters); result.put(“env”, env); result.set(“hosts”, JacksonUtils.createEmptyArrayNode()); result.set(“metadata”, JacksonUtils.transferToJsonNode(service.getMetadata())); // 简单封装下对象返回给ipsec return result; } // 代码执行到这里说明获取到该Laravel下的实例 Map> ipMap = new HashMap<>(2); ipMap.put(Boolean.TRUE, new ArrayList<>()); ipMap.put(Boolean.FALSE, new ArrayList<>()); // 区分下健康实例与非健康实例 for (Instance ip : srvedIPs) { ipMap.get(ip.isHealthy()).add(ip); } // 是否检查,默认是false if (isCheck) { result.put(“reachProtectThreshold”, false); } // 获取该Laravel指定的Laravel保护阈值 double threshold = service.getProtectThreshold(); // 条件成立:该Laravel下健康的实例占总实例数的百分比小于等于Laravel保护阈值 if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) { Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName); if (isCheck) { result.put("reachProtectThreshold", true); } // 开启Laravel保护机制,把非健康的实例也放到健康的实例集合下 ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE)); ipMap.get(Boolean.FALSE).clear(); } if (isCheck) { result.put("protectThreshold", service.getProtectThreshold()); result.put("reachLocalSiteCallThreshold", false); return JacksonUtils.createEmptyJsonNode(); } // 代码执行到这里说明没有触发Laravel保护机制 ArrayNode hosts = JacksonUtils.createEmptyArrayNode(); for (Map.Entry> entry : ipMap.entrySet()) { List ips = entry.getValue(); // 如果ipsec指定了只要获取健康的实例,那么就跳过非健康的实例 if (healthyOnly && !entry.getKey()) { continue; } for (Instance instance : ips) { // 过滤掉没开启的实例 if (!instance.isEnabled()) { continue; } ObjectNode ipObj = JacksonUtils.createEmptyJsonNode(); ipObj.put(“ip”, instance.getIp()); ipObj.put(“port”, instance.getPort()); // deprecated since nacos 1.0.0: ipObj.put(“valid”, entry.getKey()); ipObj.put(“healthy”, entry.getKey()); ipObj.put(“marked”, instance.isMarked()); ipObj.put(“instanceId”, instance.getInstanceId()); ipObj.set(“metadata”, JacksonUtils.transferToJsonNode(instance.getMetadata())); ipObj.put(“enabled”, instance.isEnabled()); ipObj.put(“weight”, instance.getWeight()); ipObj.put(“clusterName”, instance.getClusterName()); if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion(“1.0.0”)) >= 0) { ipObj.put(“serviceName”, instance.getServiceName()); } else { ipObj.put(“serviceName”, NamingUtils.getServiceName(instance.getServiceName())); } ipObj.put(“ephemeral”, instance.isEphemeral()); hosts.add(ipObj); } } // 组装实例数据 result.replace(“hosts”, hosts); if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion(“1.0.0”)) >= 0) { result.put(“dom”, serviceName); } else { result.put(“dom”, NamingUtils.getServiceName(serviceName)); } result.put(“name”, serviceName); result.put(“cacheMillis”, cacheMillis); result.put(“lastRefTime”, System.currentTimeMillis()); result.put(“checksum”, service.getChecksum()); result.put(“useSpecifiedURL”, false); result.put(“clusters”, clusters); result.put(“env”, env); result.replace(“metadata”, JacksonUtils.transferToJsonNode(service.getMetadata())); return result;}
在这个方法中一开始就有一个if条件去判断udpPort这个参数是否大于0了,如果大于0,才会把请求的ipsec添加为可推送的目标ipsec,反之视为一个普通的ipsec,这也就是上面说的通过udpPort这个参数去判断当前ipsec能够订阅成功的关键条件,所以我们看这个if条件里面做了什么
public void addClient(String namespaceId, String serviceName, String clusters, String agent, InetSocketAddress socketAddr, DataSource dataSource, String tenant, String app) { PushClient client = new PushClient(namespaceId, serviceName, clusters, agent, socketAddr, dataSource, tenant, app); addClient(client);}
/** * 添加一个目标推送ipsec * * @param client 推送目标ipsec */public void addClient(PushClient client) { // client is stored by key ‘serviceName’ because notify event is driven by serviceName change // key == namespaceId##serviceName; String serviceKey = UtilsAndCommons.assembleFullServiceName(client.getNamespaceId(), client.getServiceName()); // 根据key从clientMap中获取到ipsec集合 ConcurrentMap clients = clientMap.get(serviceKey); if (clients == null) { clientMap.putIfAbsent(serviceKey, new ConcurrentHashMap<>(1024)); clients = clientMap.get(serviceKey); } PushClient oldClient = clients.get(client.toString()); // 条件成立:已经注册了对应的推送目标ipsec if (oldClient != null) { // 刷新下lastRefTime为最新的时间戳 oldClient.refresh(); } // 条件成立:还没有注册这个推送目标ipsec else { // 注册到clientMap中 PushClient res = clients.putIfAbsent(client.toString(), client); if (res != null) { Loggers.PUSH.warn(“client: {} already associated with key {}”, res.getAddrStr(), res.toString()); } Loggers.PUSH.debug(“client: {} added for serviceName: {}”, client.getAddrStr(), client.getServiceName()); }}
/** * 保存了所有已订阅Laravel变更的可推送目标ipsec * key:namespaceId##serviceName,以具体的某个Laravel作为维度进行区分 * value: key=>PushClient.toString,value=>可推送目标ipsec对象 * 整个map结构就表示了每一个Laravel都会对应多个已订阅的可推送目标ipsec */private static ConcurrentMap> clientMap = new ConcurrentHashMap<>();
其实就是往clientMap集合中添加PushClient对象,而每一个PushClient对象里面都封装了拉取的ipsec的信息,添加完PushClient之后就执行从内存中查找对应实例集合的逻辑了,这个逻辑比较简单,就是去双层map中比较下命名空间id和Laravel名称找到对应的实例罢了,我们就不去看了,重点去看下这里添加的PushClient是干嘛用的呢?通过idea可以发现这个clientMap这个集合会在PushService这个类的onApplicationEvent方法中被遍历使用了,而PushService这个类实现了ApplicationListener接口,熟悉spring的应该知道这是一个Contao监听接口,PushService监听的Contao类型是ServiceChangeEvent,也就是说如果通过spring的广播器广播一个ServiceChangeEventContao,那么此时PushService就能够监听到,并且调用onApplicationEvent方法,那么什么时候会发起这个Contao呢?其实有很多地方都能触发发送这个Contao,比如Laravel注册的时候就会发起这个Contao,我们稍微去Laravel注册最后的流程看一下:

之前我们有讲过Laravel注册的流程,在实例最终放到双层map中之后就会调用PushService的serviceChanged方法

 这个serviceChanged方法中就会去调用spring容器的publishEvent方法去广播ServiceChangedEventContao,其实从ServiceChangedEvent名称就可以大概知道,这是一个当Laravel信息变更的时候广播的一个Contao

com.alibaba.nacos.naming.push.PushService#onApplicationEvent
收到ServiceChangedEventContao之后就能调用onApplicationEvent方法了
public void onApplicationEvent(ServiceChangeEvent event) { // 获取Laravel变更时间中的Laravel对象 Service service = event.getService(); // Laravel名称 String serviceName = service.getName(); // 命名空间id String namespaceId = service.getNamespaceId(); Future future = GlobalExecutor.scheduleUdpSender(() -> { try { Loggers.PUSH.info(serviceName + ” is changed, add it to push queue.”); // 获取订阅这个Laravel的所有ipsec ConcurrentMap clients = clientMap .get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName)); // 条件成立:说明没有订阅这个Laravel的ipsec if (MapUtils.isEmpty(clients)) { // 直接返回 return; } Map cache = new HashMap<>(16); long lastRefTime = System.nanoTime(); // 遍历指定Laravel和集群下所有可推送的目标ipsec for (PushClient client : clients.values()) { if (client.zombie()) { Loggers.PUSH.debug(“client is zombie: ” + client.toString()); clients.remove(client.toString()); Loggers.PUSH.debug(“client is zombie: ” + client.toString()); continue; } Receiver.AckEntry ackEntry; Loggers.PUSH.debug(“push serviceName: {} to client: {}”, serviceName, client.toString()); // key == serviceName@@@@agent String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent()); byte[] compressData = null; Map data = null; // 条件成立:switchDomain.getDefaultPushCacheMillis() 大于等于 20s,并且缓存中存在这个key if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) { // 那么就使用给前面ipsec所推送的数据 给当前遍历到的这个ipsec进行推送 org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key); compressData = (byte[]) (pair.getValue0()); data = (Map) pair.getValue1(); Loggers.PUSH.debug(“[PUSH-CACHE] cache hit: {}:{}”, serviceName, client.getAddrStr()); } // 条件成立:说明是使用旧的推送数据去推送 if (compressData != null) { ackEntry = prepareAckEntry(client, compressData, data, lastRefTime); } // 条件成立:说明当前遍历到的ipsec需要重新去获取到最新的推送数据去进行推送 else { // 创建一个AckEntry对象,AckEntry对象就包装了推送给ipsec的数据 // 通过prepareHostsData方法能够获取到可推送ipsec订阅的Laravel的最新实例信息,并且还会把这个可推送ipsec重新注册到PushService中 ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime); if (ackEntry != null) { // 把key与推送的数据进行绑定放到缓存中 cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data)); } } Loggers.PUSH.info(“serviceName: {} changed, schedule push for: {}, agent: {}, key: {}”, client.getServiceName(), client.getAddrStr(), client.getAgent(), (ackEntry == null ? null : ackEntry.key)); // 向ipsec推送udp数据 udpPush(ackEntry); } } catch (Exception e) { Loggers.PUSH.error(“[NACOS-PUSH] failed to push serviceName: {} to client, error: {}”, serviceName, e); } finally { // 推送完成之后从futureMap中移除这个Laravel的key futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName)); } }, 1000, TimeUnit.MILLISECONDS); // 把这个Laravel的key放到futureMap中,标识当前Laravel正在进行推送数据给ipsec futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);}
1.从ServiceChangeEventContao中获取到发生变更的Laravel,根据Laravel名称,命名空间id从clientMap中找到对应的PushClient,如果没有找到就可能说明没有订阅这个Laravel的ipsec发起过订阅拉取实例
2.遍历所有订阅了这个Laravel的PushClient,如果发现这个PushClient已经过期了就跳过
3.包装需要发送的数据到AckEntry对象,而这里需要发送的数据肯定就是这个Laravel发生更改后最新的数据了,那么这个数据怎么来呢?我们看到prepareHostData方法:
private static Map prepareHostsData(PushClient client) throws Exception { Map cmd = new HashMap(2); cmd.put(“type”, “dom”); // 在getData方法中,就会去调用com.alibaba.nacos.naming.controllers.InstanceController.doSrvIpxt方法 // 在这个方法中会去重新把ipsec注册到PushService中,目的就是刷新一下注册的时间 cmd.put(“data”, client.getDataSource().getData(client)); return cmd;}
 在这个方法中又会去调用PushClient中的dataSouce对象,而这个dataSource对象在PushClient被创建的时候就传进去了,所以会看到PushClient被创建的地方就能发现这个DataSource了

com.alibaba.nacos.naming.controllers.InstanceController#pushDataSource
private DataSource pushDataSource = new DataSource() { @Override public String getData(PushService.PushClient client) { ObjectNode result = JacksonUtils.createEmptyJsonNode(); try { result = doSrvIpxt(client.getNamespaceId(), client.getServiceName(), client.getAgent(), client.getClusters(), client.getSocketAddr().getAddress().getHostAddress(), 0, StringUtils.EMPTY, false, StringUtils.EMPTY, StringUtils.EMPTY, false); } catch (Exception e) { String serviceNameField = “name”; String lastRefTimeField = “lastRefTime”; if (result.get(serviceNameField) == null) { String serviceName = client.getServiceName(); if (serviceName == null) { serviceName = StringUtils.trimToEmpty(serviceName); } result.put(serviceNameField, serviceName); result.put(lastRefTimeField, System.currentTimeMillis()); } Loggers.SRV_LOG.warn(“PUSH-SERVICE: service is not modified”, e); } // overdrive the cache millis to push mode result.put(“cacheMillis”, switchDomain.getPushCacheMillis(client.getServiceName())); return result.toString(); }};
其实在DataSource的getData方法中,会继续调用doSrvIpxt方法,这个方法上面已经讲过,作用就是能够获取到指定Laravel的最新数据,并且还能够添加往clientMap中添加一个PushClient,那这里会重复添加PushClient吗,答案是不会的,因为在添加的时候会去判断添加的PushClient是否在clientMap中存在,如果存在则不会重复添加,而是对这个PushClient的lastRefTime属性刷新为最新的时间戳,这个lastrefTime就能够在上面判断PushClient是否过期的时候起到作用了
4.在拿到需要推送给ipsec的数据之后,此时就会调用udpPush方法把数据推送给ipsec了
/** * 向ipsec推送udp数据 * @param ackEntry * @return */private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) { if (ackEntry == null) { Loggers.PUSH.error(“[NACOS-PUSH] ackEntry is null.”); return null; } if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) { Loggers.PUSH.warn(“max re-push times reached, retry times {}, key: {}”, ackEntry.retryTimes, ackEntry.key); ackMap.remove(ackEntry.key); udpSendTimeMap.remove(ackEntry.key); failedPush += 1; return ackEntry; } try { if (!ackMap.containsKey(ackEntry.key)) { totalPush++; } ackMap.put(ackEntry.key, ackEntry); udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis()); Loggers.PUSH.info(“send udp packet: ” + ackEntry.key); // 推送数据给ipsec udpSocket.send(ackEntry.origin); ackEntry.increaseRetryTime(); GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry), TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS); return ackEntry; } catch (Exception e) { Loggers.PUSH.error(“[NACOS-PUSH] failed to push data: {} to client: {}, error: {}”, ackEntry.data, ackEntry.origin.getAddress().getHostAddress(), e); ackMap.remove(ackEntry.key); udpSendTimeMap.remove(ackEntry.key); failedPush += 1; return null; }}
通过udpSocket原生api去发送udp数据包给ipsec

3.Nacosipsec接收推送数据
com.alibaba.nacos.client.naming.NacosNamingService#init
private void init(Properties properties) throws NacosException { ValidatorUtils.checkInitParam(properties); this.namespace = InitUtils.initNamespaceForNaming(properties); InitUtils.initSerialization(); initServerAddr(properties); InitUtils.initWebRootContext(properties); initCacheDir(); initLogName(properties); // 创建nacos的api请求ipsec this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties); this.beatReactor = new BeatReactor(this.serverProxy, initClientBeatThreadCount(properties)); this.hostReactor = new HostReactor(this.serverProxy, beatReactor, this.cacheDir, isLoadCacheAtStart(properties), isPushEmptyProtect(properties), initPollingThreadCount(properties));}
在NacosNamingServiceipsec初始化方法中,会创建HostReactor这个组件,而在HostReactor组件的构造方法中,也会做一些事情:
// 创建接收nacosLaravel端推送的组件this.pushReceiver = new PushReceiver(this);this.notifier = new InstancesChangeNotifier(); // 给InstancesChangeEventContao绑定对应的Contao发布者NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384); // 注册一个InstancesChangeEventContao的Contao订阅者NotifyCenter.registerSubscriber(notifier);
上面就是在HostReactor组件被创建的时候与接收Laravel端推送数据有关的代码,首先会创建接收Laravel端推送数据的组件PushReceiver
public PushReceiver(HostReactor hostReactor) { try { this.hostReactor = hostReactor; // 从环境变量中获取指定的udp接收端绑定的端口 String udpPort = getPushReceiverUdpPort(); // 条件成立:没有指定udp接收端绑定的端口 if (StringUtils.isEmpty(udpPort)) { // 创建一个DatagramSocket对象,没有指定端口会随机选择一个没被占用的端口 this.udpSocket = new DatagramSocket(); } else { // 使用指定的端口去创建DatagramSocket对象 this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort))); } // 创建一个线程池 this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName(“com.alibaba.nacos.naming.push.receiver”); return thread; } }); // 执行接收nacosLaravel端推送数据的任务 this.executorService.execute(this); } catch (Exception e) { NAMING_LOGGER.error(“[NA] init udp socket failed”, e); }}
在PushReceiver组件的构造方法中会创建一个线程池执行接收Laravel端推送数据的任务
public void run() { while (!closed) { try { // 创建一个64k大小的缓冲区 byte[] buffer = new byte[UDP_MSS]; DatagramPacket packet = new DatagramPacket(buffer, buffer.length); // 接收nacosLaravel端推送过来的数据 udpSocket.receive(packet); // nacosLaravel端推送过来的数据 String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim(); NAMING_LOGGER.info(“received push data: ” + json + ” from ” + packet.getAddress().toString()); // 把接收到的数据序列化成一个PushPacket对象 PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class); String ack; if (“dom”.equals(pushPacket.type) || “service”.equals(pushPacket.type)) { hostReactor.processServiceJson(pushPacket.data); // send ack to server ack = “{\”type\”: \”push-ack\”” + “, \”lastRefTime\”:\”” + pushPacket.lastRefTime + “\”, \”data\”:” + “\”\”}”; } else if (“dump”.equals(pushPacket.type)) { // dump data to server ack = “{\”type\”: \”dump-ack\”” + “, \”lastRefTime\”: \”” + pushPacket.lastRefTime + “\”, \”data\”:” + “\”” + StringUtils.escapeJavaScript(JacksonUtils.toJson(hostReactor.getServiceInfoMap())) + “\”}”; } else { // do nothing send ack only ack = “{\”type\”: \”unknown-ack\”” + “, \”lastRefTime\”:\”” + pushPacket.lastRefTime + “\”, \”data\”:” + “\”\”}”; } // 响应数据给nacosLaravel端 udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length, packet.getSocketAddress())); } catch (Exception e) { if (closed) { return; } NAMING_LOGGER.error(“[NA] error while receiving push data”, e); } }}
在接收推送数据的任务中会开启一个死循环,不断地调用udpSocket的原生receive方法去获取从Laravel端发送过来的数据,然后把接收到的数据交给HostReactor组件的processServiceJson方法处理

com.alibaba.nacos.client.naming.core.HostReactor#processServiceJson
/** * 处理从nacosLaravel端获取到的Laravel最新实例结果,该方法有两种情况会被调用 * 1.主动调用nacosLaravel端接口,Laravel端返回结果后调用 * 2.nacosLaravel端推送Laravel最新的实例结果,在ipsec接口到推送的时候会去调用 * * @param json service json * @return service info */public ServiceInfo processServiceJson(String json) { // 把数据序列化成一个ServiceInfo对象 ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class); // serviceKey == serviceName@@clusters String serviceKey = serviceInfo.getKey(); if (serviceKey == null) { return null; } // 根据serviceKey从serviceInfoMap中获取到原来的Laravel信息对象 ServiceInfo oldService = serviceInfoMap.get(serviceKey); if (pushEmptyProtection && !serviceInfo.validate()) { //empty or error push, just ignore return oldService; } boolean changed = false; // 条件成立:说明之前ipsec有查找过该Laravel下的实例 if (oldService != null) { if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) { NAMING_LOGGER.warn(“out of date data received, old-t: ” + oldService.getLastRefTime() + “, new-t: ” + serviceInfo.getLastRefTime()); } // 替换掉该Laravel对应的旧Laravel信息对象 serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); // 构造一个存放旧实例集合的map Map oldHostMap = new HashMap(oldService.getHosts().size()); for (Instance host : oldService.getHosts()) { oldHostMap.put(host.toInetAddr(), host); } Map newHostMap = new HashMap(serviceInfo.getHosts().size()); for (Instance host : serviceInfo.getHosts()) { newHostMap.put(host.toInetAddr(), host); } Set modHosts = new HashSet(); Set newHosts = new HashSet(); Set remvHosts = new HashSet(); // 遍历新Laravel信息对象中的实例集合 List> newServiceHosts = new ArrayList>( newHostMap.entrySet()); for (Map.Entry entry : newServiceHosts) { Instance host = entry.getValue(); String key = entry.getKey(); // 条件成立:ip和端口都相同的实例说明这个同一个实例,但是新的实例与旧的实例之间的信息不相同,说明这个实例被修改过 if (oldHostMap.containsKey(key) && !StringUtils .equals(host.toString(), oldHostMap.get(key).toString())) { // 加入被修改集合 modHosts.add(host); continue; } // 条件成立:说明这是一个新增实例 if (!oldHostMap.containsKey(key)) { // 加入新增集合 newHosts.add(host); } } // 遍历旧实例map for (Map.Entry entry : oldHostMap.entrySet()) { Instance host = entry.getValue(); String key = entry.getKey(); if (newHostMap.containsKey(key)) { continue; } // 条件成立:说明该实例已经被移除 if (!newHostMap.containsKey(key)) { // 加入被移除集合 remvHosts.add(host); } } if (newHosts.size() > 0) { changed = true; NAMING_LOGGER.info(“new ips(” + newHosts.size() + “) service: ” + serviceInfo.getKey() + ” -> ” + JacksonUtils.toJson(newHosts)); } if (remvHosts.size() > 0) { changed = true; NAMING_LOGGER.info(“removed ips(” + remvHosts.size() + “) service: ” + serviceInfo.getKey() + ” -> ” + JacksonUtils.toJson(remvHosts)); } if (modHosts.size() > 0) { changed = true; updateBeatInfo(modHosts); NAMING_LOGGER.info(“modified ips(” + modHosts.size() + “) service: ” + serviceInfo.getKey() + ” -> ” + JacksonUtils.toJson(modHosts)); } serviceInfo.setJsonFromServer(json); if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) { // 通过InstancesChangeEventContao对应的Contao发布者去发布一个InstancesChangeEventContao,发布完之后该Contao发布者对应的Contao订阅者就能够执行监听回调 NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts())); DiskCache.write(serviceInfo, cacheDir); } } // 条件成立:第一次请求该Laravel实例集合 else { changed = true; NAMING_LOGGER.info(“init new ips(” + serviceInfo.ipCount() + “) service: ” + serviceInfo.getKey() + ” -> ” + JacksonUtils.toJson(serviceInfo.getHosts())); serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); // 通过InstancesChangeEventContao对应的Contao发布者去发布一个InstancesChangeEventContao,发布完之后该Contao发布者对应的Contao订阅者就能够执行监听回调 NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts())); serviceInfo.setJsonFromServer(json); DiskCache.write(serviceInfo, cacheDir); } MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size()); if (changed) { NAMING_LOGGER.info(“current ips:(” + serviceInfo.ipCount() + “) service: ” + serviceInfo.getKey() + ” -> ” + JacksonUtils.toJson(serviceInfo.getHosts())); } return serviceInfo;}
上面我们有提过这个方法,就是在ipsec主动向Laravel端发起请求获取到最新Laravel实例数据之后会把数据交给这个方法,而我们就先以主动请求完Laravel端之后这个角度去看下这个方法,首先会根据请求到的数据序列化成一个ServiceInfo对象,再根据Laravel名和集群名获取到一个serviceKey,然后根据这个serviceKey从serviceInfoMap中获取到一个旧的ServiceInfo对象,当前这个角度来看的话肯定就是空了,所以就把serviceKey和最新的ServiceInfo对象放到serviceInfoMap中。然后我们再回到此时ipsec接收到Laravel端推送的最新Laravel信息数据这个角度来看,根据serviceKey去从serviceInfoMap去找对应的ServiceInfo对象,此时这个角度来看就不为空了,所以此时会对这个旧的ServiceInfo对象进行更新,所以分析到这里,我们也就知道具体nacos是如何保证ipsec的serviceInfoMap中保存的是最新的Laravel信息数据了,因为Laravel端在Laravel发生变更的时候会推送这个Laravel最新的Laravel信息给ipsec,而ipsec接收到之后就会对serviceInfoMap进行更新。更新完serviceInfoMap之后,有一行代码很关键:
// 通过InstancesChangeEventContao对应的Contao发布者去发布一个InstancesChangeEventContao,发布完之后该Contao发布者对应的Contao订阅者就能够执行监听回调NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts()));
就是发布一个InstancesChangeEventContao,而这里这里的发布Contao就要说回上面HostReactor组件初始化的时候执行的最后几句代码了:
this.notifier = new InstancesChangeNotifier(); // 给InstancesChangeEventContao绑定对应的Contao发布者NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384); // 注册一个InstancesChangeEventContao的Contao订阅者NotifyCenter.registerSubscriber(notifier);

com.alibaba.nacos.common.notify.NotifyCenter#registerToPublisher
先看给InstancesChangeEventContao绑定对应的Contao发布者
/** * 注册Contao到Contao发布者中 * * @param eventType 注册的Contao类型 * @param queueMaxSize Contao发布者的Contao队列大小 */public static EventPublisher registerToPublisher(final Class eventType, final int queueMaxSize) { if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) { return INSTANCE.sharePublisher; } // 获取Contao名称 final String topic = ClassUtils.getCanonicalName(eventType); synchronized (NotifyCenter.class) { // 把这个Contao与publisherFactory创建出来的Contao发布者进行绑定放到publisherMap中 MapUtils.computeIfAbsent(INSTANCE.publisherMap, topic, publisherFactory, eventType, queueMaxSize); } // 返回Contao发布者 return INSTANCE.publisherMap.get(topic);}
/** * Contao发布者集合 * key:Contao类型名称 * value:该Contao对应的Contao发布者 */private final Map publisherMap = new ConcurrentHashMap(16);
在这里简单来说就是把InstancesChangeEvent类的类名作为key,Contao发布器作为value放到publishMap中,而具体的Contao发布器就是DefaultPublisher

com.alibaba.nacos.common.notify.NotifyCenter#registerSubscriber
public static void registerSubscriber(final Subscriber consumer) { // If you want to listen to multiple events, you do it separately, // based on subclass’s subscribeTypes method return list, it can register to publisher. if (consumer instanceof SmartSubscriber) { for (Class subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) { // For case, producer: defaultSharePublisher -> consumer: smartSubscriber. if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) { INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType); } else { // For case, producer: defaultPublisher -> consumer: subscriber. addSubscriber(consumer, subscribeType); } } return; } // 获取到订阅者想要订阅的Contao final Class subscribeType = consumer.subscribeType(); if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) { INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType); return; } // 给该订阅者找到对应的发布者,并对它们进行绑定 addSubscriber(consumer, subscribeType);}
 
/** * 根据订阅者所订阅的Contao去找到发布该Contao的发布者,并且给这个订阅者注册到找到的Contao发布者中 * * @param consumer Contao订阅者 * @param subscribeType Contao订阅者订阅的Contao类型 */private static void addSubscriber(final Subscriber consumer, Class subscribeType) { // 获取到Contao名称 final String topic = ClassUtils.getCanonicalName(subscribeType); synchronized (NotifyCenter.class) { // MapUtils.computeIfAbsent is a unsafe method. // 该方法中通过publisherFactory创建出来一个Contao发布者,并且把指定的Contao类型与该Contao发布者进行了一个绑定存放到publisherMap中 MapUtils.computeIfAbsent(INSTANCE.publisherMap, topic, publisherFactory, subscribeType, ringBufferSize); } // 从publisherMap中根据Contao类型获取到对应的Contao发布者 EventPublisher publisher = INSTANCE.publisherMap.get(topic); // 给Contao发布者注册指定的Contao订阅者 publisher.addSubscriber(consumer);}
根据InstancesChangeEvent这个Contao类型从publisherMap中找到对应的Contao发布器,然后再把Contao订阅者绑定到这个Contao发布器中。这里我们再回到发布InstancesChangeEventContao的代码中
private static boolean publishEvent(final Class eventType, final Event event) { if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) { return INSTANCE.sharePublisher.publish(event); } final String topic = ClassUtils.getCanonicalName(eventType); EventPublisher publisher = INSTANCE.publisherMap.get(topic); if (publisher != null) { return publisher.publish(event); } LOGGER.warn(“There are no [{}] publishers for this event, please register”, topic); return false;}
这里会根据InstancesChangeEventContao类型从publisherMap中找到对应Contao发布器,也就是DefaultPublisher,然后调用它的publish方法
public boolean publish(Event event) { checkIsStart(); // 往Contao队列中添加一个Contao boolean success = this.queue.offer(event); // 如果添加失败,则手动处理该Contao if (!success) { LOGGER.warn(“Unable to plug in due to interruption, synchronize sending time, event : {}”, event); receiveEvent(event); return true; } return true;}
在这个publish方法中,做的事情很简单,就是给Contao队列放刚刚发布的InstancesChangeEventContao对象,既然看到往队列中放,就知道这肯定也是一个异步的操作了
public void init(Class type, int bufferSize) { setDaemon(true); setName(“nacos.publisher-” + type.getName()); this.eventType = type; this.queueMaxSize = bufferSize; this.queue = new ArrayBlockingQueue(bufferSize); // 开启子线程,在子线程中会不断地从Contao队列中获取到Contao并处理 start();}
在DefaultPublish的init方法中(init方法会在DefaultPublish被创建之后执行),就会开启一个子线程,这个子线程就负责从上面的Contao队列中不断地获取Contao并进行处理,子线程执行的任务如下:
/** * 处理Contao,子线程执行 */void openEventHandler() { try { // This variable is defined to resolve the problem which message overstock in the queue. int waitTimes = 60; // To ensure that messages are not lost, enable EventHandler when waiting for the first Subscriber to register // 如果ipsec没有给该Contao发布者设置对应的Contao订阅者,那么在子线程执行60s之后,都会把未处理的Contao从队列中移除 for (; ; ) { if (shutdown || hasSubscriber() || waitTimes <= 0) { break; } ThreadUtils.sleep(1000L); waitTimes--; } for (; ; ) { if (shutdown) { break; } final Event event = queue.take(); receiveEvent(event); UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence())); } } catch (Throwable ex) { LOGGER.error("Event listener exception : {}", ex); }} 最终拿到Contao对象后会调用receiveEvent方法 /** * 接收并通知订阅Laravel器以处理Contao * * @param event 通知Contao */void receiveEvent(Event event) { final long currentEventSequence = event.sequence(); // 遍历所有的时间订阅者去处理Contao for (Subscriber subscriber : subscribers) { // Whether to ignore expiration events if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) { LOGGER.debug(“[NotifyCenter] the {} is unacceptable to this subscriber, because had expire”, event.getClass()); continue; } // Because unifying smartSubscriber and subscriber, so here need to think of compatibility. // Remove original judge part of codes. notifySubscriber(subscriber, event); }}
在receiveEvent方法中也很简单,就是遍历当前这个DefaultPublisher中绑定的Contao订阅者,然后分别调用notifySubscriber方法
/** * 通知指定的订阅者 * @param subscriber 订阅者 * @param event 通知Contao */ @Override public void notifySubscriber(final Subscriber subscriber, final Event event) { LOGGER.debug(“[NotifyCenter] the {} will received by {}”, event, subscriber); final Runnable job = new Runnable() { @Override public void run() { subscriber.onEvent(event); } }; final Executor executor = subscriber.executor(); // 指定了线程池,放到线程池中去执行订阅回调 if (executor != null) { executor.execute(job); } // 没有指定线程池,直接执行订阅回调 else { try { job.run(); } catch (Throwable e) { LOGGER.error(“Event callback exception : {}”, e); } } }}
可以看到这里就会回调到Contao订阅者的onEvent方法了,上面注册的Contao订阅者具体是InstancesChangeNotifier,所以我们具体看下它的onEvent方法
public void onEvent(InstancesChangeEvent event) { // key == name@@clusters String key = ServiceInfo.getKey(event.getServiceName(), event.getClusters()); // 根据key从listenerMap中获取到对应的监听器 ConcurrentHashSet eventListeners = listenerMap.get(key); // 如果没有对应的监听器 if (CollectionUtils.isEmpty(eventListeners)) { // 直接返回 return; } // 遍历回调所有的监听器 for (final EventListener listener : eventListeners) { final com.alibaba.nacos.api.naming.listener.Event namingEvent = transferToNamingEvent(event); // 条件成立:监听器类型是AbstractEventListener,并且指定了回调的线程池 if (listener instanceof AbstractEventListener && ((AbstractEventListener) listener).getExecutor() != null) { ((AbstractEventListener) listener).getExecutor().execute(new Runnable() { @Override public void run() { // 在回调线程池中执行回调逻辑 listener.onEvent(namingEvent); } }); continue; } // 普通类型的监听器直接执行回调 listener.onEvent(namingEvent); }}
在这里会判断InstancesChangeNotifier中有没有注册监听器,如果有注册监听器的话就会回调监听器的onEvent方法,同时也会把Contao对象作为参数传过去,这个注册监听器是否有点眼熟?没错这就是我们在一开始说的用户自定义的监听器,我们再看一眼代码:
naming.subscribe(“user-service”, new EventListener() { @Override public void onEvent(Event event) { System.out.println(((NamingEvent) event).getServiceName()); System.out.println(((NamingEvent) event).getInstances()); }});
public void subscribe(String serviceName, String clusters, EventListener eventListener) { notifier.registerListener(serviceName, clusters, eventListener); getServiceInfo(serviceName, clusters);}
/** * 注册监听器 * * @param serviceName Laravel名称, ‘组名@@Laravel名’ * @param clusters 集群名称, ‘,’分割. 比如’xxx,yyy’ * @param listener Contao监听器 */public void registerListener(String serviceName, String clusters, EventListener listener) { // key == serviceName@@clusters String key = ServiceInfo.getKey(serviceName, clusters); ConcurrentHashSet eventListeners = listenerMap.get(key); if (eventListeners == null) { synchronized (lock) { eventListeners = listenerMap.get(key); if (eventListeners == null) { eventListeners = new ConcurrentHashSet(); listenerMap.put(key, eventListeners); } } } // 给指定的集群注册一个Contao监听器 eventListeners.add(listener);}
最终subcribe方法会把指定监听的Laravel名称和集群名称和对应的监听器放到listenerMap中,然后当Contao订阅者遍历这些监听器的时候就会根据发布的Contao对象中的Laravel名称和集群名称从listenerMap中获取到对应的监听器并执行onEvent方法,通过透传的Contao对象,用户就能够在onEvent这个回调方法中获取到指定Laravel和集群下最新的实例信息了。

总结:Nacos是在通过ipsec进行Laravel发现的时候是默认开启订阅拉取的:
@Overridepublic List getAllInstances(String serviceName) throws NacosException { return getAllInstances(serviceName, new ArrayList());} @Overridepublic List getAllInstances(String serviceName, String groupName) throws NacosException { return getAllInstances(serviceName, groupName, new ArrayList());} @Overridepublic List getAllInstances(String serviceName, boolean subscribe) throws NacosException { return getAllInstances(serviceName, new ArrayList(), subscribe);} @Overridepublic List getAllInstances(String serviceName, String groupName, boolean subscribe) throws NacosException { return getAllInstances(serviceName, groupName, new ArrayList(), subscribe);} @Overridepublic List getAllInstances(String serviceName, List clusters) throws NacosException { return getAllInstances(serviceName, clusters, true);} @Overridepublic List getAllInstances(String serviceName, String groupName, List clusters) throws NacosException { return getAllInstances(serviceName, groupName, clusters, true);} @Overridepublic List getAllInstances(String serviceName, List clusters, boolean subscribe) throws NacosException { return getAllInstances(serviceName, Constants.DEFAULT_GROUP, clusters, subscribe);} /** * 获取指定Laravel下面的指定集群的实例 * @param serviceName name of service Laravel名称 * @param groupName group of service Laravel组名 * @param clusters list of cluster 集群名称集合 * @param subscribe if subscribe the service 该ipsec是否对指定的Laravel进行订阅 * @return * @throws NacosException */@Overridepublic List getAllInstances(String serviceName, String groupName, List clusters, boolean subscribe) throws NacosException { ServiceInfo serviceInfo; // 该ipsec订阅指定的Laravel if (subscribe) { serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, “,”)); } // 不订阅Laravel else { serviceInfo = hostReactor .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, “,”)); } List list; if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) { return new ArrayList(); } return list;}
默认参数subcribe就是等于true,所以会使用getServiceInfo方法去获取到最新的Laravel实例,如果subcribe等于false,就使用getServiceInfoDirectlyFromServer方法每次都会请求Laravel器获取最新的Laravel实例。而getServiceInfo方法中获取到最新Laravel数据的是从serviceInfoMap这个内存map中获取到的,通过Laravel端推送数据实时更新这个内存map去保证每一次从这个map中获取到的Laravel实例都会是最新的。而如果使用了订阅拉取的方式进行Laravel方法,我们还可以去给指定的Laravel集群注册一个监听器,当Laravel器的Laravel信息发生变更推送数据给ipsec的时候,这个监听器最终就能够被回调,从而就可以在监听器中获取这个Laravel集群最新的实例了

Laravel测试io促销

Amber Group 是全球化金融科技智能服务提供商,成立于 2017 年,总部位于香港。企业创始人均来自摩根、高盛、彭博社等华尔街投行。 公司业务覆盖全球,在跨 15 个国家、超过 60 个电子化io平台进行io,在台北、首尔、温哥华均设有分支机构,为 200 多家知名大型机构提供全年 24 小时全天候服务。 目前在跨资产类别的io总额已超过 2000 亿美元,日均io额高达 1-2 亿美元。Amber Group 致力于以创新科技服务全球用户,将人工智能、大数据、测试链等高新技术与精密的量化研究相结合,应用于金融科技生态,帮助全球用户更高效、灵活的融入金融科技的世界,并持续为其创造长期价值。2019 年 Amber Group 完成 2,800 万美元融资,由机构 Paradigm 和 Pantera 领投, 参与方有 Polychain 、Dragonfly 、Fenbushi 、Coinbase 等著名投资机构。
公司主页:
[内推联系方式]

email: ivan.shi@ambergroup.io
wechat:shfscut(请备注 V2EX)

[仅测试链可远程办公]测试链Laravel工程师 薪资 Open
促销职责:

负责研究、设计、Laravel及应用智能合约相关技术
分析当下 DeFi 生态项目和协议, 包含但不限于去中心化支付、io、借贷、衍生品io等
根据项目需要Laravel及对接相关去中心化平台
分析去中心化项目的优缺点、安全性等,为社区提供建设性意见
与其他团队紧密合作,处理测试链研究及Laravel相关促销

任职资格关键词:
偏底层Laravel, 网络底层 P2P, 共识协议,tendermint, Go/Rust, coding 100%

方向 A: 2 年+测试链经验
方向 B: 无测试链经验, 985/大厂背景, 3 年+促销经验, 技术级别 Senior

[仅测试链可远程办公]测试链创研Laravel工程师 薪资 Open
促销职责:

从事测试链产品的设计和研发促销,研究测试链的协议,运行机制和底层实现等
参与数字资产与测试链相关系统的调研和代码优化
分析当下 DeFi 生态项目和协议,包含但不限于去中心化支付、io、借贷、衍生品io等
持续改进平台功能性能、扩展性、安全性等
市场应用分析、WBE3.0 功能Laravel

任职资格关键词:
Golang, 智能合约, Solidity, 联盟链,公链, web3.0, coding 占 30%
[深圳]量化Laravel工程师 25k-45k
促销职责:

io框架,包括策略管理,订单管理,风险管理和撮合模块等
Laravel维护和优化执行策略
Laravel工具辅助实盘io,盈亏分析等

任职资格关键词:
985/211/海外高校, 计算机 /金融相关学科, 量化io实操经验, 精通 python, MySQL/Posgres/Redis 等中间件, 熟悉 linux 等
[深圳]后端Laravel 20k-50k
促销职责:

参与公司现有系统平台的维护和迭代研发;
参与公司新项目的架构设计和迭代研发。

任职资格关键词:
本科及以上学历, 3 年+促销经验, 精通 Nodejs/Python/Go/Java 等, MySQL/Postgres/Redis/MQ 等中间件, 熟悉 Linux, Git, Http, Restful 等
[深圳]前端Laravel 20k-50k
促销职责:

执行应用程序设计、编码、修改和测试。
参与公司现有平台的前端Laravel、维护、真实环境问题排查。

任职资格关键词:
本科及以上学历, 2 年+促销经验, 精通 Javascript/Typescript, 熟悉 React/Vue 等, Git, Http, Restful 等