From f596683ca75fe94be22959034bab2092a9dc015d Mon Sep 17 00:00:00 2001
From: Sebastian Eibl <sebastian.eibl@fau.de>
Date: Fri, 17 Mar 2017 16:53:28 +0100
Subject: [PATCH] added union pe data structure

---
 src/core/math/GenericAABB.h                |    4 +-
 src/core/math/Quaternion.h                 |    6 +-
 src/pe/Types.h                             |    5 +-
 src/pe/basic.h                             |    1 +
 src/pe/collision/Collide.h                 |   48 +-
 src/pe/communication/DynamicMarshalling.h  |    4 +-
 src/pe/communication/Instantiate.h         |    5 +-
 src/pe/communication/Marshalling.cpp       |   16 +-
 src/pe/communication/Marshalling.h         |    7 +-
 src/pe/communication/rigidbody/Box.cpp     |    4 +-
 src/pe/communication/rigidbody/Box.h       |   19 +-
 src/pe/communication/rigidbody/Capsule.cpp |    4 +-
 src/pe/communication/rigidbody/Capsule.h   |   17 +-
 src/pe/communication/rigidbody/Sphere.cpp  |    4 +-
 src/pe/communication/rigidbody/Sphere.h    |   17 +-
 src/pe/communication/rigidbody/Union.h     |  146 +++
 src/pe/contact/Contact.cpp                 |    4 +-
 src/pe/cr/HCSITS.impl.h                    |   16 +-
 src/pe/fcd/SimpleFCD.h                     |   44 +-
 src/pe/rigidbody/RigidBody.cpp             |    1 -
 src/pe/rigidbody/RigidBody.h               |  124 +-
 src/pe/rigidbody/Union.h                   | 1218 ++++++++++++++++++++
 src/pe/rigidbody/UnionFactory.h            |  266 +++++
 src/pe/vtk/SphereVtkOutput.cpp             |   10 +
 src/pe/vtk/SphereVtkOutput.h               |    7 +
 tests/pe/CMakeLists.txt                    |    6 +
 tests/pe/Collision.cpp                     |   35 +-
 tests/pe/Marshalling.cpp                   |   49 +-
 tests/pe/SetBodyTypeIDs.cpp                |    3 +-
 tests/pe/ShadowCopy.cpp                    |  176 +++
 tests/pe/Union.cpp                         |  108 ++
 31 files changed, 2204 insertions(+), 170 deletions(-)
 create mode 100644 src/pe/communication/rigidbody/Union.h
 create mode 100644 src/pe/rigidbody/Union.h
 create mode 100644 src/pe/rigidbody/UnionFactory.h
 create mode 100644 tests/pe/ShadowCopy.cpp
 create mode 100644 tests/pe/Union.cpp

diff --git a/src/core/math/GenericAABB.h b/src/core/math/GenericAABB.h
index 0d3c43a26..502b45c2e 100644
--- a/src/core/math/GenericAABB.h
+++ b/src/core/math/GenericAABB.h
@@ -179,6 +179,8 @@ public:
    template< typename Engine >
    inline vector_type randomPoint( Engine & engine ) const;
 
+   bool checkInvariant() const;
+
    /**
     * \brief Deserializes an GenericAABB from a mpi::GenericRecvBuffer
     *
@@ -206,8 +208,6 @@ protected:
    inline GenericAABB( const GenericAABB & lhs, const GenericAABB & rhs );
 
 private:
-   bool checkInvariant() const;
-
    vector_type minCorner_; /// minimal values
    vector_type maxCorner_; /// maximal values
 };
diff --git a/src/core/math/Quaternion.h b/src/core/math/Quaternion.h
index fc6dd5891..b7b71b453 100644
--- a/src/core/math/Quaternion.h
+++ b/src/core/math/Quaternion.h
@@ -277,7 +277,8 @@ inline Quaternion<Type>::Quaternion( Vector3<Axis> axis, Type angle )
 {
    static_assert(boost::is_floating_point<Axis>::value, "Axis has to be floating point!" );
 
-   if( math::equal(std::fabs(angle), real_c(0))  ) {
+   auto axisLength = axis.length();
+   if( (floatIsEqual(axisLength, 0)) || (math::equal(std::fabs(angle), real_c(0)))  ) {
       reset();
       return;
    }
@@ -287,7 +288,8 @@ inline Quaternion<Type>::Quaternion( Vector3<Axis> axis, Type angle )
    const Type sina( std::sin( angle*Type(0.5) ) );
    const Type cosa( std::cos( angle*Type(0.5) ) );
 
-   axis = axis.getNormalized();
+   auto invAxisLength = real_c(1) / axisLength;
+   axis *= invAxisLength;
 
    v_[0] = cosa;
    v_[1] = sina * axis[0];
diff --git a/src/pe/Types.h b/src/pe/Types.h
index 8a0b34c75..8eb0d3560 100644
--- a/src/pe/Types.h
+++ b/src/pe/Types.h
@@ -84,6 +84,7 @@ class SliderJoint;
 class Sphere;
 class Spring;
 class TriangleMesh;
+template <typename BodyTypeTuple>
 class Union;
 
 typedef RigidBody             BodyType;            //!< Type of the rigid bodies.
@@ -117,10 +118,6 @@ typedef TriangleMesh          MeshType;             //!< Type of the triangle me
 typedef TriangleMesh*         MeshID;               //!< Handle for a triangle mesh primitive.
 typedef const TriangleMesh*   ConstMeshID;          //!< Handle for a constant triangle mesh primitive.
 
-typedef Union                 UnionType;           //!< Type of the union compound geometry.
-typedef Union*                UnionID;             //!< Handle for a union.
-typedef const Union*          ConstUnionID;        //!< Handle for a constant union.
-
 typedef Attachable            AttachableType;      //!< Type of the attachables.
 typedef Attachable*           AttachableID;        //!< Handle for an attachable.
 typedef const Attachable*     ConstAttachableID;   //!< Handle for a constant attachable.
diff --git a/src/pe/basic.h b/src/pe/basic.h
index f968b876b..47b6345e6 100644
--- a/src/pe/basic.h
+++ b/src/pe/basic.h
@@ -43,6 +43,7 @@
 #include "pe/rigidbody/CapsuleFactory.h"
 #include "pe/rigidbody/PlaneFactory.h"
 #include "pe/rigidbody/SphereFactory.h"
+#include "pe/rigidbody/UnionFactory.h"
 
 #include "pe/synchronization/SyncNextNeighbors.h"
 #include "pe/synchronization/SyncShadowOwners.h"
diff --git a/src/pe/collision/Collide.h b/src/pe/collision/Collide.h
index 95ce17582..2572faeaf 100644
--- a/src/pe/collision/Collide.h
+++ b/src/pe/collision/Collide.h
@@ -20,11 +20,14 @@
 //
 //======================================================================================================================
 
+#pragma once
+
 #include "pe/Types.h"
 #include "pe/rigidbody/Box.h"
 #include "pe/rigidbody/Capsule.h"
 #include "pe/rigidbody/Plane.h"
 #include "pe/rigidbody/Sphere.h"
+#include "pe/rigidbody/Union.h"
 #include "pe/contact/Contact.h"
 
 #include "GJKEPAHelper.h"
@@ -34,9 +37,17 @@
 #include "core/math/Shims.h"
 #include "geometry/GeometricalFunctions.h"
 
+#include <boost/tuple/tuple.hpp>
+
 namespace walberla {
 namespace pe {
 
+namespace fcd {
+//Forward Declaration
+template < typename BodyTypeTuple, typename BaseT >
+struct DoubleDispatch;
+}
+
 //*************************************************************************************************
 /*!\brief Contact generation between two colliding rigid bodies.
  * \ingroup contact_generation
@@ -74,9 +85,6 @@ bool collide( SphereID s1, SphereID s2, Container& container )
 {
    WALBERLA_ASSERT_UNEQUAL(s1->getSystemID(), s2->getSystemID(), "colliding with itself!\n" << s1 << "\n" << s2);
 
-   WALBERLA_CHECK_EQUAL(s1->getSuperBody(), s1);
-   WALBERLA_CHECK_EQUAL(s2->getSuperBody(), s2);
-
    Vec3 contactNormal = ( s1->getPosition() - s2->getPosition() );
    real_t penetrationDepth = ( contactNormal.length() - s1->getRadius() - s2->getRadius() );
 
@@ -1994,5 +2002,39 @@ bool collide( CapsuleID c, BoxID b, Container& container )
    return collide(b, c, container);
 }
 
+template <typename BodyTypeTuple, typename BodyB, typename Container>
+inline
+bool collide( Union<BodyTypeTuple>* bd1, BodyB* bd2, Container& container )
+{
+   bool collision = false;
+   for( auto it=bd1->begin(); it!=bd1->end(); ++it )
+   {
+      collision |= fcd::DoubleDispatch< BodyTypeTuple, boost::tuple<BodyB> >::execute(*it, bd2, container);
+   }
+   return collision;
+}
+
+template <typename BodyA, typename BodyTypeTuple, typename Container>
+inline
+bool collide( BodyA* bd1, Union<BodyTypeTuple>* bd2, Container& container )
+{
+   return collide (bd2, bd1, container);
+}
+
+template <typename BodyTypeTupleA, typename BodyTypeTupleB, typename Container>
+inline
+bool collide( Union<BodyTypeTupleA>* bd1, Union<BodyTypeTupleB>* bd2, Container& container )
+{
+   bool collision = false;
+   for( auto it1=bd1->begin(); it1!=bd1->end(); ++it1 )
+   {
+      for( auto it2=bd2->begin(); it2!=bd2->end(); ++it2 )
+      {
+         collision |= fcd::DoubleDispatch< BodyTypeTupleA, BodyTypeTupleB >::execute(*it1, *it2, container);
+      }
+   }
+   return collision;
+}
+
 }
 }
diff --git a/src/pe/communication/DynamicMarshalling.h b/src/pe/communication/DynamicMarshalling.h
index 508448921..24246cbe1 100644
--- a/src/pe/communication/DynamicMarshalling.h
+++ b/src/pe/communication/DynamicMarshalling.h
@@ -33,6 +33,7 @@
 #include "pe/communication/rigidbody/Box.h"
 #include "pe/communication/rigidbody/Capsule.h"
 #include "pe/communication/rigidbody/Sphere.h"
+#include "pe/communication/rigidbody/Union.h"
 
 #include "core/Abort.h"
 
@@ -95,7 +96,8 @@ struct UnmarshalDynamically{
       if (BodyTypeTuple::head_type::getStaticTypeID() == typeID)
       {
          typedef typename BodyTypeTuple::head_type BodyT;
-         return instantiate<BodyT>( buffer, domain, block );
+         BodyT* newBody;
+         return instantiate( buffer, domain, block, newBody );
       } else
       {
          return UnmarshalDynamically<typename BodyTypeTuple::tail_type>::execute(buffer, typeID, domain, block);
diff --git a/src/pe/communication/Instantiate.h b/src/pe/communication/Instantiate.h
index 14550645e..3190de2a2 100644
--- a/src/pe/communication/Instantiate.h
+++ b/src/pe/communication/Instantiate.h
@@ -28,6 +28,7 @@
 // Includes
 //*************************************************************************************************
 
+#include "core/Abort.h"
 #include "core/math/AABB.h"
 #include "core/math/Vector3.h"
 
@@ -55,9 +56,9 @@ void correctBodyPosition(const math::AABB& domain, const Vec3& center, Vec3& pos
 }
 
 template < class BodyT >
-BodyID instantiate( mpi::RecvBuffer& /*buffer*/, const math::AABB& /*domain*/, const math::AABB& /*block*/ )
+BodyT* instantiate( mpi::RecvBuffer& /*buffer*/, const math::AABB& /*domain*/, const math::AABB& /*block*/, BodyT*& /*newBody*/ )
 {
-   throw std::runtime_error( "Body instantiation not implemented!" );
+   WALBERLA_ABORT( "Body instantiation not implemented! (" << demangle(typeid(BodyT).name()) << ")" );
 }
 
 }  // namespace communication
diff --git a/src/pe/communication/Marshalling.cpp b/src/pe/communication/Marshalling.cpp
index ec69e8db8..ca1530f1d 100644
--- a/src/pe/communication/Marshalling.cpp
+++ b/src/pe/communication/Marshalling.cpp
@@ -47,7 +47,7 @@ void marshal( mpi::SendBuffer& buffer, const MPIRigidBodyTrait& obj ) {
  * \param hasSuperBody False if body is not part of a union. Subordinate bodies in unions do not encode velocities but encode relative positions.
  * \return void
  */
-void unmarshal( mpi::RecvBuffer& buffer, MPIRigidBodyTraitParameter& objparam, bool /*hasSuperBody*/ ) {
+void unmarshal( mpi::RecvBuffer& buffer, MPIRigidBodyTraitParameter& objparam ) {
    buffer >> objparam.owner_;
 }
 
@@ -66,6 +66,7 @@ void marshal( mpi::SendBuffer& buffer, const RigidBody& obj ) {
    buffer << obj.isCommunicating();
    buffer << obj.hasInfiniteMass();
    buffer << obj.getPosition();
+   buffer << obj.hasSuperBody();
    if( obj.hasSuperBody() )
    {
       buffer <<  obj.getRelPosition();
@@ -87,23 +88,24 @@ void marshal( mpi::SendBuffer& buffer, const RigidBody& obj ) {
  * \param hasSuperBody False if body is not part of a union. Subordinate bodies in unions do not encode velocities but encode relative positions.
  * \return void
  */
-void unmarshal( mpi::RecvBuffer& buffer, RigidBodyParameters& objparam, bool hasSuperBody ) {
-   unmarshal( buffer, objparam.mpiTrait_, hasSuperBody );
+void unmarshal( mpi::RecvBuffer& buffer, RigidBodyParameters& objparam ) {
+   unmarshal( buffer, objparam.mpiTrait_ );
 
    buffer >> objparam.sid_;
    buffer >> objparam.uid_;
    buffer >> objparam.communicating_;
    buffer >> objparam.infiniteMass_;
    buffer >> objparam.gpos_;
+   buffer >> objparam.hasSuperBody_;
 
-   if( hasSuperBody )
+   if( objparam.hasSuperBody_ )
    {
       buffer >> objparam.rpos_;
    }
 
    buffer >> objparam.q_;
 
-   if( !hasSuperBody )
+   if( !objparam.hasSuperBody_ )
    {
       buffer >> objparam.v_;
       buffer >> objparam.w_;
@@ -139,8 +141,8 @@ void marshal( mpi::SendBuffer& buffer, const GeomPrimitive& obj ) {
  * \param hasSuperBody False if body is not part of a union. Passed on to rigid body unmarshalling.
  * \return void
  */
-void unmarshal( mpi::RecvBuffer& buffer, GeomPrimitiveParameters& objparam, bool hasSuperBody ) {
-   unmarshal( buffer, static_cast<RigidBodyParameters&>( objparam ), hasSuperBody );
+void unmarshal( mpi::RecvBuffer& buffer, GeomPrimitiveParameters& objparam ) {
+   unmarshal( buffer, static_cast<RigidBodyParameters&>( objparam ) );
    buffer >> objparam.material_;
 }
 
diff --git a/src/pe/communication/Marshalling.h b/src/pe/communication/Marshalling.h
index 42ecd2873..1fc935363 100644
--- a/src/pe/communication/Marshalling.h
+++ b/src/pe/communication/Marshalling.h
@@ -63,7 +63,7 @@ void marshal( mpi::SendBuffer& buffer, const MPIRigidBodyTrait& obj );
  * \param hasSuperBody False if body is not part of a union. Subordinate bodies in unions do not encode velocities but encode relative positions.
  * \return void
  */
-void unmarshal( mpi::RecvBuffer& buffer, MPIRigidBodyTraitParameter& objparam, bool hasSuperBody );
+void unmarshal( mpi::RecvBuffer& buffer, MPIRigidBodyTraitParameter& objparam );
 
 //*************************************************************************************************
 //*************************************************************************************************
@@ -77,6 +77,7 @@ struct RigidBodyParameters {
    bool communicating_, infiniteMass_;
    id_t sid_, uid_;
    Vec3 gpos_, rpos_, v_, w_;
+   bool hasSuperBody_;
    Quat q_;
 };
 
@@ -98,7 +99,7 @@ void marshal( mpi::SendBuffer& buffer, const RigidBody& obj );
  * \param hasSuperBody False if body is not part of a union. Subordinate bodies in unions do not encode velocities but encode relative positions.
  * \return void
  */
-void unmarshal( mpi::RecvBuffer& buffer, RigidBodyParameters& objparam, bool hasSuperBody );
+void unmarshal( mpi::RecvBuffer& buffer, RigidBodyParameters& objparam );
 
 //*************************************************************************************************
 //*************************************************************************************************
@@ -130,7 +131,7 @@ void marshal( mpi::SendBuffer& buffer, const GeomPrimitive& obj );
  * \param hasSuperBody False if body is not part of a union. Passed on to rigid body unmarshalling.
  * \return void
  */
-void unmarshal( mpi::RecvBuffer& buffer, GeomPrimitiveParameters& objparam, bool hasSuperBody );
+void unmarshal( mpi::RecvBuffer& buffer, GeomPrimitiveParameters& objparam );
 
 }  // namespace communication
 }  // namespace pe
diff --git a/src/pe/communication/rigidbody/Box.cpp b/src/pe/communication/rigidbody/Box.cpp
index 330960a36..40217acb6 100644
--- a/src/pe/communication/rigidbody/Box.cpp
+++ b/src/pe/communication/rigidbody/Box.cpp
@@ -49,8 +49,8 @@ void marshal( mpi::SendBuffer& buffer, const Box& obj ) {
  * \param hasSuperBody False if body is not part of a union. Passed on to rigid body unmarshalling.
  * \return void
  */
-void unmarshal( mpi::RecvBuffer& buffer, BoxParameters& objparam, bool hasSuperBody ) {
-   unmarshal( buffer, static_cast<GeomPrimitiveParameters&>( objparam ), hasSuperBody );
+void unmarshal( mpi::RecvBuffer& buffer, BoxParameters& objparam ) {
+   unmarshal( buffer, static_cast<GeomPrimitiveParameters&>( objparam ) );
    buffer >> objparam.lengths_;
 }
 //*************************************************************************************************
diff --git a/src/pe/communication/rigidbody/Box.h b/src/pe/communication/rigidbody/Box.h
index 9b5f43b7d..89324fbe6 100644
--- a/src/pe/communication/rigidbody/Box.h
+++ b/src/pe/communication/rigidbody/Box.h
@@ -58,21 +58,18 @@ void marshal( mpi::SendBuffer& buffer, const Box& obj );
  * \param hasSuperBody False if body is not part of a union. Passed on to rigid body unmarshalling.
  * \return void
  */
-void unmarshal( mpi::RecvBuffer& buffer, BoxParameters& objparam, bool hasSuperBody );
+void unmarshal( mpi::RecvBuffer& buffer, BoxParameters& objparam );
 //*************************************************************************************************
-
-
-template <>
-inline BodyID instantiate<Box>( mpi::RecvBuffer& buffer, const math::AABB& domain, const math::AABB& block )
+inline BoxID instantiate( mpi::RecvBuffer& buffer, const math::AABB& domain, const math::AABB& block, BoxID& newBody )
 {
    BoxParameters subobjparam;
-   unmarshal( buffer, subobjparam, false );
+   unmarshal( buffer, subobjparam );
    correctBodyPosition(domain, block.center(), subobjparam.gpos_);
-   BoxID obj = new Box( subobjparam.sid_, subobjparam.uid_, subobjparam.gpos_, subobjparam.rpos_, subobjparam.q_, subobjparam.lengths_, subobjparam.material_, false, subobjparam.communicating_, subobjparam.infiniteMass_ );
-   obj->setLinearVel( subobjparam.v_ );
-   obj->setAngularVel( subobjparam.w_ );
-   obj->MPITrait.setOwner( subobjparam.mpiTrait_.owner_ );
-   return obj;
+   newBody = new Box( subobjparam.sid_, subobjparam.uid_, subobjparam.gpos_, subobjparam.rpos_, subobjparam.q_, subobjparam.lengths_, subobjparam.material_, false, subobjparam.communicating_, subobjparam.infiniteMass_ );
+   newBody->setLinearVel( subobjparam.v_ );
+   newBody->setAngularVel( subobjparam.w_ );
+   newBody->MPITrait.setOwner( subobjparam.mpiTrait_.owner_ );
+   return newBody;
 }
 
 }  // namespace communication
diff --git a/src/pe/communication/rigidbody/Capsule.cpp b/src/pe/communication/rigidbody/Capsule.cpp
index ee65bc40c..431f7ab02 100644
--- a/src/pe/communication/rigidbody/Capsule.cpp
+++ b/src/pe/communication/rigidbody/Capsule.cpp
@@ -51,9 +51,9 @@ void marshal( mpi::SendBuffer& buffer, const Capsule& obj )
  * \param hasSuperBody False if body is not part of a union. Passed on to rigid body unmarshalling.
  * \return void
  */
-void unmarshal( mpi::RecvBuffer& buffer, CapsuleParameters& objparam, bool hasSuperBody )
+void unmarshal( mpi::RecvBuffer& buffer, CapsuleParameters& objparam )
 {
-   unmarshal( buffer, static_cast<GeomPrimitiveParameters&>( objparam ), hasSuperBody );
+   unmarshal( buffer, static_cast<GeomPrimitiveParameters&>( objparam ) );
    buffer >> objparam.radius_;
    buffer >> objparam.length_;
 }
diff --git a/src/pe/communication/rigidbody/Capsule.h b/src/pe/communication/rigidbody/Capsule.h
index c39cd55d9..45598783c 100644
--- a/src/pe/communication/rigidbody/Capsule.h
+++ b/src/pe/communication/rigidbody/Capsule.h
@@ -58,21 +58,20 @@ void marshal( mpi::SendBuffer& buffer, const Capsule& obj );
  * \param hasSuperBody False if body is not part of a union. Passed on to rigid body unmarshalling.
  * \return void
  */
-void unmarshal( mpi::RecvBuffer& buffer, CapsuleParameters& objparam, bool hasSuperBody );
+void unmarshal( mpi::RecvBuffer& buffer, CapsuleParameters& objparam );
 //*************************************************************************************************
 
 
-template <>
-inline BodyID instantiate<Capsule>( mpi::RecvBuffer& buffer, const math::AABB& domain, const math::AABB& block )
+inline CapsuleID instantiate( mpi::RecvBuffer& buffer, const math::AABB& domain, const math::AABB& block, CapsuleID& newBody )
 {
    CapsuleParameters subobjparam;
-   unmarshal( buffer, subobjparam, false );
+   unmarshal( buffer, subobjparam );
    correctBodyPosition(domain, block.center(), subobjparam.gpos_);
-   CapsuleID obj = new Capsule( subobjparam.sid_, subobjparam.uid_, subobjparam.gpos_, subobjparam.rpos_, subobjparam.q_, subobjparam.radius_, subobjparam.length_, subobjparam.material_, false, subobjparam.communicating_, subobjparam.infiniteMass_ );
-   obj->setLinearVel( subobjparam.v_ );
-   obj->setAngularVel( subobjparam.w_ );
-   obj->MPITrait.setOwner( subobjparam.mpiTrait_.owner_ );
-   return obj;
+   newBody = new Capsule( subobjparam.sid_, subobjparam.uid_, subobjparam.gpos_, subobjparam.rpos_, subobjparam.q_, subobjparam.radius_, subobjparam.length_, subobjparam.material_, false, subobjparam.communicating_, subobjparam.infiniteMass_ );
+   newBody->setLinearVel( subobjparam.v_ );
+   newBody->setAngularVel( subobjparam.w_ );
+   newBody->MPITrait.setOwner( subobjparam.mpiTrait_.owner_ );
+   return newBody;
 }
 
 }  // namespace communication
diff --git a/src/pe/communication/rigidbody/Sphere.cpp b/src/pe/communication/rigidbody/Sphere.cpp
index b4a53757c..bd663162e 100644
--- a/src/pe/communication/rigidbody/Sphere.cpp
+++ b/src/pe/communication/rigidbody/Sphere.cpp
@@ -49,8 +49,8 @@ void marshal( mpi::SendBuffer& buffer, const Sphere& obj ) {
  * \param hasSuperBody False if body is not part of a union. Passed on to rigid body unmarshalling.
  * \return void
  */
-void unmarshal( mpi::RecvBuffer& buffer, SphereParameters& objparam, bool hasSuperBody ) {
-   unmarshal( buffer, static_cast<GeomPrimitiveParameters&>( objparam ), hasSuperBody );
+void unmarshal( mpi::RecvBuffer& buffer, SphereParameters& objparam ) {
+   unmarshal( buffer, static_cast<GeomPrimitiveParameters&>( objparam ) );
    buffer >> objparam.radius_;
 }
 //*************************************************************************************************
diff --git a/src/pe/communication/rigidbody/Sphere.h b/src/pe/communication/rigidbody/Sphere.h
index 4a234fd8c..665a9a870 100644
--- a/src/pe/communication/rigidbody/Sphere.h
+++ b/src/pe/communication/rigidbody/Sphere.h
@@ -58,21 +58,20 @@ void marshal( mpi::SendBuffer& buffer, const Sphere& obj );
  * \param hasSuperBody False if body is not part of a union. Passed on to rigid body unmarshalling.
  * \return void
  */
-void unmarshal( mpi::RecvBuffer& buffer, SphereParameters& objparam, bool hasSuperBody );
+void unmarshal( mpi::RecvBuffer& buffer, SphereParameters& objparam );
 //*************************************************************************************************
 
 
-template <>
-inline BodyID instantiate<Sphere>( mpi::RecvBuffer& buffer, const math::AABB& domain, const math::AABB& block )
+inline SphereID instantiate( mpi::RecvBuffer& buffer, const math::AABB& domain, const math::AABB& block, SphereID& newBody )
 {
    SphereParameters subobjparam;
-   unmarshal( buffer, subobjparam, false );
+   unmarshal( buffer, subobjparam );
    correctBodyPosition(domain, block.center(), subobjparam.gpos_);
-   SphereID obj = new Sphere( subobjparam.sid_, subobjparam.uid_, subobjparam.gpos_, subobjparam.rpos_, subobjparam.q_, subobjparam.radius_, subobjparam.material_, false, subobjparam.communicating_, subobjparam.infiniteMass_ );
-   obj->setLinearVel( subobjparam.v_ );
-   obj->setAngularVel( subobjparam.w_ );
-   obj->MPITrait.setOwner( subobjparam.mpiTrait_.owner_ );
-   return obj;
+   newBody = new Sphere( subobjparam.sid_, subobjparam.uid_, subobjparam.gpos_, subobjparam.rpos_, subobjparam.q_, subobjparam.radius_, subobjparam.material_, false, subobjparam.communicating_, subobjparam.infiniteMass_ );
+   newBody->setLinearVel( subobjparam.v_ );
+   newBody->setAngularVel( subobjparam.w_ );
+   newBody->MPITrait.setOwner( subobjparam.mpiTrait_.owner_ );
+   return newBody;
 }
 
 }  // namespace communication
diff --git a/src/pe/communication/rigidbody/Union.h b/src/pe/communication/rigidbody/Union.h
new file mode 100644
index 000000000..d645d91c7
--- /dev/null
+++ b/src/pe/communication/rigidbody/Union.h
@@ -0,0 +1,146 @@
+//======================================================================================================================
+//
+//  This file is part of waLBerla. waLBerla is free software: you can
+//  redistribute it and/or modify it under the terms of the GNU General Public
+//  License as published by the Free Software Foundation, either version 3 of
+//  the License, or (at your option) any later version.
+//
+//  waLBerla is distributed in the hope that it will be useful, but WITHOUT
+//  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+//  FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+//  for more details.
+//
+//  You should have received a copy of the GNU General Public License along
+//  with waLBerla (see COPYING.txt). If not, see <http://www.gnu.org/licenses/>.
+//
+//! \file Union.h
+//! \ingroup pe
+//! \author Tobias Preclik
+//! \author Sebastian Eibl <sebastian.eibl@fau.de>
+//! \brief Marshalling of objects for data transmission or storage.
+//
+//======================================================================================================================
+
+#pragma once
+
+
+//*************************************************************************************************
+// Includes
+//*************************************************************************************************
+
+#include "pe/communication/Instantiate.h"
+#include "pe/communication/Marshalling.h"
+#include "pe/rigidbody/Union.h"
+
+namespace walberla {
+namespace pe {
+namespace communication {
+
+//forward declaration
+template < typename BodyTypeTuple >
+struct MarshalDynamically;
+
+//forward declaration
+template < typename BodyTypeTuple >
+struct UnmarshalDynamically;
+
+struct UnionParameters : public GeomPrimitiveParameters {
+   real_t         m_;
+   Mat3           I_;
+   math::AABB     aabb_;
+   size_t         size_;
+};
+
+//*************************************************************************************************
+/*!\brief Marshalling a box primitive.
+ *
+ * \param buffer The buffer to be filled.
+ * \param obj The object to be marshalled.
+ * \return void
+ */
+template < typename BodyTypeTuple >
+void marshal( mpi::SendBuffer& buffer, const Union<BodyTypeTuple>& obj )
+{
+   // Material of union is not relevant for reconstruction thus marshal RigidBody instead of GeomPrimitive.
+   marshal( buffer, static_cast<const RigidBody&>( obj ) );
+
+   buffer << obj.getMass();               // Encoding the total mass
+   buffer << obj.getBodyInertia();        // Encoding the moment of inertia
+
+   // Preparing the axis-aligned bounding box
+   buffer << obj.getAABB();               // Encoding the axis-aligned bounding box
+
+   buffer << static_cast<size_t> (obj.size());                  // Encoding the number of contained bodies
+
+   // Encoding the contained primitives
+   const typename Union<BodyTypeTuple>::ConstIterator begin( obj.begin() );
+   const typename Union<BodyTypeTuple>::ConstIterator end  ( obj.end()   );
+   for(  typename Union<BodyTypeTuple>::ConstIterator body=begin; body!=end; ++body )
+   {
+      buffer << body->getTypeID();
+      MarshalDynamically<BodyTypeTuple>::execute( buffer, **body );
+   }
+}
+
+//*************************************************************************************************
+
+//*************************************************************************************************
+/*!\brief Unmarshalling a box primitive.
+ *
+ * \param buffer The buffer from where to read.
+ * \param objparam The object to be reconstructed.
+ * \param hasSuperBody False if body is not part of a union. Passed on to rigid body unmarshalling.
+ * \return void
+ */
+inline
+void unmarshal( mpi::RecvBuffer& buffer, UnionParameters& objparam )
+{
+   // Material of union is not relevant for reconstruction thus marshal RigidBody instead of GeomPrimitive.
+   unmarshal( buffer, static_cast<RigidBodyParameters&>( objparam ) );
+
+   buffer >> objparam.m_;
+   buffer >> objparam.I_;
+   buffer >> objparam.aabb_;
+   //std::cout << "mass of union: " << objparam.m_ << std::endl;
+
+   // Decode the number of contained bodies
+   buffer >> objparam.size_;
+   //std::cout << "size of union: " << size << std::endl;
+}
+//*************************************************************************************************
+
+
+template <typename BodyTypeTuple>
+inline Union<BodyTypeTuple>* instantiate( mpi::RecvBuffer& buffer, const math::AABB& domain, const math::AABB& block, Union<BodyTypeTuple>*& newBody )
+{
+   UnionParameters subobjparam;
+   unmarshal( buffer, subobjparam );
+   correctBodyPosition(domain, block.center(), subobjparam.gpos_);
+   newBody = new Union<BodyTypeTuple>( subobjparam.sid_,
+                                       subobjparam.uid_,
+                                       subobjparam.gpos_,
+                                       subobjparam.rpos_,
+                                       subobjparam.q_,
+                                       false,
+                                       subobjparam.communicating_,
+                                       subobjparam.infiniteMass_ );
+   newBody->setLinearVel( subobjparam.v_ );
+   newBody->setAngularVel( subobjparam.w_ );
+   newBody->MPITrait.setOwner( subobjparam.mpiTrait_.owner_ );
+
+   // Decoding the contained primitives
+   for( size_t i = 0; i < subobjparam.size_; ++i )
+   {
+      decltype ( static_cast<BodyID>(nullptr)->getTypeID() ) type;
+      buffer >> type;
+      BodyID obj = UnmarshalDynamically<BodyTypeTuple>::execute(buffer, type, domain, block);
+      obj->setRemote( true );
+      newBody->add(obj);
+   }
+
+   return newBody;
+}
+
+}  // namespace communication
+}  // namespace pe
+}  // namespace walberla
diff --git a/src/pe/contact/Contact.cpp b/src/pe/contact/Contact.cpp
index 197ad84b6..4e49cd54b 100644
--- a/src/pe/contact/Contact.cpp
+++ b/src/pe/contact/Contact.cpp
@@ -64,8 +64,8 @@ Contact::Contact( GeomID g1, GeomID g2, const Vec3& gpos, const Vec3& normal, re
    WALBERLA_ASSERT( !( b1_->hasInfiniteMass() && b2_->hasInfiniteMass() ), "Invalid contact between two rigid bodies with infinite masses" );
 
    // Registering the contact with both attached rigid bodies
-   if( !b1_->hasInfiniteMass() ) b1_->registerContact( this );
-   if( !b2_->hasInfiniteMass() ) b2_->registerContact( this );
+//   if( !b1_->hasInfiniteMass() ) b1_->registerContact( this );
+//   if( !b2_->hasInfiniteMass() ) b2_->registerContact( this );
 
    // Merging the contact graph
 //   if( !b1_->hasInfiniteMass() && !b2_->hasInfiniteMass() )
diff --git a/src/pe/cr/HCSITS.impl.h b/src/pe/cr/HCSITS.impl.h
index 7a15ca02e..c629bffd7 100644
--- a/src/pe/cr/HCSITS.impl.h
+++ b/src/pe/cr/HCSITS.impl.h
@@ -1774,12 +1774,8 @@ inline void HardContactSemiImplicitTimesteppingSolvers::integratePositions( Body
    body->resetNode();
    body->clearContacts();
 
-   if( body->isAwake() ) {
-      if (body->getSystemID() == 6917529027641081896)
-      {
-         //WALBERLA_LOG_DEVEL("pos: " << body->getPosition() << "\nvel: " << body->getLinearVel() << "\nv: " << v);
-      }
-
+   if( body->isAwake() )
+   {
       // Calculating the translational displacement
       body->setPosition( body->getPosition() + v * dt );
 
@@ -1794,14 +1790,6 @@ inline void HardContactSemiImplicitTimesteppingSolvers::integratePositions( Body
       body->setLinearVel( v );
       body->setAngularVel( w );
 
-      //      if( body->getType() == unionType ) {
-      //         // Updating the contained bodies
-      //         UnionID u( static_cast<UnionID>( body ) );
-
-      //         for( BodyIterator subbody=u->begin(); subbody!=u->end(); ++subbody )
-      //            u->updateBody( *subbody, dq );
-      //      }
-
       // Setting the axis-aligned bounding box
       body->calcBoundingBox();
 
diff --git a/src/pe/fcd/SimpleFCD.h b/src/pe/fcd/SimpleFCD.h
index d74bf613e..bf1c7731f 100644
--- a/src/pe/fcd/SimpleFCD.h
+++ b/src/pe/fcd/SimpleFCD.h
@@ -32,51 +32,53 @@ namespace walberla{
 namespace pe{
 namespace fcd {
 
-template < typename BodyA, typename BodyTypeTuple, typename BaseT >
+template < typename TypeA, typename TypeListB >
 struct SingleDispatch{
-   static void execute(RigidBody* a, RigidBody* b, Contacts& contacts){
-      static_assert(boost::is_base_of<RigidBody, typename BodyTypeTuple::head_type>::value, "only downcasting allowed!");
-      if (BodyTypeTuple::head_type::getStaticTypeID() == b->getTypeID())
+   static bool execute(RigidBody* a, RigidBody* b, Contacts& contacts){
+      static_assert(boost::is_base_of<RigidBody, typename TypeListB::head_type>::value, "only downcasting allowed!");
+      if (TypeListB::head_type::getStaticTypeID() == b->getTypeID())
       {
-         typedef typename BodyTypeTuple::head_type BodyB;
+         typedef typename TypeListB::head_type TypeB;
 
-         auto bd1 = static_cast<BodyA *>(a);
-         auto bd2 = static_cast<BodyB *>(b);
+         auto bd1 = static_cast<TypeA *>(a);
+         auto bd2 = static_cast<TypeB *>(b);
 
-         collide(bd1, bd2, contacts);
+         return collide(bd1, bd2, contacts);
       } else
       {
-         SingleDispatch<BodyA, typename BodyTypeTuple::tail_type, BaseT>::execute(a, b, contacts);
+         return SingleDispatch<TypeA, typename TypeListB::tail_type>::execute(a, b, contacts);
       }
    }
 };
 
-template < typename BodyA, typename BaseT>
-struct SingleDispatch< BodyA, boost::tuples::null_type, BaseT>{
-   static void execute(RigidBody* /*a*/, RigidBody* b, Contacts& /*contacts*/){
+template < typename TypeA >
+struct SingleDispatch< TypeA, boost::tuples::null_type >{
+   static bool execute(RigidBody* /*a*/, RigidBody* b, Contacts& /*contacts*/){
       WALBERLA_ABORT("SingleDispatch: Type of body " << b->getSystemID() << " could not be determined (" << b->getTypeID() << ")");
+      return false;
    }
 };
 
-template < typename BodyTypeTuple, typename BaseT = BodyTypeTuple>
+template < typename TypeListA, typename TypeListB = TypeListA>
 struct DoubleDispatch{
-   static void execute(RigidBody* a, RigidBody* b, Contacts& contacts){
+   static bool execute(RigidBody* a, RigidBody* b, Contacts& contacts){
       // Force a defined order of collision detection across processes
       if( b->getSystemID() < a->getSystemID() )
          std::swap( a, b );
-      static_assert(boost::is_base_of<RigidBody, typename BodyTypeTuple::head_type>::value, "only downcasting allowed!");
-      if (BodyTypeTuple::head_type::getStaticTypeID() == a->getTypeID()) {
-         SingleDispatch<typename BodyTypeTuple::head_type, BaseT, BaseT>::execute(a, b, contacts);
+      static_assert(boost::is_base_of<RigidBody, typename TypeListA::head_type>::value, "only downcasting allowed!");
+      if (TypeListA::head_type::getStaticTypeID() == a->getTypeID()) {
+         return SingleDispatch<typename TypeListA::head_type, TypeListB>::execute(a, b, contacts);
       } else {
-         DoubleDispatch<typename BodyTypeTuple::tail_type, BaseT>::execute(a, b, contacts);
+         return DoubleDispatch<typename TypeListA::tail_type, TypeListB>::execute(a, b, contacts);
       }
    }
 };
 
-template < typename BaseT>
-struct DoubleDispatch< boost::tuples::null_type, BaseT>{
-   static void execute(RigidBody* /*a*/, RigidBody* /*b*/, Contacts& /*contacts*/){
+template < typename TypeListB>
+struct DoubleDispatch< boost::tuples::null_type, TypeListB>{
+   static bool execute(RigidBody* /*a*/, RigidBody* /*b*/, Contacts& /*contacts*/){
       WALBERLA_ABORT("DoubleDispatch: Type could not be determined");
+      return false;
    }
 };
 
diff --git a/src/pe/rigidbody/RigidBody.cpp b/src/pe/rigidbody/RigidBody.cpp
index 81e816638..8512ba329 100644
--- a/src/pe/rigidbody/RigidBody.cpp
+++ b/src/pe/rigidbody/RigidBody.cpp
@@ -60,7 +60,6 @@ RigidBody::RigidBody( id_t const typeID, id_t sid, id_t uid )
    , uid_    (uid)            // User-specific body ID
    , contacts_()              // Vector of the currently attached contacts
    , typeID_(typeID)          // geometry type
-   , superBodies_()           // All superordinate bodies containing this rigid body
 {
    sb_ = this;           // The superordinate rigid body
 }
diff --git a/src/pe/rigidbody/RigidBody.h b/src/pe/rigidbody/RigidBody.h
index c8b00db12..d6b3a9a44 100644
--- a/src/pe/rigidbody/RigidBody.h
+++ b/src/pe/rigidbody/RigidBody.h
@@ -44,12 +44,18 @@
 namespace walberla{
 namespace pe{
 
+template <typename BodyTypeTuple>
+class Union;
+
 class RigidBody : public Node
                 , public ccd::HashGridsBodyTrait
                 , public cr::HCSITSBodyTrait
                 , private NonCopyable
 {
 private:
+   template <typename BodyTypeTuple>
+   friend class Union;
+
    //**Type definitions****************************************************************************
    typedef PtrVector<RigidBody,NoDelete>   Bodies;       //!< Vector for superordinate bodies containing this rigid body.
    typedef PtrVector<Contact,NoDelete>     Contacts;     //!< Vector for attached contacts.
@@ -153,6 +159,9 @@ public:
    //**Set functions*******************************************************************************
    /*!\name Set functions */
    //@{
+   inline  void setSB  ( BodyID body );
+   inline  void resetSB() ;
+
    inline  void setFinite     ( const bool finite );
    inline  void setMass       ( bool infinite );
    inline  void setVisible    ( bool visible );
@@ -460,12 +469,6 @@ protected:
    //**********************************************************************************************
 private:
    id_t typeID_; //< identify geometry type
-   //**Member variables****************************************************************************
-   /*!\name Member variables */
-   //@{
-   Bodies superBodies_;   //!< All superordinate bodies containing this rigid body.
-   //@}
-   //**********************************************************************************************
 };
 
 //=================================================================================================
@@ -1264,11 +1267,8 @@ inline id_t     RigidBody::getTypeID() const{
  */
 inline void RigidBody::setRelLinearVel( real_t vx, real_t vy, real_t vz )
 {
-#if MOBILE_INFINITE
-   if( !hasSuperBody() ) {
-#else
-   if( !hasSuperBody() ) {
-#endif
+   if( !hasSuperBody() )
+   {
       v_ = R_ * Vec3( vx, vy, vz );
       wake();
    }
@@ -1302,11 +1302,8 @@ inline void RigidBody::setRelLinearVel( const Vec3& lvel )
  */
 inline void RigidBody::setLinearVel( real_t vx, real_t vy, real_t vz )
 {
-#if MOBILE_INFINITE
-   if( !hasSuperBody() ) {
-#else
-   if( !hasSuperBody() ) {
-#endif
+   if( !hasSuperBody() )
+   {
       v_ = Vec3( vx, vy, vz );
       wake();
    }
@@ -1343,11 +1340,8 @@ inline void RigidBody::setLinearVel( const Vec3& lvel )
  */
 inline void RigidBody::setRelAngularVel( real_t ax, real_t ay, real_t az )
 {
-#if MOBILE_INFINITE
-   if( !hasSuperBody() ) {
-#else
-   if( !hasSuperBody() ) {
-#endif
+   if( !hasSuperBody() )
+   {
       w_ = R_ * Vec3( ax, ay, az );
       wake();
    }
@@ -1382,11 +1376,8 @@ inline void RigidBody::setRelAngularVel( const Vec3& avel )
  */
 inline void RigidBody::setAngularVel( real_t ax, real_t ay, real_t az )
 {
-#if MOBILE_INFINITE
-   if( !hasSuperBody() ) {
-#else
-   if( !hasSuperBody() ) {
-#endif
+   if( !hasSuperBody() )
+   {
       w_ = Vec3( ax, ay, az );
       wake();
    }
@@ -1467,6 +1458,26 @@ inline const Vec3& RigidBody::getTorque() const
 }
 //*************************************************************************************************
 
+//*************************************************************************************************
+/*!\brief Sets the super body for the current body.
+ *
+ *  \attention Behaviour changed compared to setSuperBody of old PE!!!
+ */
+inline  void RigidBody::setSB  ( BodyID body )
+{
+   sb_ = body;
+}
+
+//*************************************************************************************************
+/*!\brief Resets the super body for the current body.
+ *
+ *  \attention Behaviour changed compared to setSuperBody of old PE!!!
+ */
+inline  void RigidBody::resetSB()
+{
+   sb_ = this;
+}
+
 
 //*************************************************************************************************
 /*!\brief Set the force acting at the body's center of mass.
@@ -1479,11 +1490,7 @@ inline void RigidBody::setForce( const Vec3& f )
    // Increasing the force on this rigid body
    force_ = f;
 
-   if( superBodies_.isEmpty() ) {
-      // Waking this rigid body
-      WALBERLA_ASSERT( !hasSuperBody(), "Invalid superbody detected" );
-      wake();
-   }
+   wake();
 }
 //*************************************************************************************************
 
@@ -1499,11 +1506,7 @@ inline void RigidBody::setTorque( const Vec3& tau )
    // Increasing the force on this rigid body
    torque_ = tau;
 
-   if( superBodies_.isEmpty() ) {
-      // Waking this rigid body
-      WALBERLA_ASSERT( !hasSuperBody(), "Invalid superbody detected" );
-      wake();
-   }
+   wake();
 }
 //*************************************************************************************************
 
@@ -1583,20 +1586,14 @@ inline void RigidBody::addForce( real_t fx, real_t fy, real_t fz )
  */
 inline void RigidBody::addForce( const Vec3& f )
 {
-   // Increasing the force on this rigid body
-   force_ += f;
-
-   // Increasing the force and torque on the superordinate bodies
-   if( !superBodies_.isEmpty() ) {
-      for( Bodies::Iterator it=superBodies_.begin(); it!=superBodies_.end(); ++it ) {
-         it->addForceAtPos( f, gpos_ );
-      }
-   }
-
-   // Waking this rigid body
-   else {
-      WALBERLA_ASSERT( !hasSuperBody(), "Invalid superbody detected" );
+   if (sb_ == this)
+   {
+      // Increasing the force on this rigid body
+      force_ += f;
       wake();
+   } else
+   {
+      sb_->addForceAtPos( f, gpos_ );
    }
 }
 //*************************************************************************************************
@@ -1767,20 +1764,15 @@ inline void RigidBody::addForceAtPos( real_t fx, real_t fy, real_t fz, real_t px
  */
 inline void RigidBody::addForceAtPos( const Vec3& f, const Vec3& gpos )
 {
-   // Increasing the force and torque on this rigid body
-   force_  += f;
-   torque_ += ( gpos - gpos_ ) % f;
-
-   // Increasing the force and torque on the superordinate bodies
-   if( !superBodies_.isEmpty() ) {
-      for( Bodies::Iterator it=superBodies_.begin(); it!=superBodies_.end(); ++it ) {
-         it->addForceAtPos( f, gpos );
-      }
-   }
-
-   // Waking this rigid body
-   else {
+   if (sb_ == this)
+   {
+      // Increasing the force and torque on this rigid body
+      force_  += f;
+      torque_ += ( gpos - gpos_ ) % f;
       wake();
+   } else
+   {
+      sb_->addForceAtPos( f, gpos );
    }
 }
 //*************************************************************************************************
@@ -3546,7 +3538,7 @@ inline void RigidBody::deregisterAttachable( AttachableID attachable )
  */
 inline void RigidBody::handleModification()
 {
-   WALBERLA_ASSERT( false, "Invalid call of default handle function" );
+   WALBERLA_ABORT( "Invalid call of default handle function" );
 }
 //*************************************************************************************************
 
@@ -3563,7 +3555,7 @@ inline void RigidBody::handleModification()
  */
 inline void RigidBody::handleTranslation()
 {
-   WALBERLA_ASSERT( false, "Invalid call of default handle function" );
+   WALBERLA_ABORT( "Invalid call of default handle function" );
 }
 //*************************************************************************************************
 
@@ -3580,7 +3572,7 @@ inline void RigidBody::handleTranslation()
  */
 inline void RigidBody::handleRotation()
 {
-   WALBERLA_ASSERT( false, "Invalid call of default handle function" );
+   WALBERLA_ABORT( "Invalid call of default handle function" );
 }
 //*************************************************************************************************
 
@@ -3597,7 +3589,7 @@ inline void RigidBody::handleRotation()
  */
 inline void RigidBody::handleFixation()
 {
-   WALBERLA_ASSERT( false, "Invalid call of default handle function" );
+   WALBERLA_ABORT( "Invalid call of default handle function" );
 }
 //*************************************************************************************************
 
diff --git a/src/pe/rigidbody/Union.h b/src/pe/rigidbody/Union.h
new file mode 100644
index 000000000..0b979bb02
--- /dev/null
+++ b/src/pe/rigidbody/Union.h
@@ -0,0 +1,1218 @@
+//======================================================================================================================
+//
+//  This file is part of waLBerla. waLBerla is free software: you can
+//  redistribute it and/or modify it under the terms of the GNU General Public
+//  License as published by the Free Software Foundation, either version 3 of
+//  the License, or (at your option) any later version.
+//
+//  waLBerla is distributed in the hope that it will be useful, but WITHOUT
+//  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+//  FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+//  for more details.
+//
+//  You should have received a copy of the GNU General Public License along
+//  with waLBerla (see COPYING.txt). If not, see <http://www.gnu.org/licenses/>.
+//
+//! \file Union.h
+//! \ingroup RigidBody
+//! \author Klaus Iglberger
+//! \author Sebastian Eibl <sebastian.eibl@fau.de>
+//
+//======================================================================================================================
+
+#pragma once
+
+
+//*************************************************************************************************
+// Includes
+//*************************************************************************************************
+
+#include <pe/Config.h>
+#include <pe/rigidbody/RigidBody.h>
+#include <pe/Types.h>
+
+#include <core/debug/Debug.h>
+#include <core/math/Matrix3.h>
+#include <core/math/Vector3.h>
+#include <core/DataTypes.h>
+
+#include <cmath>
+#include <iomanip>
+#include <iostream>
+#include <stdexcept>
+
+#include <boost/tuple/tuple.hpp>
+
+namespace walberla {
+namespace pe {
+
+//=================================================================================================
+//
+//  CLASS DEFINITION
+//
+//=================================================================================================
+
+//*************************************************************************************************
+/*!\brief Base class for the sphere geometry.
+ * \ingroup sphere
+ *
+ * The Sphere class represents the base class for the sphere geometry. It provides
+ * the basic functionality of a sphere. For a full description of the sphere geometry,
+ * see the Sphere class description.
+ */
+template <typename BodyTypeTuple>
+class Union : public RigidBody
+{
+public:
+   //**Type definitions****************************************************************************
+   typedef BodyTypeTuple                     BodyTypeTupleT;
+
+   typedef PtrVector<RigidBody, PtrDelete>   Bodies;             //!< Iterator over the contained rigid bodies.
+   typedef Bodies::Iterator                  Iterator;           //!< Iterator over the contained rigid bodies.
+   typedef Bodies::ConstIterator             ConstIterator;      //!< Constant iterator over the contained rigid bodies.
+   //**++++++++++++++++****************************************************************************
+
+   //**Forward declarations for nested classes*****************************************************
+//   template< typename C > class CastIterator;
+//   template< typename C > class ConstCastIterator;
+   //**********************************************************************************************
+
+   //**Constructors********************************************************************************
+   /*!\name Constructors */
+   //@{
+   explicit Union( id_t sid, id_t uid, const Vec3& gpos, const Vec3& rpos, const Quat& q,
+                    const bool global, const bool communicating, const bool infiniteMass );
+   //@}
+   //**********************************************************************************************
+
+   //**Destructor**********************************************************************************
+   /*!\name Destructor */
+   //@{
+   virtual ~Union();
+   //@}
+   //**********************************************************************************************
+   //**********************************************************************************************
+
+public:
+   //**Get functions*******************************************************************************
+   /*!\name Get functions */
+   //@{
+                          inline bool                 isEmpty() const;
+                          inline size_t               size()    const;
+   template< typename C > inline size_t               size()    const {return bodies_.size<C>();}
+                          inline BodyID               getBody( size_t index );
+                          inline ConstBodyID          getBody( size_t index ) const;
+
+                          inline Iterator                     begin()       {return bodies_.begin();}
+                          inline ConstIterator                begin() const {return bodies_.begin();}
+   template< typename C > inline Bodies::CastIterator<C>      begin()       {return bodies_.begin<C>();}
+   template< typename C > inline Bodies::ConstCastIterator<C> begin() const {return bodies_.begin<C>();}
+
+                          inline Iterator                     end()         {return bodies_.end();}
+                          inline ConstIterator                end() const   {return bodies_.end();}
+   template< typename C > inline Bodies::CastIterator<C>      end()         {return bodies_.end<C>();}
+   template< typename C > inline Bodies::ConstCastIterator<C> end() const   {return bodies_.end<C>();}
+
+   virtual inline real_t getVolume()         const;
+   //@}
+   //**********************************************************************************************
+
+   //**Set functions*******************************************************************************
+   /*!\name Set functions */
+   //@{
+   virtual void setRemote( bool remote ) WALBERLA_OVERRIDE;
+   //@}
+   //**********************************************************************************************
+
+   //**Simulation functions************************************************************************
+   /*!\name Simulation functions */
+   //@{
+   virtual void update( const Vec3& dp ) WALBERLA_OVERRIDE;  // Translation update of a subordinate rigid body
+   virtual void update( const Quat& dq ) WALBERLA_OVERRIDE;  // Rotation update of a subordinate rigid body
+   //@}
+   //**********************************************************************************************
+
+   //**Signal functions***************************************************************************
+   /*!\name Signal functions */
+   //@{
+   virtual void handleModification() WALBERLA_OVERRIDE;
+   virtual void handleTranslation() WALBERLA_OVERRIDE;
+   virtual void handleRotation() WALBERLA_OVERRIDE;
+   //@}
+   //**********************************************************************************************
+
+   //**Rigid body manager functions****************************************************************
+   /*!\name Rigid body manager functions */
+   //@{
+   void add   ( BodyID body );
+   //@}
+   //**********************************************************************************************
+
+   //**Utility functions***************************************************************************
+   /*!\name Utility functions */
+   //@{
+   static inline id_t getStaticTypeID();
+   //@}
+   //**********************************************************************************************
+
+   //**Output functions****************************************************************************
+   /*!\name Output functions */
+   //@{
+   virtual void print( std::ostream& os, const char* tab ) const;
+   //@}
+   //**********************************************************************************************
+
+protected:
+   //**Utility functions***************************************************************************
+   /*!\name Utility functions */
+   //@{
+   virtual void setPositionImpl       ( real_t px, real_t py, real_t pz )         WALBERLA_OVERRIDE;
+   virtual void setOrientationImpl    ( real_t r, real_t i, real_t j, real_t k )  WALBERLA_OVERRIDE;
+   virtual void translateImpl         ( real_t dx, real_t dy, real_t dz )         WALBERLA_OVERRIDE;
+   virtual void rotateImpl            ( const Quat& dq )                          WALBERLA_OVERRIDE;
+   virtual void rotateAroundOriginImpl( const Quat& dq )                          WALBERLA_OVERRIDE;
+   virtual void rotateAroundPointImpl ( const Vec3& point, const Quat& dq )       WALBERLA_OVERRIDE;
+   virtual bool containsRelPointImpl   ( real_t px, real_t py, real_t pz ) const  WALBERLA_OVERRIDE;
+   virtual bool isSurfaceRelPointImpl  ( real_t px, real_t py, real_t pz ) const  WALBERLA_OVERRIDE;
+   //@}
+   //**********************************************************************************************
+
+   //**Utility functions***************************************************************************
+   /*!\name Utility functions */
+   //@{
+   inline virtual void calcBoundingBox() WALBERLA_OVERRIDE;  // Calculation of the axis-aligned bounding box
+                  void calcCenterOfMass(); ///< updates the center of mass (gpos)
+   inline         void calcInertia();      // Calculation of the moment of inertia
+   //@}
+   //**********************************************************************************************
+
+   //**Member variables****************************************************************************
+   /*!\name Member variables */
+   //@{
+   PtrVector<RigidBody, PtrDelete> bodies_;  //!< Rigid bodies contained in the union.
+   //@}
+   //**********************************************************************************************
+private:
+   std::vector<id_t> containedTypeIDs_;
+
+   static id_t staticTypeID_;  //< type id of sphere, will be set by SetBodyTypeIDs
+   static void setStaticTypeID(id_t typeID) {staticTypeID_ = typeID;}
+
+   //** friend declaration
+   /// needed to be able to set static type ids with setStaticTypeID
+   template <class T>
+   friend struct SetBodyTypeIDs;
+};
+//*************************************************************************************************
+
+//=================================================================================================
+//
+//  CONSTRUCTOR
+//
+//=================================================================================================
+
+
+//*************************************************************************************************
+//*************************************************************************************************
+/*!\brief Constructor for the Sphere class.
+ *
+ * \param sid Unique system-specific ID for the sphere.
+ * \param uid User-specific ID for the sphere.
+ * \param gpos Global geometric center of the sphere.
+ * \param q The orientation of the sphere's body frame in the global world frame.
+ * \param global specifies if the sphere should be created in the global storage
+ * \param communicating specifies if the sphere should take part in synchronization (syncNextNeighbour, syncShadowOwner)
+ * \param infiniteMass specifies if the sphere has infinite mass and will be treated as an obstacle
+ */
+template <typename BodyTypeTuple>
+Union<BodyTypeTuple>::Union( id_t sid, id_t uid, const Vec3& gpos, const Vec3& rpos, const Quat& q,
+                const bool global, const bool communicating, const bool /*infiniteMass*/ )
+   : RigidBody( getStaticTypeID(), sid, uid )  // Initialization of the parent class
+{
+   // Initializing the instantiated union
+   gpos_   = gpos;                   // Setting the global center of mass
+   rpos_   = rpos;                   // Setting the relative position
+   q_      = q;                      // Setting the orientation
+   R_      = q_.toRotationMatrix();  // Setting the rotation matrix
+
+   calcCenterOfMass();
+   calcInertia();
+
+   setGlobal( global );
+//   setMass( infiniteMass );
+   setCommunicating( communicating );
+   setFinite( true );
+}
+//*************************************************************************************************
+
+
+
+
+//=================================================================================================
+//
+//  DESTRUCTOR
+//
+//=================================================================================================
+
+//*************************************************************************************************
+/*!\brief Destructor for the Sphere class.
+ */
+template <typename BodyTypeTuple>
+Union<BodyTypeTuple>::~Union()
+{
+   // Clearing the bodies
+   bodies_.clear();
+
+   // Logging the destruction of the sphere
+   WALBERLA_LOG_DETAIL( "Destroyed union " << sid_ );
+}
+//*************************************************************************************************
+
+//*************************************************************************************************
+/*!\brief Returns the volume of the union.
+ *
+ * \return The volume of the union.
+ */
+template <typename BodyTypeTuple>
+inline real_t Union<BodyTypeTuple>::getVolume() const
+{
+   real_t volume(0);
+   for( auto bodyIt=bodies_.begin(); bodyIt!=bodies_.end(); ++bodyIt )
+      volume += bodyIt->getVolume();
+   return volume;
+}
+//*************************************************************************************************
+
+//*************************************************************************************************
+/*!\brief Setting the remote flag of the union and all contained bodies.
+ *
+ * \param remote \a true to declare the union remote, \a false declare it local.
+ * \return void
+ *
+ * This function sets the remote flag of the union and all contained rigid bodies. Note that
+ * this function should not be used explicitly, but is automatically called during the MPI
+ * communication to set the remote status of a union within the simulation world. Using
+ * this function explicitly may lead to simulation errors during a parallel simulation!
+ */
+template <typename BodyTypeTuple>
+void Union<BodyTypeTuple>::setRemote( bool remote )
+{
+   remote_ = remote;
+
+   // Setting the remote flag of the contained rigid bodies
+   for( auto bIt=bodies_.begin(); bIt!=bodies_.end(); ++bIt )
+      bIt->setRemote( remote );
+}
+//*************************************************************************************************
+
+
+//*************************************************************************************************
+/*!\brief Calculation of the bounding box of the union.
+ *
+ * \return void
+ *
+ * This function updates the axis-aligned bounding box of the union according to the current
+ * position and orientation of the contained rigid bodies.
+ */
+template <typename BodyTypeTuple>
+void Union<BodyTypeTuple>::calcBoundingBox()
+{
+   // Setting the bounding box of an empty union
+   if( bodies_.isEmpty() ) {
+      aabb_ = math::AABB(
+            gpos_[0] - real_t(0.01),
+            gpos_[1] - real_t(0.01),
+            gpos_[2] - real_t(0.01),
+            gpos_[0] + real_t(0.01),
+            gpos_[1] + real_t(0.01),
+            gpos_[2] + real_t(0.01)
+            );
+   }
+
+   // Using the bounding box of the first contained bodies as initial bounding box
+   // and merging it with the bounding boxes of all other bodies
+   else {
+      aabb_ = bodies_[0]->getAABB();
+      for( auto b=bodies_.begin()+1; b!=bodies_.end(); ++b )
+         aabb_.merge( b->getAABB() );
+   }
+
+   WALBERLA_ASSERT( aabb_.checkInvariant() , "Invalid bounding box detected" );
+   WALBERLA_ASSERT( aabb_.contains( gpos_ ), "Invalid bounding box detected" );
+}
+//*************************************************************************************************
+
+
+//*************************************************************************************************
+/*!\brief Calculation of the total mass and center of mass.
+ *
+ * \return void
+ */
+template <typename BodyTypeTuple>
+void Union<BodyTypeTuple>::calcCenterOfMass()
+{
+   // Checking the state of the union
+   WALBERLA_ASSERT( checkInvariants(), "Invalid union state detected" );
+
+   // Initializing the total mass and the inverse mass
+   mass_    = real_t(0);
+   invMass_ = real_t(0);
+
+   // Don't calculate the center of mass of an empty union
+   if( bodies_.isEmpty() ) return;
+
+   // Calculating the center of mass of a single body
+   if( bodies_.size() == 1 )
+   {
+      const BodyID body( bodies_[0] );
+      gpos_ = body->getPosition();
+      mass_ = body->getMass();
+      if( !isFixed() && mass_ > real_t(0) )
+         invMass_ = real_t(1) / mass_;
+   }
+
+   // Calculating the center of mass of a union containing several bodies
+   else
+   {
+      // Resetting the center of mass
+      gpos_.reset();
+
+      // Calculating the center of mass of a finite union
+      if( finite_ )
+      {
+         // Accumulating the mass of all contained rigid bodies
+         for( auto b=bodies_.begin(); b!=bodies_.end(); ++b ) {
+            WALBERLA_ASSERT( b->isFinite(), "Invalid infinite body in finite union detected" );
+            mass_ += b->getMass();
+            gpos_ += b->getPosition() * b->getMass();
+         }
+
+         // Calculating the center of mass for unions with non-zero mass
+         if( mass_ > real_t(0) ) {
+            if( !isFixed() ) invMass_ = real_t(1) / mass_;
+            gpos_ /= mass_;
+         }
+
+         // Calculating the center of mass for unions with a mass of zero
+         else {
+            size_t counter( 0 );
+            gpos_.reset();
+
+            for( auto b=bodies_.begin(); b!=bodies_.end(); ++b ) {
+               gpos_ += b->getPosition();
+               ++counter;
+            }
+
+            gpos_ /= counter;
+         }
+      }
+
+      // Calculating the center of mass of an infinite union
+      else {
+         size_t counter( 0 );
+
+         for( auto b=bodies_.begin(); b!=bodies_.end(); ++b ) {
+            if( b->isFinite() ) continue;
+            gpos_ += b->getPosition();
+            ++counter;
+         }
+
+         gpos_ /= counter;
+      }
+   }
+
+   // Checking the state of the union
+   WALBERLA_ASSERT( checkInvariants(), "Invalid union state detected" );
+}
+//*************************************************************************************************
+
+
+//*************************************************************************************************
+/*!\brief Calculation of the moment of inertia in reference to the body frame of the union.
+ *
+ * \return void
+ */
+template <typename BodyTypeTuple>
+void Union<BodyTypeTuple>::calcInertia()
+{
+   // Checking the state of the union
+   WALBERLA_ASSERT( checkInvariants(), "Invalid union state detected" );
+
+   // Initializing the body moment of inertia and the inverse moment of inertia
+   I_    = real_t(0);
+   Iinv_ = real_t(0);
+
+   // Don't calculate the moment of inertia of an infinite or empty union
+   if( !isFinite() || bodies_.isEmpty() || floatIsEqual(mass_, real_t(0)) ) return;
+
+   // Calculating the global moment of inertia
+   real_t mass;
+   Vec3   pos;
+
+   for( auto b=bodies_.begin(); b!=bodies_.end(); ++b )
+   {
+      mass = b->getMass();
+      pos  = b->getPosition() - gpos_;
+
+      I_ += b->getInertia();
+      I_[0] += mass * ( pos[1]*pos[1] + pos[2]*pos[2] );
+      I_[1] -= mass * pos[0] * pos[1];
+      I_[2] -= mass * pos[0] * pos[2];
+      I_[3] -= mass * pos[0] * pos[1];
+      I_[4] += mass * ( pos[0]*pos[0] + pos[2]*pos[2] );
+      I_[5] -= mass * pos[1] * pos[2];
+      I_[6] -= mass * pos[0] * pos[2];
+      I_[7] -= mass * pos[1] * pos[2];
+      I_[8] += mass * ( pos[0]*pos[0] + pos[1]*pos[1] );
+   }
+
+   // Rotating the moment of inertia from the global frame of reference to the body frame of reference
+   I_ = R_ * I_ * R_;
+
+   // Calculating the inverse of the body moment of inertia
+   if( !isFixed() ) Iinv_ = I_.getInverse();
+
+   // Checking the state of the union
+   WALBERLA_ASSERT( checkInvariants(), "Invalid union state detected" );
+}
+//*************************************************************************************************
+
+
+
+
+//=================================================================================================
+//
+//  SET FUNCTIONS
+//
+//=================================================================================================
+
+
+//*************************************************************************************************
+/*!\brief Setting the global position of the union.
+ *
+ * \param gpos The global position.
+ * \return void
+ * \exception std::logic_error Invalid translation of a global union inside an exclusive section.
+ *
+ * Setting the global position of the union's center of mass. If the union contains an infinite
+ * rigid body, the function shifts the union to reposition its anchor point according to the
+ * given global coordinate.
+ */
+template <typename BodyTypeTuple>
+void Union<BodyTypeTuple>::setPositionImpl( real_t px, real_t py, real_t pz )
+{
+   Vec3 gpos = Vec3( px, py, pz );
+
+   // Calculating the position change
+   const Vec3 dp( gpos - gpos_ );
+
+   // Setting the global position
+   gpos_ = gpos;
+
+   // Updating the contained bodies
+   for( auto b=bodies_.begin(); b!=bodies_.end(); ++b )
+      (*b)->update( dp );
+
+   Union<BodyTypeTuple>::calcBoundingBox();    // Setting the axis-aligned bounding box
+   wake();               // Waking the union from sleep mode
+   signalTranslation();  // Signaling the position change to the superordinate body
+}
+//*************************************************************************************************
+
+
+
+//*************************************************************************************************
+/*!\brief Setting the global orientation of the union.
+ *
+ * \param r The value for the real part.
+ * \param i The value for the first imaginary part.
+ * \param j The value for the second imaginary part.
+ * \param k The value for the third imaginary part.
+ * \return void
+ * \exception std::logic_error Invalid rotation of a global union inside an exclusive section.
+ *
+ * Setting the orientation/rotation of the entire union. This function causes all contained
+ * primitives to rotate around the center of mass of the union (if the union is finite) or around
+ * the anchor point of the union (if the union is infinite). The orientation of the rigid bodies
+ * within the union in reference to the body frame of the union is not changed.
+ */
+template <typename BodyTypeTuple>
+void Union<BodyTypeTuple>::setOrientationImpl( real_t r, real_t i, real_t j, real_t k )
+{
+   const Quat q ( r, i, j, k );
+   const Quat dq( q * q_.getInverse() );
+
+   q_ = q;
+   R_ = q_.toRotationMatrix();
+
+   // Updating the contained bodies
+   for( auto b=bodies_.begin(); b!=bodies_.end(); ++b )
+      (*b)->update( dq );
+
+   Union<BodyTypeTuple>::calcBoundingBox();  // Setting the axis-aligned bounding box
+   wake();             // Waking the union from sleep mode
+   signalRotation();   // Signaling the change of orientation to the superordinate body
+}
+//*************************************************************************************************
+
+
+
+//=================================================================================================
+//
+//  SIMULATION FUNCTIONS
+//
+//=================================================================================================
+
+//*************************************************************************************************
+/*!\brief Translation update of a subordinate union.
+ *
+ * \param dp Change in the global position of the superordinate union.
+ * \return void
+ *
+ * This update function is triggered by the superordinate body in case of a translational
+ * movement. This movement involves a change in the global position and the axis-aligned
+ * bounding box.
+ */
+template <typename BodyTypeTuple>
+void Union<BodyTypeTuple>::update( const Vec3& dp )
+{
+   // Checking the state of the union
+   WALBERLA_ASSERT( checkInvariants(), "Invalid union state detected" );
+   WALBERLA_ASSERT( hasSuperBody(), "Invalid superordinate body detected" );
+
+   // Updating the global position
+   gpos_ += dp;
+
+   // Updating the contained bodies
+   for( auto b=bodies_.begin(); b!=bodies_.end(); ++b )
+      (*b)->update( dp );
+
+   // Setting the axis-aligned bounding box
+   Union<BodyTypeTuple>::calcBoundingBox();
+
+   // Checking the state of the union
+   WALBERLA_ASSERT( checkInvariants(), "Invalid union state detected" );
+}
+//*************************************************************************************************
+
+
+//*************************************************************************************************
+/*!\brief Rotation update of a subordinate union.
+ *
+ * \param dq Change in the orientation of the superordinate union.
+ * \return void
+ *
+ * This update function is triggered by the superordinate body in case of a rotational movement.
+ * This movement involves a change in the global position, the orientation/rotation and the
+ * axis-aligned bounding box of the box.
+ */
+template <typename BodyTypeTuple>
+void Union<BodyTypeTuple>::update( const Quat& dq )
+{
+   // Checking the state of the union
+   WALBERLA_ASSERT( checkInvariants(), "Invalid union state detected" );
+   WALBERLA_ASSERT( hasSuperBody(), "Invalid superordinate body detected" );
+
+   // Calculating the new global position
+   gpos_ = sb_->getPosition() + ( sb_->getRotation() * rpos_ );
+
+   // Calculating the new orientation and rotation
+   q_ = dq * q_;
+   R_ = q_.toRotationMatrix();
+
+   // Updating the contained bodies
+   for( auto b=bodies_.begin(); b!=bodies_.end(); ++b )
+      (*b)->update( dq );
+
+   // Setting the axis-aligned bounding box
+   Union<BodyTypeTuple>::calcBoundingBox();
+
+   // Checking the state of the union
+   WALBERLA_ASSERT( checkInvariants(), "Invalid union state detected" );
+}
+//*************************************************************************************************
+
+
+
+
+//=================================================================================================
+//
+//  RIGID BODY MANAGER FUNCTIONS
+//
+//=================================================================================================
+
+//*************************************************************************************************
+/*!\brief Adding a rigid body to the union.
+ *
+ * \param body The rigid body to be added to the union.
+ * \return void
+ * \exception std::logic_error Invalid adding into a global union inside an exclusive section.
+ * \exception std::logic_error Global flags of body and union do not match.
+ *
+ * This function adds another rigid body to the union (as for instance a sphere, a box, a
+ * capsule, or another union). It updates all union properties that change due to the new
+ * rigid body: the center of mass, the relative position of all contained rigid bodies, the
+ * attached sections of all contained links, the moment of inertia, and the axis-aligned
+ * bounding box.\n
+ * The union takes full responsibility for the newly added body, including the necessary
+ * memory management. After adding the body to the union, the body is considered part of the
+ * union. All functions called on the union (as for instance all kinds of set functions,
+ * translation or rotation functions) additionally affect the rigid body. However, the body
+ * can still individually be positioned, oriented, translated, rotated, or made (in-)visible
+ * within the union.\n\n
+ *
+ *
+ * \section union_add_infinite Adding infinite rigid bodies
+ *
+ * Adding an infinite rigid body (as for instance a plane) to the union makes the union an
+ * infinite rigid body. This additionally resets the linear and angular velocity of the union
+ * and fixes its global position. Note that removing the last infinite body from an union will
+ * not restore previous settings such as the velocities and will not unfix the union!\n\n
+ *
+ *
+ * \section union_add_global Global bodies/unions
+ *
+ * Adding a global rigid body (i.e. a body defined inside the pe_GLOBAL_SECTION) to the union
+ * requires the union to be also global. Adding a non-global rigid body to a union requires
+ * the union to be also non-global. The attempt to add a global rigid body to a non-global
+ * union or a non-global body to a global union results in a \a std::logic_error exception.
+ *
+ *
+ * \section union_add_rules Additional rules
+ *
+ * The following rules apply for the mobility and visibility of the resulting union:
+ *  - If either the union or the added rigid body is fixed, the new compound will also be fixed.
+ *    For instance, adding a fixed rigid body to an union will fix the union, and adding a rigid
+ *    body to a fixed union will fix the body.
+ *  - Neither the (in-)visibility of the added rigid body nor (in-)visibility of the union will
+ *    change due to the add operation. For instance adding a visible rigid body to an invisible
+ *    union will not change the visibility of the body. Neither is the visibility of the union
+ *    changed. In order to change the visiblity, the setVisible() function can be either called
+ *    individually for the rigid body (to exclusively make the body (in-)visible) or the entire
+ *    union (to make the entire union (in-)visible.
+ */
+template <typename BodyTypeTuple>
+void Union<BodyTypeTuple>::add( BodyID body )
+{
+   // Checking for "self-assignment"
+   if( body == BodyID( this ) ) return;
+
+   // Checking the global flags of the body and the union in MPI parallel simulations
+   if( body->isGlobal() ^ global_ )
+      throw std::logic_error( "Global flags of body and union do not match" );
+
+   // Registering the rigid body
+   body->setSB(this);
+   bodies_.pushBack( body );
+
+   // Updating the axis-aligned bounding box
+   if( bodies_.size() == 1 )
+      aabb_ = body->getAABB();
+   else
+      aabb_.merge( body->getAABB() );
+
+   // Setting the union's total mass and center of mass
+   calcCenterOfMass();
+
+   // Setting the contained primitives' relative position in reference to the center of mass
+   for( auto b=bodies_.begin(); b!=bodies_.end(); ++b )
+      ( *b )->calcRelPosition();
+
+   // Setting the moment of inertia
+   calcInertia();
+
+   // Signaling the internal modification to the superordinate body
+   signalModification();
+}
+//*************************************************************************************************
+
+
+
+
+//=================================================================================================
+//
+//  TRANSLATION FUNCTIONS
+//
+//=================================================================================================
+
+//*************************************************************************************************
+/*!\brief Translation of the center of mass of the union by the displacement vector \a dp.
+ *
+ * \param dp The displacement vector.
+ * \return void
+ * \exception std::logic_error Invalid translation of a global union inside an exclusive section.
+ *
+ * Changing the global position of the entire union's center of mass. All contained rigid bodies
+ * are moved by the same displacement.
+ */
+template <typename BodyTypeTuple>
+void Union<BodyTypeTuple>::translateImpl( real_t dx, real_t dy, real_t dz )
+{
+   Vec3 dp(dx, dy, dz);
+
+   // Changing the global position/reference point
+   gpos_ += dp;
+
+   // Updating the contained bodies
+   for( auto b=bodies_.begin(); b!=bodies_.end(); ++b )
+      (*b)->update( dp );
+
+   Union<BodyTypeTuple>::calcBoundingBox();    // Setting the axis-aligned bounding box
+   wake();               // Waking the union from sleep mode
+   signalTranslation();  // Signaling the position change to the superordinate body
+}
+//*************************************************************************************************
+
+
+
+
+//=================================================================================================
+//
+//  ROTATION FUNCTIONS
+//
+//=================================================================================================
+
+//*************************************************************************************************
+/*!\brief Rotation of the union by the quaternion \a dq.
+ *
+ * \param dq The quaternion for the rotation.
+ * \return void
+ * \exception std::logic_error Invalid rotation of a global union inside an exclusive section.
+ *
+ * Changing the orientation/rotation of the entire union. This function causes all contained
+ * rigid bodies to rotate around the center of mass of the union (if the union is finite) or
+ * around the anchor point of the union (if the union is infinite). The orientation of the
+ * bodies within the union in reference to the body frame of the union is not changed.
+ */
+template <typename BodyTypeTuple>
+void Union<BodyTypeTuple>::rotateImpl( const Quat& dq )
+{
+   if (isFixed())
+      WALBERLA_ABORT("Trying to rotate a fixed body: " << *this);
+
+   q_ = dq * q_;                // Updating the orientation of the union
+   R_ = q_.toRotationMatrix();  // Updating the rotation of the union
+
+   // Updating the contained bodies
+   for( auto b=bodies_.begin(); b!=bodies_.end(); ++b )
+      (*b)->update( dq );
+
+   Union<BodyTypeTuple>::calcBoundingBox();  // Setting the axis-aligned bounding box
+   wake();             // Waking the union from sleep mode
+   signalRotation();   // Signaling the change of orientation to the superordinate body
+}
+//*************************************************************************************************
+
+//*************************************************************************************************
+/*!\brief Rotation of the union around the origin of the global world frame.
+ *
+ * \param xangle Rotation around the x-axis (radian measure).
+ * \param yangle Rotation around the y-axis (radian measure).
+ * \param zangle Rotation around the z-axis (radian measure).
+ * \return void
+ * \exception std::logic_error Invalid rotation of a global union inside an exclusive section.
+ *
+ * This function rotates the entire union around the origin of the global world frame and
+ * changes both the global position and the orientation/rotation of the union. Additionally,
+ * all contained rigid bodies change their position and orientation accordingly. The rotations
+ * are applied in the order x, y, z. The orientation of the bodies within the union in
+ * reference to the body frame of the union is not changed.
+ */
+template <typename BodyTypeTuple>
+void Union<BodyTypeTuple>::rotateAroundOriginImpl( const Quat& dq )
+{
+   q_    = dq * q_;                // Updating the orientation of the union
+   R_    = q_.toRotationMatrix();  // Updating the rotation of the union
+   gpos_ = dq.rotate( gpos_ );     // Updating the global position of the union
+
+   // Updating the contained bodies
+   for( auto b=bodies_.begin(); b!=bodies_.end(); ++b )
+      (*b)->update( dq );
+
+   Union<BodyTypeTuple>::calcBoundingBox();    // Setting the axis-aligned bounding box
+   wake();               // Waking the union from sleep mode
+   signalTranslation();  // Signaling the position change to the superordinate body
+}
+//*************************************************************************************************
+
+//*************************************************************************************************
+/*!\brief Rotation of the union around a specific global coordinate.
+ *
+ * \param point The global center of the rotation.
+ * \param axis The global rotation axis.
+ * \param angle The rotation angle (radian measure).
+ * \return void
+ * \exception std::logic_error Invalid rotation of a global union inside an exclusive section.
+ *
+ * This function rotates the entire union around the given global coordiante \a point and
+ * changes both the global position and the orientation/rotation of the union. Additionally,
+ * all contained rigid bodies change their position and orientation accordingly. The orientation
+ * of the bodies within the union in reference to the body frame of the union is not changed.
+ */
+template <typename BodyTypeTuple>
+void Union<BodyTypeTuple>::rotateAroundPointImpl( const Vec3& point, const Quat &dq )
+{
+   const Vec3 dp( gpos_ - point );
+
+   q_    = dq * q_;                  // Updating the orientation of the union
+   R_    = q_.toRotationMatrix();    // Updating the rotation of the union
+   gpos_ = point + dq.rotate( dp );  // Updating the global position of the union
+
+   // Updating the contained bodies
+   for( auto b=bodies_.begin(); b!=bodies_.end(); ++b )
+      (*b)->update( dq );
+
+   Union<BodyTypeTuple>::calcBoundingBox();    // Setting the axis-aligned bounding box
+   wake();               // Waking the union from sleep mode
+   signalTranslation();  // Signaling the position change to the superordinate body
+}
+//*************************************************************************************************
+
+
+
+
+//=================================================================================================
+//
+//  UTILITY FUNCTIONS
+//
+//=================================================================================================
+
+//*************************************************************************************************
+/*!\brief Checks, whether a point in body relative coordinates lies inside the union.
+ *
+ * \param px The x-component of the relative coordinate.
+ * \param py The y-component of the relative coordinate.
+ * \param pz The z-component of the relative coordinate.
+ * \return \a true if the point lies inside the sphere, \a false if not.
+ */
+template <typename BodyTypeTuple>
+bool Union<BodyTypeTuple>::containsRelPointImpl( real_t px, real_t py, real_t pz ) const
+{
+   const Vec3 gpos( pointFromBFtoWF( px, py, pz ) );
+   for( auto b=bodies_.begin(); b!=bodies_.end(); ++b )
+      if( b->containsPoint( gpos ) ) return true;
+   return false;
+}
+//*************************************************************************************************
+
+
+//*************************************************************************************************
+/*!\brief Checks, whether a point in body relative coordinates lies on the surface of the union.
+ *
+ * \param px The x-component of the relative coordinate.
+ * \param py The y-component of the relative coordinate.
+ * \param pz The z-component of the relative coordinate.
+ * \return \a true if the point lies on the surface of the sphere, \a false if not.
+ *
+ * The tolerance level of the check is pe::surfaceThreshold.
+ */
+template <typename BodyTypeTuple>
+bool Union<BodyTypeTuple>::isSurfaceRelPointImpl( real_t px, real_t py, real_t pz ) const
+{
+   bool surface( false );
+   const Vec3 gpos( pointFromBFtoWF( px, py, pz ) );
+
+   for( auto b=bodies_.begin(); b!=bodies_.end(); ++b ) {
+      if( b->containsPoint( gpos ) ) return false;
+      else if( b->isSurfacePoint( gpos ) ) surface = true;
+   }
+
+   return surface;
+}
+//*************************************************************************************************
+
+
+
+
+//=================================================================================================
+//
+//  FUNCTIONS FOR INTERNAL CHANGES IN COMPOUND GEOMETRIES
+//
+//=================================================================================================
+
+//*************************************************************************************************
+/*!\brief Signals an internal modification of a contained subordiante rigid body.
+ *
+ * \return void
+ *
+ * In case one of the contained rigid bodies is interally modified, this function is called to
+ * recalculate the changed properties of the union.
+ */
+template <typename BodyTypeTuple>
+void Union<BodyTypeTuple>::handleModification()
+{
+   // Setting the finite flag
+   finite_ = true;
+   for( auto b=bodies_.begin(); b!=bodies_.end(); ++b ) {
+      if( !b->isFinite() ) {
+         finite_ = false;
+         break;
+      }
+   }
+
+   // Setting the finiteness of the union
+   if( !finite_ ) setFinite( false );
+
+   // Setting the union's total mass and center of mass
+   calcCenterOfMass();
+
+   // Setting the contained primitives' relative position in reference to the center of mass
+   for( auto b=bodies_.begin(); b!=bodies_.end(); ++b )
+      ( *b )->calcRelPosition();
+
+   // Setting the moment of inertia
+   calcInertia();
+
+   // Updating the axis-aligned bounding box
+   Union<BodyTypeTuple>::calcBoundingBox();
+
+   // Signaling the internal modification to the superordinate body
+   signalModification();
+}
+//*************************************************************************************************
+
+
+//*************************************************************************************************
+/*!\brief Signals a position change of a contained subordiante rigid body.
+ *
+ * \return void
+ *
+ * In case one of the contained rigid bodies changes its position, this function is called
+ * to recalculate the changed properties of the union.
+ */
+template <typename BodyTypeTuple>
+void Union<BodyTypeTuple>::handleTranslation()
+{
+   // Setting the union's total mass and center of mass
+   calcCenterOfMass();
+
+   // Setting the contained bodies' relative position in reference to the center of mass
+   for( auto b=bodies_.begin(); b!=bodies_.end(); ++b )
+      ( *b )->calcRelPosition();
+
+   // Setting the moment of inertia
+   calcInertia();
+
+   Union<BodyTypeTuple>::calcBoundingBox();    // Setting the axis-aligned bounding box
+   wake();               // Waking the union from sleep mode
+   signalTranslation();  // Signaling the position change to the superordinate body
+}
+//*************************************************************************************************
+
+
+//*************************************************************************************************
+/*!\brief Signals an orientation change of a contained subordiante rigid body.
+ *
+ * \return void
+ *
+ * In case one of the contained rigid bodies changes its orientation, this function is called
+ * to recalculate the changed properties of the union.
+ */
+template <typename BodyTypeTuple>
+void Union<BodyTypeTuple>::handleRotation()
+{
+   // Setting the moment of inertia
+   calcInertia();
+
+   Union<BodyTypeTuple>::calcBoundingBox();  // Setting the axis-aligned bounding box
+   wake();             // Waking the union from sleep mode
+   signalRotation();   // Signaling the change of orientation to the superordinate body
+}
+//*************************************************************************************************
+
+
+
+
+//=================================================================================================
+//
+//  OUTPUT FUNCTIONS
+//
+//=================================================================================================
+
+//*************************************************************************************************
+/*!\brief Output of the current state of an union.
+ *
+ * \param os Reference to the output stream.
+ * \param tab Indentation in front of every line of the union output.
+ * \return void
+ */
+template <typename BodyTypeTuple>
+void Union<BodyTypeTuple>::print( std::ostream& os, const char* tab ) const
+{
+   using std::setw;
+
+   os << tab << " Union " << uid_ << " with " << bodies_.size();
+   if( bodies_.size() == 1 ) os << " rigid body\n";
+   else os << " rigid bodies\n";
+
+   os << tab << "   Fixed: " << isFixed() << " , sleeping: " << !isAwake() << "\n";
+
+   os << tab << "   System ID         = " << sid_ << "\n"
+      << tab << "   Total mass        = ";
+   if( isFinite() ) os << mass_ << "\n";
+   else os << "*infinite*\n";
+
+   os << tab << "   Global position   = " << gpos_ << "\n"
+      << tab << "   Relative position = " << rpos_ << "\n"
+      << tab << "   Linear velocity   = " << v_ << "\n"
+      << tab << "   Angular velocity  = " << w_ << "\n";
+
+   os << tab << "   Bounding box      = " << aabb_ << "\n"
+      << tab << "   Quaternion        = " << q_ << "\n"
+      << tab << "   Rotation matrix   = ( " << setw(9) << R_[0] << " , " << setw(9) << R_[1] << " , " << setw(9) << R_[2] << " )\n"
+      << tab << "                       ( " << setw(9) << R_[3] << " , " << setw(9) << R_[4] << " , " << setw(9) << R_[5] << " )\n"
+      << tab << "                       ( " << setw(9) << R_[6] << " , " << setw(9) << R_[7] << " , " << setw(9) << R_[8] << " )\n";
+
+   if( isFinite() ) {
+      os << std::setiosflags(std::ios::right)
+         << tab << "   Moment of inertia = ( " << setw(9) << I_[0] << " , " << setw(9) << I_[1] << " , " << setw(9) << I_[2] << " )\n"
+         << tab << "                       ( " << setw(9) << I_[3] << " , " << setw(9) << I_[4] << " , " << setw(9) << I_[5] << " )\n"
+         << tab << "                       ( " << setw(9) << I_[6] << " , " << setw(9) << I_[7] << " , " << setw(9) << I_[8] << " )\n"
+         << std::resetiosflags(std::ios::right);
+   }
+
+   // Extending the indentation for the nested bodies and links
+   std::string longtab( tab );
+   longtab.append( "  " );
+
+   // Printing all contained bodies
+   for( auto bIt=bodies_.begin(); bIt!=bodies_.end(); ++bIt ) {
+      os << "\n";
+      bIt->print( os, longtab.c_str() );
+   }
+}
+//*************************************************************************************************
+
+
+
+
+//=================================================================================================
+//
+//  GLOBAL OPERATORS
+//
+//=================================================================================================
+
+//*************************************************************************************************
+/*!\brief Global output operator for unions.
+ * \ingroup union
+ *
+ * \param os Reference to the output stream.
+ * \param u Reference to a constant union object.
+ * \return Reference to the output stream.
+ */
+template <typename BodyTypeTuple>
+std::ostream& operator<<( std::ostream& os, const Union<BodyTypeTuple>& u )
+{
+   os << "--" << "UNION PARAMETERS"
+      << "--------------------------------------------------------------\n";
+   u.print( os, "" );
+   os << "--------------------------------------------------------------------------------\n"
+      << std::endl;
+   return os;
+}
+//*************************************************************************************************
+
+
+//*************************************************************************************************
+/*!\brief Global output operator for union handles.
+ * \ingroup union
+ *
+ * \param os Reference to the output stream.
+ * \param u Constant union handle.
+ * \return Reference to the output stream.
+ */
+template <typename BodyTypeTuple>
+std::ostream& operator<<( std::ostream& os, Union<BodyTypeTuple> const * u )
+{
+   os << "--" << "UNION PARAMETERS"
+      << "--------------------------------------------------------------\n";
+   u->print( os, "" );
+   os << "--------------------------------------------------------------------------------\n"
+      << std::endl;
+   return os;
+}
+//*************************************************************************************************
+
+
+//=================================================================================================
+//
+//  GET FUNCTIONS
+//
+//=================================================================================================
+
+//*************************************************************************************************
+/*!\brief Returns whether the union contains any bodies or not.
+ *
+ * \return \a true if the union is empty, \a false if it not.
+ */
+template <typename BodyTypeTuple>
+inline bool Union<BodyTypeTuple>::isEmpty() const
+{
+   return bodies_.size() == 0;
+}
+//*************************************************************************************************
+
+
+//*************************************************************************************************
+/*!\brief Returns the number of rigid bodies contained in the union.
+ *
+ * \return The number of rigid bodies.
+ */
+template <typename BodyTypeTuple>
+inline size_t Union<BodyTypeTuple>::size() const
+{
+   return bodies_.size();
+}
+//*************************************************************************************************
+
+
+//*************************************************************************************************
+/*!\brief Returns a handle to the indexed rigid body.
+ *
+ * \param index Access index. The index has to be in the range \f$[0..size-1]\f$.
+ * \return Handle to the accessed body.
+ *
+ * \b Note: No runtime check is performed to insure the validity of the access index.
+ */
+template <typename BodyTypeTuple>
+inline BodyID Union<BodyTypeTuple>::getBody( size_t index )
+{
+   WALBERLA_ASSERT_LESS( index, bodies_.size(), "Invalid access index" );
+   return bodies_[index];
+}
+//*************************************************************************************************
+
+
+//*************************************************************************************************
+/*!\brief Returns a constant handle to the indexed rigid body.
+ *
+ * \param index Access index. The index has to be in the range \f$[0..size-1]\f$.
+ * \return Constant handle to the accessed body.
+ *
+ * \b Note: No runtime check is performed to insure the validity of the access index.
+ */
+template <typename BodyTypeTuple>
+inline ConstBodyID Union<BodyTypeTuple>::getBody( size_t index ) const
+{
+   WALBERLA_ASSERT_LESS( index, bodies_.size(), "Invalid access index" );
+   return bodies_[index];
+}
+//*************************************************************************************************
+
+
+//*************************************************************************************************
+/*!\brief Returns unique type id of this type
+ *
+ * \return geometry specific type id
+ */
+template <typename BodyTypeTuple>
+inline id_t Union<BodyTypeTuple>::getStaticTypeID()
+{
+   return staticTypeID_;
+}
+
+template <typename BodyTypeTuple>
+id_t Union<BodyTypeTuple>::staticTypeID_ = std::numeric_limits<id_t>::max();
+
+} // namespace pe
+}  // namespace walberla
diff --git a/src/pe/rigidbody/UnionFactory.h b/src/pe/rigidbody/UnionFactory.h
new file mode 100644
index 000000000..31096e92c
--- /dev/null
+++ b/src/pe/rigidbody/UnionFactory.h
@@ -0,0 +1,266 @@
+//======================================================================================================================
+//
+//  This file is part of waLBerla. waLBerla is free software: you can
+//  redistribute it and/or modify it under the terms of the GNU General Public
+//  License as published by the Free Software Foundation, either version 3 of
+//  the License, or (at your option) any later version.
+//
+//  waLBerla is distributed in the hope that it will be useful, but WITHOUT
+//  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+//  FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+//  for more details.
+//
+//  You should have received a copy of the GNU General Public License along
+//  with waLBerla (see COPYING.txt). If not, see <http://www.gnu.org/licenses/>.
+//
+//! \file UnionFactory.h
+//! \author Klaus Iglberger
+//! \author Tobias Preclik
+//! \author Sebastian Eibl <sebastian.eibl@fau.de>
+//
+//======================================================================================================================
+
+#pragma once
+
+#include "pe/Materials.h"
+#include "pe/rigidbody/BodyStorage.h"
+#include "pe/rigidbody/Box.h"
+#include "pe/rigidbody/Capsule.h"
+#include "pe/rigidbody/Plane.h"
+#include "pe/rigidbody/Sphere.h"
+#include "pe/rigidbody/Union.h"
+#include "pe/Types.h"
+
+#include "blockforest/BlockForest.h"
+#include "core/debug/Debug.h"
+
+namespace walberla {
+namespace pe {
+
+//=================================================================================================
+//
+//  BOX SETUP FUNCTIONS
+//
+//=================================================================================================
+
+//*************************************************************************************************
+/*!\brief Setup of a new Union.
+ * \ingroup rigidbody
+ *
+ * \param globalStorage process local global storage
+ * \param blocks storage of all the blocks on this process
+ * \param storageID BlockDataID of the BlockStorage block datum
+ * \param uid The user-specific ID of the union.
+ * \param gpos The global position of the center of the union.
+ * \param global specifies if the union should be created in the global storage
+ * \param communicating specifies if the union should take part in synchronization (syncNextNeighbour, syncShadowOwner)
+ * \param infiniteMass specifies if the union has infinite mass and will be treated as an obstacle
+ * \return Handle for the new union.
+ * \exception std::invalid_argument Invalid box radius.
+ * \exception std::invalid_argument Invalid global box position.
+ */
+template <typename BodyTypeTuple>
+Union<BodyTypeTuple>* createUnion(   BodyStorage& globalStorage, BlockStorage& blocks, BlockDataID storageID,
+                                     id_t uid, const Vec3& gpos,
+                                     bool global = false, bool communicating = true, bool infiniteMass = false )
+{
+   WALBERLA_ASSERT_UNEQUAL( Union<BodyTypeTuple>::getStaticTypeID(), std::numeric_limits<id_t>::max(), "Union TypeID not initalized!");
+
+   Union<BodyTypeTuple>* bd = NULL;
+
+   if (global)
+   {
+      const id_t sid = UniqueID<RigidBody>::createGlobal();
+      WALBERLA_ASSERT_EQUAL(communicating, false);
+      WALBERLA_ASSERT_EQUAL(infiniteMass, true);
+      bd = new Union<BodyTypeTuple>(sid, uid, gpos, Vec3(0,0,0), Quat(), global, false, true);
+      globalStorage.add(bd);
+   } else
+   {
+      for (auto it = blocks.begin(); it != blocks.end(); ++it){
+         IBlock* block = (&(*it));
+         if (block->getAABB().contains(gpos))
+         {
+            const id_t sid( UniqueID<RigidBody>::create() );
+
+            Storage* bs = block->getData<Storage>(storageID);
+            bd = new Union<BodyTypeTuple>(sid, uid, gpos, Vec3(0,0,0), Quat(), global, communicating, infiniteMass);
+            bd->MPITrait.setOwner(Owner(MPIManager::instance()->rank(), block->getId().getID()));
+            (*bs)[0].add(bd);
+         }
+      }
+   }
+
+   if (bd != NULL)
+   {
+      // Logging the successful creation of the box
+      WALBERLA_LOG_DETAIL(
+               "Created union " << bd->getSystemID() << "\n"
+            << "   User-ID  = " << bd->getID()
+               );
+   }
+
+   return bd;
+}
+
+template <typename BodyTypeTuple>
+BoxID createBox( Union<BodyTypeTuple>* un,
+                 id_t uid, const Vec3& gpos, const Vec3& lengths,
+                 MaterialID material,
+                 bool global, bool communicating, bool infiniteMass )
+{
+   if (Box::getStaticTypeID() == std::numeric_limits<id_t>::max())
+      throw std::runtime_error("Box TypeID not initalized!");
+
+   // union not on this process/block -> terminate creation
+   if (un == NULL)
+      throw std::invalid_argument( "createSphere: Union argument is NULL" );
+
+   // main union not on this process/block -> terminate creation
+   if ( un->isRemote() )
+      throw std::logic_error( "createSphere: Union is remote" );
+
+   // Checking the side lengths
+   if( lengths[0] <= real_t(0) || lengths[1] <= real_t(0) || lengths[2] <= real_t(0) )
+      throw std::invalid_argument( "Invalid side length" );
+
+   BoxID box = NULL;
+   id_t  sid = 0;
+
+   if (global)
+   {
+      sid = UniqueID<RigidBody>::createGlobal();
+      WALBERLA_ASSERT_EQUAL(communicating, false);
+      WALBERLA_ASSERT_EQUAL(infiniteMass, true);
+   } else
+   {
+      sid = UniqueID<RigidBody>::create();
+   }
+
+   box = new Box(sid, uid, gpos, Vec3(0,0,0), Quat(), lengths, material, global, communicating, infiniteMass);
+   box->MPITrait.setOwner( un->MPITrait.getOwner() );
+   un->add(box);
+
+   if (box != NULL)
+   {
+      // Logging the successful creation of the box
+      WALBERLA_LOG_DETAIL(
+                "Created box " << box->getSystemID() << "\n"
+             << "   User-ID         = " << uid << "\n"
+             << "   Global position = " << gpos << "\n"
+             << "   side length     = " << lengths << "\n"
+             << "   LinVel          = " << box->getLinearVel() << "\n"
+             << "   Material        = " << Material::getName( material )
+               );
+   }
+
+   return box;
+}
+
+template <typename BodyTypeTuple>
+CapsuleID createCapsule( Union<BodyTypeTuple>* un,
+                         id_t uid, const Vec3& gpos, const real_t radius, const real_t length,
+                         MaterialID material,
+                         bool global, bool communicating, bool infiniteMass )
+{
+   if (Capsule::getStaticTypeID() == std::numeric_limits<id_t>::max())
+      throw std::runtime_error("Capsule TypeID not initalized!");
+
+   // union not on this process/block -> terminate creation
+   if (un == NULL)
+      throw std::invalid_argument( "createSphere: Union argument is NULL" );
+
+   // main union not on this process/block -> terminate creation
+   if ( un->isRemote() )
+      throw std::logic_error( "createSphere: Union is remote" );
+
+   // Checking the radius
+   if( radius <= real_c(0) )
+      throw std::invalid_argument( "Invalid capsule radius" );
+
+   // Checking the length
+   if( length <= real_c(0) )
+      throw std::invalid_argument( "Invalid capsule length" );
+
+   CapsuleID capsule = NULL;
+   id_t      sid     = 0;
+
+   if (global)
+   {
+      sid = UniqueID<RigidBody>::createGlobal();
+      WALBERLA_ASSERT_EQUAL(communicating, false);
+      WALBERLA_ASSERT_EQUAL(infiniteMass, true);
+   } else
+   {
+      sid = UniqueID<RigidBody>::create();
+   }
+
+   capsule = new Capsule(sid, uid, gpos, Vec3(0,0,0), Quat(), radius, length, material, global, communicating, infiniteMass);
+   capsule->MPITrait.setOwner( un->MPITrait.getOwner() );
+   un->add(capsule);
+
+   if (capsule != NULL)
+   {
+      WALBERLA_LOG_DETAIL("Created capsule " << capsule->getSystemID() << "\n" << capsule);
+   }
+
+   return capsule;
+}
+
+template <typename BodyTypeTuple>
+SphereID createSphere( Union<BodyTypeTuple>* un,
+                       id_t uid, const Vec3& gpos, real_t radius,
+                       MaterialID material = Material::find("iron"),
+                       bool global = false, bool communicating = true, bool infiniteMass = false )
+{
+   if (Sphere::getStaticTypeID() == std::numeric_limits<id_t>::max())
+      throw std::runtime_error("Sphere TypeID not initalized!");
+
+   // union not on this process/block -> terminate creation
+   if (un == NULL)
+      throw std::invalid_argument( "createSphere: Union argument is NULL" );
+
+   // main union not on this process/block -> terminate creation
+   if ( un->isRemote() )
+      throw std::logic_error( "createSphere: Union is remote" );
+
+   // Checking the radius
+   if( radius <= real_c(0) )
+      throw std::invalid_argument( "Invalid sphere radius" );
+
+   id_t sid(0);
+   SphereID sphere = NULL;
+
+   if (global)
+   {
+      sid = UniqueID<RigidBody>::createGlobal();
+      WALBERLA_ASSERT_EQUAL(communicating, false);
+      WALBERLA_ASSERT_EQUAL(infiniteMass, true);
+
+   } else
+   {
+      sid = UniqueID<RigidBody>::create();
+   }
+
+   sphere = new Sphere(sid, uid, gpos, Vec3(0,0,0), Quat(), radius, material, global, communicating, infiniteMass);
+   sphere->MPITrait.setOwner( un->MPITrait.getOwner() );
+   un->add( sphere );
+
+   if (sphere != NULL)
+   {
+      // Logging the successful creation of the sphere
+      WALBERLA_LOG_DETAIL(
+                "Created sphere " << sphere->getSystemID() << " as part of union " << un->getSystemID() << "\n"
+             << "   User-ID         = " << uid << "\n"
+             << "   Global position = " << gpos << "\n"
+             << "   Radius          = " << radius << "\n"
+             << "   LinVel          = " << sphere->getLinearVel() << "\n"
+             << "   Material        = " << Material::getName( material )
+               );
+   }
+
+   return sphere;
+}
+
+}  // namespace pe
+}  // namespace walberla
diff --git a/src/pe/vtk/SphereVtkOutput.cpp b/src/pe/vtk/SphereVtkOutput.cpp
index fce3a2f55..71e192dc6 100644
--- a/src/pe/vtk/SphereVtkOutput.cpp
+++ b/src/pe/vtk/SphereVtkOutput.cpp
@@ -39,6 +39,7 @@ std::vector< SphereVtkOutput::Attributes > SphereVtkOutput::getAttributes() cons
    attributes.push_back( Attributes( vtk::typeToString< float >(), "velocity", uint_c(3) ) );
    attributes.push_back( Attributes( vtk::typeToString< float >(), "orientation", uint_c(3) ) );
    attributes.push_back( Attributes( vtk::typeToString< int >(),   "rank", uint_c(1) ) );
+   attributes.push_back( Attributes( vtk::typeToString< id_t >(),   "id", uint_c(1) ) );
 
    return attributes;
 }
@@ -55,6 +56,15 @@ void SphereVtkOutput::configure()
       {
          if (it->getTypeID() == Sphere::getStaticTypeID())
             bodies_.push_back( static_cast<ConstSphereID> (*it) );
+         if (it->getTypeID() == Union<boost::tuple<Sphere> >::getStaticTypeID())
+         {
+            auto un = static_cast<Union<boost::tuple<Sphere> > const * > (*it);
+            for( auto it2 = un->begin(); it2 != un->end(); ++it2 )
+            {
+               if (it2->getTypeID() == Sphere::getStaticTypeID())
+                  bodies_.push_back( static_cast<ConstSphereID> (*it2) );
+            }
+         }
       }
    }
 }
diff --git a/src/pe/vtk/SphereVtkOutput.h b/src/pe/vtk/SphereVtkOutput.h
index ac7f8cbfa..756a2273e 100644
--- a/src/pe/vtk/SphereVtkOutput.h
+++ b/src/pe/vtk/SphereVtkOutput.h
@@ -22,6 +22,7 @@
 
 #include "pe/rigidbody/RigidBody.h"
 #include "pe/rigidbody/Sphere.h"
+#include "pe/rigidbody/Union.h"
 
 #include "core/DataTypes.h"
 #include "core/Set.h"
@@ -86,6 +87,9 @@ void SphereVtkOutput::push( std::ostream& os, const uint_t data, const uint_t po
    case 4:
       vtk::toStream( os, numeric_cast< int >(bodies_.at( point )->MPITrait.getOwner().rank_) );
       break;
+   case 5:
+      vtk::toStream( os, numeric_cast< id_t >(bodies_.at( point )->getTopSuperBody()->getSystemID()) );
+      break;
    }
 
 }
@@ -114,6 +118,9 @@ void SphereVtkOutput::push( vtk::Base64Writer& b64, const uint_t data, const uin
    case 4:
       b64 << numeric_cast< int >(bodies_.at( point )->MPITrait.getOwner().rank_);
       break;
+   case 5:
+      b64 << numeric_cast< id_t >(bodies_.at( point )->getTopSuperBody()->getSystemID());
+      break;
    }
 }
 
diff --git a/tests/pe/CMakeLists.txt b/tests/pe/CMakeLists.txt
index 48b7a58f6..2f34fd783 100644
--- a/tests/pe/CMakeLists.txt
+++ b/tests/pe/CMakeLists.txt
@@ -60,6 +60,9 @@ waLBerla_execute_test( NAME   PE_RIGIDBODY )
 waLBerla_compile_test( NAME   PE_SERIALIZEDESERIALIZE FILES SerializeDeserialize.cpp DEPENDS core blockforest  )
 waLBerla_execute_test( NAME   PE_SERIALIZEDESERIALIZE PROCESSES 8 )
 
+waLBerla_compile_test( NAME   PE_SHADOWCOPY FILES ShadowCopy.cpp DEPENDS core blockforest  )
+waLBerla_execute_test( NAME   PE_SHADOWCOPY )
+
 waLBerla_compile_test( NAME   PE_SIMPLECCD FILES SimpleCCD.cpp DEPENDS core  )
 waLBerla_execute_test( NAME   PE_SIMPLECCD )
 
@@ -86,3 +89,6 @@ waLBerla_execute_test( NAME   PE_SYNCHRONIZATIONLARGEBODY27 COMMAND $<TARGET_FIL
 
 waLBerla_compile_test( NAME   PE_STATICTYPEIDS FILES SetBodyTypeIDs.cpp DEPENDS core  )
 waLBerla_execute_test( NAME   PE_STATICTYPEIDS )
+
+waLBerla_compile_test( NAME   PE_UNION FILES Union.cpp DEPENDS core  )
+waLBerla_execute_test( NAME   PE_UNION )
diff --git a/tests/pe/Collision.cpp b/tests/pe/Collision.cpp
index 0af0edfcb..89b81fbdf 100644
--- a/tests/pe/Collision.cpp
+++ b/tests/pe/Collision.cpp
@@ -19,20 +19,26 @@
 //
 //======================================================================================================================
 
+#include "pe/collision/Collide.h"
+#include "pe/collision/GJKEPAHelper.h"
+
 #include "pe/contact/Contact.h"
+#include "pe/fcd/SimpleFCD.h"
 #include "pe/Materials.h"
+
 #include "pe/rigidbody/Box.h"
+#include "pe/rigidbody/Capsule.h"
 #include "pe/rigidbody/Sphere.h"
+#include "pe/rigidbody/Plane.h"
+#include "pe/rigidbody/Union.h"
+
 #include "pe/rigidbody/SetBodyTypeIDs.h"
 #include "pe/Types.h"
-#include "pe/collision/Collide.h"
-#include "pe/collision/GJKEPAHelper.h"
 
+#include "core/debug/TestSubsystem.h"
 #include "core/DataTypes.h"
 #include "core/math/Vector2.h"
 
-#include "core/debug/TestSubsystem.h"
-
 using namespace walberla;
 using namespace walberla::pe;
 
@@ -238,6 +244,26 @@ void CapsuleTest2()
 
 }
 
+void UnionTest()
+{
+   typedef Union< boost::tuple<Sphere> > UnionT;
+   MaterialID iron = Material::find("iron");
+   UnionT  un1(120, 0, Vec3(0,0,0), Vec3(0,0,0), Quat(), false, true, false);
+   UnionT  un2(121, 0, Vec3(real_t(1.5),0,0), Vec3(0,0,0), Quat(), false, true, false);
+   SphereID sp1 = new Sphere(123, 1, Vec3(0,0,0), Vec3(0,0,0), Quat(), 1, iron, false, true, false);
+   un1.add(sp1);
+   SphereID sp2 = new Sphere(124, 2, Vec3(real_t(1.5),0,0), Vec3(0,0,0), Quat(), 1, iron, false, true, false);
+   un2.add(sp2);
+
+   std::vector<Contact> contacts;
+
+   // SPHERE <-> SPHERE
+   WALBERLA_LOG_INFO("UNION <-> UNION");
+   fcd::DoubleDispatch< boost::tuple<UnionT, Sphere> >::execute(&un1, &un2, contacts);
+   checkContact( contacts.at(0),
+                 Contact( sp1, sp2, Vec3(real_t(0.75), 0, 0), Vec3(-1, 0, 0), real_t(-0.5)) );
+}
+
 typedef boost::tuple<Box, Capsule, Plane, Sphere> BodyTuple ;
 
 int main( int argc, char** argv )
@@ -252,6 +278,7 @@ int main( int argc, char** argv )
     BoxTest();
     CapsuleTest();
 //    CapsuleTest2();
+    UnionTest();
 
     return EXIT_SUCCESS;
 }
diff --git a/tests/pe/Marshalling.cpp b/tests/pe/Marshalling.cpp
index be1d037b6..79e9dc9cd 100644
--- a/tests/pe/Marshalling.cpp
+++ b/tests/pe/Marshalling.cpp
@@ -24,6 +24,8 @@
 #include "pe/rigidbody/Box.h"
 #include "pe/rigidbody/Capsule.h"
 #include "pe/rigidbody/Sphere.h"
+#include "pe/rigidbody/UnionFactory.h"
+#include "pe/rigidbody/Union.h"
 #include "pe/communication/DynamicMarshalling.h"
 #include "pe/rigidbody/SetBodyTypeIDs.h"
 #include "pe/Materials.h"
@@ -34,13 +36,17 @@ using namespace walberla;
 using namespace walberla::pe;
 using namespace walberla::pe::communication;
 
-typedef boost::tuple<Box, Capsule, Sphere> BodyTuple ;
+typedef boost::tuple<Sphere>       UnionTypeTuple;
+typedef Union< UnionTypeTuple >    UnionT;
+typedef UnionT*                    UnionID;
+
+typedef boost::tuple<Box, Capsule, Sphere, UnionT> BodyTuple ;
 
 void testBox()
 {
    MaterialID iron = Material::find("iron");
 
-   Box b1(759846, 1234794, Vec3(real_c(1), real_c(2), real_c(3)), Vec3(0,0,0), Quat(), Vec3(1,2,3), iron, false, false, false);
+   Box b1(759846, 1234794, Vec3(real_c(1), real_c(2), real_c(3)), Vec3(0,0,0), Quat(), Vec3(1,2,3), iron, false, true, false);
    b1.setLinearVel(Vec3(real_c(5.2), real_c(6.3), real_c(7.4)));
    b1.setAngularVel(Vec3(real_c(1.2), real_c(2.3), real_c(3.4)));
 
@@ -103,6 +109,44 @@ void testSphere()
    WALBERLA_CHECK_EQUAL(s1.getSystemID(), s2->getSystemID());
 }
 
+void testUnion()
+{
+   UnionT u1(159, 423, Vec3(real_c(1), real_c(2), real_c(3)), Vec3(0,0,0), Quat(), false, false, false);
+   SphereID s11 = createSphere< UnionTypeTuple >(&u1, 1234794, Vec3(real_c(1), real_c(2), real_c(3)), 2);
+   SphereID s21 = createSphere< UnionTypeTuple >(&u1, 4567789, Vec3(real_c(3), real_c(2), real_c(3)), real_c(1.5));
+   WALBERLA_CHECK_NOT_NULLPTR( s11 );
+   WALBERLA_CHECK_NOT_NULLPTR( s21 );
+
+   mpi::SendBuffer sb;
+   MarshalDynamically<BodyTuple>::execute(sb, u1);
+   mpi::RecvBuffer rb(sb);
+
+   UnionID u2 = static_cast<UnionID> (UnmarshalDynamically<BodyTuple>::execute(rb, UnionT::getStaticTypeID(), math::AABB(Vec3(-100,-100,-100), Vec3(100,100,100)), math::AABB(Vec3(-100,-100,-100), Vec3(100,100,100))));
+   WALBERLA_CHECK_NOT_NULLPTR( u2 );
+
+   WALBERLA_CHECK_EQUAL(u1.size(), 2);
+   WALBERLA_CHECK_EQUAL(u1.size(), u2->size());
+
+   //getting spheres of second union
+   SphereID s12 = static_cast<SphereID> (*(u2->begin()));
+   SphereID s22 = static_cast<SphereID> (*(++(u2->begin())));
+   WALBERLA_CHECK_UNEQUAL( s12, s22 );
+
+   WALBERLA_CHECK_FLOAT_EQUAL( s11->getPosition(),    s12->getPosition());
+   WALBERLA_CHECK_FLOAT_EQUAL( s11->getLinearVel(),   s12->getLinearVel());
+   WALBERLA_CHECK_FLOAT_EQUAL( s11->getAngularVel(),  s12->getAngularVel());
+   WALBERLA_CHECK_FLOAT_EQUAL( s11->getRadius(),      s12->getRadius());
+   WALBERLA_CHECK_EQUAL(       s11->getID(),          s12->getID());
+   WALBERLA_CHECK_EQUAL(       s11->getSystemID(),    s12->getSystemID());
+
+   WALBERLA_CHECK_FLOAT_EQUAL( s21->getPosition(),    s22->getPosition());
+   WALBERLA_CHECK_FLOAT_EQUAL( s21->getLinearVel(),   s22->getLinearVel());
+   WALBERLA_CHECK_FLOAT_EQUAL( s21->getAngularVel(),  s22->getAngularVel());
+   WALBERLA_CHECK_FLOAT_EQUAL( s21->getRadius(),      s22->getRadius());
+   WALBERLA_CHECK_EQUAL(       s21->getID(),          s22->getID());
+   WALBERLA_CHECK_EQUAL(       s21->getSystemID(),    s22->getSystemID());
+}
+
 int main( int argc, char** argv )
 {
    walberla::debug::enterTestMode();
@@ -114,6 +158,7 @@ int main( int argc, char** argv )
    testSphere();
    testBox();
    testCapsule();
+   testUnion();
 
    return EXIT_SUCCESS;
 }
diff --git a/tests/pe/SetBodyTypeIDs.cpp b/tests/pe/SetBodyTypeIDs.cpp
index e5231a8eb..389adbd15 100644
--- a/tests/pe/SetBodyTypeIDs.cpp
+++ b/tests/pe/SetBodyTypeIDs.cpp
@@ -27,6 +27,7 @@
 #include "pe/rigidbody/Capsule.h"
 #include "pe/rigidbody/Plane.h"
 #include "pe/rigidbody/Sphere.h"
+#include "pe/rigidbody/Union.h"
 
 namespace walberla {
 namespace pe {
@@ -102,7 +103,7 @@ int main( int argc, char** argv )
    WALBERLA_CHECK_UNEQUAL(A::getStaticTypeID(), C::getStaticTypeID());
    WALBERLA_CHECK_UNEQUAL(B::getStaticTypeID(), C::getStaticTypeID());
 
-   typedef boost::tuple<Plane, Sphere, Box, Capsule> BodyTuple2 ;
+   typedef boost::tuple<Plane, Sphere, Box, Capsule, Union< boost::tuple<Sphere, Box> > > BodyTuple2 ;
    SetBodyTypeIDs<BodyTuple2>::execute();
    WALBERLA_CHECK_UNEQUAL(Plane::getStaticTypeID(), Sphere::getStaticTypeID());
    WALBERLA_CHECK_UNEQUAL(Plane::getStaticTypeID(), Box::getStaticTypeID());
diff --git a/tests/pe/ShadowCopy.cpp b/tests/pe/ShadowCopy.cpp
new file mode 100644
index 000000000..b709ae03e
--- /dev/null
+++ b/tests/pe/ShadowCopy.cpp
@@ -0,0 +1,176 @@
+//======================================================================================================================
+//
+//  This file is part of waLBerla. waLBerla is free software: you can 
+//  redistribute it and/or modify it under the terms of the GNU General Public
+//  License as published by the Free Software Foundation, either version 3 of 
+//  the License, or (at your option) any later version.
+//  
+//  waLBerla is distributed in the hope that it will be useful, but WITHOUT 
+//  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 
+//  FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License 
+//  for more details.
+//  
+//  You should have received a copy of the GNU General Public License along
+//  with waLBerla (see COPYING.txt). If not, see <http://www.gnu.org/licenses/>.
+//
+//! \file ShadowCopy.cpp
+//! \ingroup pe_tests
+//! \author Sebastian Eibl <sebastian.eibl@fau.de>
+//
+//======================================================================================================================
+
+#include "pe/basic.h"
+#include "pe/utility/GetBody.h"
+#include "pe/utility/DestroyBody.h"
+
+#include "blockforest/Initialization.h"
+#include "core/all.h"
+#include "domain_decomposition/all.h"
+
+#include "core/debug/TestSubsystem.h"
+
+using namespace walberla;
+using namespace walberla::pe;
+
+typedef Union< boost::tuple<Sphere> > UnionT;
+typedef boost::tuple<Sphere, UnionT> BodyTuple ;
+
+int main( int argc, char** argv )
+{
+   walberla::debug::enterTestMode();
+   walberla::MPIManager::instance()->initializeMPI( &argc, &argv );
+
+   shared_ptr<BodyStorage> globalBodyStorage = make_shared<BodyStorage>();
+
+   // create blocks
+   shared_ptr< StructuredBlockForest > forest = blockforest::createUniformBlockGrid(
+            math::AABB(0,0,0,15,15,15),
+            uint_c( 3), uint_c( 3), uint_c( 3), // number of blocks in x,y,z direction
+            uint_c( 1), uint_c( 1), uint_c( 1), // how many cells per block (x,y,z)
+            false,                               // max blocks per process
+            true, true, true,                   // full periodicity
+            false);
+
+   SetBodyTypeIDs<BodyTuple>::execute();
+
+   auto storageID           = forest->addBlockData(createStorageDataHandling<BodyTuple>(), "Storage");
+                              forest->addBlockData(ccd::createHashGridsDataHandling( globalBodyStorage, storageID ), "HCCD");
+                              forest->addBlockData(fcd::createSimpleFCDDataHandling<BodyTuple>(), "FCD");
+
+
+//   logging::Logging::instance()->setStreamLogLevel(logging::Logging::DETAIL);
+//   logging::Logging::instance()->setFileLogLevel(logging::Logging::DETAIL);
+//   logging::Logging::instance()->includeLoggingToFile("ShadowCopy");
+
+   bool syncShadowOwners = false;
+   boost::function<void(void)> syncCall;
+   if (!syncShadowOwners)
+   {
+      syncCall = boost::bind( pe::syncNextNeighbors<BodyTuple>, boost::ref(forest->getBlockForest()), storageID, static_cast<WcTimingTree*>(NULL), real_c(0.0), false );
+   } else
+   {
+      syncCall = boost::bind( pe::syncShadowOwners<BodyTuple>, boost::ref(forest->getBlockForest()), storageID, static_cast<WcTimingTree*>(NULL), real_c(0.0), false );
+   }
+
+   WALBERLA_LOG_PROGRESS_ON_ROOT( " *** SPHERE *** ");
+   SphereID sp = pe::createSphere(
+            *globalBodyStorage,
+            forest->getBlockStorage(),
+            storageID,
+            999999999,
+            Vec3(real_t(4.9),2,2),
+            real_c(1.2));
+   auto sid = sp->getSystemID();
+   sp->setLinearVel(1,2,3);
+   sp->setAngularVel(1,2,3);
+   syncCall();
+   sp->setPosition(7,2,2);
+   syncCall();
+   sp = static_cast<SphereID> (getBody( *globalBodyStorage, forest->getBlockStorage(), storageID, sid, StorageSelect::LOCAL ));
+   WALBERLA_CHECK_NOT_NULLPTR(sp);
+   WALBERLA_CHECK_FLOAT_EQUAL( sp->getLinearVel(), Vec3(1,2,3) );
+   WALBERLA_CHECK_FLOAT_EQUAL( sp->getAngularVel(), Vec3(1,2,3) );
+   WALBERLA_CHECK_FLOAT_EQUAL( sp->getRadius(), real_t(1.2) );
+   destroyBodyBySID( *globalBodyStorage, forest->getBlockStorage(), storageID, sid );
+
+   WALBERLA_LOG_PROGRESS_ON_ROOT( " *** UNION *** ");
+   MaterialID iron = Material::find("iron");
+   UnionT* un   = createUnion< boost::tuple<Sphere> >( *globalBodyStorage, forest->getBlockStorage(), storageID, 0, Vec3(2,2,2) );
+   SphereID sp1 = new Sphere( 10, 0, Vec3(real_t(4.9),2,2), Vec3(0,0,0), Quat(), real_t(1)  , iron, false, true, false );
+   SphereID sp2 = new Sphere( 11, 0, Vec3(3,2,2)          , Vec3(0,0,0), Quat(), real_t(1.5), iron, false, true, false );
+   un->add(sp1);
+   un->add(sp2);
+   un->setPosition( Vec3( real_t(4.9), 2, 2) );
+   auto relPosSp1 = sp1->getRelPosition();
+   auto relPosSp2 = sp2->getRelPosition();
+   sid = un->getSystemID();
+   syncCall();
+
+   un->setPosition( Vec3( real_t(5.9), 2, 2) );
+   WALBERLA_LOG_PROGRESS_ON_ROOT( un->getPosition() );
+   auto posUnion  = un->getPosition();
+   syncCall();
+
+   un  = static_cast<UnionT*> (getBody( *globalBodyStorage, forest->getBlockStorage(), storageID, sid, StorageSelect::LOCAL ));
+   sp1 = static_cast<SphereID> (*(un->begin()));
+   sp2 = static_cast<SphereID> (*(++(un->begin())));
+   WALBERLA_CHECK_NOT_NULLPTR(sp1);
+   WALBERLA_CHECK_NOT_NULLPTR(sp2);
+   WALBERLA_CHECK_EQUAL( sp1->getTypeID(), Sphere::getStaticTypeID() );
+   WALBERLA_CHECK_EQUAL( sp2->getTypeID(), Sphere::getStaticTypeID() );
+   WALBERLA_CHECK_FLOAT_EQUAL( sp1->getRadius(), real_t(1.0) );
+   WALBERLA_CHECK_FLOAT_EQUAL( sp2->getRadius(), real_t(1.5) );
+   WALBERLA_CHECK_FLOAT_EQUAL( un->getPosition(), posUnion );
+   WALBERLA_CHECK_FLOAT_EQUAL( relPosSp1, sp1->getRelPosition() );
+   WALBERLA_CHECK_FLOAT_EQUAL( relPosSp2, sp2->getRelPosition() );
+
+   un->setPosition(real_t(7.5),2,2);
+   WALBERLA_LOG_PROGRESS_ON_ROOT( un->getPosition() );
+   syncCall();
+
+   un->setPosition(real_t(9.9),2,2);
+   WALBERLA_LOG_PROGRESS_ON_ROOT( un->getPosition() );
+   syncCall();
+
+   un->setPosition(real_t(10.9),2,2);
+   WALBERLA_LOG_PROGRESS_ON_ROOT( un->getPosition() );
+   syncCall();
+
+   un  = static_cast<UnionT*> (getBody( *globalBodyStorage, forest->getBlockStorage(), storageID, sid, StorageSelect::LOCAL ));
+   un->setPosition(real_t(12.5),2,2);
+   WALBERLA_LOG_PROGRESS_ON_ROOT( un->getPosition() );
+   syncCall();
+
+   un->setPosition(real_t(14.9),2,2);
+   WALBERLA_LOG_PROGRESS_ON_ROOT( un->getPosition() );
+   syncCall();
+
+   un->setPosition(real_t(15.9),2,2);
+   WALBERLA_LOG_PROGRESS_ON_ROOT( un->getPosition() );
+   syncCall();
+
+   posUnion = Vec3(real_t(0.9),2,2);
+   un  = static_cast<UnionT*> (getBody( *globalBodyStorage, forest->getBlockStorage(), storageID, sid, StorageSelect::LOCAL ));
+   sp1 = static_cast<SphereID> (*(un->begin()));
+   sp2 = static_cast<SphereID> (*(++(un->begin())));
+   WALBERLA_CHECK_NOT_NULLPTR(sp1);
+   WALBERLA_CHECK_NOT_NULLPTR(sp2);
+   WALBERLA_CHECK_EQUAL( sp1->getTypeID(), Sphere::getStaticTypeID() );
+   WALBERLA_CHECK_EQUAL( sp2->getTypeID(), Sphere::getStaticTypeID() );
+   WALBERLA_CHECK_FLOAT_EQUAL( sp1->getRadius(), real_t(1.0) );
+   WALBERLA_CHECK_FLOAT_EQUAL( sp2->getRadius(), real_t(1.5) );
+   WALBERLA_CHECK_FLOAT_EQUAL( un->getPosition(), posUnion );
+   WALBERLA_CHECK_FLOAT_EQUAL( relPosSp1, sp1->getRelPosition() );
+   WALBERLA_CHECK_FLOAT_EQUAL( relPosSp2, sp2->getRelPosition() );
+
+   for (auto blockIt = forest->begin(); blockIt != forest->end(); ++blockIt)
+   {
+      for (auto bodyIt = LocalBodyIterator::begin(*blockIt, storageID); bodyIt != LocalBodyIterator::end(); ++bodyIt)
+      {
+         WALBERLA_LOG_DEVEL( blockIt->getAABB() );
+         WALBERLA_LOG_DEVEL(*bodyIt );
+      }
+   }
+
+   return EXIT_SUCCESS;
+}
diff --git a/tests/pe/Union.cpp b/tests/pe/Union.cpp
new file mode 100644
index 000000000..4e274d22d
--- /dev/null
+++ b/tests/pe/Union.cpp
@@ -0,0 +1,108 @@
+//======================================================================================================================
+//
+//  This file is part of waLBerla. waLBerla is free software: you can
+//  redistribute it and/or modify it under the terms of the GNU General Public
+//  License as published by the Free Software Foundation, either version 3 of
+//  the License, or (at your option) any later version.
+//
+//  waLBerla is distributed in the hope that it will be useful, but WITHOUT
+//  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+//  FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+//  for more details.
+//
+//  You should have received a copy of the GNU General Public License along
+//  with waLBerla (see COPYING.txt). If not, see <http://www.gnu.org/licenses/>.
+//
+//! \file ParallelEquivalence.cpp
+//! \ingroup pe2_tests
+//! \author Sebastian Eibl <sebastian.eibl@fau.de>
+//
+//======================================================================================================================
+
+
+#include "blockforest/all.h"
+#include "core/all.h"
+#include "domain_decomposition/all.h"
+
+#include "pe/basic.h"
+#include "pe/rigidbody/Union.h"
+#include "pe/ccd/SimpleCCDDataHandling.h"
+#include "pe/synchronization/SyncNextNeighbors.h"
+#include "pe/vtk/BodyVtkOutput.h"
+#include "pe/vtk/SphereVtkOutput.h"
+
+#include "CheckVitalParameters.h"
+
+#include "core/debug/TestSubsystem.h"
+#include "vtk/VTKOutput.h"
+
+#include <boost/tuple/tuple.hpp>
+
+#include <algorithm>
+#include <vector>
+
+using namespace walberla;
+using namespace walberla::pe;
+using namespace walberla::blockforest;
+
+typedef Union< boost::tuple<Sphere> >          UnionType ;
+typedef boost::tuple<Sphere, Plane, UnionType> BodyTuple ;
+
+int main( int argc, char ** argv )
+{
+   walberla::debug::enterTestMode();
+
+   walberla::MPIManager::instance()->initializeMPI( &argc, &argv );
+
+   shared_ptr<BodyStorage> globalBodyStorage = make_shared<BodyStorage>();
+
+   // create blocks
+   shared_ptr< StructuredBlockForest > forest = blockforest::createUniformBlockGrid(
+            uint_c( 1), uint_c( 1), uint_c( 1), // number of blocks in x,y,z direction
+            uint_c( 1), uint_c( 1), uint_c( 1), // how many cells per block (x,y,z)
+            real_c(10),                         // dx: length of one cell in physical coordinates
+            0,                                  // max blocks per process
+            false, false,                       // include metis / force metis
+            false, false, false );                 // full periodicity
+
+   SetBodyTypeIDs<BodyTuple>::execute();
+
+   auto storageID           = forest->addBlockData(createStorageDataHandling<BodyTuple>(), "Storage");
+   auto ccdID               = forest->addBlockData(ccd::createHashGridsDataHandling( globalBodyStorage, storageID ), "CCD");
+   auto fcdID               = forest->addBlockData(fcd::createSimpleFCDDataHandling<BodyTuple>(), "FCD");
+
+   cr::HCSITS cr(globalBodyStorage, forest->getBlockStoragePointer(), storageID, ccdID, fcdID);
+   cr.setMaxIterations( uint_c(10) );
+   cr.setRelaxationModel( cr::HCSITS::InelasticGeneralizedMaximumDissipationContact );
+   cr.setRelaxationParameter( real_t(0.7) );
+   cr.setErrorReductionParameter( real_t(0.8) );
+   cr.setGlobalLinearAcceleration( Vec3(0,0,-1) );
+
+   createPlane( *globalBodyStorage, 0, Vec3(0,0,1), Vec3(0,0,0) );
+
+   MaterialID iron = Material::find("iron");
+   UnionType* un   = createUnion< boost::tuple<Sphere> >( *globalBodyStorage, forest->getBlockStorage(), storageID, 0, Vec3(5,5,5) );
+   SphereID sp1 = new Sphere( 10, 0, Vec3(5,5,1), Vec3(0,0,0), Quat(), real_t(1)  , iron, false, true, false );
+   SphereID sp2 = new Sphere( 11, 0, Vec3(6,5,2), Vec3(0,0,0), Quat(), real_t(1.5), iron, false, true, false );
+   un->add(sp1);
+   un->add(sp2);
+
+   auto vtkOutput   = make_shared<SphereVtkOutput>(storageID, forest->getBlockStorage()) ;
+   auto vtkWriter   = vtk::createVTKOutput_PointData(vtkOutput, "Bodies", 1, "vtk_out", "simulation_step", false, false);
+
+   for (unsigned int i = 0; i < 10000; ++i)
+   {
+      vtkWriter->write( true );
+      cr.timestep( real_t(0.001) );
+   }
+
+   //WALBERLA_CHECK_FLOAT_EQUAL( sp1->getLinearVel().length(), real_t(0) );
+   //WALBERLA_CHECK_FLOAT_EQUAL( sp2->getLinearVel().length(), real_t(0) );
+   WALBERLA_CHECK_FLOAT_EQUAL_EPSILON( sp1->getPosition()[2], real_t(1)  , real_t(0.001) );
+   WALBERLA_CHECK_FLOAT_EQUAL_EPSILON( sp2->getPosition()[2], real_t(1.5), real_t(0.001) );
+   WALBERLA_CHECK_FLOAT_EQUAL( (sp1->getPosition() - sp2->getPosition()).length(), real_t(sqrt(2)) );
+
+   //WALBERLA_LOG_DEVEL(un);
+
+   return EXIT_SUCCESS;
+}
-- 
GitLab