微信搜索lxw1234bigdata | 邀请体验:数阅–数据管理、OLAP分析与可视化平台 | 赞助作者:赞助作者

图数据库Neo4j使用例子

编程语言 lxw1234@qq.com 9160℃ 0评论

准备开发一个任务调度系统,其中使用Neo4j来保存和查询任务之间的依赖关系,

整理了一个服务类,记录一下。

使用的Neo4j版本为:2.1.3

 

服务类:

 

 

	
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import org.apache.log4j.Logger;
import org.neo4j.graphdb.Direction;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Path;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.RelationshipType;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.factory.GraphDatabaseFactory;
import org.neo4j.graphdb.index.Index;
import org.neo4j.graphdb.traversal.Evaluators;
import org.neo4j.graphdb.traversal.PathEvaluator;
import org.neo4j.graphdb.traversal.TraversalDescription;
import org.neo4j.graphdb.traversal.Traverser;
import org.neo4j.kernel.impl.util.FileUtils;

/**
 * Neo4j服务类
 * @author lxw
 *
 */
public class Neo4jService implements Runnable {
	
	private static Logger log = Logger.getLogger(Neo4jService.class);
	
	private final String DB_PATH = "neo4jdb/skynet-neo4j.db";
	
	private GraphDatabaseService db;
	private Index index = null;
	
	public static final String INDEX_ID_KEY = "idx_id";
	public static final String JOB_ID_KEY = "job_id";
	
	private boolean running = true;
	
	

	private enum RelTypes implements RelationshipType{
        KNOWS,
    }
	
	public Neo4jService() {	
		init();
	}
	
	public void init() {
		clearDB();
		db = new GraphDatabaseFactory().newEmbeddedDatabase(DB_PATH);
		registerShutdownHook();
		Transaction tx = db.beginTx();
		try {
			this.index = db.index().forNodes("jobNodes");
			tx.success();
		} catch (Exception e) {
			log.error(e);
			log.error("init() error.."); 
		} finally {
			tx.close();
		}
	}
	
	//添加节点
	public boolean addNode(int job_id) {
		Transaction tx = db.beginTx();
		try {
			Node jobNode = db.createNode();
			jobNode.setProperty(JOB_ID_KEY, job_id);
			this.index.add(jobNode, INDEX_ID_KEY, job_id);
			tx.success();
			log.info("addNode: [" + job_id + "].");
		} catch (Exception e) {
			log.error(e);
			log.error("addNode() error ..");
			return false;
		} finally {
			tx.close();
		}
		return true;
	}
	
	/**
	 * 删除节点依赖
	 * @param parent_job_id
	 * @param son_job_id
	 * @return
	 */
	public boolean deleteNodeRelationShip(int parent_job_id,int son_job_id) {
		Transaction tx = db.beginTx();
		try {
			Node parentJobNode = getNodeByJob(parent_job_id);
			Node sonJobNode = getNodeByJob(son_job_id);
			Iterable relationships = parentJobNode.getRelationships(RelTypes.KNOWS, Direction.OUTGOING);
			for(Relationship r : relationships) {
				if(r.getEndNode().equals(sonJobNode)) {
					r.delete();
				}
			}
			tx.success();
			log.info("deleteNodeRelationShip: [" + parent_job_id + "," + son_job_id + "].");
		} catch (Exception e) {
			log.error(e);
			log.error("deleteNodeRelationShip() error .. paren_job_id is [" + parent_job_id 
					+ "], son_job_id is [" + son_job_id + "]."); 
		} finally {
			tx.close();
		}
		return false;
	}
	
	//删除节点
	public boolean deleteNode(int job_id) {		
		Transaction tx = db.beginTx();
		try {
			Node jobNode = getNodeByJob(job_id);
			jobNode.delete();
			tx.success();
			log.info("deleteNode: [" + job_id + "].");
		} catch (Exception e) {
			log.error(e);
			log.error("deleteNode() error ..");
			return false;
		} finally {
			tx.close();
		}
		return true;
	}
	
	//添加节点依赖
	public boolean addNodeRelationShip(int parent_job_id,int son_job_id) {
		Transaction tx = db.beginTx();
		try {
			Node parentJobNode = getNodeByJob(parent_job_id);
			Node sonJobNode = getNodeByJob(son_job_id);
			parentJobNode.createRelationshipTo(sonJobNode, RelTypes.KNOWS);
			tx.success();
			log.info("addNodeRelationShip: [" + parent_job_id + "," + son_job_id + "].");
		} catch (Exception e) {
			log.error(e);
			log.error("addNodeRelationShip() error .. paren_job_id is [" + parent_job_id 
					+ "], son_job_id is [" + son_job_id + "]."); 
		} finally {
			tx.close();
		}
		return false;
	}
	
	//获取节点的子任务,包括自身
	public List getJobSons (int job_id,int depth) {
		List jobSons = new ArrayList();
		PathEvaluator<?> path = null;
		if(depth == 0 || depth == -1) {
			path = Evaluators.excludeStartPosition();
		} else {
			path = Evaluators.toDepth(depth);
		}
		Node jobNode = getNodeByJob(job_id);
		
		if(null != jobNode) {
			Transaction tx = db.beginTx();
			try {
				TraversalDescription td = db.traversalDescription()
		                .breadthFirst()
		                .relationships( RelTypes.KNOWS, Direction.OUTGOING )
		                .evaluator(path);
				Traverser t = td.traverse(jobNode);
				
				for(Path p : t) {
					log.info("level:" + p.length());
					log.info("id:" + p.endNode().getProperty(JOB_ID_KEY));
					jobSons.add((Integer) p.endNode().getProperty(JOB_ID_KEY));
				}
				tx.success();
			} catch (Exception e) {
				log.error(e);
				log.error("getJobSons error .. job_id is [" + job_id + "]");
			} finally {
				tx.close();
			}
		}
		return jobSons;
	}
	
	
	//获取节点的父任务,包括自身
	public List getJobParents (int job_id,int depth) {
		List jobParents = new ArrayList();
		PathEvaluator<?> path = null;
		if(depth == 0 || depth == -1) {
			path = Evaluators.excludeStartPosition();
		} else {
			path = Evaluators.toDepth(depth);
		}
		Node jobNode = getNodeByJob(job_id);
		
		if(null != jobNode) {
			Transaction tx = db.beginTx();
			try {
				TraversalDescription td = db.traversalDescription()
		                .breadthFirst()
		                .relationships( RelTypes.KNOWS, Direction.INCOMING )
		                .evaluator(path);
				Traverser t = td.traverse(jobNode);
				
				for(Path p : t) {
					log.info("level:" + p.length());
					log.info("id:" + p.endNode().getProperty(JOB_ID_KEY));
					jobParents.add((Integer) p.endNode().getProperty(JOB_ID_KEY));
				}
				tx.success();
			} catch (Exception e) {
				log.error(e);
				log.error("getJobParents error .. job_id is [" + job_id + "]");
			} finally {
				tx.close();
			}
		}
		return jobParents;
	}
	
	
	//获取节点的一级父任务,不包括自身
	public List getJobFirstParents (int job_id) {
		List jobParents = new ArrayList();
		PathEvaluator<?> path = Evaluators.atDepth(1);
		Node jobNode = getNodeByJob(job_id);
		
		if(null != jobNode) {
			Transaction tx = db.beginTx();
			try {
				TraversalDescription td = db.traversalDescription()
		                .breadthFirst()
		                .relationships( RelTypes.KNOWS, Direction.INCOMING )
		                .evaluator(path);
				Traverser t = td.traverse(jobNode);
				
				for(Path p : t) {
					log.info("level:" + p.length());
					log.info("id:" + p.endNode().getProperty(JOB_ID_KEY));
					jobParents.add((Integer) p.endNode().getProperty(JOB_ID_KEY));
				}
				tx.success();
			} catch (Exception e) {
				log.error(e);
				log.error("getJobFirstParents error .. job_id is [" + job_id + "]");
			} finally {
				tx.close();
			}
		}
		return jobParents;
	}
	
	//获取节点的一级子任务,不包括自身
	public List getJobFirstSons (int job_id) {
		List jobSons = new ArrayList();
		PathEvaluator<?> path = Evaluators.atDepth(1);
		Node jobNode = getNodeByJob(job_id);
		
		if(null != jobNode) {
			Transaction tx = db.beginTx();
			try {
				TraversalDescription td = db.traversalDescription()
		                .breadthFirst()
		                .relationships( RelTypes.KNOWS, Direction.OUTGOING )
		                .evaluator(path);
				Traverser t = td.traverse(jobNode);
				
				for(Path p : t) {
					log.info("level:" + p.length());
					log.info("id:" + p.endNode().getProperty(JOB_ID_KEY));
					jobSons.add((Integer) p.endNode().getProperty(JOB_ID_KEY));
				}
				tx.success();
			} catch (Exception e) {
				log.error(e);
				log.error("getJobFirstParents error .. job_id is [" + job_id + "]");
			} finally {
				tx.close();
			}
		}
		return jobSons;
	}
	
	//根据job_id获取Node
	public Node getNodeByJob(int job_id) {
		Node node = null;
		Transaction tx = db.beginTx();
		try {
			node = this.index.get(INDEX_ID_KEY,job_id).getSingle();
			tx.success();
		} catch (Exception e) {
			log.error(e);
			log.error("getNodeByJob() error, job_id is [" + job_id + "].");
		} finally {
			tx.close();
		}
		return node;
	}
	
	private void registerShutdownHook() {
        // Registers a shutdown hook for the Neo4j instance so that it
        // shuts down nicely when the VM exits (even if you "Ctrl-C" the
        // running example before it's completed)
        Runtime.getRuntime()
                .addShutdownHook( new Thread()
                {
                    @Override
                    public void run()
                    {
                    	log.info("neo4j shutdown hook ... ");
                        db.shutdown();
                    }
                } );
    }
	
	private void clearDB() {  
        try {  
            FileUtils.deleteRecursively(new File(DB_PATH));  
        }  
        catch(IOException e) {  
            throw new RuntimeException(e);  
        }  
    }

	@Override
	public void run() {
		while (running) {
			log.info(new Date() + " ### Neo4jService 运行正常!");
			try {
				Thread.sleep(20000);
			} catch (InterruptedException e) {
				log.error(e);
			}
		}
	}  
	
	public void setRunning(boolean running) {
		this.running = running;
	}
	
	
}

测试类:

	
import java.util.List;


public class TestNeo4j {
	public static void main(String[] args) {
		Neo4jService nj = new Neo4jService();
		Thread t = new Thread(nj,"Neo4jThread");
		t.start();
		nj.addNode(0);
		nj.addNode(1);
		nj.addNode(2);
		nj.addNode(3);
		nj.addNode(4);
		nj.addNode(5);
		nj.addNode(6);
		nj.addNode(7);
		nj.addNode(8);
		nj.addNode(9);
		nj.addNode(10);
		
		
		nj.addNodeRelationShip(0,1);
		nj.addNodeRelationShip(0,2);
		nj.addNodeRelationShip(0,3);
		nj.addNodeRelationShip(1,4);
		nj.addNodeRelationShip(1,5);
		nj.addNodeRelationShip(1,6);
		
		nj.addNodeRelationShip(2,7);
		nj.addNodeRelationShip(2,9);
		
		nj.addNodeRelationShip(3,8);
		nj.addNodeRelationShip(3,10);
		
		nj.addNodeRelationShip(8,9);
		nj.addNodeRelationShip(10,9);
		
		List sons = nj.getJobFirstParents(9);
		for(Integer i : sons) {
			System.out.println(i);
		}
		
		nj.deleteNodeRelationShip(2, 9);
		
		sons = nj.getJobFirstParents(9);
		System.out.println("after delete 2--9");
		for(Integer i : sons) {
			System.out.println(i);
		}
		
		nj.setRunning(false);
		
		
	}
}

任务的依赖关系如图:

任务依赖

任务依赖

 

测试输出:

14/09/11 15:39:03 INFO neo4j.Neo4jService: Thu Sep 11 15:39:03 CST 2014 ### Neo4jService 运行正常!
14/09/11 15:39:03 INFO neo4j.Neo4jService: addNode: [0].
14/09/11 15:39:03 INFO neo4j.Neo4jService: addNode: [1].
14/09/11 15:39:03 INFO neo4j.Neo4jService: addNode: [2].
14/09/11 15:39:03 INFO neo4j.Neo4jService: addNode: [3].
14/09/11 15:39:03 INFO neo4j.Neo4jService: addNode: [4].
14/09/11 15:39:03 INFO neo4j.Neo4jService: addNode: [5].
14/09/11 15:39:03 INFO neo4j.Neo4jService: addNode: [6].
14/09/11 15:39:04 INFO neo4j.Neo4jService: addNode: [7].
14/09/11 15:39:04 INFO neo4j.Neo4jService: addNode: [8].
14/09/11 15:39:04 INFO neo4j.Neo4jService: addNode: [9].
14/09/11 15:39:04 INFO neo4j.Neo4jService: addNode: [10].
14/09/11 15:39:04 INFO neo4j.Neo4jService: addNodeRelationShip: [0,1].
14/09/11 15:39:04 INFO neo4j.Neo4jService: addNodeRelationShip: [0,2].
14/09/11 15:39:04 INFO neo4j.Neo4jService: addNodeRelationShip: [0,3].
14/09/11 15:39:04 INFO neo4j.Neo4jService: addNodeRelationShip: [1,4].
14/09/11 15:39:04 INFO neo4j.Neo4jService: addNodeRelationShip: [1,5].
14/09/11 15:39:04 INFO neo4j.Neo4jService: addNodeRelationShip: [1,6].
14/09/11 15:39:04 INFO neo4j.Neo4jService: addNodeRelationShip: [2,7].
14/09/11 15:39:04 INFO neo4j.Neo4jService: addNodeRelationShip: [2,9].
14/09/11 15:39:04 INFO neo4j.Neo4jService: addNodeRelationShip: [3,8].
14/09/11 15:39:04 INFO neo4j.Neo4jService: addNodeRelationShip: [3,10].
14/09/11 15:39:04 INFO neo4j.Neo4jService: addNodeRelationShip: [8,9].
14/09/11 15:39:04 INFO neo4j.Neo4jService: addNodeRelationShip: [10,9].
14/09/11 15:39:04 INFO neo4j.Neo4jService: level:1
14/09/11 15:39:04 INFO neo4j.Neo4jService: id:2
14/09/11 15:39:04 INFO neo4j.Neo4jService: level:1
14/09/11 15:39:04 INFO neo4j.Neo4jService: id:8
14/09/11 15:39:04 INFO neo4j.Neo4jService: level:1
14/09/11 15:39:04 INFO neo4j.Neo4jService: id:10
2
8
10
14/09/11 15:39:04 INFO neo4j.Neo4jService: deleteNodeRelationShip: [2,9].
14/09/11 15:39:04 INFO neo4j.Neo4jService: level:1
14/09/11 15:39:04 INFO neo4j.Neo4jService: id:10
14/09/11 15:39:04 INFO neo4j.Neo4jService: level:1
14/09/11 15:39:04 INFO neo4j.Neo4jService: id:8
after delete 2–9
10
8
14/09/11 15:39:23 INFO neo4j.Neo4jService: neo4j shutdown hook …

如果觉得本博客对您有帮助,请 赞助作者

转载请注明:lxw的大数据田地 » 图数据库Neo4j使用例子

喜欢 (1)
分享 (0)
发表我的评论
取消评论
表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址