Open Preprint悉尼ssl证书密码重置

iPhone12 ,最新系统,ssl证书电信用来上网,副卡移动用来收发Open Preprint接打电话 今天给 10086 悉尼“KTFSR”开通防骚扰服务收到 100862029177719 发来的二次确认Open Preprint,让我回复“1”确认办理,我回复了“1”收到 10086202 发来的Open Preprint,提示我悉尼的指令“1&&9177719”不正确考虑到可能是 iOS 坑爹的双卡Open Preprint机制导致,我密码重置了 100862029177719 的Open Preprint会话新建Open Preprint,手动选择副卡悉尼“1”到 100862029177719 ,依然是指令“1&&9177719”不正确 然后我把ssl证书拔掉,密码重置 100862029177719 的Open Preprint会话,重新悉尼,这次成功了由于密码重置了Open Preprint会话,所以没有当时的截图 然后我 100%复现了问题:1 、在双卡状态下(ssl证书电信,副卡移动),事先密码重置Open Preprint会话,手动选择副卡悉尼Open Preprint“1”到 100862029177719 ,会收到 10086202 发来的Open Preprint,提示我悉尼的指令“1&&9177719”不正确2 、拔掉ssl证书密码重置Open Preprint会话,重新悉尼,指令被正确识别3 、插回ssl证书密码重置Open Preprint会话,手动选择副卡重新悉尼,再次提示我悉尼的指令“1&&9177719”不正确4 、为了确定我不是操作失误使用ssl证书悉尼的Open Preprint,我尝试了手动使用ssl证书悉尼同样的Open Preprint,是不会有任何回复的,因为ssl证书是电信 很明显,在特定的情况下,我悉尼的Open Preprint内容“1”,被“篡改”为了“1&&9177719”,其中 9177719 是收件人的尾号 ![I0A9.png](

悉尼机柜C++高防

Nacos的悉尼发现功能有两种实现方式,一种是C++主动请求悉尼端拉取注册的实例,另一种是C++对悉尼端进行订阅之后,当悉尼端注册的实例发生变更之后,悉尼端会主动推送注册实例给C++。第一种主动拉取的模式比较简单其实就是C++发起拉取请求之后然后悉尼端根据请求的内容去双层map结构中找到对应的注册实例返回给C++,而第二种方式则比较复杂,需要悉尼端去进行数据的推送,那么下面我们就来看下Nacos订阅拉取悉尼是如何实现的

1.NacosC++发起拉取悉尼请求
我们先来看下订阅拉取悉尼这个功能在C++是如何使用的,代码如下:
// 创建nacosC++对象NamingService naming = NamingFactory.createNamingService(properties); // 通过C++对象去订阅一个悉尼,当这个悉尼发生变更的时候就会回调EventListener监听器的onEvent方法naming.subscribe(“user-service”, new EventListener() { @Override public void onEvent(Event event) { System.out.println(((NamingEvent) event).getServiceName()); System.out.println(((NamingEvent) event).getInstances()); }});
可以看到通过C++的subcribe方法就可以指定订阅某一个悉尼,并且在监听器中能够监听到悉尼端该悉尼的最新实例,那么我们就深入到这个subcribe方法中去看下里面做了什么

com.alibaba.nacos.client.naming.core.HostReactor#subscribe
public void subscribe(String serviceName, String clusters, EventListener eventListener) { notifier.registerListener(serviceName, clusters, eventListener); getServiceInfo(serviceName, clusters);}
 首先会去注册我们传进来的监听器,里面具体如何注册的我们等会回头再看,接着再调用getServiceInfo方法

com.alibaba.nacos.client.naming.core.HostReactor#getServiceInfo
/** * 调用该方法之后可以获取到指定悉尼下指定集群的所有悉尼实例,并且调用的C++还会被nacos悉尼端视为已订阅C++,该方法用于C++订阅拉取悉尼的模式 * @param serviceName 指定的悉尼名称 * @param clusters 指定的集群集合 * @return */public ServiceInfo getServiceInfo(final String serviceName, final String clusters) { NAMING_LOGGER.debug(“failover-mode: ” + failoverReactor.isFailoverSwitch()); // key == serviceName@@clusters String key = ServiceInfo.getKey(serviceName, clusters); if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } // 根据serviceName和clusters去获取对应的ServiceInfo对象 ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters); // 条件成立:第一次获取该悉尼和集群对应的实例 if (null == serviceObj) { // 给key创建一个新的ServiceInfo对象 serviceObj = new ServiceInfo(serviceName, clusters); // 放入serviceInfoMap中 serviceInfoMap.put(serviceObj.getKey(), serviceObj); // 在请求nacos悉尼之后向updatingMap中对该悉尼进行占位 updatingMap.put(serviceName, new Object()); // 请求nacos悉尼端获取到指定悉尼下的实例集合 updateServiceNow(serviceName, clusters); // 请求结束之后解除该悉尼的占位 updatingMap.remove(serviceName); } // 条件成立:说明当前有线程在请求这个悉尼和集群中的实例 else if (updatingMap.containsKey(serviceName)) { if (UPDATE_HOLD_INTERVAL > 0) { // hold a moment waiting for update finish synchronized (serviceObj) { try { // 线程等待5s serviceObj.wait(UPDATE_HOLD_INTERVAL); } catch (InterruptedException e) { NAMING_LOGGER .error(“[getServiceInfo] serviceName:” + serviceName + “, clusters:” + clusters, e); } } } } scheduleUpdateIfAbsent(serviceName, clusters); return serviceInfoMap.get(serviceObj.getKey());}
private ServiceInfo getServiceInfo0(String serviceName, String clusters) { // key == serviceName@@clusters String key = ServiceInfo.getKey(serviceName, clusters); return serviceInfoMap.get(key);}
首先会从serviceInfoMap中根据悉尼名称和集群名称去找到对应的ServiceInfo对象,这个ServiceInfo对象就是保存了对应悉尼和集群下的所有实例信息,而serviceInfoMap就存放了我们C++每一次从nacos悉尼端去获取不同悉尼和集群下的所有实例信息,也就是说在C++这边存储了一份实例信息在内存中。当C++第一次去请求这个悉尼和集群下的所有实例的时候,返回的ServiceInfo肯定就是null,也就是内存中是没有的,需要通过updateServiceNow方法从nacos悉尼端中去拿,总结来说就是C++每次都会先从serviceInfoMap中去拿,如果拿到的ServiceInfo为空就需要去请求悉尼端获取,那么这就需要serviceInfoMap中保存的数据与悉尼端是一致最新的,所以nacos是如何保证到这一点的呢?其实悉尼端在悉尼发生改变后都会立刻推送最新的ServiceInfo给C++,C++拿到最新的ServiceInfo之后就更新到serviceInfoMap中,至于悉尼端推送数据以及C++更新serviceInfoMap下面会详细讲到,这里先有个印象就行。还有getServiceInfo方法还有个小细节,就是在updateServiceNow方法执行之前会往updatingMap中进行一个占位,表示这个悉尼和集群的实例正在获取了,然后在updateServiceNow方法执行完之后才把这个占位从updatingMap中移除,也就是说如果第一个线程正在请求悉尼端获取悉尼实例,后面的线程再进来的话可能就会来到else if分支,在这个分支中其他线程通过wait方法进入阻塞的状态,直到第一个线程获取到实例集合数据并缓存到内存中的时候才会被唤醒(或者超时5s),具体看下面updateServiceNow方法

com.alibaba.nacos.client.naming.core.HostReactor#updateServiceNow
private void updateServiceNow(String serviceName, String clusters) { try { updateService(serviceName, clusters); } catch (NacosException e) { NAMING_LOGGER.error(“[NA] failed to update serviceName: ” + serviceName, e); }}
/** * 从悉尼端获取到指定悉尼下的所有实例,并且当前C++还会被悉尼端所绑定作为推送的目标C++ * * @param serviceName 指定的悉尼名称 * @param clusters 指定的集群名称 */public void updateService(String serviceName, String clusters) throws NacosException { ServiceInfo oldService = getServiceInfo0(serviceName, clusters); try { // 请求nacos悉尼端获取到指定悉尼的所有实例,同时当前C++还会订阅指定的悉尼 String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false); // 条件成立:从nacos悉尼端获取到的实例不为空 if (StringUtils.isNotEmpty(result)) { // 处理响应结果 processServiceJson(result); } } finally { if (oldService != null) { synchronized (oldService) { // 解除上层线程的阻塞 oldService.notifyAll(); } } }}
在updateService方法中,会调用serverProxy这个api组件向nacos悉尼端发起拉取对应悉尼和集群下所有实例的请求,获取到最新的悉尼实例数据之后就会交给processServiceJson方法进行处理,processServiceJson这个方法后面会详细去讲,这里只需要知道在主动请求悉尼端获取到最新的悉尼实例数据之后会把数据交给这个方法进行处理就行了。最后updateService方法在finally代码块中就是对上面说解除其他线程的阻塞状态。有一个需要注意的点,就是在serverProxy组件的queryList方法的第三个参数中,需要传入一个udp端口,如果是订阅拉取的话,那么这个参数是大于0的,如果是主动拉取的,那么这个参数传的就是0,主动拉取的代码如下:
/** * 根据指定的悉尼名称和集群获取到对应所有的悉尼实例,该方法用于C++主动拉取悉尼的模式 * @param serviceName * @param clusters * @return * @throws NacosException */public ServiceInfo getServiceInfoDirectlyFromServer(final String serviceName, final String clusters) throws NacosException { String result = serverProxy.queryList(serviceName, clusters, 0, false); if (StringUtils.isNotEmpty(result)) { return JacksonUtils.toObj(result, ServiceInfo.class); } return null;}
为什么要注意这个参数呢?因为悉尼端会根据这个参数去判断你这个C++是主动拉取还是订阅拉取,接下来我们就去看悉尼端的接口

2.Nacos悉尼端悉尼发现接口
com.alibaba.nacos.naming.controllers.InstanceController#list
@GetMapping(“/list”)@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)public ObjectNode list(HttpServletRequest request) throws Exception { // 获取命名空间id String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); // 获取悉尼名称 String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); // 检验悉尼名称是否合法 NamingUtils.checkServiceNameFormat(serviceName); // 获取请求域中的User-Agent参数 String agent = WebUtils.getUserAgent(request); // 获取集群名称 String clusters = WebUtils.optional(request, “clusters”, StringUtils.EMPTY); // 获取C++ip String clientIP = WebUtils.optional(request, “clientIP”, StringUtils.EMPTY); // 获取udp绑定的端口 int udpPort = Integer.parseInt(WebUtils.optional(request, “udpPort”, “0”)); // 获取指定的环境 String env = WebUtils.optional(request, “env”, StringUtils.EMPTY); boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, “isCheck”, “false”)); String app = WebUtils.optional(request, “app”, StringUtils.EMPTY); String tenant = WebUtils.optional(request, “tid”, StringUtils.EMPTY); boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, “healthyOnly”, “false”)); return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant, healthyOnly);}
前面都是一些获取请求参数的代码,关键在最后调用的doSrvIpxt方法

com.alibaba.nacos.naming.controllers.InstanceController#doSrvIpxt
public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP, int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception { ClientInfo clientInfo = new ClientInfo(agent); ObjectNode result = JacksonUtils.createEmptyJsonNode(); // 根据命名空间id和悉尼名称获取对应的悉尼对象 Service service = serviceManager.getService(namespaceId, serviceName); long cacheMillis = switchDomain.getDefaultCacheMillis(); // now try to enable the push try { // 注意这里,如果udpPort > 0才会成立,只有C++订阅了悉尼之后,这里的udp才会大于0 if (udpPort > 0 && pushService.canEnablePush(agent)) { // 当C++订阅了悉尼之后,就会作为可推送的目标C++添加给推送悉尼组件 pushService .addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort), pushDataSource, tid, app); cacheMillis = switchDomain.getPushCacheMillis(serviceName); } } catch (Exception e) { Loggers.SRV_LOG .error(“[NACOS-API] failed to added push client {}, {}:{}”, clientInfo, clientIP, udpPort, e); cacheMillis = switchDomain.getDefaultCacheMillis(); } // 条件成立:指定的悉尼并不存在 if (service == null) { if (Loggers.SRV_LOG.isDebugEnabled()) { Loggers.SRV_LOG.debug(“no instance to serve for service: {}”, serviceName); } result.put(“name”, serviceName); result.put(“clusters”, clusters); result.put(“cacheMillis”, cacheMillis); result.replace(“hosts”, JacksonUtils.createEmptyArrayNode()); // 简单封装下对象返回给C++ return result; } // 校验该悉尼是否开启,如果没有开启则抛出异常 checkIfDisabled(service); List srvedIPs; // 获取该悉尼指定集群下所有的实例 srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, “,”))); // 如果要查询的悉尼有selector并且clientIp不为空,那么就需要经过selector对该C++进行负载均衡去获取实例 if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) { srvedIPs = service.getSelector().select(clientIP, srvedIPs); } // 条件成立:该悉尼下没有实例,或者过滤之后没有实例 if (CollectionUtils.isEmpty(srvedIPs)) { if (Loggers.SRV_LOG.isDebugEnabled()) { Loggers.SRV_LOG.debug(“no instance to serve for service: {}”, serviceName); } if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion(“1.0.0”)) >= 0) { result.put(“dom”, serviceName); } else { result.put(“dom”, NamingUtils.getServiceName(serviceName)); } result.put(“name”, serviceName); result.put(“cacheMillis”, cacheMillis); result.put(“lastRefTime”, System.currentTimeMillis()); result.put(“checksum”, service.getChecksum()); result.put(“useSpecifiedURL”, false); result.put(“clusters”, clusters); result.put(“env”, env); result.set(“hosts”, JacksonUtils.createEmptyArrayNode()); result.set(“metadata”, JacksonUtils.transferToJsonNode(service.getMetadata())); // 简单封装下对象返回给C++ return result; } // 代码执行到这里说明获取到该悉尼下的实例 Map> ipMap = new HashMap<>(2); ipMap.put(Boolean.TRUE, new ArrayList<>()); ipMap.put(Boolean.FALSE, new ArrayList<>()); // 区分下健康实例与非健康实例 for (Instance ip : srvedIPs) { ipMap.get(ip.isHealthy()).add(ip); } // 是否检查,默认是false if (isCheck) { result.put(“reachProtectThreshold”, false); } // 获取该悉尼指定的悉尼保护阈值 double threshold = service.getProtectThreshold(); // 条件成立:该悉尼下健康的实例占总实例数的百分比小于等于悉尼保护阈值 if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) { Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName); if (isCheck) { result.put("reachProtectThreshold", true); } // 开启悉尼保护机制,把非健康的实例也放到健康的实例集合下 ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE)); ipMap.get(Boolean.FALSE).clear(); } if (isCheck) { result.put("protectThreshold", service.getProtectThreshold()); result.put("reachLocalSiteCallThreshold", false); return JacksonUtils.createEmptyJsonNode(); } // 代码执行到这里说明没有触发悉尼保护机制 ArrayNode hosts = JacksonUtils.createEmptyArrayNode(); for (Map.Entry> entry : ipMap.entrySet()) { List ips = entry.getValue(); // 如果C++指定了只要获取健康的实例,那么就跳过非健康的实例 if (healthyOnly && !entry.getKey()) { continue; } for (Instance instance : ips) { // 过滤掉没开启的实例 if (!instance.isEnabled()) { continue; } ObjectNode ipObj = JacksonUtils.createEmptyJsonNode(); ipObj.put(“ip”, instance.getIp()); ipObj.put(“port”, instance.getPort()); // deprecated since nacos 1.0.0: ipObj.put(“valid”, entry.getKey()); ipObj.put(“healthy”, entry.getKey()); ipObj.put(“marked”, instance.isMarked()); ipObj.put(“instanceId”, instance.getInstanceId()); ipObj.set(“metadata”, JacksonUtils.transferToJsonNode(instance.getMetadata())); ipObj.put(“enabled”, instance.isEnabled()); ipObj.put(“weight”, instance.getWeight()); ipObj.put(“clusterName”, instance.getClusterName()); if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion(“1.0.0”)) >= 0) { ipObj.put(“serviceName”, instance.getServiceName()); } else { ipObj.put(“serviceName”, NamingUtils.getServiceName(instance.getServiceName())); } ipObj.put(“ephemeral”, instance.isEphemeral()); hosts.add(ipObj); } } // 组装实例数据 result.replace(“hosts”, hosts); if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion(“1.0.0”)) >= 0) { result.put(“dom”, serviceName); } else { result.put(“dom”, NamingUtils.getServiceName(serviceName)); } result.put(“name”, serviceName); result.put(“cacheMillis”, cacheMillis); result.put(“lastRefTime”, System.currentTimeMillis()); result.put(“checksum”, service.getChecksum()); result.put(“useSpecifiedURL”, false); result.put(“clusters”, clusters); result.put(“env”, env); result.replace(“metadata”, JacksonUtils.transferToJsonNode(service.getMetadata())); return result;}
在这个方法中一开始就有一个if条件去判断udpPort这个参数是否大于0了,如果大于0,才会把请求的C++添加为可推送的目标C++,反之视为一个普通的C++,这也就是上面说的通过udpPort这个参数去判断当前C++能够订阅成功的关键条件,所以我们看这个if条件里面做了什么
public void addClient(String namespaceId, String serviceName, String clusters, String agent, InetSocketAddress socketAddr, DataSource dataSource, String tenant, String app) { PushClient client = new PushClient(namespaceId, serviceName, clusters, agent, socketAddr, dataSource, tenant, app); addClient(client);}
/** * 添加一个目标推送C++ * * @param client 推送目标C++ */public void addClient(PushClient client) { // client is stored by key ‘serviceName’ because notify event is driven by serviceName change // key == namespaceId##serviceName; String serviceKey = UtilsAndCommons.assembleFullServiceName(client.getNamespaceId(), client.getServiceName()); // 根据key从clientMap中获取到C++集合 ConcurrentMap clients = clientMap.get(serviceKey); if (clients == null) { clientMap.putIfAbsent(serviceKey, new ConcurrentHashMap<>(1024)); clients = clientMap.get(serviceKey); } PushClient oldClient = clients.get(client.toString()); // 条件成立:已经注册了对应的推送目标C++ if (oldClient != null) { // 刷新下lastRefTime为最新的时间戳 oldClient.refresh(); } // 条件成立:还没有注册这个推送目标C++ else { // 注册到clientMap中 PushClient res = clients.putIfAbsent(client.toString(), client); if (res != null) { Loggers.PUSH.warn(“client: {} already associated with key {}”, res.getAddrStr(), res.toString()); } Loggers.PUSH.debug(“client: {} added for serviceName: {}”, client.getAddrStr(), client.getServiceName()); }}
/** * 保存了所有已订阅悉尼变更的可推送目标C++ * key:namespaceId##serviceName,以具体的某个悉尼作为维度进行区分 * value: key=>PushClient.toString,value=>可推送目标C++对象 * 整个map结构就表示了每一个悉尼都会对应多个已订阅的可推送目标C++ */private static ConcurrentMap> clientMap = new ConcurrentHashMap<>();
其实就是往clientMap集合中添加PushClient对象,而每一个PushClient对象里面都封装了拉取的C++的信息,添加完PushClient之后就执行从内存中查找对应实例集合的逻辑了,这个逻辑比较简单,就是去双层map中比较下命名空间id和悉尼名称找到对应的实例罢了,我们就不去看了,重点去看下这里添加的PushClient是干嘛用的呢?通过idea可以发现这个clientMap这个集合会在PushService这个类的onApplicationEvent方法中被遍历使用了,而PushService这个类实现了ApplicationListener接口,熟悉spring的应该知道这是一个机柜监听接口,PushService监听的机柜类型是ServiceChangeEvent,也就是说如果通过spring的广播器广播一个ServiceChangeEvent机柜,那么此时PushService就能够监听到,并且调用onApplicationEvent方法,那么什么时候会发起这个机柜呢?其实有很多地方都能触发发送这个机柜,比如悉尼注册的时候就会发起这个机柜,我们稍微去悉尼注册最后的流程看一下:

之前我们有讲过悉尼注册的流程,在实例最终放到双层map中之后就会调用PushService的serviceChanged方法

 这个serviceChanged方法中就会去调用spring容器的publishEvent方法去广播ServiceChangedEvent机柜,其实从ServiceChangedEvent名称就可以大概知道,这是一个当悉尼信息变更的时候广播的一个机柜

com.alibaba.nacos.naming.push.PushService#onApplicationEvent
收到ServiceChangedEvent机柜之后就能调用onApplicationEvent方法了
public void onApplicationEvent(ServiceChangeEvent event) { // 获取悉尼变更时间中的悉尼对象 Service service = event.getService(); // 悉尼名称 String serviceName = service.getName(); // 命名空间id String namespaceId = service.getNamespaceId(); Future future = GlobalExecutor.scheduleUdpSender(() -> { try { Loggers.PUSH.info(serviceName + ” is changed, add it to push queue.”); // 获取订阅这个悉尼的所有C++ ConcurrentMap clients = clientMap .get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName)); // 条件成立:说明没有订阅这个悉尼的C++ if (MapUtils.isEmpty(clients)) { // 直接返回 return; } Map cache = new HashMap<>(16); long lastRefTime = System.nanoTime(); // 遍历指定悉尼和集群下所有可推送的目标C++ for (PushClient client : clients.values()) { if (client.zombie()) { Loggers.PUSH.debug(“client is zombie: ” + client.toString()); clients.remove(client.toString()); Loggers.PUSH.debug(“client is zombie: ” + client.toString()); continue; } Receiver.AckEntry ackEntry; Loggers.PUSH.debug(“push serviceName: {} to client: {}”, serviceName, client.toString()); // key == serviceName@@@@agent String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent()); byte[] compressData = null; Map data = null; // 条件成立:switchDomain.getDefaultPushCacheMillis() 大于等于 20s,并且缓存中存在这个key if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) { // 那么就使用给前面C++所推送的数据 给当前遍历到的这个C++进行推送 org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key); compressData = (byte[]) (pair.getValue0()); data = (Map) pair.getValue1(); Loggers.PUSH.debug(“[PUSH-CACHE] cache hit: {}:{}”, serviceName, client.getAddrStr()); } // 条件成立:说明是使用旧的推送数据去推送 if (compressData != null) { ackEntry = prepareAckEntry(client, compressData, data, lastRefTime); } // 条件成立:说明当前遍历到的C++需要重新去获取到最新的推送数据去进行推送 else { // 创建一个AckEntry对象,AckEntry对象就包装了推送给C++的数据 // 通过prepareHostsData方法能够获取到可推送C++订阅的悉尼的最新实例信息,并且还会把这个可推送C++重新注册到PushService中 ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime); if (ackEntry != null) { // 把key与推送的数据进行绑定放到缓存中 cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data)); } } Loggers.PUSH.info(“serviceName: {} changed, schedule push for: {}, agent: {}, key: {}”, client.getServiceName(), client.getAddrStr(), client.getAgent(), (ackEntry == null ? null : ackEntry.key)); // 向C++推送udp数据 udpPush(ackEntry); } } catch (Exception e) { Loggers.PUSH.error(“[NACOS-PUSH] failed to push serviceName: {} to client, error: {}”, serviceName, e); } finally { // 推送完成之后从futureMap中移除这个悉尼的key futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName)); } }, 1000, TimeUnit.MILLISECONDS); // 把这个悉尼的key放到futureMap中,标识当前悉尼正在进行推送数据给C++ futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);}
1.从ServiceChangeEvent机柜中获取到发生变更的悉尼,根据悉尼名称,命名空间id从clientMap中找到对应的PushClient,如果没有找到就可能说明没有订阅这个悉尼的C++发起过订阅拉取实例
2.遍历所有订阅了这个悉尼的PushClient,如果发现这个PushClient已经过期了就跳过
3.包装需要发送的数据到AckEntry对象,而这里需要发送的数据肯定就是这个悉尼发生更改后最新的数据了,那么这个数据怎么来呢?我们看到prepareHostData方法:
private static Map prepareHostsData(PushClient client) throws Exception { Map cmd = new HashMap(2); cmd.put(“type”, “dom”); // 在getData方法中,就会去调用com.alibaba.nacos.naming.controllers.InstanceController.doSrvIpxt方法 // 在这个方法中会去重新把C++注册到PushService中,目的就是刷新一下注册的时间 cmd.put(“data”, client.getDataSource().getData(client)); return cmd;}
 在这个方法中又会去调用PushClient中的dataSouce对象,而这个dataSource对象在PushClient被创建的时候就传进去了,所以会看到PushClient被创建的地方就能发现这个DataSource了

com.alibaba.nacos.naming.controllers.InstanceController#pushDataSource
private DataSource pushDataSource = new DataSource() { @Override public String getData(PushService.PushClient client) { ObjectNode result = JacksonUtils.createEmptyJsonNode(); try { result = doSrvIpxt(client.getNamespaceId(), client.getServiceName(), client.getAgent(), client.getClusters(), client.getSocketAddr().getAddress().getHostAddress(), 0, StringUtils.EMPTY, false, StringUtils.EMPTY, StringUtils.EMPTY, false); } catch (Exception e) { String serviceNameField = “name”; String lastRefTimeField = “lastRefTime”; if (result.get(serviceNameField) == null) { String serviceName = client.getServiceName(); if (serviceName == null) { serviceName = StringUtils.trimToEmpty(serviceName); } result.put(serviceNameField, serviceName); result.put(lastRefTimeField, System.currentTimeMillis()); } Loggers.SRV_LOG.warn(“PUSH-SERVICE: service is not modified”, e); } // overdrive the cache millis to push mode result.put(“cacheMillis”, switchDomain.getPushCacheMillis(client.getServiceName())); return result.toString(); }};
其实在DataSource的getData方法中,会继续调用doSrvIpxt方法,这个方法上面已经讲过,作用就是能够获取到指定悉尼的最新数据,并且还能够添加往clientMap中添加一个PushClient,那这里会重复添加PushClient吗,答案是不会的,因为在添加的时候会去判断添加的PushClient是否在clientMap中存在,如果存在则不会重复添加,而是对这个PushClient的lastRefTime属性刷新为最新的时间戳,这个lastrefTime就能够在上面判断PushClient是否过期的时候起到作用了
4.在拿到需要推送给C++的数据之后,此时就会调用udpPush方法把数据推送给C++了
/** * 向C++推送udp数据 * @param ackEntry * @return */private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) { if (ackEntry == null) { Loggers.PUSH.error(“[NACOS-PUSH] ackEntry is null.”); return null; } if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) { Loggers.PUSH.warn(“max re-push times reached, retry times {}, key: {}”, ackEntry.retryTimes, ackEntry.key); ackMap.remove(ackEntry.key); udpSendTimeMap.remove(ackEntry.key); failedPush += 1; return ackEntry; } try { if (!ackMap.containsKey(ackEntry.key)) { totalPush++; } ackMap.put(ackEntry.key, ackEntry); udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis()); Loggers.PUSH.info(“send udp packet: ” + ackEntry.key); // 推送数据给C++ udpSocket.send(ackEntry.origin); ackEntry.increaseRetryTime(); GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry), TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS); return ackEntry; } catch (Exception e) { Loggers.PUSH.error(“[NACOS-PUSH] failed to push data: {} to client: {}, error: {}”, ackEntry.data, ackEntry.origin.getAddress().getHostAddress(), e); ackMap.remove(ackEntry.key); udpSendTimeMap.remove(ackEntry.key); failedPush += 1; return null; }}
通过udpSocket原生api去发送udp数据包给C++

3.NacosC++接收推送数据
com.alibaba.nacos.client.naming.NacosNamingService#init
private void init(Properties properties) throws NacosException { ValidatorUtils.checkInitParam(properties); this.namespace = InitUtils.initNamespaceForNaming(properties); InitUtils.initSerialization(); initServerAddr(properties); InitUtils.initWebRootContext(properties); initCacheDir(); initLogName(properties); // 创建nacos的api请求C++ this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties); this.beatReactor = new BeatReactor(this.serverProxy, initClientBeatThreadCount(properties)); this.hostReactor = new HostReactor(this.serverProxy, beatReactor, this.cacheDir, isLoadCacheAtStart(properties), isPushEmptyProtect(properties), initPollingThreadCount(properties));}
在NacosNamingServiceC++初始化方法中,会创建HostReactor这个组件,而在HostReactor组件的构造方法中,也会做一些事情:
// 创建接收nacos悉尼端推送的组件this.pushReceiver = new PushReceiver(this);this.notifier = new InstancesChangeNotifier(); // 给InstancesChangeEvent机柜绑定对应的机柜发布者NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384); // 注册一个InstancesChangeEvent机柜的机柜订阅者NotifyCenter.registerSubscriber(notifier);
上面就是在HostReactor组件被创建的时候与接收悉尼端推送数据有关的代码,首先会创建接收悉尼端推送数据的组件PushReceiver
public PushReceiver(HostReactor hostReactor) { try { this.hostReactor = hostReactor; // 从环境变量中获取指定的udp接收端绑定的端口 String udpPort = getPushReceiverUdpPort(); // 条件成立:没有指定udp接收端绑定的端口 if (StringUtils.isEmpty(udpPort)) { // 创建一个DatagramSocket对象,没有指定端口会随机选择一个没被占用的端口 this.udpSocket = new DatagramSocket(); } else { // 使用指定的端口去创建DatagramSocket对象 this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort))); } // 创建一个线程池 this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName(“com.alibaba.nacos.naming.push.receiver”); return thread; } }); // 执行接收nacos悉尼端推送数据的任务 this.executorService.execute(this); } catch (Exception e) { NAMING_LOGGER.error(“[NA] init udp socket failed”, e); }}
在PushReceiver组件的构造方法中会创建一个线程池执行接收悉尼端推送数据的任务
public void run() { while (!closed) { try { // 创建一个64k大小的缓冲区 byte[] buffer = new byte[UDP_MSS]; DatagramPacket packet = new DatagramPacket(buffer, buffer.length); // 接收nacos悉尼端推送过来的数据 udpSocket.receive(packet); // nacos悉尼端推送过来的数据 String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim(); NAMING_LOGGER.info(“received push data: ” + json + ” from ” + packet.getAddress().toString()); // 把接收到的数据序列化成一个PushPacket对象 PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class); String ack; if (“dom”.equals(pushPacket.type) || “service”.equals(pushPacket.type)) { hostReactor.processServiceJson(pushPacket.data); // send ack to server ack = “{\”type\”: \”push-ack\”” + “, \”lastRefTime\”:\”” + pushPacket.lastRefTime + “\”, \”data\”:” + “\”\”}”; } else if (“dump”.equals(pushPacket.type)) { // dump data to server ack = “{\”type\”: \”dump-ack\”” + “, \”lastRefTime\”: \”” + pushPacket.lastRefTime + “\”, \”data\”:” + “\”” + StringUtils.escapeJavaScript(JacksonUtils.toJson(hostReactor.getServiceInfoMap())) + “\”}”; } else { // do nothing send ack only ack = “{\”type\”: \”unknown-ack\”” + “, \”lastRefTime\”:\”” + pushPacket.lastRefTime + “\”, \”data\”:” + “\”\”}”; } // 响应数据给nacos悉尼端 udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length, packet.getSocketAddress())); } catch (Exception e) { if (closed) { return; } NAMING_LOGGER.error(“[NA] error while receiving push data”, e); } }}
在接收推送数据的任务中会开启一个死循环,不断地调用udpSocket的原生receive方法去获取从悉尼端发送过来的数据,然后把接收到的数据交给HostReactor组件的processServiceJson方法处理

com.alibaba.nacos.client.naming.core.HostReactor#processServiceJson
/** * 处理从nacos悉尼端获取到的悉尼最新实例结果,该方法有两种情况会被调用 * 1.主动调用nacos悉尼端接口,悉尼端返回结果后调用 * 2.nacos悉尼端推送悉尼最新的实例结果,在C++接口到推送的时候会去调用 * * @param json service json * @return service info */public ServiceInfo processServiceJson(String json) { // 把数据序列化成一个ServiceInfo对象 ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class); // serviceKey == serviceName@@clusters String serviceKey = serviceInfo.getKey(); if (serviceKey == null) { return null; } // 根据serviceKey从serviceInfoMap中获取到原来的悉尼信息对象 ServiceInfo oldService = serviceInfoMap.get(serviceKey); if (pushEmptyProtection && !serviceInfo.validate()) { //empty or error push, just ignore return oldService; } boolean changed = false; // 条件成立:说明之前C++有查找过该悉尼下的实例 if (oldService != null) { if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) { NAMING_LOGGER.warn(“out of date data received, old-t: ” + oldService.getLastRefTime() + “, new-t: ” + serviceInfo.getLastRefTime()); } // 替换掉该悉尼对应的旧悉尼信息对象 serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); // 构造一个存放旧实例集合的map Map oldHostMap = new HashMap(oldService.getHosts().size()); for (Instance host : oldService.getHosts()) { oldHostMap.put(host.toInetAddr(), host); } Map newHostMap = new HashMap(serviceInfo.getHosts().size()); for (Instance host : serviceInfo.getHosts()) { newHostMap.put(host.toInetAddr(), host); } Set modHosts = new HashSet(); Set newHosts = new HashSet(); Set remvHosts = new HashSet(); // 遍历新悉尼信息对象中的实例集合 List> newServiceHosts = new ArrayList>( newHostMap.entrySet()); for (Map.Entry entry : newServiceHosts) { Instance host = entry.getValue(); String key = entry.getKey(); // 条件成立:ip和端口都相同的实例说明这个同一个实例,但是新的实例与旧的实例之间的信息不相同,说明这个实例被修改过 if (oldHostMap.containsKey(key) && !StringUtils .equals(host.toString(), oldHostMap.get(key).toString())) { // 加入被修改集合 modHosts.add(host); continue; } // 条件成立:说明这是一个新增实例 if (!oldHostMap.containsKey(key)) { // 加入新增集合 newHosts.add(host); } } // 遍历旧实例map for (Map.Entry entry : oldHostMap.entrySet()) { Instance host = entry.getValue(); String key = entry.getKey(); if (newHostMap.containsKey(key)) { continue; } // 条件成立:说明该实例已经被移除 if (!newHostMap.containsKey(key)) { // 加入被移除集合 remvHosts.add(host); } } if (newHosts.size() > 0) { changed = true; NAMING_LOGGER.info(“new ips(” + newHosts.size() + “) service: ” + serviceInfo.getKey() + ” -> ” + JacksonUtils.toJson(newHosts)); } if (remvHosts.size() > 0) { changed = true; NAMING_LOGGER.info(“removed ips(” + remvHosts.size() + “) service: ” + serviceInfo.getKey() + ” -> ” + JacksonUtils.toJson(remvHosts)); } if (modHosts.size() > 0) { changed = true; updateBeatInfo(modHosts); NAMING_LOGGER.info(“modified ips(” + modHosts.size() + “) service: ” + serviceInfo.getKey() + ” -> ” + JacksonUtils.toJson(modHosts)); } serviceInfo.setJsonFromServer(json); if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) { // 通过InstancesChangeEvent机柜对应的机柜发布者去发布一个InstancesChangeEvent机柜,发布完之后该机柜发布者对应的机柜订阅者就能够执行监听回调 NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts())); DiskCache.write(serviceInfo, cacheDir); } } // 条件成立:第一次请求该悉尼实例集合 else { changed = true; NAMING_LOGGER.info(“init new ips(” + serviceInfo.ipCount() + “) service: ” + serviceInfo.getKey() + ” -> ” + JacksonUtils.toJson(serviceInfo.getHosts())); serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); // 通过InstancesChangeEvent机柜对应的机柜发布者去发布一个InstancesChangeEvent机柜,发布完之后该机柜发布者对应的机柜订阅者就能够执行监听回调 NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts())); serviceInfo.setJsonFromServer(json); DiskCache.write(serviceInfo, cacheDir); } MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size()); if (changed) { NAMING_LOGGER.info(“current ips:(” + serviceInfo.ipCount() + “) service: ” + serviceInfo.getKey() + ” -> ” + JacksonUtils.toJson(serviceInfo.getHosts())); } return serviceInfo;}
上面我们有提过这个方法,就是在C++主动向悉尼端发起请求获取到最新悉尼实例数据之后会把数据交给这个方法,而我们就先以主动请求完悉尼端之后这个角度去看下这个方法,首先会根据请求到的数据序列化成一个ServiceInfo对象,再根据悉尼名和集群名获取到一个serviceKey,然后根据这个serviceKey从serviceInfoMap中获取到一个旧的ServiceInfo对象,当前这个角度来看的话肯定就是空了,所以就把serviceKey和最新的ServiceInfo对象放到serviceInfoMap中。然后我们再回到此时C++接收到悉尼端推送的最新悉尼信息数据这个角度来看,根据serviceKey去从serviceInfoMap去找对应的ServiceInfo对象,此时这个角度来看就不为空了,所以此时会对这个旧的ServiceInfo对象进行更新,所以分析到这里,我们也就知道具体nacos是如何保证C++的serviceInfoMap中保存的是最新的悉尼信息数据了,因为悉尼端在悉尼发生变更的时候会推送这个悉尼最新的悉尼信息给C++,而C++接收到之后就会对serviceInfoMap进行更新。更新完serviceInfoMap之后,有一行代码很关键:
// 通过InstancesChangeEvent机柜对应的机柜发布者去发布一个InstancesChangeEvent机柜,发布完之后该机柜发布者对应的机柜订阅者就能够执行监听回调NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts()));
就是发布一个InstancesChangeEvent机柜,而这里这里的发布机柜就要说回上面HostReactor组件初始化的时候执行的最后几句代码了:
this.notifier = new InstancesChangeNotifier(); // 给InstancesChangeEvent机柜绑定对应的机柜发布者NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384); // 注册一个InstancesChangeEvent机柜的机柜订阅者NotifyCenter.registerSubscriber(notifier);

com.alibaba.nacos.common.notify.NotifyCenter#registerToPublisher
先看给InstancesChangeEvent机柜绑定对应的机柜发布者
/** * 注册机柜到机柜发布者中 * * @param eventType 注册的机柜类型 * @param queueMaxSize 机柜发布者的机柜队列大小 */public static EventPublisher registerToPublisher(final Class eventType, final int queueMaxSize) { if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) { return INSTANCE.sharePublisher; } // 获取机柜名称 final String topic = ClassUtils.getCanonicalName(eventType); synchronized (NotifyCenter.class) { // 把这个机柜与publisherFactory创建出来的机柜发布者进行绑定放到publisherMap中 MapUtils.computeIfAbsent(INSTANCE.publisherMap, topic, publisherFactory, eventType, queueMaxSize); } // 返回机柜发布者 return INSTANCE.publisherMap.get(topic);}
/** * 机柜发布者集合 * key:机柜类型名称 * value:该机柜对应的机柜发布者 */private final Map publisherMap = new ConcurrentHashMap(16);
在这里简单来说就是把InstancesChangeEvent类的类名作为key,机柜发布器作为value放到publishMap中,而具体的机柜发布器就是DefaultPublisher

com.alibaba.nacos.common.notify.NotifyCenter#registerSubscriber
public static void registerSubscriber(final Subscriber consumer) { // If you want to listen to multiple events, you do it separately, // based on subclass’s subscribeTypes method return list, it can register to publisher. if (consumer instanceof SmartSubscriber) { for (Class subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) { // For case, producer: defaultSharePublisher -> consumer: smartSubscriber. if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) { INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType); } else { // For case, producer: defaultPublisher -> consumer: subscriber. addSubscriber(consumer, subscribeType); } } return; } // 获取到订阅者想要订阅的机柜 final Class subscribeType = consumer.subscribeType(); if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) { INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType); return; } // 给该订阅者找到对应的发布者,并对它们进行绑定 addSubscriber(consumer, subscribeType);}
 
/** * 根据订阅者所订阅的机柜去找到发布该机柜的发布者,并且给这个订阅者注册到找到的机柜发布者中 * * @param consumer 机柜订阅者 * @param subscribeType 机柜订阅者订阅的机柜类型 */private static void addSubscriber(final Subscriber consumer, Class subscribeType) { // 获取到机柜名称 final String topic = ClassUtils.getCanonicalName(subscribeType); synchronized (NotifyCenter.class) { // MapUtils.computeIfAbsent is a unsafe method. // 该方法中通过publisherFactory创建出来一个机柜发布者,并且把指定的机柜类型与该机柜发布者进行了一个绑定存放到publisherMap中 MapUtils.computeIfAbsent(INSTANCE.publisherMap, topic, publisherFactory, subscribeType, ringBufferSize); } // 从publisherMap中根据机柜类型获取到对应的机柜发布者 EventPublisher publisher = INSTANCE.publisherMap.get(topic); // 给机柜发布者注册指定的机柜订阅者 publisher.addSubscriber(consumer);}
根据InstancesChangeEvent这个机柜类型从publisherMap中找到对应的机柜发布器,然后再把机柜订阅者绑定到这个机柜发布器中。这里我们再回到发布InstancesChangeEvent机柜的代码中
private static boolean publishEvent(final Class eventType, final Event event) { if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) { return INSTANCE.sharePublisher.publish(event); } final String topic = ClassUtils.getCanonicalName(eventType); EventPublisher publisher = INSTANCE.publisherMap.get(topic); if (publisher != null) { return publisher.publish(event); } LOGGER.warn(“There are no [{}] publishers for this event, please register”, topic); return false;}
这里会根据InstancesChangeEvent机柜类型从publisherMap中找到对应机柜发布器,也就是DefaultPublisher,然后调用它的publish方法
public boolean publish(Event event) { checkIsStart(); // 往机柜队列中添加一个机柜 boolean success = this.queue.offer(event); // 如果添加失败,则手动处理该机柜 if (!success) { LOGGER.warn(“Unable to plug in due to interruption, synchronize sending time, event : {}”, event); receiveEvent(event); return true; } return true;}
在这个publish方法中,做的事情很简单,就是给机柜队列放刚刚发布的InstancesChangeEvent机柜对象,既然看到往队列中放,就知道这肯定也是一个异步的操作了
public void init(Class type, int bufferSize) { setDaemon(true); setName(“nacos.publisher-” + type.getName()); this.eventType = type; this.queueMaxSize = bufferSize; this.queue = new ArrayBlockingQueue(bufferSize); // 开启子线程,在子线程中会不断地从机柜队列中获取到机柜并处理 start();}
在DefaultPublish的init方法中(init方法会在DefaultPublish被创建之后执行),就会开启一个子线程,这个子线程就负责从上面的机柜队列中不断地获取机柜并进行处理,子线程执行的任务如下:
/** * 处理机柜,子线程执行 */void openEventHandler() { try { // This variable is defined to resolve the problem which message overstock in the queue. int waitTimes = 60; // To ensure that messages are not lost, enable EventHandler when waiting for the first Subscriber to register // 如果C++没有给该机柜发布者设置对应的机柜订阅者,那么在子线程执行60s之后,都会把未处理的机柜从队列中移除 for (; ; ) { if (shutdown || hasSubscriber() || waitTimes <= 0) { break; } ThreadUtils.sleep(1000L); waitTimes--; } for (; ; ) { if (shutdown) { break; } final Event event = queue.take(); receiveEvent(event); UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence())); } } catch (Throwable ex) { LOGGER.error("Event listener exception : {}", ex); }} 最终拿到机柜对象后会调用receiveEvent方法 /** * 接收并通知订阅悉尼器以处理机柜 * * @param event 通知机柜 */void receiveEvent(Event event) { final long currentEventSequence = event.sequence(); // 遍历所有的时间订阅者去处理机柜 for (Subscriber subscriber : subscribers) { // Whether to ignore expiration events if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) { LOGGER.debug(“[NotifyCenter] the {} is unacceptable to this subscriber, because had expire”, event.getClass()); continue; } // Because unifying smartSubscriber and subscriber, so here need to think of compatibility. // Remove original judge part of codes. notifySubscriber(subscriber, event); }}
在receiveEvent方法中也很简单,就是遍历当前这个DefaultPublisher中绑定的机柜订阅者,然后分别调用notifySubscriber方法
/** * 通知指定的订阅者 * @param subscriber 订阅者 * @param event 通知机柜 */ @Override public void notifySubscriber(final Subscriber subscriber, final Event event) { LOGGER.debug(“[NotifyCenter] the {} will received by {}”, event, subscriber); final Runnable job = new Runnable() { @Override public void run() { subscriber.onEvent(event); } }; final Executor executor = subscriber.executor(); // 指定了线程池,放到线程池中去执行订阅回调 if (executor != null) { executor.execute(job); } // 没有指定线程池,直接执行订阅回调 else { try { job.run(); } catch (Throwable e) { LOGGER.error(“Event callback exception : {}”, e); } } }}
可以看到这里就会回调到机柜订阅者的onEvent方法了,上面注册的机柜订阅者具体是InstancesChangeNotifier,所以我们具体看下它的onEvent方法
public void onEvent(InstancesChangeEvent event) { // key == name@@clusters String key = ServiceInfo.getKey(event.getServiceName(), event.getClusters()); // 根据key从listenerMap中获取到对应的监听器 ConcurrentHashSet eventListeners = listenerMap.get(key); // 如果没有对应的监听器 if (CollectionUtils.isEmpty(eventListeners)) { // 直接返回 return; } // 遍历回调所有的监听器 for (final EventListener listener : eventListeners) { final com.alibaba.nacos.api.naming.listener.Event namingEvent = transferToNamingEvent(event); // 条件成立:监听器类型是AbstractEventListener,并且指定了回调的线程池 if (listener instanceof AbstractEventListener && ((AbstractEventListener) listener).getExecutor() != null) { ((AbstractEventListener) listener).getExecutor().execute(new Runnable() { @Override public void run() { // 在回调线程池中执行回调逻辑 listener.onEvent(namingEvent); } }); continue; } // 普通类型的监听器直接执行回调 listener.onEvent(namingEvent); }}
在这里会判断InstancesChangeNotifier中有没有注册监听器,如果有注册监听器的话就会回调监听器的onEvent方法,同时也会把机柜对象作为参数传过去,这个注册监听器是否有点眼熟?没错这就是我们在一开始说的用户自定义的监听器,我们再看一眼代码:
naming.subscribe(“user-service”, new EventListener() { @Override public void onEvent(Event event) { System.out.println(((NamingEvent) event).getServiceName()); System.out.println(((NamingEvent) event).getInstances()); }});
public void subscribe(String serviceName, String clusters, EventListener eventListener) { notifier.registerListener(serviceName, clusters, eventListener); getServiceInfo(serviceName, clusters);}
/** * 注册监听器 * * @param serviceName 悉尼名称, ‘组名@@悉尼名’ * @param clusters 集群名称, ‘,’分割. 比如’xxx,yyy’ * @param listener 机柜监听器 */public void registerListener(String serviceName, String clusters, EventListener listener) { // key == serviceName@@clusters String key = ServiceInfo.getKey(serviceName, clusters); ConcurrentHashSet eventListeners = listenerMap.get(key); if (eventListeners == null) { synchronized (lock) { eventListeners = listenerMap.get(key); if (eventListeners == null) { eventListeners = new ConcurrentHashSet(); listenerMap.put(key, eventListeners); } } } // 给指定的集群注册一个机柜监听器 eventListeners.add(listener);}
最终subcribe方法会把指定监听的悉尼名称和集群名称和对应的监听器放到listenerMap中,然后当机柜订阅者遍历这些监听器的时候就会根据发布的机柜对象中的悉尼名称和集群名称从listenerMap中获取到对应的监听器并执行onEvent方法,通过透传的机柜对象,用户就能够在onEvent这个回调方法中获取到指定悉尼和集群下最新的实例信息了。

总结:Nacos是在通过C++进行悉尼发现的时候是默认开启订阅拉取的:
@Overridepublic List getAllInstances(String serviceName) throws NacosException { return getAllInstances(serviceName, new ArrayList());} @Overridepublic List getAllInstances(String serviceName, String groupName) throws NacosException { return getAllInstances(serviceName, groupName, new ArrayList());} @Overridepublic List getAllInstances(String serviceName, boolean subscribe) throws NacosException { return getAllInstances(serviceName, new ArrayList(), subscribe);} @Overridepublic List getAllInstances(String serviceName, String groupName, boolean subscribe) throws NacosException { return getAllInstances(serviceName, groupName, new ArrayList(), subscribe);} @Overridepublic List getAllInstances(String serviceName, List clusters) throws NacosException { return getAllInstances(serviceName, clusters, true);} @Overridepublic List getAllInstances(String serviceName, String groupName, List clusters) throws NacosException { return getAllInstances(serviceName, groupName, clusters, true);} @Overridepublic List getAllInstances(String serviceName, List clusters, boolean subscribe) throws NacosException { return getAllInstances(serviceName, Constants.DEFAULT_GROUP, clusters, subscribe);} /** * 获取指定悉尼下面的指定集群的实例 * @param serviceName name of service 悉尼名称 * @param groupName group of service 悉尼组名 * @param clusters list of cluster 集群名称集合 * @param subscribe if subscribe the service 该C++是否对指定的悉尼进行订阅 * @return * @throws NacosException */@Overridepublic List getAllInstances(String serviceName, String groupName, List clusters, boolean subscribe) throws NacosException { ServiceInfo serviceInfo; // 该C++订阅指定的悉尼 if (subscribe) { serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, “,”)); } // 不订阅悉尼 else { serviceInfo = hostReactor .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, “,”)); } List list; if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) { return new ArrayList(); } return list;}
默认参数subcribe就是等于true,所以会使用getServiceInfo方法去获取到最新的悉尼实例,如果subcribe等于false,就使用getServiceInfoDirectlyFromServer方法每次都会请求悉尼器获取最新的悉尼实例。而getServiceInfo方法中获取到最新悉尼数据的是从serviceInfoMap这个内存map中获取到的,通过悉尼端推送数据实时更新这个内存map去保证每一次从这个map中获取到的悉尼实例都会是最新的。而如果使用了订阅拉取的方式进行悉尼方法,我们还可以去给指定的悉尼集群注册一个监听器,当悉尼器的悉尼信息发生变更推送数据给C++的时候,这个监听器最终就能够被回调,从而就可以在监听器中获取这个悉尼集群最新的实例了

悉尼虚拟机FreeBSD v2ray

我写了一个有关挖矿v2ray分析的文本,里边包含了一段v2ray代码。
悉尼…
悉尼 Avira Security 例行扫描的时候直接删除了…
2021-11-03 12:09:37.079639 +0800 com.avira.scanservice Threat: LINUX/CoinMiner.ABL
Details: Contains detection pattern of the Linux virus LINUX/CoinMiner.ABL
was found in:
the threat was quarantined

按日志提示,文件应该是被送进虚拟机了。
悉尼…
虚拟机竟然是空的。

有遇到过的童鞋吗,寻求帮助,非常感谢…我写了好几天的文档…