分布式集群管理

使用Zookeeper实现分布式集群管理,功能如下:

  • 查看线上服务节点的资源使用情况
  • 服务离线通知
  • 服务资源超出阈值告警

架构设计:

monitor.png

具体实现:

  • 服务端收集服务状态信息发送给Zookeeper
  • 监控管理中心监听Zookeeper monitor节点下的子结点状态,做事件响应

代码:

服务端:使用agent技术监测服务端信息

  • pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <artifactId>my-zookeeper-demo</artifactId>
        <groupId>com.my</groupId>
        <version>1.0.0-SNAPSHOT</version>
    </parent>
    <artifactId>agent</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <packaging>jar</packaging>
    <inceptionYear>2020-Now</inceptionYear>

    <properties>
        <java.version>1.8</java.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
    </properties>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.5.8</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.11</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.9.9.3</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.2</version>
                <configuration>
                    <archive>
                        <manifestEntries>
                            <Project-name>monitor-collector</Project-name>
                            <Project-version>${project.version}</Project-version>
                            <Premain-Class>agent.MonitorAgent</Premain-Class>
                            <Can-Redefine-Classes>true</Can-Redefine-Classes>
                            <Can-Retransform-Classes>true</Can-Retransform-Classes>
                        </manifestEntries>
                    </archive>
                    <skip>true</skip>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.cbt.agent.Agent</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

dependency-reduced-pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <parent>
    <artifactId>my-zookeeper-demo</artifactId>
    <groupId>com.my</groupId>
    <version>1.0.0-SNAPSHOT</version>
  </parent>
  <modelVersion>4.0.0</modelVersion>
  <artifactId>agent</artifactId>
  <version>1.0.0-SNAPSHOT</version>
  <inceptionYear>2020-Now</inceptionYear>
  <build>
    <plugins>
      <plugin>
        <artifactId>maven-jar-plugin</artifactId>
        <version>2.2</version>
        <configuration>
          <archive>
            <manifestEntries>
              <Project-name>monitor-collector</Project-name>
              <Project-version>${project.version}</Project-version>
              <Premain-Class>agent.MonitorAgent</Premain-Class>
              <Can-Redefine-Classes>true</Can-Redefine-Classes>
              <Can-Retransform-Classes>true</Can-Retransform-Classes>
            </manifestEntries>
          </archive>
          <skip>true</skip>
        </configuration>
      </plugin>
      <plugin>
        <artifactId>maven-shade-plugin</artifactId>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <transformers>
                <transformer>
                  <mainClass>com.cbt.agent.Agent</mainClass>
                </transformer>
              </transformers>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
  <properties>
    <java.version>1.8</java.version>
    <maven.compiler.source>${java.version}</maven.compiler.source>
    <maven.compiler.target>${java.version}</maven.compiler.target>
  </properties>
</project>

  • domain
package agent.domain;

import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;

/**
 * 操作系统信息
 *
 * @author Zijian Liao
 * @since 1.0.0
 */
@Data
@Accessors(chain = true)
@EqualsAndHashCode
public class OSInfo {
    public String ip;
    public Double cpu;
    public long usedMemorySize;
    public long usableMemorySize;
    public String pid;
    public long lastUpdateTime;

}

package agent.domain;

import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.lang.management.ThreadMXBean;

/**
 * 获取CPU相关信息
 */
public class CPUMonitorCalc {

    private static CPUMonitorCalc instance = new CPUMonitorCalc();

    private OperatingSystemMXBean osMxBean;
    private ThreadMXBean threadBean;
    private long preTime = System.nanoTime();
    private long preUsedTime = 0;

    private CPUMonitorCalc() {
        osMxBean = ManagementFactory.getOperatingSystemMXBean();
        threadBean = ManagementFactory.getThreadMXBean();
    }

    public static CPUMonitorCalc getInstance() {
        return instance;
    }

    public double getProcessCpu() {
        long totalTime = 0;
        for (long id : threadBean.getAllThreadIds()) {
            totalTime += threadBean.getThreadCpuTime(id);
        }
        long curtime = System.nanoTime();
        long usedTime = totalTime - preUsedTime;
        long totalPassedTime = curtime - preTime;
        preTime = curtime;
        preUsedTime = totalTime;
        return (((double) usedTime) / totalPassedTime / osMxBean.getAvailableProcessors()) * 100;
    }
}
  • agent
package agent;

import agent.domain.CPUMonitorCalc;
import agent.domain.OSInfo;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.I0Itec.zkclient.ZkClient;

import java.lang.instrument.Instrumentation;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryUsage;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit;

/**
 * agent
 *
 * @author Zijian Liao
 * @since 1.0.0
 */
public class MonitorAgent {

    private static final String CLUSTER = "172.20.140.111:2181,172.20.140.220:2181,172.20.140.28:2181";
    private static final String ROOT_PATH = "/my-manager";
    private static final String SERVER_PATH = ROOT_PATH + "/server";


    public static void premain(String agentArgs, Instrumentation inst) {
        System.out.println("搜集服务信息插件启动中....");
        ZkClient zkClient = new ZkClient(CLUSTER);
        if (!zkClient.exists(ROOT_PATH)) {
            zkClient.createPersistent(ROOT_PATH);
        }
        //创建临时节点
        String nodePath = zkClient.createEphemeralSequential(SERVER_PATH, getOsInfo());
        //上报信息
        Thread thread = new Thread(()->{
            while(true){
                String osInfo = getOsInfo();
                zkClient.writeData(nodePath, osInfo);
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"zk-monitor");

        thread.setDaemon(true);
        thread.start();
        System.out.println("搜集服务信息插件已启动");
    }

    private static String getOsInfo() {
        OSInfo bean = new OSInfo();
        bean.lastUpdateTime = System.currentTimeMillis();
        bean.ip = getLocalIp();
        bean.cpu = CPUMonitorCalc.getInstance().getProcessCpu();
        MemoryUsage memoryUsag = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
        bean.usedMemorySize = memoryUsag.getUsed() / 1024 / 1024;
        bean.usableMemorySize = memoryUsag.getMax() / 1024 / 1024;
        bean.pid = ManagementFactory.getRuntimeMXBean().getName();
        ObjectMapper mapper = new ObjectMapper();
        try {
            return mapper.writeValueAsString(bean);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    private static String getLocalIp() {
        InetAddress addr = null;
        try {
            addr = InetAddress.getLocalHost();
        } catch (UnknownHostException e) {
            throw new RuntimeException(e);
        }
        return addr.getHostAddress();
    }

}

打包后运行服务端时,只要加上vm参数 -javaagent:D:\agent-1.0.0-SNAPSHOT.jar 即可挂载上监控插件

监控中心

@Controller
public class MonitorController implements InitializingBean {

    private static final String CLUSTER = "172.20.140.111:2181,172.20.140.220:2181,172.20.140.28:2181";
    private static final String ROOT_PATH = "/my-manager";
    private static final String SERVER_PATH = ROOT_PATH + "/server";
    ZkClient zkClient;

    @RequestMapping("/list")
    public String list(Model model){
        List<OSInfo> items = new ArrayList<>();
        List<String> children = zkClient.getChildren(ROOT_PATH);
        for (String child : children) {
            String path = ROOT_PATH + "/" + child;
            System.out.println(path);
            OSInfo osInfo = convert(zkClient.readData(path));
            System.out.println(osInfo);
            items.add(osInfo);
        }
        model.addAttribute("items", items);
        return "list";
    }

    ObjectMapper mapper = new ObjectMapper();
    private OSInfo convert(String json) {
        try {
            return mapper.readValue(json, OSInfo.class);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        zkClient = new ZkClient(CLUSTER);
    }
}

分布式注册中心

详见dubbo篇

分布式锁

使用Zookeeper实现一把分布式锁,具体功能

  • 同时只有一个服务能获取到锁
  • 未获取到服务的线程进入阻塞
  • 服务解锁后通知其他服务唤醒阻塞线程

具体实现:

  • 各个服务获取锁时在lock结点下创建临时节点
  • 获取到lock结点下所有子结点
  • 判断自己是不是最前的结点
  • 是则获取到锁,否则监听前一个结点,并阻塞自己
  • 服务解锁后,删除自己这个结点,Zookeeper通知下一个监听结点
  • 结点重新获取lock结点下所有子结点判断自己是不是最前的结点(检查一次,也可以不检查),唤醒阻塞线程,获取到了锁
  • 重复以上逻辑

代码:

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;

public class ZookeeperLock {
    private static final String CLUSTER = "172.20.140.111:2181,172.20.140.220:2181,172.20.140.28:2181";
    private ZkClient zkClient;
    private static final String ROOT_PATH = "/my-lock";
    private static final Map<Lock,Thread> nodeMap = new HashMap<>();

    public ZookeeperLock() {
        zkClient = new ZkClient(CLUSTER);
        if (!zkClient.exists(ROOT_PATH)) {
            zkClient.createPersistent(ROOT_PATH);
        }
    }
    // 获取锁
    public Lock lock(String lockId, long timeout) {
        // 创建临时节点
        Lock lockNode = createLockNode(lockId);
        lockNode = tryActiveLock(lockNode);// 尝试激活锁
        if (!lockNode.isActive()) {
            nodeMap.put(lockNode,Thread.currentThread());
            System.out.println(Thread.currentThread().getName()+"park");
            LockSupport.parkNanos(timeout);
            System.out.println(Thread.currentThread().getName()+"unpark");
        }
        if (!lockNode.isActive()) {
            throw new RuntimeException(" lock  timeout");
        }
        return lockNode;
    }

    // 释放锁
    public void unlock(Lock lock) {
        if (lock.isActive()) {
            zkClient.delete(lock.getPath());
        }
    }

    // 尝试激活锁
    private Lock tryActiveLock(Lock lockNode) {

        // 获取根节点下面所有的子节点
        List<String> list = zkClient.getChildren(ROOT_PATH)
                .stream()
                .sorted()
                .map(p -> ROOT_PATH + "/" + p)
                .collect(Collectors.toList());      // 判断当前是否为最小节点

        String firstNodePath = list.get(0);
        // 最小节点是不是当前节点
        if (firstNodePath.equals(lockNode.getPath())) {
            lockNode.setActive(true);
        } else {
            String upNodePath = list.get(list.indexOf(lockNode.getPath()) - 1);
            zkClient.subscribeDataChanges(upNodePath, new IZkDataListener() {
                @Override
                public void handleDataChange(String dataPath, Object data) throws Exception {

                }

                @Override
                public void handleDataDeleted(String dataPath) throws Exception {
                    System.out.println("节点删除:" + dataPath);
                    Lock lock = tryActiveLock(lockNode);
                    if (lock.isActive()) {
                        LockSupport.unpark(nodeMap.get(lockNode)); // 释放了
                    }
                    zkClient.unsubscribeDataChanges(upNodePath, this);
                }
            });
        }
        return lockNode;
    }


    public Lock createLockNode(String lockId) {
        String nodePath = zkClient.createEphemeralSequential(ROOT_PATH + "/" + lockId, "w");
        return new Lock(lockId, nodePath);
    }


    public static class Lock {
        private String lockId;
        private String path;
        private boolean active;
        public Lock(String lockId, String path) {
            this.lockId = lockId;
            this.path = path;
        }
        public String getPath() {
            return path;
        }

        public void setPath(String path) {
            this.path = path;
        }

        public boolean isActive() {
            return active;
        }
        public void setActive(boolean active) {
            this.active = active;
        }
    }

    public static void main(String[] args) {
        ZookeeperLock lock = new ZookeeperLock();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        for(int i =0 ;i<10;i++){
            new Thread(()->{
                try {
                    countDownLatch.await();
                    System.out.println(Thread.currentThread().getName() + "准备获取锁");
                    Lock lockNode = lock.lock("lockKey", TimeUnit.SECONDS.toNanos(10));
                    System.out.println(Thread.currentThread().getName() + "获取到锁");
                    Thread.sleep(5000);
                    lock.unlock(lockNode);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }


            },"Thread-"+i).start();
        }
        countDownLatch.countDown();
    }
}